From e6181bdd1fb802dc14187724510057959ea9035c Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Fri, 20 Mar 2020 03:50:48 -0500 Subject: [PATCH] fixed nasty bug where tor circuits was causing gevent to leak threads --- src/communicatorutils/housekeeping.py | 11 ++++---- .../onionrcommunicatortimers.py | 9 ++++--- src/httpapi/directconnections/__init__.py | 8 +++--- src/onionrplugins/onionrevents.py | 1 + src/onionrproofs/__init__.py | 1 - src/onionrservices/bootstrapservice.py | 8 +++--- src/onionrstatistics/serializeddata.py | 25 +++++++++++-------- .../transports/tor/__init__.py | 6 +++-- src/runtests/__init__.py | 4 +-- src/runtests/clearnettor.py | 5 +--- src/runtests/lanservertest.py | 6 ++++- tests/runtime-result.txt | 2 +- 12 files changed, 47 insertions(+), 39 deletions(-) diff --git a/src/communicatorutils/housekeeping.py b/src/communicatorutils/housekeeping.py index ba8fe82d..950e2813 100755 --- a/src/communicatorutils/housekeeping.py +++ b/src/communicatorutils/housekeeping.py @@ -50,11 +50,12 @@ def clean_old_blocks(comm_inst): oldest = blockmetadb.get_block_list()[0] except IndexError: break - blacklist.addToDB(oldest) - removeblock.remove_block(oldest) - onionrstorage.deleteBlock(oldest) - __remove_from_upload(comm_inst, oldest) - logger.info('Deleted block: %s' % (oldest,)) + else: + blacklist.addToDB(oldest) + removeblock.remove_block(oldest) + onionrstorage.deleteBlock(oldest) + __remove_from_upload(comm_inst, oldest) + logger.info('Deleted block: %s' % (oldest,)) comm_inst.decrementThreadCount('clean_old_blocks') diff --git a/src/communicatorutils/onionrcommunicatortimers.py b/src/communicatorutils/onionrcommunicatortimers.py index 7ffaa355..2a464bd7 100755 --- a/src/communicatorutils/onionrcommunicatortimers.py +++ b/src/communicatorutils/onionrcommunicatortimers.py @@ -26,15 +26,16 @@ import onionrexceptions, logger from typing import TYPE_CHECKING from typing import Callable, NewType, Iterable +from psutil import Process if TYPE_CHECKING: from communicator import OnionrCommunicatorDaemon CallFreqSeconds = NewType('CallFreqSeconds', int) class OnionrCommunicatorTimers: - def __init__(self, daemon_inst: OnionrCommunicatorDaemon, - timer_function: Callable, frequency: CallFreqSeconds, - make_thread:bool=True, thread_amount:int=1, max_threads:int=5, + def __init__(self, daemon_inst: OnionrCommunicatorDaemon, + timer_function: Callable, frequency: CallFreqSeconds, + make_thread:bool=True, thread_amount:int=1, max_threads:int=5, requires_peer:bool=False, my_args:Iterable=[]): self.timer_function = timer_function self.frequency = frequency @@ -70,7 +71,7 @@ class OnionrCommunicatorTimers: logger.debug('%s is currently using the maximum number of threads, not starting another.' % self.timer_function.__name__) else: self.daemon_inst.threadCounts[self.timer_function.__name__] += 1 - newThread = threading.Thread(target=self.timer_function, args=self.args, daemon=True, + newThread = threading.Thread(target=self.timer_function, args=self.args, daemon=True, name=self.timer_function.__name__ + ' - ' + str(uuid.uuid4())) newThread.start() else: diff --git a/src/httpapi/directconnections/__init__.py b/src/httpapi/directconnections/__init__.py index 146a90f2..65d527b5 100644 --- a/src/httpapi/directconnections/__init__.py +++ b/src/httpapi/directconnections/__init__.py @@ -49,21 +49,21 @@ class DirectConnectionManagement: if pubkey in communicator.direct_connection_clients: resp = communicator.direct_connection_clients[pubkey] return Response(resp) - + @direct_conn_management_bp.route('/dc-client/connect/') def make_new_connection(pubkey): communicator = _get_communicator(g) resp = "pending" if pubkey in communicator.shared_state.get(pool.ServicePool).bootstrap_pending: return Response(resp) - + if pubkey in communicator.direct_connection_clients: resp = communicator.direct_connection_clients[pubkey] else: """Spawn a thread that will create the client and eventually add it to the - communicator.active_services + communicator.active_services """ - threading.Thread(target=onionrservices.OnionrServices().create_client, + threading.Thread(target=onionrservices.OnionrServices().create_client, args=[pubkey, communicator], daemon=True).start() return Response(resp) \ No newline at end of file diff --git a/src/onionrplugins/onionrevents.py b/src/onionrplugins/onionrevents.py index b0bba935..4f7ba198 100755 --- a/src/onionrplugins/onionrevents.py +++ b/src/onionrplugins/onionrevents.py @@ -50,6 +50,7 @@ def event(event_name, data = {}, threaded = True): ''' if threaded: + print('threaded event', event_name) thread = Thread(target = __event_caller, args = (event_name, data)) thread.start() return thread diff --git a/src/onionrproofs/__init__.py b/src/onionrproofs/__init__.py index e76fd9d4..82d47af0 100755 --- a/src/onionrproofs/__init__.py +++ b/src/onionrproofs/__init__.py @@ -99,7 +99,6 @@ class POW: self.mainHash = '0' * 64 self.puzzle = self.mainHash[0:min(self.difficulty, len(self.mainHash))] - for i in range(max(1, threadCount)): t = threading.Thread(name = 'thread%s' % i, target = self.pow, args = (True,)) t.start() diff --git a/src/onionrservices/bootstrapservice.py b/src/onionrservices/bootstrapservice.py index a493290b..c41295df 100755 --- a/src/onionrservices/bootstrapservice.py +++ b/src/onionrservices/bootstrapservice.py @@ -41,7 +41,7 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300): ''' if not stringvalidators.validate_pub_key(peer): raise ValueError('Peer must be valid base32 ed25519 public key') - + connection_pool = None # here we use a lambda for the timeout thread to set to true @@ -60,7 +60,7 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300): else: comm_inst.service_greenlets.append(http_server) connection_pool = comm_inst.shared_state.get(pool.ServicePool) - + bootstrap_address = '' shutdown = False bs_id = str(uuid.uuid4()) @@ -93,9 +93,9 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300): controller.authenticate(config.get('tor.controlpassword')) # Create the v3 onion service response = controller.create_ephemeral_hidden_service({80: bootstrap_port}, key_type = 'NEW', key_content = 'ED25519-V3', await_publication = True) - onionrblocks.insert(response.service_id, header='con', sign=True, encryptType='asym', + onionrblocks.insert(response.service_id, header='con', sign=True, encryptType='asym', asymPeer=peer, disableForward=True, expire=(epoch.get_epoch() + bootstrap_timeout)) - + threading.Thread(target=__bootstrap_timeout, args=[http_server, bootstrap_timeout, timed_out], daemon=True).start() # Run the bootstrap server diff --git a/src/onionrstatistics/serializeddata.py b/src/onionrstatistics/serializeddata.py index 35e14fc3..2d03ed77 100755 --- a/src/onionrstatistics/serializeddata.py +++ b/src/onionrstatistics/serializeddata.py @@ -1,8 +1,14 @@ -""" - Onionr - Private P2P Communication +"""Onionr - Private P2P Communication. - This module serializes various data pieces for use in other modules, in particular the web api +Serialize various node information """ +import json +from gevent import sleep + +from psutil import Process + +from coredb import blockmetadb +import communicator """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -18,11 +24,6 @@ along with this program. If not, see . """ -import json # noqa -import time # noqa - -from coredb import blockmetadb # noqa -import communicator # noqa class SerializedData: def __init__(self): @@ -34,19 +35,21 @@ class SerializedData: etc } """ - + def get_stats(self): """Return statistics about our node""" stats = {} + proc = Process() try: self._too_many except AttributeError: - time.sleep(1) + sleep(1) comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,)) connected = [] - [connected.append(x) for x in comm_inst.onlinePeers if x not in connected] + [connected.append(x) for x in comm_inst.onlinePeers if x not in connected] stats['uptime'] = comm_inst.getUptime() stats['connectedNodes'] = '\n'.join(connected) stats['blockCount'] = len(blockmetadb.get_block_list()) stats['blockQueueCount'] = len(comm_inst.blockQueue) + stats['threads'] = proc.num_threads() return json.dumps(stats) diff --git a/src/onionrstatistics/transports/tor/__init__.py b/src/onionrstatistics/transports/tor/__init__.py index 15ada77c..577d1620 100644 --- a/src/onionrstatistics/transports/tor/__init__.py +++ b/src/onionrstatistics/transports/tor/__init__.py @@ -3,7 +3,6 @@ """ import json -from gevent import sleep from stem import CircStatus @@ -29,6 +28,7 @@ class TorStats: def __init__(self): self.circuits = {} self.json_data = "" + self.controller = None def get_json(self): """Refresh circuits then serialize them into form: @@ -36,6 +36,8 @@ class TorStats: "nodes": list of tuples containing fingerprint and nickname strings" "purpose": https://stem.torproject.org/api/control.html#stem.CircPurpose """ + if self.controller is None: + self.controller = get_controller() self.get_circuits() json_serialized = {} for circuit in self.circuits.keys(): @@ -52,7 +54,7 @@ class TorStats: def get_circuits(self): """Update the circuit dictionary""" circuits = {} - for circ in list(sorted(get_controller().get_circuits())): + for circ in list(sorted(self.controller.get_circuits())): if circ.status != CircStatus.BUILT: continue circuits[circ.id] = (circ.path, circ.purpose) diff --git a/src/runtests/__init__.py b/src/runtests/__init__.py index bbb85119..123c4e51 100644 --- a/src/runtests/__init__.py +++ b/src/runtests/__init__.py @@ -62,10 +62,10 @@ class OnionrRunTestManager: for i in RUN_TESTS: last = i logger.info("[RUNTIME TEST] " + last.__name__ + " started", - terminal=True) + terminal=True, timestamp=True) i(self) logger.info("[RUNTIME TEST] " + last.__name__ + " passed", - terminal=True) + terminal=True, timestamp=True) except (ValueError, AttributeError): logger.error(last.__name__ + ' failed assertions', terminal=True) except Exception as e: diff --git a/src/runtests/clearnettor.py b/src/runtests/clearnettor.py index 30419f9c..3c7d7119 100644 --- a/src/runtests/clearnettor.py +++ b/src/runtests/clearnettor.py @@ -2,13 +2,10 @@ Ensure that clearnet cannot be reached """ -from threading import Thread - from onionrutils.basicrequests import do_get_request from onionrutils import localcommand import logger import config - """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -30,7 +27,7 @@ def test_clearnet_tor_request(testmanager): Does not run if Tor is being reused """ - + config.reload() leak_result = "" diff --git a/src/runtests/lanservertest.py b/src/runtests/lanservertest.py index 99e4ce31..97347883 100644 --- a/src/runtests/lanservertest.py +++ b/src/runtests/lanservertest.py @@ -10,11 +10,15 @@ def test_lan_server(testmanager): try: if requests.get(f"http://{best_ip}:{i}/ping").text == 'pong!': bl = insert('test data') - if bl not in requests.get(f"http://{best_ip}:{i}/blist/0").text: + bl2 = insert('test data2') + l = requests.get(f"http://{best_ip}:{i}/blist/0").text + if bl not in l or bl2 not in l: raise ValueError if onionrblockapi.Block(bl).raw != requests.get(f"http://{best_ip}:{i}/get/{bl}").content: raise ValueError + break + except requests.exceptions.ConnectionError: pass else: diff --git a/tests/runtime-result.txt b/tests/runtime-result.txt index ab89ffbd..69cddc63 100644 --- a/tests/runtime-result.txt +++ b/tests/runtime-result.txt @@ -1 +1 @@ -1584597517 \ No newline at end of file +1584652661 \ No newline at end of file