From 26fd456702d14762339fc02083967086339a5526 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Thu, 3 May 2018 17:41:12 -0500 Subject: [PATCH] work on converting communicator to multithreads --- onionr/communicator.py | 61 ++++++++++++++++++++++++++++++++++++------ onionr/onionrproofs.py | 1 + 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/onionr/communicator.py b/onionr/communicator.py index a5476507..685efa94 100755 --- a/onionr/communicator.py +++ b/onionr/communicator.py @@ -19,7 +19,7 @@ and code to operate as a daemon, getting commands from the command queue databas You should have received a copy of the GNU General Public License along with this program. If not, see . ''' -import sqlite3, requests, hmac, hashlib, time, sys, os, math, logger, urllib.parse, base64, binascii, random, json +import sqlite3, requests, hmac, hashlib, time, sys, os, math, logger, urllib.parse, base64, binascii, random, json, threading import core, onionrutils, onionrcrypto, netcontroller, onionrproofs, btc, config, onionrplugins as plugins class OnionrCommunicate: @@ -40,6 +40,11 @@ class OnionrCommunicate: self.ignoredHashes = [] self.highFailureAmount = 7 + + self.communicatorThreads = [] + + self.blocksProcessing = [] # list of blocks currently processing, to avoid trying a block twice at once in 2 seperate threads + self.peerStatus = {} # network actions (active requests) for peers used mainly to prevent conflicting actions in threads ''' logger.info('Starting Bitcoin Node... with Tor socks port:' + str(sys.argv[2]), timestamp=True) try: @@ -86,14 +91,31 @@ class OnionrCommunicate: if self.peerData[i]['failCount'] >= self.highFailureAmount: self.peerData[i]['failCount'] -= 1 if pexTimer == pexCount: - self.getNewPeers() + pT1 = threading.Thread(target=self.getNewPeers, name="pT1") + pT1.start() + pT2 = threading.Thread(target=self.getNewPeers, name="pT2") + pT2.start() pexCount = 0 # TODO: do not reset timer if low peer count if heartBeatRate == heartBeatTimer: logger.debug('Communicator heartbeat') heartBeatTimer = 0 if blockProcessTimer == blockProcessAmount: - self.lookupBlocks() - self.processBlocks() + lT1 = threading.Thread(target=self.lookupBlocks, name="lt1") + lT2 = threading.Thread(target=self.lookupBlocks, name="lt2") + lT3 = threading.Thread(target=self.lookupBlocks, name="lt3") + lT4 = threading.Thread(target=self.lookupBlocks, name="lt4") + pbT1 = threading.Thread(target=self.processBlocks, name='pbT1') + pbT2 = threading.Thread(target=self.processBlocks, name='pbT2') + pbT3 = threading.Thread(target=self.processBlocks, name='pbT3') + pbT4 = threading.Thread(target=self.processBlocks, name='pbT4') + lT1.start() + lT2.start() + lT3.start() + lT4.start() + pbT1.start() + pbT2.start() + pbT3.start() + pbT4.start() blockProcessTimer = 0 if command != False: if command[0] == 'shutdown': @@ -138,7 +160,7 @@ class OnionrCommunicate: def getNewPeers(self): ''' - Get new peers and keys + Get new peers and ed25519 keys ''' peersCheck = 5 # Amount of peers to ask for new peers + keys peersChecked = 0 @@ -157,6 +179,13 @@ class OnionrCommunicate: while peersCheck > peersChecked: #i = secrets.randbelow(maxN) # cant use prior to 3.6 i = random.randint(0, maxN) + + try: + if self.peerStatusTaken(peerList[i], 'pex') or self.peerStatusTaken(peerList[i], 'kex'): + continue + except IndexError: + pass + logger.info('Using ' + peerList[i] + ' to find new peers', timestamp=True) try: newAdders = self.performGet('pex', peerList[i], skipHighFailureAddress=True) @@ -188,6 +217,8 @@ class OnionrCommunicate: peerList = self._core.listAdders() blocks = '' for i in peerList: + if self.peerStatusTaken(i, 'getBlockHashes') or self.peerStatusTaken(i, 'getDBHash'): + continue try: if self.peerData[i]['failCount'] >= self.highFailureAmount: continue @@ -245,8 +276,10 @@ class OnionrCommunicate: for i in self._core.getBlockList(unsaved=True).split("\n"): if i != "": - if i in self.ignoredHashes: + if i in self.blocksProcessing or i in self.ignoredHashes: continue + else: + self.blocksProcessing.append(i) try: self.newHashes[i] except KeyError: @@ -294,12 +327,13 @@ class OnionrCommunicate: try: logger.info('Block type is ' + blockMetadata['type']) self._core.updateBlockInfo(i, 'dataType', blockMetadata['type']) + self.blocksProcessing.pop(i) except KeyError: logger.warn('Block has no type') pass except json.decoder.JSONDecodeError: logger.warn('Could not decode block metadata') - pass + self.blocksProcessing.pop(i) return def downloadBlock(self, hash, peerTries=3): @@ -381,6 +415,7 @@ class OnionrCommunicate: retData = False logger.debug('Skipping ' + peer + ' because of high failure rate') else: + self.peerStatus[peer] = action logger.debug('Contacting ' + peer + ' on port ' + socksPort) r = requests.get(url, headers=headers, proxies=proxies, timeout=(15, 30)) retData = r.text @@ -395,7 +430,17 @@ class OnionrCommunicate: self.peerData[peer]['failCount'] -= 1 self.peerData[peer]['lastConnectTime'] = math.floor(time.time()) return retData - + + def peerStatusTaken(self, peer, status): + ''' + Returns if a peer is currently performing a specified action + ''' + try: + if self.peerStatus[peer] == status: + return True + except KeyError: + pass + return False shouldRun = False debug = True diff --git a/onionr/onionrproofs.py b/onionr/onionrproofs.py index e6e2f137..0eb860e2 100644 --- a/onionr/onionrproofs.py +++ b/onionr/onionrproofs.py @@ -33,6 +33,7 @@ class POW: blockCheck = 300000 # How often the hasher should check if the bitcoin block is updated (slows hashing but prevents less wasted work) blockCheckCount = 0 block = '' #self.bitcoinNode.getBlockHash(self.bitcoinNode.getLastBlockHeight()) + print('thread started') while self.hashing: ''' if blockCheckCount == blockCheck: