Misc bug fixes for gossip

This commit is contained in:
Kevin F 2022-03-20 18:05:44 -05:00
parent 5b5e5ef764
commit 9c44069248
7 changed files with 36 additions and 11 deletions

View File

@ -6,6 +6,7 @@ import traceback
from typing import TYPE_CHECKING
from typing import Set, Tuple
from time import sleep
import asyncio
from queue import Queue
@ -79,13 +80,17 @@ def gossip_client(
if dandelion_phase.remaining_time() <= 10:
sleep(dandelion_phase.remaining_time())
if dandelion_phase.is_stem_phase():
logger.debug("Entering stem phase", terminal=True)
try:
# Stem out blocks for (roughly) remaining epoch time
await stem_out(
block_queues, peer_set, dandelion_phase)
asyncio.run(stem_out(
block_queues, peer_set, dandelion_phase))
except TimeoutError:
continue
except Exception:
logger.error(traceback.format_exc(), terminal=True)
continue
else:
logger.debug("Entering fluff phase", terminal=True)
# Add block to primary block db, where the diffuser can read it
store_blocks(block_queues, dandelion_phase)

View File

@ -5,6 +5,8 @@ import traceback
from typing import TYPE_CHECKING, Coroutine, Tuple, List
from ordered_set import OrderedSet
from onionrthreads import add_delayed_thread
from blockdb import add_block_to_db
import logger
@ -16,7 +18,7 @@ from ... import dandelion
from .stemstream import do_stem_stream
if TYPE_CHECKING:
from ordered_set import OrderedSet
from onionrblocks import Block
from ...peer import Peer
from ...dandelion.phase import DandelionPhase

View File

@ -1,5 +1,7 @@
from typing import TYPE_CHECKING
from ...constants import BLOCK_MAX_SIZE
if TYPE_CHECKING:
from queue import Queue
import socket
@ -20,6 +22,9 @@ async def do_stem_stream(
# Primary client component that communicate's with gossip.server.acceptstem
remaining_time = d_phase.remaining_time()
bl: 'Block' = block_queue.get(block=True, timeout=remaining_time)
block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE)
peer_socket.sendall(bl.id)
peer_socket.sendall(len(bl.raw))
peer_socket.sendall(block_size.encode('utf-8'))
peer_socket.sendall(bl.raw)

View File

@ -1,6 +1,7 @@
from typing import TYPE_CHECKING, Tuple
from threading import Thread
from queue import Queue
from queue import Empty
import blockdb
@ -22,11 +23,11 @@ def store_blocks(
try:
new_queue.put(
block_queue.get(timeout=dandelion_phase.remaining_time()))
except TimeoutError:
except Empty:
pass
for block_queue in block_queues:
Thread(target=_watch_queue, args=block_queue, daemon=True).start()
Thread(target=_watch_queue, args=[block_queue], daemon=True).start()
while not dandelion_phase.is_stem_phase() \
and dandelion_phase.remaining_time() > 1:
@ -34,6 +35,6 @@ def store_blocks(
blockdb.add_block_to_db(
new_queue.get(timeout=dandelion_phase.remaining_time())
)
except TimeoutError:
except Empty:
pass

View File

@ -1,5 +1,6 @@
from time import time
from hashlib import shake_128
from secrets import randbits
class DandelionPhase:
@ -8,7 +9,7 @@ class DandelionPhase:
assert len(self.seed) == 32
self.epoch = int(time())
self.epoch_interval = epoch_interval_secs
self._is_stem = True
self._is_stem = bool(randbits(1))
self.phase_id = b''
@ -21,7 +22,7 @@ class DandelionPhase:
int.to_bytes(cur_time, 8, 'big')).digest(8)
# Use first byte of phase id as random source for stem phase picking
if int.from_bytes(self.phase_id[0], 'big') % 2:
if self.phase_id[0] % 2:
self._is_stem = True
else:
self._is_stem = False
@ -29,7 +30,8 @@ class DandelionPhase:
def remaining_time(self) -> int:
current_time = int(time())
return self.epoch_interval - (current_time - self.epoch)
return max(0, self.epoch_interval - (current_time - self.epoch))
def is_stem_phase(self) -> bool:

View File

@ -85,6 +85,10 @@ def gossip_server(
block_queues,
reader, writer,
inbound_dandelion_edge_count)
except asyncio.exceptions.TimeoutError:
logger.debug(
"Inbound edge timed out when steming blocks to us",
terminal=True)
except Exception:
logger.warn(
f"Err getting\n{traceback.format_exc()}",

View File

@ -12,7 +12,7 @@ from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM
block_size_digits = len(str(BLOCK_MAX_SIZE))
base_wait_timeout = 10
base_wait_timeout = 30
if TYPE_CHECKING:
from queue import Queue
@ -44,6 +44,9 @@ async def accept_stem_blocks(
reader.read(block_size_digits),
base_wait_timeout)).decode('utf-8')
if not block_size:
break
if not all(c in "0123456789" for c in block_size):
raise ValueError("Invalid block size data (non 0-9 char)")
block_size = int(block_size)
@ -53,6 +56,9 @@ async def accept_stem_blocks(
raw_block: bytes = await wait_for(
reader.read(block_size), base_wait_timeout * 6)
if not raw_block:
break
block_queue_to_use.put(
Block(block_id, raw_block, auto_verify=True)
)