From 2c61380ffa3ae41907d4c631e23788b99d33e652 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Sun, 25 Aug 2019 21:18:09 -0500 Subject: [PATCH] fixed unsynced flow cache --- onionr/communicator/__init__.py | 2 +- .../downloadblocks/__init__.py | 2 +- onionr/communicatorutils/housekeeping.py | 11 ++++++++++- onionr/httpapi/directconnections/__init__.py | 9 +++++++-- onionr/onionrservices/bootstrapservice.py | 17 ++++++++--------- .../static-data/default-plugins/flow/flowapi.py | 4 +++- onionr/static-data/default-plugins/flow/main.py | 17 +++++++---------- 7 files changed, 37 insertions(+), 25 deletions(-) diff --git a/onionr/communicator/__init__.py b/onionr/communicator/__init__.py index 72816fb3..25f7f3b5 100755 --- a/onionr/communicator/__init__.py +++ b/onionr/communicator/__init__.py @@ -113,7 +113,7 @@ class OnionrCommunicatorDaemon: OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, myArgs=[self]) # Timer to cleanup old blocks - blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 65, myArgs=[self]) + blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 20, myArgs=[self]) # Timer to discover new peers OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requiresPeer=True, myArgs=[self], maxThreads=2) diff --git a/onionr/communicatorutils/downloadblocks/__init__.py b/onionr/communicatorutils/downloadblocks/__init__.py index 3cffa526..18abd423 100755 --- a/onionr/communicatorutils/downloadblocks/__init__.py +++ b/onionr/communicatorutils/downloadblocks/__init__.py @@ -102,7 +102,7 @@ def download_blocks_from_communicator(comm_inst): blacklist.addToDB(blockHash) else: # if block didn't meet expected hash - tempHash = crypto.sha3Hash(content) # lazy hack, TODO use var + tempHash = onionrcrypto.hashers.sha3_hash(content) # lazy hack, TODO use var try: tempHash = tempHash.decode() except AttributeError: diff --git a/onionr/communicatorutils/housekeeping.py b/onionr/communicatorutils/housekeeping.py index e13ca39f..e4475bb5 100755 --- a/onionr/communicatorutils/housekeeping.py +++ b/onionr/communicatorutils/housekeeping.py @@ -25,6 +25,13 @@ from coredb import blockmetadb, dbfiles import onionrstorage from onionrstorage import removeblock import onionrblacklist + +def __remove_from_upload(comm_inst, block_hash: str): + try: + comm_inst.blocksToUpload.remove(block_hash) + except ValueError: + pass + def clean_old_blocks(comm_inst): '''Delete old blocks if our disk allocation is full/near full, and also expired blocks''' blacklist = onionrblacklist.OnionrBlackList() @@ -33,13 +40,15 @@ def clean_old_blocks(comm_inst): blacklist.addToDB(bHash) removeblock.remove_block(bHash) onionrstorage.deleteBlock(bHash) + __remove_from_upload(comm_inst, bHash) logger.info('Deleted block: %s' % (bHash,)) while comm_inst.storage_counter.isFull(): oldest = blockmetadb.get_block_list()[0] blacklist.addToDB(oldest) removeblock.remove_block(oldest) - onionrstorage.deleteBlock(bHash) + onionrstorage.deleteBlock(oldest) + __remove_from_upload.remove(comm_inst, oldest) logger.info('Deleted block: %s' % (oldest,)) comm_inst.decrementThreadCount('clean_old_blocks') diff --git a/onionr/httpapi/directconnections/__init__.py b/onionr/httpapi/directconnections/__init__.py index b0a75f72..146a90f2 100644 --- a/onionr/httpapi/directconnections/__init__.py +++ b/onionr/httpapi/directconnections/__init__.py @@ -26,9 +26,14 @@ import deadsimplekv import filepaths import onionrservices +from onionrservices import pool def _get_communicator(g): - return g.too_many.get_by_string("OnionrCommunicatorDaemon") + while True: + try: + return g.too_many.get_by_string("OnionrCommunicatorDaemon") + except KeyError: + pass class DirectConnectionManagement: def __init__(self, client_api): @@ -49,7 +54,7 @@ class DirectConnectionManagement: def make_new_connection(pubkey): communicator = _get_communicator(g) resp = "pending" - if pubkey in communicator.shared_state.get_by_string("ServicePool").bootstrap_pending: + if pubkey in communicator.shared_state.get(pool.ServicePool).bootstrap_pending: return Response(resp) if pubkey in communicator.direct_connection_clients: diff --git a/onionr/onionrservices/bootstrapservice.py b/onionr/onionrservices/bootstrapservice.py index 0ef8b42a..7e1322e9 100755 --- a/onionr/onionrservices/bootstrapservice.py +++ b/onionr/onionrservices/bootstrapservice.py @@ -17,17 +17,17 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . ''' -import time, threading, uuid +import time, threading, uuid, os from gevent.pywsgi import WSGIServer, WSGIHandler from stem.control import Controller from flask import Flask, Response from netcontroller import get_open_port from . import httpheaders from onionrutils import stringvalidators, epoch +import logger import config, onionrblocks, filepaths import onionrexceptions import deadsimplekv as simplekv -import warden from . import pool def __bootstrap_timeout(server: WSGIServer, timeout: int, signal_object): @@ -39,7 +39,6 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300): ''' Bootstrap client services ''' - if not stringvalidators.validate_pub_key(peer): raise ValueError('Peer must be valid base32 ed25519 public key') @@ -89,7 +88,7 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300): return Response("") with Controller.from_port(port=config.get('tor.controlPort')) as controller: - connection_pool.bootstrap_pending.append(peer) + if not connection_pool is None: connection_pool.bootstrap_pending.append(peer) # Connect to the Tor process for Onionr controller.authenticate(config.get('tor.controlpassword')) # Create the v3 onion service @@ -97,7 +96,7 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300): onionrblocks.insert(response.service_id, header='con', sign=True, encryptType='asym', asymPeer=peer, disableForward=True, expire=(epoch.get_epoch() + bootstrap_timeout)) - threading.Thread(target=__bootstrap_timeout, args=[http_server, bootstrap_timeout], daemon=True) + threading.Thread(target=__bootstrap_timeout, args=[http_server, bootstrap_timeout, timed_out], daemon=True).start() # Run the bootstrap server try: @@ -107,11 +106,11 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300): # This line reached when server is shutdown by being bootstrapped # Add the address to the client pool if not comm_inst is None: + connection_pool.bootstrap_pending.remove(peer) + if timed_out.timed_out: + logger.warn('Could not connect to %s due to timeout' % (peer,)) + return None comm_inst.direct_connection_clients[peer] = response.service_id - - connection_pool.bootstrap_pending.remove(peer) - if timed_out.timed_out: - raise onionrexceptions.Timeout # Now that the bootstrap server has received a server, return the address return key_store.get(bs_id) diff --git a/onionr/static-data/default-plugins/flow/flowapi.py b/onionr/static-data/default-plugins/flow/flowapi.py index d52743c5..704e3b27 100755 --- a/onionr/static-data/default-plugins/flow/flowapi.py +++ b/onionr/static-data/default-plugins/flow/flowapi.py @@ -25,9 +25,11 @@ flask_blueprint = Blueprint('flow', __name__) @flask_blueprint.route('/flow/getpostsbyboard/') def get_post_by_board(board): - board_cache = simplekv.DeadSimpleKV(identifyhome.identify_home() + '/board-index.cache.json') + board_cache = simplekv.DeadSimpleKV(identifyhome.identify_home() + '/board-index.cache.json', flush_on_exit=False) board_cache.refresh() posts = board_cache.get(board) if posts is None: posts = '' + else: + posts = ','.join(posts) return Response(posts) \ No newline at end of file diff --git a/onionr/static-data/default-plugins/flow/main.py b/onionr/static-data/default-plugins/flow/main.py index 1c0cf974..b0263f8c 100755 --- a/onionr/static-data/default-plugins/flow/main.py +++ b/onionr/static-data/default-plugins/flow/main.py @@ -111,9 +111,10 @@ def on_processblocks(api, data=None): metadata = data['block'].bmetadata # Get the block metadata if data['type'] != 'brd': return - b_hash = reconstructhash.deconstruct_hash(data['block'].hash) # Get the 0-truncated block hash - board_cache = simplekv.DeadSimpleKV(identifyhome.identify_home() + '/board-index.cache.json') # get the board index cache + b_hash = reconstructhash.deconstruct_hash(data['block'].hash) # Get the 0-truncated block hash + board_cache = simplekv.DeadSimpleKV(identifyhome.identify_home() + '/board-index.cache.json', flush_on_exit=False) # get the board index cache + board_cache.refresh() # Validate the channel name is sane for caching try: ch = metadata['ch'] @@ -127,11 +128,7 @@ def on_processblocks(api, data=None): existing_posts = board_cache.get(ch) if existing_posts is None: - existing_posts = '' - - check_list = existing_posts.split(',') - if len(check_list) > 30: - check_list.pop(0) - existing_posts = ','.join(check_list) - board_cache.put(ch, '%s,%s' % (existing_posts, b_hash)) - board_cache.flush() \ No newline at end of file + existing_posts = [] + existing_posts.append(data['block'].hash) + board_cache.put(ch, existing_posts) + board_cache.flush()