diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index b2ed51ae..8ab5a920 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -6,6 +6,7 @@ import traceback from typing import TYPE_CHECKING from typing import Set, Tuple from time import sleep +import asyncio from queue import Queue @@ -79,13 +80,17 @@ def gossip_client( if dandelion_phase.remaining_time() <= 10: sleep(dandelion_phase.remaining_time()) if dandelion_phase.is_stem_phase(): + logger.debug("Entering stem phase", terminal=True) try: # Stem out blocks for (roughly) remaining epoch time - await stem_out( - block_queues, peer_set, dandelion_phase) + asyncio.run(stem_out( + block_queues, peer_set, dandelion_phase)) except TimeoutError: continue + except Exception: + logger.error(traceback.format_exc(), terminal=True) continue else: + logger.debug("Entering fluff phase", terminal=True) # Add block to primary block db, where the diffuser can read it store_blocks(block_queues, dandelion_phase) diff --git a/src/gossip/client/dandelionstem/__init__.py b/src/gossip/client/dandelionstem/__init__.py index 51ea7802..e970642b 100644 --- a/src/gossip/client/dandelionstem/__init__.py +++ b/src/gossip/client/dandelionstem/__init__.py @@ -5,6 +5,8 @@ import traceback from typing import TYPE_CHECKING, Coroutine, Tuple, List +from ordered_set import OrderedSet + from onionrthreads import add_delayed_thread from blockdb import add_block_to_db import logger @@ -16,7 +18,7 @@ from ... import dandelion from .stemstream import do_stem_stream if TYPE_CHECKING: - from ordered_set import OrderedSet + from onionrblocks import Block from ...peer import Peer from ...dandelion.phase import DandelionPhase diff --git a/src/gossip/client/dandelionstem/stemstream.py b/src/gossip/client/dandelionstem/stemstream.py index eb5561e4..321275a8 100644 --- a/src/gossip/client/dandelionstem/stemstream.py +++ b/src/gossip/client/dandelionstem/stemstream.py @@ -1,5 +1,7 @@ from typing import TYPE_CHECKING +from ...constants import BLOCK_MAX_SIZE + if TYPE_CHECKING: from queue import Queue import socket @@ -20,6 +22,9 @@ async def do_stem_stream( # Primary client component that communicate's with gossip.server.acceptstem remaining_time = d_phase.remaining_time() bl: 'Block' = block_queue.get(block=True, timeout=remaining_time) + + block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE) + peer_socket.sendall(bl.id) - peer_socket.sendall(len(bl.raw)) + peer_socket.sendall(block_size.encode('utf-8')) peer_socket.sendall(bl.raw) diff --git a/src/gossip/client/storeblocks.py b/src/gossip/client/storeblocks.py index 1ef5471f..f1486f35 100644 --- a/src/gossip/client/storeblocks.py +++ b/src/gossip/client/storeblocks.py @@ -1,6 +1,7 @@ from typing import TYPE_CHECKING, Tuple from threading import Thread from queue import Queue +from queue import Empty import blockdb @@ -22,11 +23,11 @@ def store_blocks( try: new_queue.put( block_queue.get(timeout=dandelion_phase.remaining_time())) - except TimeoutError: + except Empty: pass for block_queue in block_queues: - Thread(target=_watch_queue, args=block_queue, daemon=True).start() + Thread(target=_watch_queue, args=[block_queue], daemon=True).start() while not dandelion_phase.is_stem_phase() \ and dandelion_phase.remaining_time() > 1: @@ -34,6 +35,6 @@ def store_blocks( blockdb.add_block_to_db( new_queue.get(timeout=dandelion_phase.remaining_time()) ) - except TimeoutError: + except Empty: pass diff --git a/src/gossip/dandelion/phase.py b/src/gossip/dandelion/phase.py index 998ff36c..0792061b 100644 --- a/src/gossip/dandelion/phase.py +++ b/src/gossip/dandelion/phase.py @@ -1,5 +1,6 @@ from time import time from hashlib import shake_128 +from secrets import randbits class DandelionPhase: @@ -8,7 +9,7 @@ class DandelionPhase: assert len(self.seed) == 32 self.epoch = int(time()) self.epoch_interval = epoch_interval_secs - self._is_stem = True + self._is_stem = bool(randbits(1)) self.phase_id = b'' @@ -21,7 +22,7 @@ class DandelionPhase: int.to_bytes(cur_time, 8, 'big')).digest(8) # Use first byte of phase id as random source for stem phase picking - if int.from_bytes(self.phase_id[0], 'big') % 2: + if self.phase_id[0] % 2: self._is_stem = True else: self._is_stem = False @@ -29,7 +30,8 @@ class DandelionPhase: def remaining_time(self) -> int: current_time = int(time()) - return self.epoch_interval - (current_time - self.epoch) + + return max(0, self.epoch_interval - (current_time - self.epoch)) def is_stem_phase(self) -> bool: diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index d7d5b576..4a9a42d2 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -85,6 +85,10 @@ def gossip_server( block_queues, reader, writer, inbound_dandelion_edge_count) + except asyncio.exceptions.TimeoutError: + logger.debug( + "Inbound edge timed out when steming blocks to us", + terminal=True) except Exception: logger.warn( f"Err getting\n{traceback.format_exc()}", diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index a9225c18..fb63ac56 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -12,7 +12,7 @@ from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM block_size_digits = len(str(BLOCK_MAX_SIZE)) -base_wait_timeout = 10 +base_wait_timeout = 30 if TYPE_CHECKING: from queue import Queue @@ -44,6 +44,9 @@ async def accept_stem_blocks( reader.read(block_size_digits), base_wait_timeout)).decode('utf-8') + if not block_size: + break + if not all(c in "0123456789" for c in block_size): raise ValueError("Invalid block size data (non 0-9 char)") block_size = int(block_size) @@ -53,6 +56,9 @@ async def accept_stem_blocks( raw_block: bytes = await wait_for( reader.read(block_size), base_wait_timeout * 6) + if not raw_block: + break + block_queue_to_use.put( Block(block_id, raw_block, auto_verify=True) )