From 2879595c66a1ff8fc6f1ae96c7a1d289acf379ec Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Tue, 7 May 2019 22:28:06 -0500 Subject: [PATCH] refactored communicator a bit --- onionr/communicatorutils/__init__.py | 0 onionr/communicatorutils/downloadblocks.py | 95 ++++++++++++++++++++++ onionr/communicatorutils/lookupblocks.py | 60 ++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 onionr/communicatorutils/__init__.py create mode 100644 onionr/communicatorutils/downloadblocks.py create mode 100644 onionr/communicatorutils/lookupblocks.py diff --git a/onionr/communicatorutils/__init__.py b/onionr/communicatorutils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/onionr/communicatorutils/downloadblocks.py b/onionr/communicatorutils/downloadblocks.py new file mode 100644 index 00000000..f8ab7d54 --- /dev/null +++ b/onionr/communicatorutils/downloadblocks.py @@ -0,0 +1,95 @@ +import communicator, onionrexceptions +import logger + +def download_blocks_from_communicator(comm_inst): + assert isinstance(comm_inst, communicator.OnionrCommunicatorDaemon) + for blockHash in list(comm_inst.blockQueue): + triedQueuePeers = [] # List of peers we've tried for a block + try: + blockPeers = list(comm_inst.blockQueue[blockHash]) + except KeyError: + blockPeers = [] + removeFromQueue = True + if comm_inst.shutdown or not comm_inst.isOnline: + # Exit loop if shutting down or offline + break + # Do not download blocks being downloaded or that are already saved (edge cases) + if blockHash in comm_inst.currentDownloading: + #logger.debug('Already downloading block %s...' % blockHash) + continue + if blockHash in comm_inst._core.getBlockList(): + #logger.debug('Block %s is already saved.' % (blockHash,)) + try: + del comm_inst.blockQueue[blockHash] + except KeyError: + pass + continue + if comm_inst._core._blacklist.inBlacklist(blockHash): + continue + if comm_inst._core._utils.storageCounter.isFull(): + break + comm_inst.currentDownloading.append(blockHash) # So we can avoid concurrent downloading in other threads of same block + if len(blockPeers) == 0: + peerUsed = comm_inst.pickOnlinePeer() + else: + blockPeers = comm_inst._core._crypto.randomShuffle(blockPeers) + peerUsed = blockPeers.pop(0) + + if not comm_inst.shutdown and peerUsed.strip() != '': + logger.info("Attempting to download %s from %s..." % (blockHash[:12], peerUsed)) + content = comm_inst.peerAction(peerUsed, 'getdata/' + blockHash) # block content from random peer (includes metadata) + if content != False and len(content) > 0: + try: + content = content.encode() + except AttributeError: + pass + + realHash = comm_inst._core._crypto.sha3Hash(content) + try: + realHash = realHash.decode() # bytes on some versions for some reason + except AttributeError: + pass + if realHash == blockHash: + content = content.decode() # decode here because sha3Hash needs bytes above + metas = comm_inst._core._utils.getBlockMetadataFromData(content) # returns tuple(metadata, meta), meta is also in metadata + metadata = metas[0] + if comm_inst._core._utils.validateMetadata(metadata, metas[2]): # check if metadata is valid, and verify nonce + if comm_inst._core._crypto.verifyPow(content): # check if POW is enough/correct + logger.info('Attempting to save block %s...' % blockHash[:12]) + try: + comm_inst._core.setData(content) + except onionrexceptions.DiskAllocationReached: + logger.error('Reached disk allocation allowance, cannot save block %s.' % blockHash) + removeFromQueue = False + else: + comm_inst._core.addToBlockDB(blockHash, dataSaved=True) + comm_inst._core._utils.processBlockMetadata(blockHash) # caches block metadata values to block database + else: + logger.warn('POW failed for block %s.' % blockHash) + else: + if comm_inst._core._blacklist.inBlacklist(realHash): + logger.warn('Block %s is blacklisted.' % (realHash,)) + else: + logger.warn('Metadata for block %s is invalid.' % blockHash) + comm_inst._core._blacklist.addToDB(blockHash) + else: + # if block didn't meet expected hash + tempHash = comm_inst._core._crypto.sha3Hash(content) # lazy hack, TODO use var + try: + tempHash = tempHash.decode() + except AttributeError: + pass + # Punish peer for sharing invalid block (not always malicious, but is bad regardless) + onionrpeers.PeerProfiles(peerUsed, comm_inst._core).addScore(-50) + if tempHash != 'ed55e34cb828232d6c14da0479709bfa10a0923dca2b380496e6b2ed4f7a0253': + # Dumb hack for 404 response from peer. Don't log it if 404 since its likely not malicious or a critical error. + logger.warn('Block hash validation failed for ' + blockHash + ' got ' + tempHash) + else: + removeFromQueue = False # Don't remove from queue if 404 + if removeFromQueue: + try: + del comm_inst.blockQueue[blockHash] # remove from block queue both if success or false + except KeyError: + pass + comm_inst.currentDownloading.remove(blockHash) + comm_inst.decrementThreadCount('getBlocks') \ No newline at end of file diff --git a/onionr/communicatorutils/lookupblocks.py b/onionr/communicatorutils/lookupblocks.py new file mode 100644 index 00000000..c8b17b56 --- /dev/null +++ b/onionr/communicatorutils/lookupblocks.py @@ -0,0 +1,60 @@ +import logger, onionrproofs +def lookup_blocks_from_communicator(comm_inst): + logger.info('Looking up new blocks...') + tryAmount = 2 + newBlocks = '' + existingBlocks = comm_inst._core.getBlockList() + triedPeers = [] # list of peers we've tried this time around + maxBacklog = 1560 # Max amount of *new* block hashes to have already in queue, to avoid memory exhaustion + lastLookupTime = 0 # Last time we looked up a particular peer's list + for i in range(tryAmount): + listLookupCommand = 'getblocklist' # This is defined here to reset it each time + if len(comm_inst.blockQueue) >= maxBacklog: + break + if not comm_inst.isOnline: + break + # check if disk allocation is used + if comm_inst._core._utils.storageCounter.isFull(): + logger.debug('Not looking up new blocks due to maximum amount of allowed disk space used') + break + peer = comm_inst.pickOnlinePeer() # select random online peer + # if we've already tried all the online peers this time around, stop + if peer in triedPeers: + if len(comm_inst.onlinePeers) == len(triedPeers): + break + else: + continue + triedPeers.append(peer) + + # Get the last time we looked up a peer's stamp to only fetch blocks since then. + # Saved in memory only for privacy reasons + try: + lastLookupTime = comm_inst.dbTimestamps[peer] + except KeyError: + lastLookupTime = 0 + else: + listLookupCommand += '?date=%s' % (lastLookupTime,) + try: + newBlocks = comm_inst.peerAction(peer, listLookupCommand) # get list of new block hashes + except Exception as error: + logger.warn('Could not get new blocks from %s.' % peer, error = error) + newBlocks = False + else: + comm_inst.dbTimestamps[peer] = comm_inst._core._utils.getRoundedEpoch(roundS=60) + if newBlocks != False: + # if request was a success + for i in newBlocks.split('\n'): + if comm_inst._core._utils.validateHash(i): + # if newline seperated string is valid hash + if not i in existingBlocks: + # if block does not exist on disk and is not already in block queue + if i not in comm_inst.blockQueue: + if onionrproofs.hashMeetsDifficulty(i) and not comm_inst._core._blacklist.inBlacklist(i): + if len(comm_inst.blockQueue) <= 1000000: + comm_inst.blockQueue[i] = [peer] # add blocks to download queue + else: + if peer not in comm_inst.blockQueue[i]: + if len(comm_inst.blockQueue[i]) < 10: + comm_inst.blockQueue[i].append(peer) + comm_inst.decrementThreadCount('lookupBlocks') + return \ No newline at end of file