diff --git a/src/communicator/__init__.py b/src/communicator/__init__.py index 80f8bc6d..788d7c6f 100755 --- a/src/communicator/__init__.py +++ b/src/communicator/__init__.py @@ -90,19 +90,15 @@ class OnionrCommunicatorDaemon: lookupblocks.lookup_blocks_from_communicator, [self.shared_state], 25, 3) - - """The block download timer is accessed by the block lookup function - to trigger faster download starts""" - self.download_blocks_timer = OnionrCommunicatorTimers( - self, self.getBlocks, config.get('timers.getBlocks', 10), - requires_peer=True, max_threads=5) + add_onionr_thread( + downloadblocks.download_blocks_from_communicator, + [self.shared_state], + config.get('timers.getBlocks', 10), 1) add_onionr_thread(onlinepeers.clear_offline_peer, [self.kv], 58) - # Timer to cleanup old blocks - blockCleanupTimer = OnionrCommunicatorTimers( - self, housekeeping.clean_old_blocks, 20, my_args=[self], - max_threads=1) + add_onionr_thread( + housekeeping.clean_old_blocks, self.shared_state, 20, 1) # Timer to discover new peers OnionrCommunicatorTimers( @@ -173,7 +169,6 @@ class OnionrCommunicatorDaemon: # Adjust initial timer triggers cleanupTimer.count = (cleanupTimer.frequency - 60) - blockCleanupTimer.count = (blockCleanupTimer.frequency - 2) shared_state.add(self) @@ -228,10 +223,6 @@ class OnionrCommunicatorDaemon: except KeyboardInterrupt: pass - def getBlocks(self): - """Download new blocks in queue.""" - downloadblocks.download_blocks_from_communicator(self) - def decrementThreadCount(self, threadName): """Decrement amount of a thread name if more than zero. diff --git a/src/communicatorutils/downloadblocks/__init__.py b/src/communicatorutils/downloadblocks/__init__.py index 6f944096..08cf9a1e 100755 --- a/src/communicatorutils/downloadblocks/__init__.py +++ b/src/communicatorutils/downloadblocks/__init__.py @@ -44,10 +44,10 @@ from . import shoulddownload storage_counter = storagecounter.StorageCounter() -def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): +def download_blocks_from_communicator(shared_state: "TooMany"): """Use communicator instance to download blocks in the comms's queue""" blacklist = onionrblacklist.OnionrBlackList() - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") + kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV") LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter count: int = 0 metadata_validation_result: bool = False @@ -61,7 +61,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): blockPeers = [] removeFromQueue = True - if not shoulddownload.should_download(comm_inst, blockHash): + if not shoulddownload.should_download(shared_state, blockHash): continue if kv.get('shutdown') or not kv.get('isOnline') or \ @@ -90,7 +90,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): logger.info( f"Attempting to download %s from {peerUsed}..." % (blockHash[:12],)) content = peeraction.peer_action( - comm_inst.shared_state, peerUsed, + shared_state, peerUsed, 'getdata/' + blockHash, max_resp_size=3000000) # block content from random peer @@ -171,4 +171,3 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"): except KeyError: pass kv.get('currentDownloading').remove(blockHash) - comm_inst.decrementThreadCount('getBlocks') diff --git a/src/communicatorutils/downloadblocks/shoulddownload.py b/src/communicatorutils/downloadblocks/shoulddownload.py index 95fe4209..ddf11624 100644 --- a/src/communicatorutils/downloadblocks/shoulddownload.py +++ b/src/communicatorutils/downloadblocks/shoulddownload.py @@ -21,11 +21,11 @@ from onionrblocks import onionrblacklist """ -def should_download(comm_inst, block_hash) -> bool: +def should_download(shared_state, block_hash) -> bool: """Return bool for if a (assumed to exist) block should be downloaded.""" blacklist = onionrblacklist.OnionrBlackList() should = True - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") + kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV") if block_hash in blockmetadb.get_block_list(): # Don't download block we have should = False diff --git a/src/communicatorutils/housekeeping.py b/src/communicatorutils/housekeeping.py index 4d002b48..f87c28d3 100755 --- a/src/communicatorutils/housekeeping.py +++ b/src/communicatorutils/housekeeping.py @@ -36,15 +36,15 @@ from etc.onionrvalues import DATABASE_LOCK_TIMEOUT storage_counter = StorageCounter() -def __remove_from_upload(comm_inst, block_hash: str): - kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") +def __remove_from_upload(shared_state, block_hash: str): + kv: "DeadSimpleKV" = shared_state.get_by_string("DeadSimpleKV") try: kv.get('blocksToUpload').remove(block_hash) except ValueError: pass -def clean_old_blocks(comm_inst): +def clean_old_blocks(shared_state): """Delete expired blocks + old blocks if disk allocation is near full""" blacklist = onionrblacklist.OnionrBlackList() # Delete expired blocks @@ -52,7 +52,7 @@ def clean_old_blocks(comm_inst): blacklist.addToDB(bHash) removeblock.remove_block(bHash) onionrstorage.deleteBlock(bHash) - __remove_from_upload(comm_inst, bHash) + __remove_from_upload(shared_state, bHash) logger.info('Deleted block: %s' % (bHash,)) while storage_counter.is_full(): @@ -64,11 +64,9 @@ def clean_old_blocks(comm_inst): blacklist.addToDB(oldest) removeblock.remove_block(oldest) onionrstorage.deleteBlock(oldest) - __remove_from_upload(comm_inst, oldest) + __remove_from_upload(shared_state, oldest) logger.info('Deleted block: %s' % (oldest,)) - comm_inst.decrementThreadCount('clean_old_blocks') - def clean_keys(comm_inst): """Delete expired forward secrecy keys""" diff --git a/src/onionrthreads/__init__.py b/src/onionrthreads/__init__.py index 48a2c8fd..95d86300 100644 --- a/src/onionrthreads/__init__.py +++ b/src/onionrthreads/__init__.py @@ -3,6 +3,7 @@ from typing import Iterable import traceback from threading import Thread +from uuid import uuid4 from time import sleep @@ -11,6 +12,7 @@ import logger def _onionr_thread(func: Callable, args: Iterable, sleep_secs: int, initial_sleep): + thread_id = str(uuid4()) if initial_sleep: sleep(initial_sleep) while True: @@ -18,7 +20,8 @@ def _onionr_thread(func: Callable, args: Iterable, func(*args) except Exception as _: # noqa logger.warn( - "Onionr thread exception \n" + traceback.format_exc(), + f"Onionr thread exception in {thread_id} \n" + + traceback.format_exc(), terminal=True) sleep(sleep_secs)