more work on thread logic

This commit is contained in:
Kevin Froman 2018-05-04 22:39:00 -05:00
parent c3bf73d821
commit ab1cce3616
No known key found for this signature in database
GPG Key ID: 0D414D0FE405B63B
1 changed files with 22 additions and 17 deletions

View File

@ -43,6 +43,8 @@ class OnionrCommunicate:
self.communicatorThreads = 0 self.communicatorThreads = 0
self.maxThreads = 75 self.maxThreads = 75
self.processBlocksThreads = 0
self.lookupBlocksThreads = 0
self.blocksProcessing = [] # list of blocks currently processing, to avoid trying a block twice at once in 2 seperate threads self.blocksProcessing = [] # list of blocks currently processing, to avoid trying a block twice at once in 2 seperate threads
self.peerStatus = {} # network actions (active requests) for peers used mainly to prevent conflicting actions in threads self.peerStatus = {} # network actions (active requests) for peers used mainly to prevent conflicting actions in threads
@ -59,11 +61,11 @@ class OnionrCommunicate:
#exit(1) #exit(1)
blockProcessTimer = 0 blockProcessTimer = 0
blockProcessAmount = 5 blockProcessAmount = 20
highFailureTimer = 0 highFailureTimer = 0
highFailureRate = 10 highFailureRate = 10
heartBeatTimer = 0 heartBeatTimer = 0
heartBeatRate = 0 heartBeatRate = 10
pexTimer = 120 # How often we should check for new peers pexTimer = 120 # How often we should check for new peers
pexCount = 0 pexCount = 0
logger.debug('Communicator debugging enabled.') logger.debug('Communicator debugging enabled.')
@ -101,14 +103,14 @@ class OnionrCommunicate:
logger.debug('Communicator heartbeat') logger.debug('Communicator heartbeat')
heartBeatTimer = 0 heartBeatTimer = 0
if blockProcessTimer == blockProcessAmount: if blockProcessTimer == blockProcessAmount:
lT1 = threading.Thread(target=self.lookupBlocks, name="lt1") lT1 = threading.Thread(target=self.lookupBlocks, name="lt1", args=(isThread=True,))
lT2 = threading.Thread(target=self.lookupBlocks, name="lt2") lT2 = threading.Thread(target=self.lookupBlocks, name="lt2", args=(isThread=True,))
lT3 = threading.Thread(target=self.lookupBlocks, name="lt3") lT3 = threading.Thread(target=self.lookupBlocks, name="lt3", args=(isThread=True,))
lT4 = threading.Thread(target=self.lookupBlocks, name="lt4") lT4 = threading.Thread(target=self.lookupBlocks, name="lt4", args=(isThread=True,))
pbT1 = threading.Thread(target=self.processBlocks, name='pbT1') pbT1 = threading.Thread(target=self.processBlocks, name='pbT1', args=(isThread=True,))
pbT2 = threading.Thread(target=self.processBlocks, name='pbT2') pbT2 = threading.Thread(target=self.processBlocks, name='pbT2', args=(isThread=True,))
pbT3 = threading.Thread(target=self.processBlocks, name='pbT3') pbT3 = threading.Thread(target=self.processBlocks, name='pbT3', args=(isThread=True,))
pbT4 = threading.Thread(target=self.processBlocks, name='pbT4') pbT4 = threading.Thread(target=self.processBlocks, name='pbT4', args=(isThread=True,))
if (self.maxThreads - 8) >= threading.active_count(): if (self.maxThreads - 8) >= threading.active_count():
lT1.start() lT1.start()
lT2.start() lT2.start()
@ -118,10 +120,10 @@ class OnionrCommunicate:
pbT2.start() pbT2.start()
pbT3.start() pbT3.start()
pbT4.start() pbT4.start()
blockProcessTimer = 0
else: else:
logger.debug(threading.active_count()) logger.debug(threading.active_count())
logger.debug('Too many threads.') logger.debug('Too many threads.')
blockProcessTimer = 0
if command != False: if command != False:
if command[0] == 'shutdown': if command[0] == 'shutdown':
logger.info('Daemon received exit command.', timestamp=True) logger.info('Daemon received exit command.', timestamp=True)
@ -390,7 +392,7 @@ class OnionrCommunicate:
Get new peers and ed25519 keys Get new peers and ed25519 keys
''' '''
peersCheck = 2 # Amount of peers to ask for new peers + keys peersCheck = 1 # Amount of peers to ask for new peers + keys
peersChecked = 0 peersChecked = 0
peerList = list(self._core.listAdders()) # random ordered list of peers peerList = list(self._core.listAdders()) # random ordered list of peers
newKeys = [] newKeys = []
@ -439,11 +441,12 @@ class OnionrCommunicate:
peersChecked += 1 peersChecked += 1
return return
def lookupBlocks(self): def lookupBlocks(self, isThread=False):
''' '''
Lookup blocks and merge new ones Lookup blocks and merge new ones
''' '''
if isThread:
self.lookupBlocksThreads += 1
peerList = self._core.listAdders() peerList = self._core.listAdders()
blocks = '' blocks = ''
@ -504,16 +507,17 @@ class OnionrCommunicate:
self.newHashes[i] = 0 self.newHashes[i] = 0
logger.debug('Adding ' + i + ' to hash database...') logger.debug('Adding ' + i + ' to hash database...')
self._core.addToBlockDB(i) self._core.addToBlockDB(i)
self.lookupBlocksThreads -= 1
return return
def processBlocks(self): def processBlocks(self, isThread=False):
''' '''
Work with the block database and download any missing blocks Work with the block database and download any missing blocks
This is meant to be called from the communicator daemon on its timer. This is meant to be called from the communicator daemon on its timer.
''' '''
if isThread:
self.processBlocksThreads += 1
for i in self._core.getBlockList(unsaved=True).split("\n"): for i in self._core.getBlockList(unsaved=True).split("\n"):
if i != "": if i != "":
if i in self.blocksProcessing or i in self.ignoredHashes: if i in self.blocksProcessing or i in self.ignoredHashes:
@ -582,6 +586,7 @@ class OnionrCommunicate:
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logger.warn('Could not decode block metadata') logger.warn('Could not decode block metadata')
self.removeBlockFromProcessingList(i) self.removeBlockFromProcessingList(i)
self.processBlocksThreads -= 1
return return
def removeBlockFromProcessingList(self, block): def removeBlockFromProcessingList(self, block):