fixed nasty bug where tor circuits was causing gevent to leak threads

This commit is contained in:
Kevin Froman 2020-03-20 03:50:48 -05:00
parent 022fbaa1af
commit e6181bdd1f
12 changed files with 47 additions and 39 deletions

View File

@ -50,11 +50,12 @@ def clean_old_blocks(comm_inst):
oldest = blockmetadb.get_block_list()[0] oldest = blockmetadb.get_block_list()[0]
except IndexError: except IndexError:
break break
blacklist.addToDB(oldest) else:
removeblock.remove_block(oldest) blacklist.addToDB(oldest)
onionrstorage.deleteBlock(oldest) removeblock.remove_block(oldest)
__remove_from_upload(comm_inst, oldest) onionrstorage.deleteBlock(oldest)
logger.info('Deleted block: %s' % (oldest,)) __remove_from_upload(comm_inst, oldest)
logger.info('Deleted block: %s' % (oldest,))
comm_inst.decrementThreadCount('clean_old_blocks') comm_inst.decrementThreadCount('clean_old_blocks')

View File

@ -26,15 +26,16 @@ import onionrexceptions, logger
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Callable, NewType, Iterable from typing import Callable, NewType, Iterable
from psutil import Process
if TYPE_CHECKING: if TYPE_CHECKING:
from communicator import OnionrCommunicatorDaemon from communicator import OnionrCommunicatorDaemon
CallFreqSeconds = NewType('CallFreqSeconds', int) CallFreqSeconds = NewType('CallFreqSeconds', int)
class OnionrCommunicatorTimers: class OnionrCommunicatorTimers:
def __init__(self, daemon_inst: OnionrCommunicatorDaemon, def __init__(self, daemon_inst: OnionrCommunicatorDaemon,
timer_function: Callable, frequency: CallFreqSeconds, timer_function: Callable, frequency: CallFreqSeconds,
make_thread:bool=True, thread_amount:int=1, max_threads:int=5, make_thread:bool=True, thread_amount:int=1, max_threads:int=5,
requires_peer:bool=False, my_args:Iterable=[]): requires_peer:bool=False, my_args:Iterable=[]):
self.timer_function = timer_function self.timer_function = timer_function
self.frequency = frequency self.frequency = frequency
@ -70,7 +71,7 @@ class OnionrCommunicatorTimers:
logger.debug('%s is currently using the maximum number of threads, not starting another.' % self.timer_function.__name__) logger.debug('%s is currently using the maximum number of threads, not starting another.' % self.timer_function.__name__)
else: else:
self.daemon_inst.threadCounts[self.timer_function.__name__] += 1 self.daemon_inst.threadCounts[self.timer_function.__name__] += 1
newThread = threading.Thread(target=self.timer_function, args=self.args, daemon=True, newThread = threading.Thread(target=self.timer_function, args=self.args, daemon=True,
name=self.timer_function.__name__ + ' - ' + str(uuid.uuid4())) name=self.timer_function.__name__ + ' - ' + str(uuid.uuid4()))
newThread.start() newThread.start()
else: else:

View File

@ -49,21 +49,21 @@ class DirectConnectionManagement:
if pubkey in communicator.direct_connection_clients: if pubkey in communicator.direct_connection_clients:
resp = communicator.direct_connection_clients[pubkey] resp = communicator.direct_connection_clients[pubkey]
return Response(resp) return Response(resp)
@direct_conn_management_bp.route('/dc-client/connect/<pubkey>') @direct_conn_management_bp.route('/dc-client/connect/<pubkey>')
def make_new_connection(pubkey): def make_new_connection(pubkey):
communicator = _get_communicator(g) communicator = _get_communicator(g)
resp = "pending" resp = "pending"
if pubkey in communicator.shared_state.get(pool.ServicePool).bootstrap_pending: if pubkey in communicator.shared_state.get(pool.ServicePool).bootstrap_pending:
return Response(resp) return Response(resp)
if pubkey in communicator.direct_connection_clients: if pubkey in communicator.direct_connection_clients:
resp = communicator.direct_connection_clients[pubkey] resp = communicator.direct_connection_clients[pubkey]
else: else:
"""Spawn a thread that will create the client and eventually add it to the """Spawn a thread that will create the client and eventually add it to the
communicator.active_services communicator.active_services
""" """
threading.Thread(target=onionrservices.OnionrServices().create_client, threading.Thread(target=onionrservices.OnionrServices().create_client,
args=[pubkey, communicator], daemon=True).start() args=[pubkey, communicator], daemon=True).start()
return Response(resp) return Response(resp)

View File

@ -50,6 +50,7 @@ def event(event_name, data = {}, threaded = True):
''' '''
if threaded: if threaded:
print('threaded event', event_name)
thread = Thread(target = __event_caller, args = (event_name, data)) thread = Thread(target = __event_caller, args = (event_name, data))
thread.start() thread.start()
return thread return thread

View File

@ -99,7 +99,6 @@ class POW:
self.mainHash = '0' * 64 self.mainHash = '0' * 64
self.puzzle = self.mainHash[0:min(self.difficulty, len(self.mainHash))] self.puzzle = self.mainHash[0:min(self.difficulty, len(self.mainHash))]
for i in range(max(1, threadCount)): for i in range(max(1, threadCount)):
t = threading.Thread(name = 'thread%s' % i, target = self.pow, args = (True,)) t = threading.Thread(name = 'thread%s' % i, target = self.pow, args = (True,))
t.start() t.start()

View File

@ -41,7 +41,7 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300):
''' '''
if not stringvalidators.validate_pub_key(peer): if not stringvalidators.validate_pub_key(peer):
raise ValueError('Peer must be valid base32 ed25519 public key') raise ValueError('Peer must be valid base32 ed25519 public key')
connection_pool = None connection_pool = None
# here we use a lambda for the timeout thread to set to true # here we use a lambda for the timeout thread to set to true
@ -60,7 +60,7 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300):
else: else:
comm_inst.service_greenlets.append(http_server) comm_inst.service_greenlets.append(http_server)
connection_pool = comm_inst.shared_state.get(pool.ServicePool) connection_pool = comm_inst.shared_state.get(pool.ServicePool)
bootstrap_address = '' bootstrap_address = ''
shutdown = False shutdown = False
bs_id = str(uuid.uuid4()) bs_id = str(uuid.uuid4())
@ -93,9 +93,9 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300):
controller.authenticate(config.get('tor.controlpassword')) controller.authenticate(config.get('tor.controlpassword'))
# Create the v3 onion service # Create the v3 onion service
response = controller.create_ephemeral_hidden_service({80: bootstrap_port}, key_type = 'NEW', key_content = 'ED25519-V3', await_publication = True) response = controller.create_ephemeral_hidden_service({80: bootstrap_port}, key_type = 'NEW', key_content = 'ED25519-V3', await_publication = True)
onionrblocks.insert(response.service_id, header='con', sign=True, encryptType='asym', onionrblocks.insert(response.service_id, header='con', sign=True, encryptType='asym',
asymPeer=peer, disableForward=True, expire=(epoch.get_epoch() + bootstrap_timeout)) asymPeer=peer, disableForward=True, expire=(epoch.get_epoch() + bootstrap_timeout))
threading.Thread(target=__bootstrap_timeout, args=[http_server, bootstrap_timeout, timed_out], daemon=True).start() threading.Thread(target=__bootstrap_timeout, args=[http_server, bootstrap_timeout, timed_out], daemon=True).start()
# Run the bootstrap server # Run the bootstrap server

View File

@ -1,8 +1,14 @@
""" """Onionr - Private P2P Communication.
Onionr - Private P2P Communication
This module serializes various data pieces for use in other modules, in particular the web api Serialize various node information
""" """
import json
from gevent import sleep
from psutil import Process
from coredb import blockmetadb
import communicator
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -18,11 +24,6 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
import json # noqa
import time # noqa
from coredb import blockmetadb # noqa
import communicator # noqa
class SerializedData: class SerializedData:
def __init__(self): def __init__(self):
@ -34,19 +35,21 @@ class SerializedData:
etc etc
} }
""" """
def get_stats(self): def get_stats(self):
"""Return statistics about our node""" """Return statistics about our node"""
stats = {} stats = {}
proc = Process()
try: try:
self._too_many self._too_many
except AttributeError: except AttributeError:
time.sleep(1) sleep(1)
comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,)) comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,))
connected = [] connected = []
[connected.append(x) for x in comm_inst.onlinePeers if x not in connected] [connected.append(x) for x in comm_inst.onlinePeers if x not in connected]
stats['uptime'] = comm_inst.getUptime() stats['uptime'] = comm_inst.getUptime()
stats['connectedNodes'] = '\n'.join(connected) stats['connectedNodes'] = '\n'.join(connected)
stats['blockCount'] = len(blockmetadb.get_block_list()) stats['blockCount'] = len(blockmetadb.get_block_list())
stats['blockQueueCount'] = len(comm_inst.blockQueue) stats['blockQueueCount'] = len(comm_inst.blockQueue)
stats['threads'] = proc.num_threads()
return json.dumps(stats) return json.dumps(stats)

View File

@ -3,7 +3,6 @@
""" """
import json import json
from gevent import sleep
from stem import CircStatus from stem import CircStatus
@ -29,6 +28,7 @@ class TorStats:
def __init__(self): def __init__(self):
self.circuits = {} self.circuits = {}
self.json_data = "" self.json_data = ""
self.controller = None
def get_json(self): def get_json(self):
"""Refresh circuits then serialize them into form: """Refresh circuits then serialize them into form:
@ -36,6 +36,8 @@ class TorStats:
"nodes": list of tuples containing fingerprint and nickname strings" "nodes": list of tuples containing fingerprint and nickname strings"
"purpose": https://stem.torproject.org/api/control.html#stem.CircPurpose "purpose": https://stem.torproject.org/api/control.html#stem.CircPurpose
""" """
if self.controller is None:
self.controller = get_controller()
self.get_circuits() self.get_circuits()
json_serialized = {} json_serialized = {}
for circuit in self.circuits.keys(): for circuit in self.circuits.keys():
@ -52,7 +54,7 @@ class TorStats:
def get_circuits(self): def get_circuits(self):
"""Update the circuit dictionary""" """Update the circuit dictionary"""
circuits = {} circuits = {}
for circ in list(sorted(get_controller().get_circuits())): for circ in list(sorted(self.controller.get_circuits())):
if circ.status != CircStatus.BUILT: if circ.status != CircStatus.BUILT:
continue continue
circuits[circ.id] = (circ.path, circ.purpose) circuits[circ.id] = (circ.path, circ.purpose)

View File

@ -62,10 +62,10 @@ class OnionrRunTestManager:
for i in RUN_TESTS: for i in RUN_TESTS:
last = i last = i
logger.info("[RUNTIME TEST] " + last.__name__ + " started", logger.info("[RUNTIME TEST] " + last.__name__ + " started",
terminal=True) terminal=True, timestamp=True)
i(self) i(self)
logger.info("[RUNTIME TEST] " + last.__name__ + " passed", logger.info("[RUNTIME TEST] " + last.__name__ + " passed",
terminal=True) terminal=True, timestamp=True)
except (ValueError, AttributeError): except (ValueError, AttributeError):
logger.error(last.__name__ + ' failed assertions', terminal=True) logger.error(last.__name__ + ' failed assertions', terminal=True)
except Exception as e: except Exception as e:

View File

@ -2,13 +2,10 @@
Ensure that clearnet cannot be reached Ensure that clearnet cannot be reached
""" """
from threading import Thread
from onionrutils.basicrequests import do_get_request from onionrutils.basicrequests import do_get_request
from onionrutils import localcommand from onionrutils import localcommand
import logger import logger
import config import config
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -30,7 +27,7 @@ def test_clearnet_tor_request(testmanager):
Does not run if Tor is being reused Does not run if Tor is being reused
""" """
config.reload() config.reload()
leak_result = "" leak_result = ""

View File

@ -10,11 +10,15 @@ def test_lan_server(testmanager):
try: try:
if requests.get(f"http://{best_ip}:{i}/ping").text == 'pong!': if requests.get(f"http://{best_ip}:{i}/ping").text == 'pong!':
bl = insert('test data') bl = insert('test data')
if bl not in requests.get(f"http://{best_ip}:{i}/blist/0").text: bl2 = insert('test data2')
l = requests.get(f"http://{best_ip}:{i}/blist/0").text
if bl not in l or bl2 not in l:
raise ValueError raise ValueError
if onionrblockapi.Block(bl).raw != requests.get(f"http://{best_ip}:{i}/get/{bl}").content: if onionrblockapi.Block(bl).raw != requests.get(f"http://{best_ip}:{i}/get/{bl}").content:
raise ValueError raise ValueError
break break
except requests.exceptions.ConnectionError: except requests.exceptions.ConnectionError:
pass pass
else: else:

View File

@ -1 +1 @@
1584597517 1584652661