diff --git a/docs/dev/daemon-events.md b/docs/dev/daemon-events.md index 25769344..2bfd95fb 100644 --- a/docs/dev/daemon-events.md +++ b/docs/dev/daemon-events.md @@ -7,30 +7,3 @@ Observer pattern Register listeners dynamically per event Spawn new greenlets - -------------------- - -## Attributes - -events: dict - -schema: - -{ - "event_id": dict{ - "event_name": string, - "result_data": bytes, - "started": epoch, - "finished": epoch, - "done": bool - } -} - - --------------------- - -MsgPack schema: - -event_name: string -event_id: uuid4 - diff --git a/src/communicator/__init__.py b/src/communicator/__init__.py index e0be6382..3e217e1e 100755 --- a/src/communicator/__init__.py +++ b/src/communicator/__init__.py @@ -36,6 +36,7 @@ from coredb import daemonqueue from coredb import dbfiles from netcontroller import NetController from . import bootstrappeers +from . import daemoneventhooks """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -247,6 +248,8 @@ class OnionrCommunicatorDaemon: bootstrappeers.add_bootstrap_list_to_peer_list( self, [], db_only=True) + daemoneventhooks.daemon_event_handlers(shared_state) + if not config.get('onboarding.done', True): logger.info( 'First run detected. Run openhome to get setup.', diff --git a/src/communicator/daemoneventhooks/__init__.py b/src/communicator/daemoneventhooks/__init__.py index b696faee..09a9d2d4 100644 --- a/src/communicator/daemoneventhooks/__init__.py +++ b/src/communicator/daemoneventhooks/__init__.py @@ -2,6 +2,8 @@ Hooks to handle daemon events """ +from .removefrominsertqueue import remove_from_insert_queue + from typing import TYPE_CHECKING from gevent import sleep @@ -10,6 +12,7 @@ if TYPE_CHECKING: from toomanyobjs import TooMany from communicator import OnionrCommunicatorDaemon from httpapi.daemoneventsapi import DaemonEventsBP + from onionrtypes import BlockHash """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -33,6 +36,17 @@ def daemon_event_handlers(shared_state: 'TooMany'): return shared_state.get_by_string(class_name) except KeyError: sleep(0.2) + comm_inst = _get_inst('OnionrCommunicatorDaemon') events_api: 'DaemonEventsBP' = _get_inst('DaemonEventsBP') - + + def remove_from_insert_queue_wrapper(block_hash: 'BlockHash'): + print(f'removed {block_hash} from upload') + remove_from_insert_queue(comm_inst, block_hash) + + def print_test(text=''): + print("It works!", text) + return f"It works! {text}" + + events_api.register_listener(remove_from_insert_queue_wrapper) + events_api.register_listener(print_test) diff --git a/src/communicator/daemoneventhooks/removefrominsertqueue.py b/src/communicator/daemoneventhooks/removefrominsertqueue.py new file mode 100644 index 00000000..ff22b1e7 --- /dev/null +++ b/src/communicator/daemoneventhooks/removefrominsertqueue.py @@ -0,0 +1,31 @@ +"""Onionr - P2P Anonymous Storage Network. + +Remove block hash from daemon's upload list. +""" +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from communicator import OnionrCommunicatorDaemon + from onionrtypes import BlockHash +""" + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +""" + + +def remove_from_insert_queue(comm_inst: "OnionrCommunicatorDaemon", + b_hash: "BlockHash"): + """Remove block hash from daemon's upload list.""" + try: + comm_inst.generating_blocks.remove(b_hash) + except ValueError: + pass diff --git a/src/communicatorutils/daemonqueuehandler.py b/src/communicatorutils/daemonqueuehandler.py index 9cddc793..2c2cda75 100755 --- a/src/communicatorutils/daemonqueuehandler.py +++ b/src/communicatorutils/daemonqueuehandler.py @@ -26,42 +26,24 @@ from communicatorutils.uploadblocks import mixmate def handle_daemon_commands(comm_inst): + # Deprecated in favor of daemon events cmd = daemonqueue.daemon_queue() response = '' if cmd is not False: - events.event('daemon_command', data = {'cmd' : cmd}) + events.event('daemon_command', data = {'cmd': cmd}) if cmd[0] == 'shutdown': comm_inst.shutdown = True elif cmd[0] == 'runtimeTest': - comm_inst.shared_state.get_by_string("OnionrRunTestManager").run_tests() + comm_inst.shared_state.get_by_string( + "OnionrRunTestManager").run_tests() elif cmd[0] == 'remove_from_insert_list': try: comm_inst.generating_blocks.remove(cmd[1]) except ValueError: pass - elif cmd[0] == 'announceNode': - if len(comm_inst.onlinePeers) > 0: - comm_inst.announce(cmd[1]) - else: - logger.debug("No nodes connected. Will not introduce node.") - elif cmd[0] == 'runCheck': # deprecated - logger.debug('Status check; looks good.') - open(filepaths.run_check_file + '.runcheck', 'w+').close() - elif cmd[0] == 'connectedPeers': - response = '\n'.join(list(comm_inst.onlinePeers)).strip() - if response == '': - response = 'none' - elif cmd[0] == 'localCommand': - response = localcommand.local_command(cmd[1]) - elif cmd[0] == 'clearOffline': - comm_inst.offlinePeers = [] elif cmd[0] == 'restartTor': restarttor.restart(comm_inst) comm_inst.offlinePeers = [] - elif cmd[0] == 'pex': - for i in comm_inst.timers: - if i.timerFunction.__name__ == 'lookupAdders': - i.count = (i.frequency - 1) elif cmd[0] == 'uploadBlock': comm_inst.blocksToUpload.append(cmd[1]) elif cmd[0] == 'uploadEvent': @@ -70,13 +52,18 @@ def handle_daemon_commands(comm_inst): except ValueError: pass else: - localcommand.local_command('/waitforshare/' + cmd[1], post=True, maxWait=5) + localcommand.local_command( + '/waitforshare/' + cmd[1], post=True, maxWait=5) else: - logger.debug('Received daemon queue command unable to be handled: %s' % (cmd[0],)) + logger.debug( + 'Received daemon queue command unable to be handled: %s' % + (cmd[0],)) if cmd[0] not in ('', None): if response != '': - localcommand.local_command('queueResponseAdd/' + cmd[4], post=True, postData={'data': response}) + localcommand.local_command( + 'queueResponseAdd/' + cmd[4], + post=True, postData={'data': response}) response = '' comm_inst.decrementThreadCount('handle_daemon_commands') diff --git a/src/communicatorutils/uploadblocks/sessionmanager.py b/src/communicatorutils/uploadblocks/sessionmanager.py index 81faa668..763300c2 100644 --- a/src/communicatorutils/uploadblocks/sessionmanager.py +++ b/src/communicatorutils/uploadblocks/sessionmanager.py @@ -84,7 +84,7 @@ class BlockUploadSessionManager: comm_inst: 'OnionrCommunicatorDaemon' # type: ignore comm_inst = self._too_many.get_by_string( # pylint: disable=E1101 type: ignore - "OnionrCommunicatorDaemon") + "OnionrCommunicatorDaemon") sessions_to_delete = [] if comm_inst.getUptime() < 120: return @@ -106,7 +106,10 @@ class BlockUploadSessionManager: if (sess.total_success_count / onlinePeerCount) >= onionrvalues.MIN_BLOCK_UPLOAD_PEER_PERCENT: sessions_to_delete.append(sess) for sess in sessions_to_delete: - self.sessions.remove(session) + try: + self.sessions.remove(session) + except ValueError: + pass # TODO cleanup to one round of search # Remove the blocks from the sessions, upload list, # and waitforshare list diff --git a/src/httpapi/daemoneventsapi/__init__.py b/src/httpapi/daemoneventsapi/__init__.py index 31b20201..0863f410 100644 --- a/src/httpapi/daemoneventsapi/__init__.py +++ b/src/httpapi/daemoneventsapi/__init__.py @@ -2,9 +2,12 @@ Event driven interface to trigger events in communicator """ -import json -from flask import Blueprint, request, Response -import config +from typing import Callable + +from flask import Blueprint, request, Response, abort +from werkzeug.exceptions import BadRequest +from gevent import spawn + """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,28 +29,28 @@ class DaemonEventsBP: """Create DaemonEvents instance, intended to be a singleton. Attributes: - events: dict of current/finished events listeners: callables that are called when a new event is added. The callables name should match the event name _too_many: TooManyObjects instance set by external code """ event_BP = Blueprint('event_BP', __name__) - self.events = {} - self.listeners = {} + self.listeners = set([]) self.flask_bp = event_BP event_BP = self.flask_bp @event_BP.route('/daemon-event/', methods=['POST']) def daemon_event_handler(name): - if name in self.listeners: - - - @event_BP.route('/daemon-event/bp-enabled') - def bp_enabled() -> Response: - return Response('true') - - def clean_old(self): - """Deletes old daemon events based on their completion date.""" - pass + handler: Callable + try: + json_data = request.get_json(force=True) + except BadRequest: + json_data = {} + for handler in self.listeners: + if handler.__name__ == name: + return Response( + spawn(handler, **json_data).get(timeout=120)) + abort(404) + def register_listener(self, listener: Callable): + self.listeners.add(listener) diff --git a/src/onionrblocks/insert.py b/src/onionrblocks/insert.py index 29300b03..5aeeaa6f 100644 --- a/src/onionrblocks/insert.py +++ b/src/onionrblocks/insert.py @@ -20,6 +20,8 @@ from typing import Union import json +from gevent import spawn + from onionrutils import bytesconverter, epoch import filepaths, onionrstorage from . import storagecounter @@ -211,5 +213,10 @@ def insert_block(data: Union[str, bytes], header: str = 'txt', events.event('insertdeniable', {'content': plaintext, 'meta': plaintextMeta, 'hash': retData, 'peer': bytesconverter.bytes_to_str(asymPeer)}, threaded = True) else: events.event('insertblock', {'content': plaintext, 'meta': plaintextMeta, 'hash': retData, 'peer': bytesconverter.bytes_to_str(asymPeer)}, threaded = True) - coredb.daemonqueue.daemon_queue_add('remove_from_insert_list', data= dataNonce) + #coredb.daemonqueue.daemon_queue_add('remove_from_insert_list', data= dataNonce) + spawn( + localcommand.local_command, + '/daemon-event/remove_from_insert_queue_wrapper', + post=True, timeout=10 + ) return retData