diff --git a/src/communicator/__init__.py b/src/communicator/__init__.py index cfb8f2f7..0ab796f5 100755 --- a/src/communicator/__init__.py +++ b/src/communicator/__init__.py @@ -59,6 +59,9 @@ class OnionrCommunicatorDaemon: self.isOnline = True # Assume we're connected to the internet self.shared_state = shared_state # TooManyObjects module + # populate kv values + self.shared_state.get_by_string('DeadSimpleKV').put('blockQueue', {}) + if config.get('general.offline_mode', False): self.isOnline = False @@ -97,11 +100,7 @@ class OnionrCommunicatorDaemon: # set true when shutdown command received self.shutdown = False - # list of new blocks to download - # added to when new block lists are fetched from peers - self.blockQueue = {} - - # list of blocks currently downloading, avoid s + # list of blocks currently downloading self.currentDownloading = [] # timestamp when the last online node was seen diff --git a/src/communicatorutils/downloadblocks/__init__.py b/src/communicatorutils/downloadblocks/__init__.py index e7e865de..ea9cb6b6 100755 --- a/src/communicatorutils/downloadblocks/__init__.py +++ b/src/communicatorutils/downloadblocks/__init__.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from communicator import OnionrCommunicatorDaemon + from deadsimplekv import DeadSimpleKV from gevent import spawn @@ -45,15 +46,16 @@ storage_counter = storagecounter.StorageCounter() def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): """Use communicator instance to download blocks in the comms's queue""" blacklist = onionrblacklist.OnionrBlackList() + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter count: int = 0 metadata_validation_result: bool = False # Iterate the block queue in the communicator - for blockHash in list(comm_inst.blockQueue): + for blockHash in list(kv.get('blockQueue')): count += 1 try: - blockPeers = list(comm_inst.blockQueue[blockHash]) + blockPeers = list(kv.get('blockQueue')[blockHash]) except KeyError: blockPeers = [] removeFromQueue = True @@ -61,7 +63,8 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): if not shoulddownload.should_download(comm_inst, blockHash): continue - if comm_inst.shutdown or not comm_inst.isOnline or storage_counter.is_full(): + if comm_inst.shutdown or not comm_inst.isOnline or \ + storage_counter.is_full(): # Exit loop if shutting down or offline, or disk allocation reached break # Do not download blocks being downloaded @@ -82,8 +85,12 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): peerUsed = blockPeers.pop(0) if not comm_inst.shutdown and peerUsed.strip() != '': - logger.info("Attempting to download %s from %s..." % (blockHash[:12], peerUsed)) - content = peeraction.peer_action(comm_inst, peerUsed, 'getdata/' + blockHash, max_resp_size=3000000) # block content from random peer (includes metadata) + logger.info( + f"Attempting to download %s from {peerUsed}..." % (blockHash[:12],)) + content = peeraction.peer_action( + comm_inst, peerUsed, + 'getdata/' + blockHash, + max_resp_size=3000000) # block content from random peer if content is not False and len(content) > 0: try: @@ -151,10 +158,10 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): removeFromQueue = False # Don't remove from queue if 404 if removeFromQueue: try: - del comm_inst.blockQueue[blockHash] # remove from block queue both if success or false + del kv.get('blockQueue')[blockHash] # remove from block queue both if success or false if count == LOG_SKIP_COUNT: logger.info('%s blocks remaining in queue' % - [len(comm_inst.blockQueue)], terminal=True) + [len(kv.get('blockQueue'))], terminal=True) count = 0 except KeyError: pass diff --git a/src/communicatorutils/downloadblocks/shoulddownload.py b/src/communicatorutils/downloadblocks/shoulddownload.py index 2e296eb0..95fe4209 100644 --- a/src/communicatorutils/downloadblocks/shoulddownload.py +++ b/src/communicatorutils/downloadblocks/shoulddownload.py @@ -25,6 +25,7 @@ def should_download(comm_inst, block_hash) -> bool: """Return bool for if a (assumed to exist) block should be downloaded.""" blacklist = onionrblacklist.OnionrBlackList() should = True + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") if block_hash in blockmetadb.get_block_list(): # Don't download block we have should = False @@ -35,7 +36,7 @@ def should_download(comm_inst, block_hash) -> bool: if should is False: # Remove block from communicator queue if it shouldn't be downloaded try: - del comm_inst.blockQueue[block_hash] + del kv.get('blockQueue')[block_hash] except KeyError: pass return should diff --git a/src/communicatorutils/lookupblocks.py b/src/communicatorutils/lookupblocks.py index f86bdfef..fcf347c8 100755 --- a/src/communicatorutils/lookupblocks.py +++ b/src/communicatorutils/lookupblocks.py @@ -45,10 +45,11 @@ def lookup_blocks_from_communicator(comm_inst): maxBacklog = 1560 lastLookupTime = 0 # Last time we looked up a particular peer's list new_block_count = 0 + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") for i in range(tryAmount): # Defined here to reset it each time, time offset is added later listLookupCommand = 'getblocklist' - if len(comm_inst.blockQueue) >= maxBacklog: + if len(kv.get('blockQueue')) >= maxBacklog: break if not comm_inst.isOnline: break @@ -100,19 +101,19 @@ def lookup_blocks_from_communicator(comm_inst): # if block does not exist on disk + is not already in queue if i not in existingBlocks: - if i not in comm_inst.blockQueue: + if i not in kv.get('blockQueue'): if onionrproofs.hashMeetsDifficulty(i) and \ not blacklist.inBlacklist(i): - if len(comm_inst.blockQueue) <= 1000000: + if len(kv.get('blockQueue')) <= 1000000: # add blocks to download queue - comm_inst.blockQueue[i] = [peer] + kv.get('blockQueue')[i] = [peer] new_block_count += 1 comm_inst.dbTimestamps[peer] = \ epoch.get_rounded_epoch(roundS=60) else: - if peer not in comm_inst.blockQueue[i]: - if len(comm_inst.blockQueue[i]) < 10: - comm_inst.blockQueue[i].append(peer) + if peer not in kv.get('blockQueue')[i]: + if len(kv.get('blockQueue')[i]) < 10: + kv.get('blockQueue')[i].append(peer) if new_block_count > 0: block_string = "" if new_block_count > 1: diff --git a/src/onionrcommands/daemonlaunch/__init__.py b/src/onionrcommands/daemonlaunch/__init__.py index 96d7954d..54cd2c22 100755 --- a/src/onionrcommands/daemonlaunch/__init__.py +++ b/src/onionrcommands/daemonlaunch/__init__.py @@ -10,6 +10,7 @@ from threading import Thread from stem.connection import IncorrectPassword import toomanyobjs import filenuke +from deadsimplekv import DeadSimpleKV import config import onionrstatistics @@ -70,7 +71,7 @@ def _show_info_messages(): (logger.colors.underline + getourkeypair.get_keypair()[0][:52])) -def _setup_online_mode(use_existing_tor: bool, +def _setup_online_mode(use_existing_tor: bool, net: NetController, security_level: int): if config.get('transports.tor', True): @@ -131,6 +132,9 @@ def daemon(): shared_state = toomanyobjs.TooMany() + # Add DeadSimpleKV for quasi-global variables (ephemeral key-value) + shared_state.get(DeadSimpleKV) + shared_state.get(daemoneventsapi.DaemonEventsBP) Thread(target=shared_state.get(apiservers.ClientAPI).start, @@ -144,7 +148,7 @@ def daemon(): # Run time tests are not normally run shared_state.get(runtests.OnionrRunTestManager) - # Create singleton + # Create singleton shared_state.get(serializeddata.SerializedData) shared_state.share_object() # share the parent object to the threads @@ -169,8 +173,7 @@ def daemon(): if not offline_mode: # we need to setup tor for use _setup_online_mode(use_existing_tor, net, security_level) - - + _show_info_messages() events.event('init', threaded=False) diff --git a/src/onionrstatistics/serializeddata.py b/src/onionrstatistics/serializeddata.py index 359d35ea..d04b637a 100755 --- a/src/onionrstatistics/serializeddata.py +++ b/src/onionrstatistics/serializeddata.py @@ -52,12 +52,13 @@ class SerializedData: except AttributeError: sleep(1) comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,)) + kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") connected = [] [connected.append(x) for x in comm_inst.onlinePeers if x not in connected] stats['uptime'] = comm_inst.getUptime() stats['connectedNodes'] = '\n'.join(connected) stats['blockCount'] = len(blockmetadb.get_block_list()) - stats['blockQueueCount'] = len(comm_inst.blockQueue) + stats['blockQueueCount'] = len(kv.get('blockQueue')) stats['threads'] = proc.num_threads() stats['ramPercent'] = proc.memory_percent() stats['fd'] = get_open_files()