refactored communicator a bit
This commit is contained in:
parent
a4d6dc5fa5
commit
2879595c66
onionr/communicatorutils
0
onionr/communicatorutils/__init__.py
Normal file
0
onionr/communicatorutils/__init__.py
Normal file
95
onionr/communicatorutils/downloadblocks.py
Normal file
95
onionr/communicatorutils/downloadblocks.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
import communicator, onionrexceptions
|
||||||
|
import logger
|
||||||
|
|
||||||
|
def download_blocks_from_communicator(comm_inst):
|
||||||
|
assert isinstance(comm_inst, communicator.OnionrCommunicatorDaemon)
|
||||||
|
for blockHash in list(comm_inst.blockQueue):
|
||||||
|
triedQueuePeers = [] # List of peers we've tried for a block
|
||||||
|
try:
|
||||||
|
blockPeers = list(comm_inst.blockQueue[blockHash])
|
||||||
|
except KeyError:
|
||||||
|
blockPeers = []
|
||||||
|
removeFromQueue = True
|
||||||
|
if comm_inst.shutdown or not comm_inst.isOnline:
|
||||||
|
# Exit loop if shutting down or offline
|
||||||
|
break
|
||||||
|
# Do not download blocks being downloaded or that are already saved (edge cases)
|
||||||
|
if blockHash in comm_inst.currentDownloading:
|
||||||
|
#logger.debug('Already downloading block %s...' % blockHash)
|
||||||
|
continue
|
||||||
|
if blockHash in comm_inst._core.getBlockList():
|
||||||
|
#logger.debug('Block %s is already saved.' % (blockHash,))
|
||||||
|
try:
|
||||||
|
del comm_inst.blockQueue[blockHash]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
continue
|
||||||
|
if comm_inst._core._blacklist.inBlacklist(blockHash):
|
||||||
|
continue
|
||||||
|
if comm_inst._core._utils.storageCounter.isFull():
|
||||||
|
break
|
||||||
|
comm_inst.currentDownloading.append(blockHash) # So we can avoid concurrent downloading in other threads of same block
|
||||||
|
if len(blockPeers) == 0:
|
||||||
|
peerUsed = comm_inst.pickOnlinePeer()
|
||||||
|
else:
|
||||||
|
blockPeers = comm_inst._core._crypto.randomShuffle(blockPeers)
|
||||||
|
peerUsed = blockPeers.pop(0)
|
||||||
|
|
||||||
|
if not comm_inst.shutdown and peerUsed.strip() != '':
|
||||||
|
logger.info("Attempting to download %s from %s..." % (blockHash[:12], peerUsed))
|
||||||
|
content = comm_inst.peerAction(peerUsed, 'getdata/' + blockHash) # block content from random peer (includes metadata)
|
||||||
|
if content != False and len(content) > 0:
|
||||||
|
try:
|
||||||
|
content = content.encode()
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
realHash = comm_inst._core._crypto.sha3Hash(content)
|
||||||
|
try:
|
||||||
|
realHash = realHash.decode() # bytes on some versions for some reason
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
if realHash == blockHash:
|
||||||
|
content = content.decode() # decode here because sha3Hash needs bytes above
|
||||||
|
metas = comm_inst._core._utils.getBlockMetadataFromData(content) # returns tuple(metadata, meta), meta is also in metadata
|
||||||
|
metadata = metas[0]
|
||||||
|
if comm_inst._core._utils.validateMetadata(metadata, metas[2]): # check if metadata is valid, and verify nonce
|
||||||
|
if comm_inst._core._crypto.verifyPow(content): # check if POW is enough/correct
|
||||||
|
logger.info('Attempting to save block %s...' % blockHash[:12])
|
||||||
|
try:
|
||||||
|
comm_inst._core.setData(content)
|
||||||
|
except onionrexceptions.DiskAllocationReached:
|
||||||
|
logger.error('Reached disk allocation allowance, cannot save block %s.' % blockHash)
|
||||||
|
removeFromQueue = False
|
||||||
|
else:
|
||||||
|
comm_inst._core.addToBlockDB(blockHash, dataSaved=True)
|
||||||
|
comm_inst._core._utils.processBlockMetadata(blockHash) # caches block metadata values to block database
|
||||||
|
else:
|
||||||
|
logger.warn('POW failed for block %s.' % blockHash)
|
||||||
|
else:
|
||||||
|
if comm_inst._core._blacklist.inBlacklist(realHash):
|
||||||
|
logger.warn('Block %s is blacklisted.' % (realHash,))
|
||||||
|
else:
|
||||||
|
logger.warn('Metadata for block %s is invalid.' % blockHash)
|
||||||
|
comm_inst._core._blacklist.addToDB(blockHash)
|
||||||
|
else:
|
||||||
|
# if block didn't meet expected hash
|
||||||
|
tempHash = comm_inst._core._crypto.sha3Hash(content) # lazy hack, TODO use var
|
||||||
|
try:
|
||||||
|
tempHash = tempHash.decode()
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
# Punish peer for sharing invalid block (not always malicious, but is bad regardless)
|
||||||
|
onionrpeers.PeerProfiles(peerUsed, comm_inst._core).addScore(-50)
|
||||||
|
if tempHash != 'ed55e34cb828232d6c14da0479709bfa10a0923dca2b380496e6b2ed4f7a0253':
|
||||||
|
# Dumb hack for 404 response from peer. Don't log it if 404 since its likely not malicious or a critical error.
|
||||||
|
logger.warn('Block hash validation failed for ' + blockHash + ' got ' + tempHash)
|
||||||
|
else:
|
||||||
|
removeFromQueue = False # Don't remove from queue if 404
|
||||||
|
if removeFromQueue:
|
||||||
|
try:
|
||||||
|
del comm_inst.blockQueue[blockHash] # remove from block queue both if success or false
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
comm_inst.currentDownloading.remove(blockHash)
|
||||||
|
comm_inst.decrementThreadCount('getBlocks')
|
60
onionr/communicatorutils/lookupblocks.py
Normal file
60
onionr/communicatorutils/lookupblocks.py
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
import logger, onionrproofs
|
||||||
|
def lookup_blocks_from_communicator(comm_inst):
|
||||||
|
logger.info('Looking up new blocks...')
|
||||||
|
tryAmount = 2
|
||||||
|
newBlocks = ''
|
||||||
|
existingBlocks = comm_inst._core.getBlockList()
|
||||||
|
triedPeers = [] # list of peers we've tried this time around
|
||||||
|
maxBacklog = 1560 # Max amount of *new* block hashes to have already in queue, to avoid memory exhaustion
|
||||||
|
lastLookupTime = 0 # Last time we looked up a particular peer's list
|
||||||
|
for i in range(tryAmount):
|
||||||
|
listLookupCommand = 'getblocklist' # This is defined here to reset it each time
|
||||||
|
if len(comm_inst.blockQueue) >= maxBacklog:
|
||||||
|
break
|
||||||
|
if not comm_inst.isOnline:
|
||||||
|
break
|
||||||
|
# check if disk allocation is used
|
||||||
|
if comm_inst._core._utils.storageCounter.isFull():
|
||||||
|
logger.debug('Not looking up new blocks due to maximum amount of allowed disk space used')
|
||||||
|
break
|
||||||
|
peer = comm_inst.pickOnlinePeer() # select random online peer
|
||||||
|
# if we've already tried all the online peers this time around, stop
|
||||||
|
if peer in triedPeers:
|
||||||
|
if len(comm_inst.onlinePeers) == len(triedPeers):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
triedPeers.append(peer)
|
||||||
|
|
||||||
|
# Get the last time we looked up a peer's stamp to only fetch blocks since then.
|
||||||
|
# Saved in memory only for privacy reasons
|
||||||
|
try:
|
||||||
|
lastLookupTime = comm_inst.dbTimestamps[peer]
|
||||||
|
except KeyError:
|
||||||
|
lastLookupTime = 0
|
||||||
|
else:
|
||||||
|
listLookupCommand += '?date=%s' % (lastLookupTime,)
|
||||||
|
try:
|
||||||
|
newBlocks = comm_inst.peerAction(peer, listLookupCommand) # get list of new block hashes
|
||||||
|
except Exception as error:
|
||||||
|
logger.warn('Could not get new blocks from %s.' % peer, error = error)
|
||||||
|
newBlocks = False
|
||||||
|
else:
|
||||||
|
comm_inst.dbTimestamps[peer] = comm_inst._core._utils.getRoundedEpoch(roundS=60)
|
||||||
|
if newBlocks != False:
|
||||||
|
# if request was a success
|
||||||
|
for i in newBlocks.split('\n'):
|
||||||
|
if comm_inst._core._utils.validateHash(i):
|
||||||
|
# if newline seperated string is valid hash
|
||||||
|
if not i in existingBlocks:
|
||||||
|
# if block does not exist on disk and is not already in block queue
|
||||||
|
if i not in comm_inst.blockQueue:
|
||||||
|
if onionrproofs.hashMeetsDifficulty(i) and not comm_inst._core._blacklist.inBlacklist(i):
|
||||||
|
if len(comm_inst.blockQueue) <= 1000000:
|
||||||
|
comm_inst.blockQueue[i] = [peer] # add blocks to download queue
|
||||||
|
else:
|
||||||
|
if peer not in comm_inst.blockQueue[i]:
|
||||||
|
if len(comm_inst.blockQueue[i]) < 10:
|
||||||
|
comm_inst.blockQueue[i].append(peer)
|
||||||
|
comm_inst.decrementThreadCount('lookupBlocks')
|
||||||
|
return
|
Loading…
Reference in New Issue
Block a user