From 84c13ade51f244329c89d2d7283e6bb2c17042fa Mon Sep 17 00:00:00 2001 From: Kevin F Date: Tue, 19 Jul 2022 00:32:54 -0500 Subject: [PATCH] gossip fixes --- src/blockdb/__init__.py | 4 --- src/gossip/client/dandelionstem/__init__.py | 4 +-- src/gossip/client/dandelionstem/stemstream.py | 11 +++++++- src/gossip/client/streamblocks.py | 8 ++++++ src/gossip/constants.py | 2 +- src/gossip/server/diffuseblocks.py | 26 +++++++++---------- src/httpapi/addblock/__init__.py | 6 +++-- 7 files changed, 37 insertions(+), 24 deletions(-) diff --git a/src/blockdb/__init__.py b/src/blockdb/__init__.py index 09dfc60c..a27c113a 100644 --- a/src/blockdb/__init__.py +++ b/src/blockdb/__init__.py @@ -14,10 +14,6 @@ def add_block_to_db(block: Block): # Raises db.DuplicateKey if dupe db.set_if_new(block_db_path, block.id, block.raw) - for func in block_storage_observers: - func(block) - - def get_blocks_by_type(block_type: str) -> "Generator[Block]": block_db = db.get_db_obj(block_db_path, 'u') for block_hash in db.list_keys(block_db_path): diff --git a/src/gossip/client/dandelionstem/__init__.py b/src/gossip/client/dandelionstem/__init__.py index ae600100..54d6c853 100644 --- a/src/gossip/client/dandelionstem/__init__.py +++ b/src/gossip/client/dandelionstem/__init__.py @@ -84,7 +84,7 @@ async def stem_out(d_phase: 'DandelionPhase'): def blackhole_protection(q): for bl in q: - add_block_to_db(q) + add_block_to_db(bl) # Spawn threads with deep copied block queue to add to db after time @@ -122,7 +122,7 @@ async def stem_out(d_phase: 'DandelionPhase'): logger.error( "Did not stem out any blocks in time, " + "if this happens regularly you may be under attack", - terminal=True) + terminal=False) for s in peer_sockets: if s: s.close() diff --git a/src/gossip/client/dandelionstem/stemstream.py b/src/gossip/client/dandelionstem/stemstream.py index 4dfbd281..03d53f71 100644 --- a/src/gossip/client/dandelionstem/stemstream.py +++ b/src/gossip/client/dandelionstem/stemstream.py @@ -1,3 +1,5 @@ +from asyncio import sleep +from queue import Empty from typing import TYPE_CHECKING import logger @@ -22,7 +24,14 @@ async def do_stem_stream( while remaining_time > 5 and my_phase_id == d_phase.phase_id: # Primary client component that communicate's with gossip.server.acceptstem remaining_time = d_phase.remaining_time() - bl: 'Block' = block_queue.get(block=True, timeout=remaining_time) + while remaining_time: + try: + # queues can't block because we're in async + bl = block_queue.get(block=False) + except Empty: + await sleep(1) + else: + break logger.info("Sending block over dandelion++", terminal=True) block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN) diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks.py index 18aef4cd..f286287e 100644 --- a/src/gossip/client/streamblocks.py +++ b/src/gossip/client/streamblocks.py @@ -5,6 +5,7 @@ Download blocks that are being diffused doesn't apply for blocks in the gossip queue that are awaiting descision to fluff or stem """ +from ast import Index from threading import Thread, Semaphore from random import SystemRandom from time import sleep @@ -120,6 +121,12 @@ def stream_from_peers(): while True: need_socket_lock.acquire() available_set = gossip_peer_set - tried_peers + if not len(available_set) and len(tried_peers): + try: + tried_peers.pop() + except IndexError: + pass + available_set = gossip_peer_set - tried_peers peers = sys_rand.sample( available_set, min(MAX_STREAMS, len(available_set))) @@ -137,3 +144,4 @@ def stream_from_peers(): except IndexError: need_socket_lock.release() break + diff --git a/src/gossip/constants.py b/src/gossip/constants.py index f4276886..5e760f71 100644 --- a/src/gossip/constants.py +++ b/src/gossip/constants.py @@ -14,4 +14,4 @@ MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris OUTBOUND_DANDELION_EDGES = 2 MAX_STEM_BLOCKS_PER_STREAM = 1000 -BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3 \ No newline at end of file +BLACKHOLE_EVADE_TIMER_SECS = 10 \ No newline at end of file diff --git a/src/gossip/server/diffuseblocks.py b/src/gossip/server/diffuseblocks.py index 93b35383..edd7ab4b 100644 --- a/src/gossip/server/diffuseblocks.py +++ b/src/gossip/server/diffuseblocks.py @@ -20,6 +20,7 @@ from ..constants import BLOCK_MAX_SIZE, BLOCK_SIZE_LEN from ..constants import BLOCK_STREAM_OFFSET_DIGITS import logger +import blockdb from blockdb import get_blocks_after_timestamp, block_storage_observers """ This program is free software: you can redistribute it and/or modify @@ -53,13 +54,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): raise ValueError( "Peer's specified time offset skewed too far into the future") - newly_stored_blocks = Queue() - def _add_to_queue(bl): - newly_stored_blocks.put_nowait(bl) - block_storage_observers.append( - _add_to_queue - ) async def _send_block(block: 'Block'): writer.write(block.id) @@ -90,17 +85,20 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): ) except IncompleteReadError: keep_writing = False + time_offset = time() # Diffuse blocks stored since we started this stream while keep_writing: - await _send_block(await newly_stored_blocks.get()) - try: - keep_writing = bool( - int.from_bytes(await reader.readexactly(1), 'big') - ) - except IncompleteReadError: - keep_writing = False + bls = blockdb.get_blocks_after_timestamp(time_offset) + for bl in bls: + await _send_block(bl) + try: + keep_writing = bool( + int.from_bytes(await reader.readexactly(1), 'big') + ) + except IncompleteReadError: + keep_writing = False + break except Exception: logger.warn(traceback.format_exc(), terminal=True) - block_storage_observers.remove(_add_to_queue) diff --git a/src/httpapi/addblock/__init__.py b/src/httpapi/addblock/__init__.py index a09c48c5..0eabefdc 100644 --- a/src/httpapi/addblock/__init__.py +++ b/src/httpapi/addblock/__init__.py @@ -8,6 +8,7 @@ import secrets from flask import Blueprint, Response, request from onionrblocks import Block +import blockdb import logger from gossip import blockqueues @@ -38,6 +39,7 @@ def block_serialized(): req_data = request.data block_id = req_data[:BLOCK_ID_SIZE] block_data = req_data[BLOCK_ID_SIZE:] - blockqueues.gossip_block_queues[stream_to_use].put( - Block(block_id, block_data, auto_verify=False)) + blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False)) + #blockqueues.gossip_block_queues[stream_to_use].put( + #Block(block_id, block_data, auto_verify=False), block=False) return "ok"