From fefec8bdc8e8ecb9c18f7dae33d9624183212284 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Thu, 10 Mar 2022 01:10:13 -0600 Subject: [PATCH] added block store function used when not in stem phase --- src/gossip/client/__init__.py | 27 +++++++++++++++------ src/gossip/client/dandelionstem.py | 5 +++- src/gossip/client/storeblocks.py | 39 ++++++++++++++++++++++++++++++ src/gossip/server/acceptstem.py | 2 -- 4 files changed, 63 insertions(+), 10 deletions(-) create mode 100644 src/gossip/client/storeblocks.py diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index 10e7fdf5..c2b6b96a 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -4,16 +4,20 @@ Dandelion ++ Gossip client logic """ import traceback from typing import TYPE_CHECKING -from typing import Set +from typing import Set, List from time import sleep from queue import Queue + +from onionrblocks import Block + +from gossip.client.storeblocks import store_blocks + from ..constants import DANDELION_EPOCH_LENGTH from ..connectpeer import connect_peer if TYPE_CHECKING: - from onionrblocks import Block from ..peer import Peer import logger @@ -21,6 +25,8 @@ import onionrplugins from ..commands import GossipCommands from gossip.dandelion.phase import DandelionPhase from onionrthreads import add_onionr_thread +from blockdb import store_vdf_block + from .announce import do_announce from .dandelionstem import stem_out @@ -44,7 +50,7 @@ along with this program. If not, see . def gossip_client( peer_set: Set['Peer'], - block_queue: Queue['Block'], + block_queues: List[Queue['Block']], dandelion_seed: bytes): """ Gossip client does the following: @@ -52,9 +58,14 @@ def gossip_client( Stem new blocks we created or downloaded *during stem phase* Stream new blocks """ - + bl: Block do_announce(peer_set) + # Start a thread that runs every 1200 secs to + # Ask peers for a subset for their peer set + # The transport addresses for said peers will + # be passed to a plugin event where the respective + # transport plugin handles the new peer add_onionr_thread( get_new_peers, 1200, peer_set, initial_sleep=5) @@ -64,14 +75,16 @@ def gossip_client( while True: while not len(peer_set): sleep(0.2) + if dandelion_phase.remaining_time <= 10: + sleep(dandelion_phase.remaining_time) if dandelion_phase.is_stem_phase(): try: # Stem out blocks for (roughly) remaining epoch time stem_out( - block_queue, peer_set, dandelion_phase) + block_queues, peer_set, dandelion_phase) except TimeoutError: continue continue else: - pass - + # Add block to primary block db, where the diffuser can read it + store_blocks(block_queues, dandelion_phase) diff --git a/src/gossip/client/dandelionstem.py b/src/gossip/client/dandelionstem.py index 861e3e2e..7df89ebe 100644 --- a/src/gossip/client/dandelionstem.py +++ b/src/gossip/client/dandelionstem.py @@ -12,7 +12,10 @@ def stem_out( block_queue: Queue['Block'], peer_set: Set['Block'], d_phase: 'DandelionPhase'): - block = block_queue.get(block=True, timeout=5) + # Deep copy the block queues so that everything gets + # stemmed out if we run out of time in epoch + # Also spawn a thread with block set to add to db after time for black hole attack + block = block_queue.get(block=True, timeout=d_phase.remaining_time) raw_block = block.raw block_size = len(block.raw) block_id = block.id diff --git a/src/gossip/client/storeblocks.py b/src/gossip/client/storeblocks.py new file mode 100644 index 00000000..ac4265ca --- /dev/null +++ b/src/gossip/client/storeblocks.py @@ -0,0 +1,39 @@ +from typing import TYPE_CHECKING, List +from threading import Thread +from queue import Queue + +import blockdb + +if TYPE_CHECKING: + from onionrblocks import Block + from ..dandelion.phase import DandelionPhase + + +def store_blocks( + block_queues: List[Queue['Block']], + dandelion_phase: 'DandelionPhase'): + + new_queue: Queue['Block'] = Queue() + + def _watch_queue(block_queue: Queue['Block']): + # Copy all incoming blocks into 1 queue which gets processed to db + while not dandelion_phase.is_stem_phase() \ + and dandelion_phase.remaining_time() > 1: + try: + new_queue.put( + block_queue.get(timeout=dandelion_phase.remaining_time())) + except TimeoutError: + pass + + for block_queue in block_queues: + Thread(target=_watch_queue, args=block_queue, daemon=True).start() + + while not dandelion_phase.is_stem_phase() \ + and dandelion_phase.remaining_time() > 1: + try: + blockdb.store_vdf_block( + new_queue.get(timeout=dandelion_phase.remaining_time()) + ) + except TimeoutError: + pass + diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index 49f1c682..0cd2e7bb 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -37,7 +37,6 @@ async def accept_stem_blocks( q = Queue() appended_queue = False - #block_queues.append(q) for _ in range(MAX_STEM_BLOCKS_PER_STREAM): block_id = ( @@ -67,4 +66,3 @@ async def accept_stem_blocks( appended_queue = True read_routine = reader.read(BLOCK_ID_SIZE) -