Onionr/src/sneakernet/__init__.py

56 lines
1.9 KiB
Python
Raw Normal View History

"""Onionr - Private P2P Communication.
Detect new block files in a given directory
"""
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import config
from filepaths import block_data_location
from etc.onionrvalues import BLOCK_EXPORT_FILE_EXT
from onionrblocks.blockimporter import import_block_from_data
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
watch_paths = config.get('transports.sneakernet.paths', list([]))
if block_data_location not in watch_paths:
watch_paths.append(block_data_location)
class _Importer(FileSystemEventHandler):
@staticmethod
def on_created(event):
if not event.src_path.endswith(BLOCK_EXPORT_FILE_EXT):
return
with open(event.src_path, 'rb') as block_file:
import_block_from_data(block_file.read())
if block_data_location in event.src_path:
try:
os.remove(event.src_path)
except FileNotFoundError:
pass
def sneakernet_import_thread():
2020-03-31 02:14:48 +00:00
"""Add block data dir & confed paths to fs observer to watch for new bls"""
observer = Observer()
for path in watch_paths:
observer.schedule(_Importer(), path, recursive=True)
observer.start()
while observer.isAlive():
2020-03-31 02:14:48 +00:00
# call import func with timeout
observer.join(60)