fixed unsynced flow cache

This commit is contained in:
Kevin Froman 2019-08-25 21:18:09 -05:00
parent 9fc6e35fe4
commit 2c61380ffa
7 changed files with 37 additions and 25 deletions

View File

@ -113,7 +113,7 @@ class OnionrCommunicatorDaemon:
OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, myArgs=[self]) OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, myArgs=[self])
# Timer to cleanup old blocks # Timer to cleanup old blocks
blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 65, myArgs=[self]) blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 20, myArgs=[self])
# Timer to discover new peers # Timer to discover new peers
OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requiresPeer=True, myArgs=[self], maxThreads=2) OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requiresPeer=True, myArgs=[self], maxThreads=2)

View File

@ -102,7 +102,7 @@ def download_blocks_from_communicator(comm_inst):
blacklist.addToDB(blockHash) blacklist.addToDB(blockHash)
else: else:
# if block didn't meet expected hash # if block didn't meet expected hash
tempHash = crypto.sha3Hash(content) # lazy hack, TODO use var tempHash = onionrcrypto.hashers.sha3_hash(content) # lazy hack, TODO use var
try: try:
tempHash = tempHash.decode() tempHash = tempHash.decode()
except AttributeError: except AttributeError:

View File

@ -25,6 +25,13 @@ from coredb import blockmetadb, dbfiles
import onionrstorage import onionrstorage
from onionrstorage import removeblock from onionrstorage import removeblock
import onionrblacklist import onionrblacklist
def __remove_from_upload(comm_inst, block_hash: str):
try:
comm_inst.blocksToUpload.remove(block_hash)
except ValueError:
pass
def clean_old_blocks(comm_inst): def clean_old_blocks(comm_inst):
'''Delete old blocks if our disk allocation is full/near full, and also expired blocks''' '''Delete old blocks if our disk allocation is full/near full, and also expired blocks'''
blacklist = onionrblacklist.OnionrBlackList() blacklist = onionrblacklist.OnionrBlackList()
@ -33,13 +40,15 @@ def clean_old_blocks(comm_inst):
blacklist.addToDB(bHash) blacklist.addToDB(bHash)
removeblock.remove_block(bHash) removeblock.remove_block(bHash)
onionrstorage.deleteBlock(bHash) onionrstorage.deleteBlock(bHash)
__remove_from_upload(comm_inst, bHash)
logger.info('Deleted block: %s' % (bHash,)) logger.info('Deleted block: %s' % (bHash,))
while comm_inst.storage_counter.isFull(): while comm_inst.storage_counter.isFull():
oldest = blockmetadb.get_block_list()[0] oldest = blockmetadb.get_block_list()[0]
blacklist.addToDB(oldest) blacklist.addToDB(oldest)
removeblock.remove_block(oldest) removeblock.remove_block(oldest)
onionrstorage.deleteBlock(bHash) onionrstorage.deleteBlock(oldest)
__remove_from_upload.remove(comm_inst, oldest)
logger.info('Deleted block: %s' % (oldest,)) logger.info('Deleted block: %s' % (oldest,))
comm_inst.decrementThreadCount('clean_old_blocks') comm_inst.decrementThreadCount('clean_old_blocks')

View File

@ -26,9 +26,14 @@ import deadsimplekv
import filepaths import filepaths
import onionrservices import onionrservices
from onionrservices import pool
def _get_communicator(g): def _get_communicator(g):
return g.too_many.get_by_string("OnionrCommunicatorDaemon") while True:
try:
return g.too_many.get_by_string("OnionrCommunicatorDaemon")
except KeyError:
pass
class DirectConnectionManagement: class DirectConnectionManagement:
def __init__(self, client_api): def __init__(self, client_api):
@ -49,7 +54,7 @@ class DirectConnectionManagement:
def make_new_connection(pubkey): def make_new_connection(pubkey):
communicator = _get_communicator(g) communicator = _get_communicator(g)
resp = "pending" resp = "pending"
if pubkey in communicator.shared_state.get_by_string("ServicePool").bootstrap_pending: if pubkey in communicator.shared_state.get(pool.ServicePool).bootstrap_pending:
return Response(resp) return Response(resp)
if pubkey in communicator.direct_connection_clients: if pubkey in communicator.direct_connection_clients:

View File

@ -17,17 +17,17 @@
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
import time, threading, uuid import time, threading, uuid, os
from gevent.pywsgi import WSGIServer, WSGIHandler from gevent.pywsgi import WSGIServer, WSGIHandler
from stem.control import Controller from stem.control import Controller
from flask import Flask, Response from flask import Flask, Response
from netcontroller import get_open_port from netcontroller import get_open_port
from . import httpheaders from . import httpheaders
from onionrutils import stringvalidators, epoch from onionrutils import stringvalidators, epoch
import logger
import config, onionrblocks, filepaths import config, onionrblocks, filepaths
import onionrexceptions import onionrexceptions
import deadsimplekv as simplekv import deadsimplekv as simplekv
import warden
from . import pool from . import pool
def __bootstrap_timeout(server: WSGIServer, timeout: int, signal_object): def __bootstrap_timeout(server: WSGIServer, timeout: int, signal_object):
@ -39,7 +39,6 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300):
''' '''
Bootstrap client services Bootstrap client services
''' '''
if not stringvalidators.validate_pub_key(peer): if not stringvalidators.validate_pub_key(peer):
raise ValueError('Peer must be valid base32 ed25519 public key') raise ValueError('Peer must be valid base32 ed25519 public key')
@ -89,7 +88,7 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300):
return Response("") return Response("")
with Controller.from_port(port=config.get('tor.controlPort')) as controller: with Controller.from_port(port=config.get('tor.controlPort')) as controller:
connection_pool.bootstrap_pending.append(peer) if not connection_pool is None: connection_pool.bootstrap_pending.append(peer)
# Connect to the Tor process for Onionr # Connect to the Tor process for Onionr
controller.authenticate(config.get('tor.controlpassword')) controller.authenticate(config.get('tor.controlpassword'))
# Create the v3 onion service # Create the v3 onion service
@ -97,7 +96,7 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300):
onionrblocks.insert(response.service_id, header='con', sign=True, encryptType='asym', onionrblocks.insert(response.service_id, header='con', sign=True, encryptType='asym',
asymPeer=peer, disableForward=True, expire=(epoch.get_epoch() + bootstrap_timeout)) asymPeer=peer, disableForward=True, expire=(epoch.get_epoch() + bootstrap_timeout))
threading.Thread(target=__bootstrap_timeout, args=[http_server, bootstrap_timeout], daemon=True) threading.Thread(target=__bootstrap_timeout, args=[http_server, bootstrap_timeout, timed_out], daemon=True).start()
# Run the bootstrap server # Run the bootstrap server
try: try:
@ -107,11 +106,11 @@ def bootstrap_client_service(peer, comm_inst=None, bootstrap_timeout=300):
# This line reached when server is shutdown by being bootstrapped # This line reached when server is shutdown by being bootstrapped
# Add the address to the client pool # Add the address to the client pool
if not comm_inst is None: if not comm_inst is None:
connection_pool.bootstrap_pending.remove(peer)
if timed_out.timed_out:
logger.warn('Could not connect to %s due to timeout' % (peer,))
return None
comm_inst.direct_connection_clients[peer] = response.service_id comm_inst.direct_connection_clients[peer] = response.service_id
connection_pool.bootstrap_pending.remove(peer)
if timed_out.timed_out:
raise onionrexceptions.Timeout
# Now that the bootstrap server has received a server, return the address # Now that the bootstrap server has received a server, return the address
return key_store.get(bs_id) return key_store.get(bs_id)

View File

@ -25,9 +25,11 @@ flask_blueprint = Blueprint('flow', __name__)
@flask_blueprint.route('/flow/getpostsbyboard/<board>') @flask_blueprint.route('/flow/getpostsbyboard/<board>')
def get_post_by_board(board): def get_post_by_board(board):
board_cache = simplekv.DeadSimpleKV(identifyhome.identify_home() + '/board-index.cache.json') board_cache = simplekv.DeadSimpleKV(identifyhome.identify_home() + '/board-index.cache.json', flush_on_exit=False)
board_cache.refresh() board_cache.refresh()
posts = board_cache.get(board) posts = board_cache.get(board)
if posts is None: if posts is None:
posts = '' posts = ''
else:
posts = ','.join(posts)
return Response(posts) return Response(posts)

View File

@ -111,9 +111,10 @@ def on_processblocks(api, data=None):
metadata = data['block'].bmetadata # Get the block metadata metadata = data['block'].bmetadata # Get the block metadata
if data['type'] != 'brd': if data['type'] != 'brd':
return return
b_hash = reconstructhash.deconstruct_hash(data['block'].hash) # Get the 0-truncated block hash
board_cache = simplekv.DeadSimpleKV(identifyhome.identify_home() + '/board-index.cache.json') # get the board index cache
b_hash = reconstructhash.deconstruct_hash(data['block'].hash) # Get the 0-truncated block hash
board_cache = simplekv.DeadSimpleKV(identifyhome.identify_home() + '/board-index.cache.json', flush_on_exit=False) # get the board index cache
board_cache.refresh()
# Validate the channel name is sane for caching # Validate the channel name is sane for caching
try: try:
ch = metadata['ch'] ch = metadata['ch']
@ -127,11 +128,7 @@ def on_processblocks(api, data=None):
existing_posts = board_cache.get(ch) existing_posts = board_cache.get(ch)
if existing_posts is None: if existing_posts is None:
existing_posts = '' existing_posts = []
existing_posts.append(data['block'].hash)
check_list = existing_posts.split(',') board_cache.put(ch, existing_posts)
if len(check_list) > 30: board_cache.flush()
check_list.pop(0)
existing_posts = ','.join(check_list)
board_cache.put(ch, '%s,%s' % (existing_posts, b_hash))
board_cache.flush()