From 6a6718c9fd6443952abed79bac2b6f2acdff05e4 Mon Sep 17 00:00:00 2001 From: Kevin Date: Sat, 25 Jul 2020 22:28:32 -0500 Subject: [PATCH] Moved onlinePeers to KV to further reduce coupling --- src/communicator/__init__.py | 7 ++++--- src/communicator/onlinepeers/onlinepeers.py | 9 +++++---- src/communicator/onlinepeers/pickonlinepeers.py | 11 ++++++++--- src/communicator/onlinepeers/removeonlinepeer.py | 7 ++++++- src/communicatorutils/announcenode.py | 3 ++- src/communicatorutils/connectnewpeers.py | 6 +++--- src/communicatorutils/cooldownpeer.py | 8 +++++++- src/communicatorutils/lookupblocks.py | 2 +- src/communicatorutils/netcheck.py | 7 ++++++- src/communicatorutils/onionrcommunicatortimers.py | 2 +- src/communicatorutils/uploadblocks/__init__.py | 4 +++- src/communicatorutils/uploadblocks/sessionmanager.py | 5 ++++- src/httpapi/miscpublicapi/upload.py | 2 +- src/onionrstatistics/devreporting/__init__.py | 6 ++++-- src/onionrstatistics/serializeddata.py | 2 +- 15 files changed, 56 insertions(+), 25 deletions(-) diff --git a/src/communicator/__init__.py b/src/communicator/__init__.py index e6404efc..0d7aabf9 100755 --- a/src/communicator/__init__.py +++ b/src/communicator/__init__.py @@ -60,8 +60,10 @@ class OnionrCommunicatorDaemon: self.shared_state = shared_state # TooManyObjects module # populate kv values - self.shared_state.get_by_string('DeadSimpleKV').put('blockQueue', {}) - self.shared_state.get_by_string('DeadSimpleKV').put('shutdown', False) + self.kv = self.shared_state.get_by_string('DeadSimpleKV') + self.kv.put('blockQueue', {}) + self.kv.put('shutdown', False) + self.kv.put('onlinePeers', []) if config.get('general.offline_mode', False): self.isOnline = False @@ -82,7 +84,6 @@ class OnionrCommunicatorDaemon: self.delay = 1 # lists of connected peers and peers we know we can't reach currently - self.onlinePeers = [] self.offlinePeers = [] self.cooldownPeer = {} self.connectTimes = {} diff --git a/src/communicator/onlinepeers/onlinepeers.py b/src/communicator/onlinepeers/onlinepeers.py index 894ec6da..c9cb6026 100644 --- a/src/communicator/onlinepeers/onlinepeers.py +++ b/src/communicator/onlinepeers/onlinepeers.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING from etc import humanreadabletime import logger if TYPE_CHECKING: + from deadsimplekv import DeadSimpleKV from communicator import OnionrCommunicatorDaemon """ This program is free software: you can redistribute it and/or modify @@ -26,7 +27,7 @@ if TYPE_CHECKING: def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'): - """Manage the comm_inst.onlinePeers attribute list. + """Manage the kv.get('onlinePeers') attribute list. Connect to more peers if we have none connected """ @@ -37,7 +38,7 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'): return logger.debug('Refreshing peer pool...') max_peers = int(config.get('peers.max_connect', 10)) - needed = max_peers - len(comm_inst.onlinePeers) + needed = max_peers - len(kv.get('onlinePeers')) last_seen = 'never' if not isinstance(comm_inst.lastNodeSeen, type(None)): @@ -45,7 +46,7 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'): comm_inst.lastNodeSeen) for _ in range(needed): - if len(comm_inst.onlinePeers) == 0: + if len(kv.get('onlinePeers')) == 0: comm_inst.connectNewPeer(useBootstrap=True) else: comm_inst.connectNewPeer() @@ -53,7 +54,7 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'): if kv.get('shutdown'): break else: - if len(comm_inst.onlinePeers) == 0: + if len(kv.get('onlinePeers')) == 0: logger.debug('Couldn\'t connect to any peers.' + f' Last node seen {last_seen} ago.') try: diff --git a/src/communicator/onlinepeers/pickonlinepeers.py b/src/communicator/onlinepeers/pickonlinepeers.py index 738ba3c5..4a3d6216 100644 --- a/src/communicator/onlinepeers/pickonlinepeers.py +++ b/src/communicator/onlinepeers/pickonlinepeers.py @@ -4,8 +4,12 @@ Onionr - Private P2P Communication. pick online peers in a communicator instance """ import secrets +from typing import TYPE_CHECKING import onionrexceptions + +if TYPE_CHECKING: + from deadsimplekv import DeadSimpleKV """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,18 +28,19 @@ import onionrexceptions def pick_online_peer(comm_inst): """Randomly picks peer from pool without bias (using secrets module).""" + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") ret_data = '' - peer_length = len(comm_inst.onlinePeers) + peer_length = len(kv.get('onlinePeers')) if peer_length <= 0: raise onionrexceptions.OnlinePeerNeeded while True: - peer_length = len(comm_inst.onlinePeers) + peer_length = len(kv.get('onlinePeers')) try: # Get a random online peer, securely. # May get stuck in loop if network is lost - ret_data = comm_inst.onlinePeers[secrets.randbelow(peer_length)] + ret_data = kv.get('onlinePeers')[secrets.randbelow(peer_length)] except IndexError: pass else: diff --git a/src/communicator/onlinepeers/removeonlinepeer.py b/src/communicator/onlinepeers/removeonlinepeer.py index 5d06f4ae..925aae13 100644 --- a/src/communicator/onlinepeers/removeonlinepeer.py +++ b/src/communicator/onlinepeers/removeonlinepeer.py @@ -2,6 +2,10 @@ remove an online peer from the pool in a communicator instance """ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from deadsimplekv import DeadSimpleKV """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -20,6 +24,7 @@ remove an online peer from the pool in a communicator instance def remove_online_peer(comm_inst, peer): """Remove an online peer.""" + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") try: del comm_inst.connectTimes[peer] except KeyError: @@ -29,6 +34,6 @@ def remove_online_peer(comm_inst, peer): except KeyError: pass try: - comm_inst.onlinePeers.remove(peer) + kv.get('onlinePeers').remove(peer) except ValueError: pass diff --git a/src/communicatorutils/announcenode.py b/src/communicatorutils/announcenode.py index f8e45a81..743dafa9 100755 --- a/src/communicatorutils/announcenode.py +++ b/src/communicatorutils/announcenode.py @@ -30,6 +30,7 @@ import onionrexceptions def announce_node(daemon): """Announce our node to our peers.""" ret_data = False + kv: "DeadSimpleKV" = daemon.shared_state.get_by_string("DeadSimpleKV") # Do not let announceCache get too large if len(daemon.announceCache) >= 10000: @@ -37,7 +38,7 @@ def announce_node(daemon): if daemon.config.get('general.security_level', 0) == 0: # Announce to random online peers - for i in daemon.onlinePeers: + for i in kv.get('onlinePeers'): if i not in daemon.announceCache and\ i not in daemon.announceProgress: peer = i diff --git a/src/communicatorutils/connectnewpeers.py b/src/communicatorutils/connectnewpeers.py index cc6755de..d357e378 100755 --- a/src/communicatorutils/connectnewpeers.py +++ b/src/communicatorutils/connectnewpeers.py @@ -75,7 +75,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False): if its already been tried/connected, or if its cooled down """ if len(address) == 0 or address in tried \ - or address in comm_inst.onlinePeers \ + or address in kv.get('onlinePeers') \ or address in comm_inst.cooldownPeer: continue if kv.get('shutdown'): @@ -87,9 +87,9 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False): if address not in mainPeerList: # Add a peer to our list if it isn't already since it connected networkmerger.mergeAdders(address) - if address not in comm_inst.onlinePeers: + if address not in kv.get('onlinePeers'): logger.info('Connected to ' + address, terminal=True) - comm_inst.onlinePeers.append(address) + kv.get('onlinePeers').append(address) comm_inst.connectTimes[address] = epoch.get_epoch() retData = address diff --git a/src/communicatorutils/cooldownpeer.py b/src/communicatorutils/cooldownpeer.py index d415c648..e44c448b 100755 --- a/src/communicatorutils/cooldownpeer.py +++ b/src/communicatorutils/cooldownpeer.py @@ -2,8 +2,13 @@ Select random online peer in a communicator instance and have them "cool down" """ +from typing import TYPE_CHECKING + from onionrutils import epoch from communicator import onlinepeers + +if TYPE_CHECKING: + from deadsimplekv import DeadSimpleKV """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -22,8 +27,9 @@ from communicator import onlinepeers def cooldown_peer(comm_inst): """Randomly add an online peer to cooldown, so we can connect a new one.""" + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") config = comm_inst.config - online_peer_amount = len(comm_inst.onlinePeers) + online_peer_amount = len(kv.get('onlinePeers')) minTime = 300 cooldown_time = 600 to_cool = '' diff --git a/src/communicatorutils/lookupblocks.py b/src/communicatorutils/lookupblocks.py index fcf347c8..db147f8c 100755 --- a/src/communicatorutils/lookupblocks.py +++ b/src/communicatorutils/lookupblocks.py @@ -66,7 +66,7 @@ def lookup_blocks_from_communicator(comm_inst): continue # if we've already tried all the online peers this time around, stop if peer in triedPeers: - if len(comm_inst.onlinePeers) == len(triedPeers): + if len(kv.get('onlinePeers')) == len(triedPeers): break else: continue diff --git a/src/communicatorutils/netcheck.py b/src/communicatorutils/netcheck.py index 63aad540..b56a0f33 100755 --- a/src/communicatorutils/netcheck.py +++ b/src/communicatorutils/netcheck.py @@ -9,6 +9,11 @@ import logger from utils import netutils from onionrutils import localcommand, epoch from . import restarttor + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from deadsimplekv import DeadSimpleKV """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -33,7 +38,7 @@ def net_check(comm_inst): # for detecting if we have received incoming connections recently rec = False kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") - if len(comm_inst.onlinePeers) == 0: + if len(kv.get('onlinePeers')) == 0: try: if (epoch.get_epoch() - int(localcommand.local_command ('/lastconnect'))) <= 60: diff --git a/src/communicatorutils/onionrcommunicatortimers.py b/src/communicatorutils/onionrcommunicatortimers.py index 2fbca2eb..9b91ac2f 100755 --- a/src/communicatorutils/onionrcommunicatortimers.py +++ b/src/communicatorutils/onionrcommunicatortimers.py @@ -66,7 +66,7 @@ class OnionrCommunicatorTimers: if self.count == self.frequency and not self.kv.get('shutdown'): try: if self.requires_peer and \ - len(self.daemon_inst.onlinePeers) == 0: + len(self.kv.get('onlinePeers')) == 0: raise onionrexceptions.OnlinePeerNeeded except onionrexceptions.OnlinePeerNeeded: return diff --git a/src/communicatorutils/uploadblocks/__init__.py b/src/communicatorutils/uploadblocks/__init__.py index bf83e301..78dddace 100755 --- a/src/communicatorutils/uploadblocks/__init__.py +++ b/src/communicatorutils/uploadblocks/__init__.py @@ -17,6 +17,7 @@ from onionrutils import stringvalidators, basicrequests import onionrcrypto from communicator import onlinepeers if TYPE_CHECKING: + from deadsimplekv import DeadSimpleKV from communicator import OnionrCommunicatorDaemon """ This program is free software: you can redistribute it and/or modify @@ -38,6 +39,7 @@ def upload_blocks_from_communicator(comm_inst: 'OnionrCommunicatorDaemon'): """Accept a communicator instance + upload blocks from its upload queue.""" """when inserting a block, we try to upload it to a few peers to add some deniability & increase functionality""" + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") TIMER_NAME = "upload_blocks_from_communicator" session_manager: sessionmanager.BlockUploadSessionManager @@ -63,7 +65,7 @@ def upload_blocks_from_communicator(comm_inst: 'OnionrCommunicatorDaemon'): comm_inst.decrementThreadCount(TIMER_NAME) return session = session_manager.add_session(bl) - for _ in range(min(len(comm_inst.onlinePeers), 6)): + for _ in range(min(len(kv.get('onlinePeers')), 6)): try: peer = onlinepeers.pick_online_peer(comm_inst) except onionrexceptions.OnlinePeerNeeded: diff --git a/src/communicatorutils/uploadblocks/sessionmanager.py b/src/communicatorutils/uploadblocks/sessionmanager.py index cb46c429..aa797db4 100644 --- a/src/communicatorutils/uploadblocks/sessionmanager.py +++ b/src/communicatorutils/uploadblocks/sessionmanager.py @@ -4,6 +4,7 @@ Manager for upload 'sessions' """ from typing import List, Union, TYPE_CHECKING if TYPE_CHECKING: + from deadsimplekv import DeadSimpleKV from session import UploadSession from onionrutils import bytesconverter @@ -84,10 +85,12 @@ class BlockUploadSessionManager: comm_inst: 'OnionrCommunicatorDaemon' # type: ignore comm_inst = self._too_many.get_by_string( # pylint: disable=E1101 type: ignore "OnionrCommunicatorDaemon") + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string( + "DeadSimpleKV") sessions_to_delete = [] if comm_inst.getUptime() < 120: return - onlinePeerCount = len(comm_inst.onlinePeers) + onlinePeerCount = len(kv.get('onlinePeers')) # If we have no online peers right now, if onlinePeerCount == 0: diff --git a/src/httpapi/miscpublicapi/upload.py b/src/httpapi/miscpublicapi/upload.py index 6a4e102a..8c268726 100755 --- a/src/httpapi/miscpublicapi/upload.py +++ b/src/httpapi/miscpublicapi/upload.py @@ -40,7 +40,7 @@ def accept_upload(request): try: b_hash = blockimporter.import_block_from_data(data) if b_hash: - if g.too_many.get_by_string("OnionrCommunicatorDaemon").onlinePeers: + if g.too_many.get_by_string("DeadSimpleKV").get('onlinePeers'): spawn( localcommand.local_command, f'/daemon-event/upload_event', diff --git a/src/onionrstatistics/devreporting/__init__.py b/src/onionrstatistics/devreporting/__init__.py index 6a9bb422..a31af860 100644 --- a/src/onionrstatistics/devreporting/__init__.py +++ b/src/onionrstatistics/devreporting/__init__.py @@ -28,13 +28,15 @@ from onionrutils import epoch def statistics_reporter(shared_state): server = config.get('statistics.server', '') if not config.get('statistics.i_dont_want_privacy', False) or \ - not server: return + not server: + return def compile_data(): return { 'time': epoch.get_epoch(), 'adders': get_transports(), - 'peers': shared_state.get_by_string('OnionrCommunicatorDaemon').onlinePeers + 'peers': shared_state.get_by_string( + 'DeadSimpleKV').get('onlinePeers') } while True: diff --git a/src/onionrstatistics/serializeddata.py b/src/onionrstatistics/serializeddata.py index d04b637a..5a13d4ed 100755 --- a/src/onionrstatistics/serializeddata.py +++ b/src/onionrstatistics/serializeddata.py @@ -54,7 +54,7 @@ class SerializedData: comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,)) kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") connected = [] - [connected.append(x) for x in comm_inst.onlinePeers if x not in connected] + [connected.append(x) for x in kv.get('onlinePeers') if x not in connected] stats['uptime'] = comm_inst.getUptime() stats['connectedNodes'] = '\n'.join(connected) stats['blockCount'] = len(blockmetadb.get_block_list())