From ab1cce3616adaa7b1ef8d5fcc5c198c5b8771116 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Fri, 4 May 2018 22:39:00 -0500 Subject: [PATCH] more work on thread logic --- onionr/communicator.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/onionr/communicator.py b/onionr/communicator.py index aeb0e90b..95d83bbf 100755 --- a/onionr/communicator.py +++ b/onionr/communicator.py @@ -43,6 +43,8 @@ class OnionrCommunicate: self.communicatorThreads = 0 self.maxThreads = 75 + self.processBlocksThreads = 0 + self.lookupBlocksThreads = 0 self.blocksProcessing = [] # list of blocks currently processing, to avoid trying a block twice at once in 2 seperate threads self.peerStatus = {} # network actions (active requests) for peers used mainly to prevent conflicting actions in threads @@ -59,11 +61,11 @@ class OnionrCommunicate: #exit(1) blockProcessTimer = 0 - blockProcessAmount = 5 + blockProcessAmount = 20 highFailureTimer = 0 highFailureRate = 10 heartBeatTimer = 0 - heartBeatRate = 0 + heartBeatRate = 10 pexTimer = 120 # How often we should check for new peers pexCount = 0 logger.debug('Communicator debugging enabled.') @@ -101,14 +103,14 @@ class OnionrCommunicate: logger.debug('Communicator heartbeat') heartBeatTimer = 0 if blockProcessTimer == blockProcessAmount: - lT1 = threading.Thread(target=self.lookupBlocks, name="lt1") - lT2 = threading.Thread(target=self.lookupBlocks, name="lt2") - lT3 = threading.Thread(target=self.lookupBlocks, name="lt3") - lT4 = threading.Thread(target=self.lookupBlocks, name="lt4") - pbT1 = threading.Thread(target=self.processBlocks, name='pbT1') - pbT2 = threading.Thread(target=self.processBlocks, name='pbT2') - pbT3 = threading.Thread(target=self.processBlocks, name='pbT3') - pbT4 = threading.Thread(target=self.processBlocks, name='pbT4') + lT1 = threading.Thread(target=self.lookupBlocks, name="lt1", args=(isThread=True,)) + lT2 = threading.Thread(target=self.lookupBlocks, name="lt2", args=(isThread=True,)) + lT3 = threading.Thread(target=self.lookupBlocks, name="lt3", args=(isThread=True,)) + lT4 = threading.Thread(target=self.lookupBlocks, name="lt4", args=(isThread=True,)) + pbT1 = threading.Thread(target=self.processBlocks, name='pbT1', args=(isThread=True,)) + pbT2 = threading.Thread(target=self.processBlocks, name='pbT2', args=(isThread=True,)) + pbT3 = threading.Thread(target=self.processBlocks, name='pbT3', args=(isThread=True,)) + pbT4 = threading.Thread(target=self.processBlocks, name='pbT4', args=(isThread=True,)) if (self.maxThreads - 8) >= threading.active_count(): lT1.start() lT2.start() @@ -118,10 +120,10 @@ class OnionrCommunicate: pbT2.start() pbT3.start() pbT4.start() + blockProcessTimer = 0 else: logger.debug(threading.active_count()) logger.debug('Too many threads.') - blockProcessTimer = 0 if command != False: if command[0] == 'shutdown': logger.info('Daemon received exit command.', timestamp=True) @@ -390,7 +392,7 @@ class OnionrCommunicate: Get new peers and ed25519 keys ''' - peersCheck = 2 # Amount of peers to ask for new peers + keys + peersCheck = 1 # Amount of peers to ask for new peers + keys peersChecked = 0 peerList = list(self._core.listAdders()) # random ordered list of peers newKeys = [] @@ -439,11 +441,12 @@ class OnionrCommunicate: peersChecked += 1 return - def lookupBlocks(self): + def lookupBlocks(self, isThread=False): ''' Lookup blocks and merge new ones ''' - + if isThread: + self.lookupBlocksThreads += 1 peerList = self._core.listAdders() blocks = '' @@ -504,16 +507,17 @@ class OnionrCommunicate: self.newHashes[i] = 0 logger.debug('Adding ' + i + ' to hash database...') self._core.addToBlockDB(i) - + self.lookupBlocksThreads -= 1 return - def processBlocks(self): + def processBlocks(self, isThread=False): ''' Work with the block database and download any missing blocks This is meant to be called from the communicator daemon on its timer. ''' - + if isThread: + self.processBlocksThreads += 1 for i in self._core.getBlockList(unsaved=True).split("\n"): if i != "": if i in self.blocksProcessing or i in self.ignoredHashes: @@ -582,6 +586,7 @@ class OnionrCommunicate: except json.decoder.JSONDecodeError: logger.warn('Could not decode block metadata') self.removeBlockFromProcessingList(i) + self.processBlocksThreads -= 1 return def removeBlockFromProcessingList(self, block):