work on converting communicator to multithreads

This commit is contained in:
Kevin Froman 2018-05-03 17:41:12 -05:00
parent 1a02124122
commit 26fd456702
No known key found for this signature in database
GPG Key ID: 0D414D0FE405B63B
2 changed files with 54 additions and 8 deletions

View File

@ -19,7 +19,7 @@ and code to operate as a daemon, getting commands from the command queue databas
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
import sqlite3, requests, hmac, hashlib, time, sys, os, math, logger, urllib.parse, base64, binascii, random, json import sqlite3, requests, hmac, hashlib, time, sys, os, math, logger, urllib.parse, base64, binascii, random, json, threading
import core, onionrutils, onionrcrypto, netcontroller, onionrproofs, btc, config, onionrplugins as plugins import core, onionrutils, onionrcrypto, netcontroller, onionrproofs, btc, config, onionrplugins as plugins
class OnionrCommunicate: class OnionrCommunicate:
@ -40,6 +40,11 @@ class OnionrCommunicate:
self.ignoredHashes = [] self.ignoredHashes = []
self.highFailureAmount = 7 self.highFailureAmount = 7
self.communicatorThreads = []
self.blocksProcessing = [] # list of blocks currently processing, to avoid trying a block twice at once in 2 seperate threads
self.peerStatus = {} # network actions (active requests) for peers used mainly to prevent conflicting actions in threads
''' '''
logger.info('Starting Bitcoin Node... with Tor socks port:' + str(sys.argv[2]), timestamp=True) logger.info('Starting Bitcoin Node... with Tor socks port:' + str(sys.argv[2]), timestamp=True)
try: try:
@ -86,14 +91,31 @@ class OnionrCommunicate:
if self.peerData[i]['failCount'] >= self.highFailureAmount: if self.peerData[i]['failCount'] >= self.highFailureAmount:
self.peerData[i]['failCount'] -= 1 self.peerData[i]['failCount'] -= 1
if pexTimer == pexCount: if pexTimer == pexCount:
self.getNewPeers() pT1 = threading.Thread(target=self.getNewPeers, name="pT1")
pT1.start()
pT2 = threading.Thread(target=self.getNewPeers, name="pT2")
pT2.start()
pexCount = 0 # TODO: do not reset timer if low peer count pexCount = 0 # TODO: do not reset timer if low peer count
if heartBeatRate == heartBeatTimer: if heartBeatRate == heartBeatTimer:
logger.debug('Communicator heartbeat') logger.debug('Communicator heartbeat')
heartBeatTimer = 0 heartBeatTimer = 0
if blockProcessTimer == blockProcessAmount: if blockProcessTimer == blockProcessAmount:
self.lookupBlocks() lT1 = threading.Thread(target=self.lookupBlocks, name="lt1")
self.processBlocks() lT2 = threading.Thread(target=self.lookupBlocks, name="lt2")
lT3 = threading.Thread(target=self.lookupBlocks, name="lt3")
lT4 = threading.Thread(target=self.lookupBlocks, name="lt4")
pbT1 = threading.Thread(target=self.processBlocks, name='pbT1')
pbT2 = threading.Thread(target=self.processBlocks, name='pbT2')
pbT3 = threading.Thread(target=self.processBlocks, name='pbT3')
pbT4 = threading.Thread(target=self.processBlocks, name='pbT4')
lT1.start()
lT2.start()
lT3.start()
lT4.start()
pbT1.start()
pbT2.start()
pbT3.start()
pbT4.start()
blockProcessTimer = 0 blockProcessTimer = 0
if command != False: if command != False:
if command[0] == 'shutdown': if command[0] == 'shutdown':
@ -138,7 +160,7 @@ class OnionrCommunicate:
def getNewPeers(self): def getNewPeers(self):
''' '''
Get new peers and keys Get new peers and ed25519 keys
''' '''
peersCheck = 5 # Amount of peers to ask for new peers + keys peersCheck = 5 # Amount of peers to ask for new peers + keys
peersChecked = 0 peersChecked = 0
@ -157,6 +179,13 @@ class OnionrCommunicate:
while peersCheck > peersChecked: while peersCheck > peersChecked:
#i = secrets.randbelow(maxN) # cant use prior to 3.6 #i = secrets.randbelow(maxN) # cant use prior to 3.6
i = random.randint(0, maxN) i = random.randint(0, maxN)
try:
if self.peerStatusTaken(peerList[i], 'pex') or self.peerStatusTaken(peerList[i], 'kex'):
continue
except IndexError:
pass
logger.info('Using ' + peerList[i] + ' to find new peers', timestamp=True) logger.info('Using ' + peerList[i] + ' to find new peers', timestamp=True)
try: try:
newAdders = self.performGet('pex', peerList[i], skipHighFailureAddress=True) newAdders = self.performGet('pex', peerList[i], skipHighFailureAddress=True)
@ -188,6 +217,8 @@ class OnionrCommunicate:
peerList = self._core.listAdders() peerList = self._core.listAdders()
blocks = '' blocks = ''
for i in peerList: for i in peerList:
if self.peerStatusTaken(i, 'getBlockHashes') or self.peerStatusTaken(i, 'getDBHash'):
continue
try: try:
if self.peerData[i]['failCount'] >= self.highFailureAmount: if self.peerData[i]['failCount'] >= self.highFailureAmount:
continue continue
@ -245,8 +276,10 @@ class OnionrCommunicate:
for i in self._core.getBlockList(unsaved=True).split("\n"): for i in self._core.getBlockList(unsaved=True).split("\n"):
if i != "": if i != "":
if i in self.ignoredHashes: if i in self.blocksProcessing or i in self.ignoredHashes:
continue continue
else:
self.blocksProcessing.append(i)
try: try:
self.newHashes[i] self.newHashes[i]
except KeyError: except KeyError:
@ -294,12 +327,13 @@ class OnionrCommunicate:
try: try:
logger.info('Block type is ' + blockMetadata['type']) logger.info('Block type is ' + blockMetadata['type'])
self._core.updateBlockInfo(i, 'dataType', blockMetadata['type']) self._core.updateBlockInfo(i, 'dataType', blockMetadata['type'])
self.blocksProcessing.pop(i)
except KeyError: except KeyError:
logger.warn('Block has no type') logger.warn('Block has no type')
pass pass
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logger.warn('Could not decode block metadata') logger.warn('Could not decode block metadata')
pass self.blocksProcessing.pop(i)
return return
def downloadBlock(self, hash, peerTries=3): def downloadBlock(self, hash, peerTries=3):
@ -381,6 +415,7 @@ class OnionrCommunicate:
retData = False retData = False
logger.debug('Skipping ' + peer + ' because of high failure rate') logger.debug('Skipping ' + peer + ' because of high failure rate')
else: else:
self.peerStatus[peer] = action
logger.debug('Contacting ' + peer + ' on port ' + socksPort) logger.debug('Contacting ' + peer + ' on port ' + socksPort)
r = requests.get(url, headers=headers, proxies=proxies, timeout=(15, 30)) r = requests.get(url, headers=headers, proxies=proxies, timeout=(15, 30))
retData = r.text retData = r.text
@ -395,7 +430,17 @@ class OnionrCommunicate:
self.peerData[peer]['failCount'] -= 1 self.peerData[peer]['failCount'] -= 1
self.peerData[peer]['lastConnectTime'] = math.floor(time.time()) self.peerData[peer]['lastConnectTime'] = math.floor(time.time())
return retData return retData
def peerStatusTaken(self, peer, status):
'''
Returns if a peer is currently performing a specified action
'''
try:
if self.peerStatus[peer] == status:
return True
except KeyError:
pass
return False
shouldRun = False shouldRun = False
debug = True debug = True

View File

@ -33,6 +33,7 @@ class POW:
blockCheck = 300000 # How often the hasher should check if the bitcoin block is updated (slows hashing but prevents less wasted work) blockCheck = 300000 # How often the hasher should check if the bitcoin block is updated (slows hashing but prevents less wasted work)
blockCheckCount = 0 blockCheckCount = 0
block = '' #self.bitcoinNode.getBlockHash(self.bitcoinNode.getLastBlockHeight()) block = '' #self.bitcoinNode.getBlockHash(self.bitcoinNode.getLastBlockHeight())
print('thread started')
while self.hashing: while self.hashing:
''' '''
if blockCheckCount == blockCheck: if blockCheckCount == blockCheck: