From a801960179344506c4de25686b5f6932184907a5 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Mon, 6 Jan 2020 06:06:27 -0600 Subject: [PATCH] moved all but shutdown over to new daemon events system --- src/communicator/daemoneventhooks/__init__.py | 9 ++ src/communicatorutils/daemonqueuehandler.py | 69 -------------- src/communicatorutils/netcheck.py | 27 +++--- .../uploadblocks/mixmate/__init__.py | 1 + src/coredb/daemonqueue/__init__.py | 93 ------------------- src/coredb/dbfiles.py | 1 - src/httpapi/miscclientapi/endpoints.py | 21 ----- src/httpapi/miscpublicapi/upload.py | 10 +- src/netcontroller/torcontrol/rebuildtor.py | 33 ++++++- src/onionrblocks/insert.py | 4 +- src/onionrcommands/runtimetestcmd.py | 12 ++- 11 files changed, 76 insertions(+), 204 deletions(-) delete mode 100755 src/communicatorutils/daemonqueuehandler.py delete mode 100644 src/coredb/daemonqueue/__init__.py diff --git a/src/communicator/daemoneventhooks/__init__.py b/src/communicator/daemoneventhooks/__init__.py index aa37d82c..fe46f15b 100644 --- a/src/communicator/daemoneventhooks/__init__.py +++ b/src/communicator/daemoneventhooks/__init__.py @@ -9,6 +9,7 @@ from typing import TYPE_CHECKING from gevent import sleep from communicatorutils.uploadblocks import mixmate +from communicatorutils import restarttor if TYPE_CHECKING: from toomanyobjs import TooMany @@ -61,6 +62,14 @@ def daemon_event_handlers(shared_state: 'TooMany'): pass return "removed" + def restart_tor(): + restarttor.restart(comm_inst) + comm_inst.offlinePeers = [] + + def test_runtime(): + comm_inst.shared_state.get_by_string( + "OnionrRunTestManager").run_tests() + events_api.register_listener(remove_from_insert_queue_wrapper) events_api.register_listener(print_test) events_api.register_listener(upload_event) diff --git a/src/communicatorutils/daemonqueuehandler.py b/src/communicatorutils/daemonqueuehandler.py deleted file mode 100755 index 2c2cda75..00000000 --- a/src/communicatorutils/daemonqueuehandler.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Onionr - P2P Anonymous Storage Network. - -Handle daemon queue commands in the communicator -""" -import logger -from onionrplugins import onionrevents as events -from onionrutils import localcommand -from coredb import daemonqueue -import filepaths -from . import restarttor -from communicatorutils.uploadblocks import mixmate -""" - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . -""" - - -def handle_daemon_commands(comm_inst): - # Deprecated in favor of daemon events - cmd = daemonqueue.daemon_queue() - response = '' - if cmd is not False: - events.event('daemon_command', data = {'cmd': cmd}) - if cmd[0] == 'shutdown': - comm_inst.shutdown = True - elif cmd[0] == 'runtimeTest': - comm_inst.shared_state.get_by_string( - "OnionrRunTestManager").run_tests() - elif cmd[0] == 'remove_from_insert_list': - try: - comm_inst.generating_blocks.remove(cmd[1]) - except ValueError: - pass - elif cmd[0] == 'restartTor': - restarttor.restart(comm_inst) - comm_inst.offlinePeers = [] - elif cmd[0] == 'uploadBlock': - comm_inst.blocksToUpload.append(cmd[1]) - elif cmd[0] == 'uploadEvent': - try: - mixmate.block_mixer(comm_inst.blocksToUpload, cmd[1]) - except ValueError: - pass - else: - localcommand.local_command( - '/waitforshare/' + cmd[1], post=True, maxWait=5) - else: - logger.debug( - 'Received daemon queue command unable to be handled: %s' % - (cmd[0],)) - - if cmd[0] not in ('', None): - if response != '': - localcommand.local_command( - 'queueResponseAdd/' + cmd[4], - post=True, postData={'data': response}) - response = '' - - comm_inst.decrementThreadCount('handle_daemon_commands') diff --git a/src/communicatorutils/netcheck.py b/src/communicatorutils/netcheck.py index 3371d97d..415c2d1e 100755 --- a/src/communicatorutils/netcheck.py +++ b/src/communicatorutils/netcheck.py @@ -1,10 +1,15 @@ -''' +""" Onionr - Private P2P Communication - Determine if our node is able to use Tor based on the status of a communicator instance + Determine if our node is able to use Tor based + on the status of a communicator instance and the result of pinging onion http servers -''' -''' +""" +import logger +from utils import netutils +from onionrutils import localcommand, epoch +from . import restarttor +""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or @@ -17,14 +22,12 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . -''' -import logger -from utils import netutils -from onionrutils import localcommand, epoch -from . import restarttor +""" + + def net_check(comm_inst): - '''Check if we are connected to the internet - or not when we can't connect to any peers''' + """Check if we are connected to the internet + or not when we can't connect to any peers""" # for detecting if we have received incoming connections recently rec = False if len(comm_inst.onlinePeers) == 0: @@ -46,4 +49,4 @@ def net_check(comm_inst): comm_inst.isOnline = False else: comm_inst.isOnline = True - comm_inst.decrementThreadCount('net_check') \ No newline at end of file + comm_inst.decrementThreadCount('net_check') diff --git a/src/communicatorutils/uploadblocks/mixmate/__init__.py b/src/communicatorutils/uploadblocks/mixmate/__init__.py index ce50be7e..4160b0ec 100644 --- a/src/communicatorutils/uploadblocks/mixmate/__init__.py +++ b/src/communicatorutils/uploadblocks/mixmate/__init__.py @@ -38,6 +38,7 @@ def block_mixer(upload_list: List[onionrtypes.BlockHash], to the said block list """ bl = onionrblockapi.Block(block_to_mix) + if time.time() - bl.claimedTime > onionrvalues.BLOCK_POOL_MAX_AGE: raise ValueError diff --git a/src/coredb/daemonqueue/__init__.py b/src/coredb/daemonqueue/__init__.py deleted file mode 100644 index ee4d9943..00000000 --- a/src/coredb/daemonqueue/__init__.py +++ /dev/null @@ -1,93 +0,0 @@ -''' - Onionr - Private P2P Communication - - Write and read the daemon queue, which is how messages are passed into the onionr daemon in a more - direct way than the http api -''' -''' - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . -''' -import sqlite3, os -from onionrplugins import onionrevents as events -from onionrutils import localcommand, epoch -from .. import dbfiles -from onionrsetup import dbcreator -from etc import onionrvalues - -def daemon_queue()->str: - ''' - Gives commands to the communication proccess/daemon by reading an sqlite3 database - - This function intended to be used by the client. Queue to exchange data between "client" and server. - ''' - - retData = False - if not os.path.exists(dbfiles.daemon_queue_db): - dbcreator.createDaemonDB() - else: - conn = sqlite3.connect(dbfiles.daemon_queue_db, timeout=onionrvalues.DATABASE_LOCK_TIMEOUT) - c = conn.cursor() - try: - for row in c.execute('SELECT command, data, date, min(ID), responseID FROM commands group by id'): - retData = row - break - except sqlite3.OperationalError: - dbcreator.createDaemonDB() - else: - if retData != False: - c.execute('DELETE FROM commands WHERE id=?;', (retData[3],)) - conn.commit() - conn.close() - - return retData - -def daemon_queue_add(command: str, data='', responseID: str =''): - ''' - Add a command to the daemon queue, used by the communication daemon (communicator.py) - ''' - - retData = True - - date = epoch.get_epoch() - conn = sqlite3.connect(dbfiles.daemon_queue_db, timeout=onionrvalues.DATABASE_LOCK_TIMEOUT) - c = conn.cursor() - t = (command, data, date, responseID) - try: - c.execute('INSERT INTO commands (command, data, date, responseID) VALUES(?, ?, ?, ?)', t) - conn.commit() - except sqlite3.OperationalError: - retData = False - daemon_queue() - conn.close() - return retData - -def daemon_queue_get_response(responseID=''): - ''' - Get a response sent by communicator to the API, by requesting to the API - ''' - if len(responseID) == 0: raise ValueError('ResponseID should not be empty') - resp = localcommand.local_command(dbfiles.daemon_queue_db, 'queueResponse/' + responseID) - return resp - -def clear_daemon_queue(): - ''' - Clear the daemon queue (somewhat dangerous) - ''' - conn = sqlite3.connect(dbfiles.daemon_queue_db, timeout=onionrvalues.DATABASE_LOCK_TIMEOUT) - c = conn.cursor() - - c.execute('DELETE FROM commands;') - conn.commit() - - conn.close() diff --git a/src/coredb/dbfiles.py b/src/coredb/dbfiles.py index fedbfdfb..3d1e8383 100644 --- a/src/coredb/dbfiles.py +++ b/src/coredb/dbfiles.py @@ -5,7 +5,6 @@ if not home.endswith('/'): home += '/' block_meta_db = '%sblock-metadata.db' % (home) block_data_db = '%s/block-data.db' % (filepaths.block_data_location,) -daemon_queue_db = '%sdaemon-queue.db' % (home,) address_info_db = '%saddress.db' % (home,) user_id_info_db = '%susers.db' % (home,) forward_keys_db = '%sforward-keys.db' % (home,) diff --git a/src/httpapi/miscclientapi/endpoints.py b/src/httpapi/miscclientapi/endpoints.py index e92bfe05..0c92f11f 100644 --- a/src/httpapi/miscclientapi/endpoints.py +++ b/src/httpapi/miscclientapi/endpoints.py @@ -52,27 +52,6 @@ class PrivateEndpoints: def get_hit_count(): return Response(str(client_api.publicAPI.hitCount)) - @private_endpoints_bp.route('/queueResponseAdd/', methods=['post']) - def queueResponseAdd(name): - # Responses from the daemon. TODO: change to direct var access instead of http endpoint - client_api.queueResponse[name] = request.form['data'] - return Response('success') - - @private_endpoints_bp.route('/queueResponse/') - def queueResponse(name): - # Fetch a daemon queue response - resp = 'failure' - try: - resp = client_api.queueResponse[name] - except KeyError: - pass - else: - del client_api.queueResponse[name] - if resp == 'failure': - return resp, 404 - else: - return resp - @private_endpoints_bp.route('/ping') def ping(): # Used to check if client api is working diff --git a/src/httpapi/miscpublicapi/upload.py b/src/httpapi/miscpublicapi/upload.py index e2c3fe47..9c3359f2 100755 --- a/src/httpapi/miscpublicapi/upload.py +++ b/src/httpapi/miscpublicapi/upload.py @@ -3,6 +3,7 @@ Accept block uploads to the public API server ''' +from gevent import spawn from gevent import threading import sys @@ -10,7 +11,6 @@ from flask import Response from flask import abort from onionrutils import localcommand -from coredb import daemonqueue from onionrblocks import blockimporter import onionrexceptions import logger @@ -40,7 +40,13 @@ def accept_upload(request): try: b_hash = blockimporter.import_block_from_data(data) if b_hash: - daemonqueue.daemon_queue_add('uploadEvent', b_hash) + spawn( + localcommand.local_command, + f'/daemon-event/upload_event', + post=True, + is_json=True, + postData={'block': b_hash} + ).get(timeout=5) resp = 'success' else: resp = 'failure' diff --git a/src/netcontroller/torcontrol/rebuildtor.py b/src/netcontroller/torcontrol/rebuildtor.py index 219141d3..f063d9a1 100644 --- a/src/netcontroller/torcontrol/rebuildtor.py +++ b/src/netcontroller/torcontrol/rebuildtor.py @@ -1,5 +1,34 @@ +"""Onionr - P2P Anonymous Storage Network. + +Send Tor restart command +""" import time -from coredb import daemonqueue + +from gevent import spawn + +from onionrutils import localcommand +""" + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +""" + def rebuild(): - daemonqueue.daemon_queue_add('restartTor') + """Send Tor restart command""" + spawn( + localcommand.local_command, + f'/daemon-event/restart_tor', + post=True, + is_json=True, + postData={} + ).get(10) diff --git a/src/onionrblocks/insert.py b/src/onionrblocks/insert.py index 4f869c27..3b89354f 100644 --- a/src/onionrblocks/insert.py +++ b/src/onionrblocks/insert.py @@ -197,7 +197,6 @@ def insert_block(data: Union[str, bytes], header: str = 'txt', retData = False else: # Tell the api server through localCommand to wait for the daemon to upload this block to make statistical analysis more difficult - #coredb.daemonqueue.daemon_queue_add('uploadEvent', retData) spawn( localcommand.local_command, f'/daemon-event/upload_event', @@ -224,6 +223,7 @@ def insert_block(data: Union[str, bytes], header: str = 'txt', spawn( localcommand.local_command, '/daemon-event/remove_from_insert_queue_wrapper', - post=True + post=True, + postData={'block_hash': retData} ).get(timeout=5) return retData diff --git a/src/onionrcommands/runtimetestcmd.py b/src/onionrcommands/runtimetestcmd.py index 7af3bbec..5004e37d 100644 --- a/src/onionrcommands/runtimetestcmd.py +++ b/src/onionrcommands/runtimetestcmd.py @@ -2,7 +2,9 @@ Command to tell daemon to do run time tests """ -from coredb import daemonqueue +from gevent import spawn + +from onionrutils import localcommand """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,7 +23,13 @@ from coredb import daemonqueue def do_runtime_test(): """Send runtime test daemon queue command.""" - daemonqueue.daemon_queue_add("runtimeTest") + spawn( + localcommand.local_command, + f'/daemon-event/test_runtime', + post=True, + is_json=True, + postData={} + ).get(10) do_runtime_test.onionr_help = "If Onionr is running, " # type: ignore