diff --git a/src/bigbrother/ministry/ofdisk.py b/src/bigbrother/ministry/ofdisk.py index 136e8f09..ad2dd550 100644 --- a/src/bigbrother/ministry/ofdisk.py +++ b/src/bigbrother/ministry/ofdisk.py @@ -14,7 +14,5 @@ def detect_disk_access(info): return if identify_home() not in info[0]: - if 'proc' in info[0]: - logger.warn(f'[DISK MINISTRY] {info} - probably built in Onionr stats') - else: + if 'proc' not in info[0]: # if it is, it is onionr stats logger.warn(f'[DISK MINISTRY] {info}') diff --git a/src/communicator/__init__.py b/src/communicator/__init__.py index 89a7ff29..d712b318 100755 --- a/src/communicator/__init__.py +++ b/src/communicator/__init__.py @@ -86,14 +86,6 @@ class OnionrCommunicatorDaemon: # extends our upload list and saves our list when Onionr exits uploadqueue.UploadQueue(self) - - # Set timers, function reference, seconds - # requires_peer True means the timer function won't fire if we - # have no connected peers - peerPoolTimer = OnionrCommunicatorTimers( - self, onlinepeers.get_online_peers, 60, max_threads=1, - my_args=[self]) - # Timers to periodically lookup new blocks and download them lookup_blocks_timer = OnionrCommunicatorTimers( self, @@ -184,7 +176,6 @@ class OnionrCommunicatorDaemon: self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1) # Adjust initial timer triggers - peerPoolTimer.count = (peerPoolTimer.frequency - 1) cleanupTimer.count = (cleanupTimer.frequency - 60) blockCleanupTimer.count = (blockCleanupTimer.frequency - 2) lookup_blocks_timer = (lookup_blocks_timer.frequency - 2) @@ -193,7 +184,7 @@ class OnionrCommunicatorDaemon: if config.get('general.use_bootstrap_list', True): bootstrappeers.add_bootstrap_list_to_peer_list( - self, [], db_only=True) + self.kv, [], db_only=True) daemoneventhooks.daemon_event_handlers(shared_state) @@ -257,11 +248,6 @@ class OnionrCommunicatorDaemon: except KeyError: pass - def connectNewPeer(self, peer='', useBootstrap=False): - """Adds a new random online peer to self.onlinePeers""" - connectnewpeers.connect_new_peer_to_communicator( - self, peer, useBootstrap) - def peerCleanup(self): """This just calls onionrpeers.cleanupPeers. diff --git a/src/communicator/bootstrappeers.py b/src/communicator/bootstrappeers.py index 6a183737..7b07fc25 100644 --- a/src/communicator/bootstrappeers.py +++ b/src/communicator/bootstrappeers.py @@ -4,9 +4,6 @@ add bootstrap peers to the communicator peer list """ from typing import TYPE_CHECKING -if TYPE_CHECKING: - from deadsimplekv import DeadSimpleKV - from utils import readstatic, gettransports from coredb import keydb """ @@ -27,9 +24,8 @@ from coredb import keydb bootstrap_peers = readstatic.read_static('bootstrap-nodes.txt').split(',') -def add_bootstrap_list_to_peer_list(comm_inst, peerList, db_only=False): +def add_bootstrap_list_to_peer_list(kv, peerList, db_only=False): """Add the bootstrap list to the peer list (no duplicates).""" - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") for i in bootstrap_peers: if i not in peerList and i not in kv.get('offlinePeers') \ and i not in gettransports.get() and len(str(i).strip()) > 0: diff --git a/src/communicator/onlinepeers/onlinepeers.py b/src/communicator/onlinepeers/onlinepeers.py index c616e8b0..1bbc5d6a 100644 --- a/src/communicator/onlinepeers/onlinepeers.py +++ b/src/communicator/onlinepeers/onlinepeers.py @@ -5,11 +5,12 @@ get online peers in a communicator instance import time from typing import TYPE_CHECKING +import config from etc.humanreadabletime import human_readable_time +from communicatorutils.connectnewpeers import connect_new_peer_to_communicator import logger if TYPE_CHECKING: from deadsimplekv import DeadSimpleKV - from communicator import OnionrCommunicatorDaemon """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,17 +27,15 @@ if TYPE_CHECKING: """ -def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'): +def get_online_peers(shared_state): """Manage the kv.get('onlinePeers') attribute list. Connect to more peers if we have none connected """ - config = comm_inst.config - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") + kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV") if config.get('general.offline_mode', False): - comm_inst.decrementThreadCount('get_online_peers') return - logger.debug('Refreshing peer pool...') + logger.info('Refreshing peer pool...') max_peers = int(config.get('peers.max_connect', 10)) needed = max_peers - len(kv.get('onlinePeers')) @@ -46,9 +45,9 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'): for _ in range(needed): if len(kv.get('onlinePeers')) == 0: - comm_inst.connectNewPeer(useBootstrap=True) + connect_new_peer_to_communicator(shared_state, useBootstrap=True) else: - comm_inst.connectNewPeer() + connect_new_peer_to_communicator(shared_state) if kv.get('shutdown'): break @@ -57,9 +56,8 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'): logger.debug('Couldn\'t connect to any peers.' + f' Last node seen {last_seen} ago.') try: - get_online_peers(comm_inst) + get_online_peers(kv) except RecursionError: pass else: kv.put('lastNodeSeen', time.time()) - comm_inst.decrementThreadCount('get_online_peers') diff --git a/src/communicator/onlinepeers/removeonlinepeer.py b/src/communicator/onlinepeers/removeonlinepeer.py index fcd42328..0eb0b338 100644 --- a/src/communicator/onlinepeers/removeonlinepeer.py +++ b/src/communicator/onlinepeers/removeonlinepeer.py @@ -22,9 +22,8 @@ if TYPE_CHECKING: """ -def remove_online_peer(comm_inst, peer): +def remove_online_peer(kv, peer): """Remove an online peer.""" - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") try: del kv.get('connectTimes')[peer] except KeyError: diff --git a/src/communicator/peeraction.py b/src/communicator/peeraction.py index f26b6a92..fcb5f8e3 100644 --- a/src/communicator/peeraction.py +++ b/src/communicator/peeraction.py @@ -2,11 +2,15 @@ This file implements logic for performing requests to Onionr peers """ +from typing import TYPE_CHECKING + import streamedrequests import logger from onionrutils import epoch, basicrequests from coredb import keydb from . import onlinepeers +from onionrtypes import OnionAddressString +from onionrpeers.peerprofiles import PeerProfiles """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -23,17 +27,27 @@ from . import onlinepeers """ -def peer_action(comm_inst, peer, action, +def get_peer_profile(kv, address: OnionAddressString) -> 'PeerProfiles': + profile_inst_list = kv.get('peerProfiles') + for profile in profile_inst_list: + if profile.address == address: + return profile + p = PeerProfiles(address) + return p + + + +def peer_action(shared_state, peer, action, returnHeaders=False, max_resp_size=5242880): """Perform a get request to a peer.""" penalty_score = -10 - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") + kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV") if len(peer) == 0: return False url = 'http://%s/%s' % (peer, action) try: - ret_data = basicrequests.do_get_request(url, port=comm_inst.proxyPort, + ret_data = basicrequests.do_get_request(url, port=kv.get('proxyPort'), max_size=max_resp_size) except streamedrequests.exceptions.ResponseLimitReached: logger.warn( @@ -44,14 +58,14 @@ def peer_action(comm_inst, peer, action, # if request failed, (error), mark peer offline if ret_data is False: try: - comm_inst.getPeerProfileInstance(peer).addScore(penalty_score) - onlinepeers.remove_online_peer(comm_inst, peer) + get_peer_profile(kv, peer).addScore(penalty_score) + onlinepeers.remove_online_peer(kv, peer) keydb.transportinfo.set_address_info( peer, 'lastConnectAttempt', epoch.get_epoch()) if action != 'ping' and not kv.get('shutdown'): logger.warn(f'Lost connection to {peer}', terminal=True) # Will only add a new peer to pool if needed - onlinepeers.get_online_peers(comm_inst) + onlinepeers.get_online_peers(kv) except ValueError: pass else: diff --git a/src/communicatorutils/connectnewpeers.py b/src/communicatorutils/connectnewpeers.py index a660e5ed..a95e724d 100755 --- a/src/communicatorutils/connectnewpeers.py +++ b/src/communicatorutils/connectnewpeers.py @@ -13,6 +13,7 @@ from utils import networkmerger, gettransports from onionrutils import stringvalidators, epoch from communicator import peeraction, bootstrappeers from coredb import keydb +import config """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -29,10 +30,9 @@ from coredb import keydb """ -def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False): - config = comm_inst.config +def connect_new_peer_to_communicator(shared_state, peer='', useBootstrap=False): retData = False - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") + kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV") tried = kv.get('offlinePeers') transports = gettransports.get() if peer != '': @@ -63,7 +63,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False): if len(peerList) == 0 or useBootstrap: # Avoid duplicating bootstrap addresses in peerList if config.get('general.use_bootstrap_list', True): - bootstrappeers.add_bootstrap_list_to_peer_list(comm_inst, peerList) + bootstrappeers.add_bootstrap_list_to_peer_list(kv, peerList) for address in peerList: address = address.strip() @@ -81,7 +81,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False): if kv.get('shutdown'): return # Ping a peer, - ret = peeraction.peer_action(comm_inst, address, 'ping') + ret = peeraction.peer_action(shared_state, address, 'ping') if ret == 'pong!': time.sleep(0.1) if address not in mainPeerList: diff --git a/src/communicatorutils/lookupadders.py b/src/communicatorutils/lookupadders.py index d474af2d..ed8f8098 100755 --- a/src/communicatorutils/lookupadders.py +++ b/src/communicatorutils/lookupadders.py @@ -28,12 +28,12 @@ if TYPE_CHECKING: """ -def lookup_new_peer_transports_with_communicator(comm_inst): +def lookup_new_peer_transports_with_communicator(shared_state): logger.info('Looking up new addresses...') tryAmount = 1 newPeers = [] transports = gettransports.get() - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") + kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV") for i in range(tryAmount): # Download new peer address list from random online peers @@ -41,7 +41,7 @@ def lookup_new_peer_transports_with_communicator(comm_inst): # Don't get new peers if we have too many queued up break try: - peer = onlinepeers.pick_online_peer(comm_inst) + peer = onlinepeers.pick_online_peer() newAdders = peeraction.peer_action(comm_inst, peer, action='pex') except onionrexceptions.OnlinePeerNeeded: continue diff --git a/src/onionrcommands/daemonlaunch/__init__.py b/src/onionrcommands/daemonlaunch/__init__.py index f1fb5106..d605502e 100755 --- a/src/onionrcommands/daemonlaunch/__init__.py +++ b/src/onionrcommands/daemonlaunch/__init__.py @@ -40,6 +40,7 @@ from lan.server import LANServer from sneakernet import sneakernet_import_thread from onionrstatistics.devreporting import statistics_reporter from setupkvvars import setup_kv +from .spawndaemonthreads import spawn_client_threads """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -139,6 +140,7 @@ def daemon(): # Initialize the quasi-global variables setup_kv(shared_state.get(DeadSimpleKV)) + spawn_client_threads(shared_state) shared_state.get(daemoneventsapi.DaemonEventsBP) Thread(target=shared_state.get(apiservers.ClientAPI).start, diff --git a/src/onionrstatistics/serializeddata.py b/src/onionrstatistics/serializeddata.py index 005f7ed0..fece1d14 100755 --- a/src/onionrstatistics/serializeddata.py +++ b/src/onionrstatistics/serializeddata.py @@ -12,7 +12,7 @@ import ujson as json from coredb import blockmetadb from utils.sizeutils import size, human_size from utils.identifyhome import identify_home -import communicator +from onionrutils.epoch import get_epoch if TYPE_CHECKING: from deadsimplekv import DeadSimpleKV @@ -57,13 +57,11 @@ class SerializedData: self._too_many except AttributeError: sleep(1) - comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, - args=(self._too_many,)) - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") + kv: "DeadSimpleKV" = self._too_many.get_by_string("DeadSimpleKV") connected = [] [connected.append(x) for x in kv.get('onlinePeers') if x not in connected] - stats['uptime'] = kv.get('getUptime') + stats['uptime'] = get_epoch() - kv.get('startTime') stats['connectedNodes'] = '\n'.join(connected) stats['blockCount'] = len(blockmetadb.get_block_list()) stats['blockQueueCount'] = len(kv.get('blockQueue')) diff --git a/src/onionrthreads/__init__.py b/src/onionrthreads/__init__.py index aaef3956..2a0c25c9 100644 --- a/src/onionrthreads/__init__.py +++ b/src/onionrthreads/__init__.py @@ -3,23 +3,24 @@ from typing import Iterable from threading import Thread -from utils.bettersleep import better_sleep +from time import sleep def _onionr_thread(func: Callable, args: Iterable, - sleep: int, initial_sleep): - better_sleep(initial_sleep) + sleep_secs: int, initial_sleep): + if initial_sleep: + sleep(initial_sleep) while True: func(*args) - better_sleep(sleep) + sleep(sleep_secs) def add_onionr_thread( func: Callable, args: Iterable, - sleep: int, initial_sleep: int = 5): + sleep_secs: int, initial_sleep: int = 5): """Spawn a new onionr thread that exits when the main thread does. Runs in an infinite loop with sleep between calls Passes in an interable args and sleep variables""" Thread(target=_onionr_thread, - args=(func, args, sleep, initial_sleep), daemon=True).start() + args=(func, args, sleep_secs, initial_sleep), daemon=True).start()