From c215d4decdc8f539ffd774578acac7511a4a943d Mon Sep 17 00:00:00 2001 From: Kevin F Date: Sat, 26 Mar 2022 18:24:40 -0500 Subject: [PATCH] Work on normal gossip diffusion --- ROADMAP.md | 1 + src/blockdb/__init__.py | 19 ++++++++---- src/db/__init__.py | 14 +++++++++ src/gossip/client/dandelionstem/stemstream.py | 4 +-- src/gossip/constants.py | 3 +- src/gossip/server/__init__.py | 30 ++++++++++++------- src/gossip/server/acceptstem.py | 15 +++++----- 7 files changed, 61 insertions(+), 25 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 94cd3af8..e8200ba5 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -13,6 +13,7 @@ * [ ] Restore webUI as a plugin * [ ] Restore/reimplement mail plugin * [ ] Restore/reimplement friends plugin +* [ ] Refresh test suite ## Store Plugin Release (9.1) diff --git a/src/blockdb/__init__.py b/src/blockdb/__init__.py index 92fca8c0..bded0296 100644 --- a/src/blockdb/__init__.py +++ b/src/blockdb/__init__.py @@ -1,16 +1,24 @@ +from typing import Callable, Generator, List + from onionrblocks import Block import db -from utils import identifyhome -block_db_path = identifyhome.identify_home() + 'blockdata' +from .dbpath import block_db_path + + +block_storage_observers: List[Callable] = [] def add_block_to_db(block: Block): - db.set(block_db_path, block.id, block.raw) + # Raises db.DuplicateKey if dupe + db.set_if_new(block_db_path, block.id, block.raw) + + for func in block_storage_observers: + func(block) -def get_blocks_by_type(block_type: str): +def get_blocks_by_type(block_type: str) -> Generator[Block]: block_db = db.get_db_obj(block_db_path, 'u') for block_hash in db.list_keys(block_db_path): block = Block(block_hash, block_db[block_hash], auto_verify=False) @@ -18,7 +26,8 @@ def get_blocks_by_type(block_type: str): yield block -def get_blocks_after_timestamp(timestamp: int, block_type: str = ''): +def get_blocks_after_timestamp( + timestamp: int, block_type: str = '') -> Generator[Block]: block_db = db.get_db_obj(block_db_path, 'u') for block_hash in db.list_keys(block_db_path): diff --git a/src/db/__init__.py b/src/db/__init__.py index 9758b628..439a36f6 100644 --- a/src/db/__init__.py +++ b/src/db/__init__.py @@ -4,6 +4,8 @@ import os timeout = 120 +class DuplicateKey(ValueError): pass + def _do_timeout(func, *args): ts = 0 @@ -22,6 +24,18 @@ def _do_timeout(func, *args): return res +def set_if_new(db_path, key, value): + def _set(key, value): + with dbm.open(db_path, "c") as my_db: + try: + my_db[key] + except KeyError: + my_db[key] = value + else: + raise DuplicateKey + _do_timeout(_set, key, value) + + def set(db_path, key, value): """Set a value in the db, open+timeout so not good for rapid use""" def _set(key, value): diff --git a/src/gossip/client/dandelionstem/stemstream.py b/src/gossip/client/dandelionstem/stemstream.py index 321275a8..00143f2a 100644 --- a/src/gossip/client/dandelionstem/stemstream.py +++ b/src/gossip/client/dandelionstem/stemstream.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from ...constants import BLOCK_MAX_SIZE +from ...constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN if TYPE_CHECKING: from queue import Queue @@ -23,7 +23,7 @@ async def do_stem_stream( remaining_time = d_phase.remaining_time() bl: 'Block' = block_queue.get(block=True, timeout=remaining_time) - block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE) + block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE_LEN) peer_socket.sendall(bl.id) peer_socket.sendall(block_size.encode('utf-8')) diff --git a/src/gossip/constants.py b/src/gossip/constants.py index 2716156a..78be49c6 100644 --- a/src/gossip/constants.py +++ b/src/gossip/constants.py @@ -2,8 +2,9 @@ BOOTSTRAP_ATTEMPTS = 5 PEER_AMOUNT_TO_ASK = 3 TRANSPORT_SIZE_BYTES = 64 BLOCK_MAX_SIZE = 1024 * 2000 +BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE)) BLOCK_ID_SIZE = 128 -DANDELION_EPOCH_LENGTH = 120 +DANDELION_EPOCH_LENGTH = 60 # Magic number i made up, not really specified in dandelion++ paper MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index b9d715b8..bb0160b9 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -21,6 +21,7 @@ from filepaths import gossip_server_socket_file from ..commands import GossipCommands from ..peerset import gossip_peer_set from .acceptstem import accept_stem_blocks +from .streamblocks import stream_blocks """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -46,9 +47,11 @@ def gossip_server(): while True: try: - cmd = await asyncio.wait_for(reader.read(1), 60) + cmd = await asyncio.wait_for(reader.readexactly(1), 60) except asyncio.exceptions.CancelledError: break + except asyncio.IncompleteReadError: + break cmd = int.from_bytes(cmd, 'big') if cmd == b'' or cmd == 0: @@ -60,23 +63,30 @@ def gossip_server(): pass case GossipCommands.ANNOUNCE: async def _read_announce(): - address = await reader.read( + address = await reader.readexactly( constants.TRANSPORT_SIZE_BYTES) - onionrevents.event( - 'announce_rec', - data={'address': address, - 'callback': connect_peer}, - threaded=True) - writer.write(int(1).to_bytes(1, 'big')) + if address: + onionrevents.event( + 'announce_rec', + data={'address': address, + 'callback': connect_peer}, + threaded=True) + writer.write(int(1).to_bytes(1, 'big')) await asyncio.wait_for(_read_announce(), 10) case GossipCommands.PEER_EXCHANGE: for peer in gossip_peer_set: writer.write( peer.transport_address.encode( 'utf-8').removesuffix(b'.onion')) + case GossipCommands.STREAM_BLOCKS: + try: + await stream_blocks(reader, writer) + except Exception: + logger.warn( + f"Err streaming blocks\n{traceback.format_exc()}", + terminal=True) case GossipCommands.PUT_BLOCKS: - # Create block queue & append stemmed blocks to it - + # Pick block queue & append stemmed blocks to it try: await accept_stem_blocks( reader, writer, diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index c2c5852f..e77a52e2 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -32,7 +32,7 @@ async def accept_stem_blocks( inbound_edge_count[0] += 1 # Start getting the first block - read_routine = reader.read(BLOCK_ID_SIZE) + read_routine = reader.readexactly(BLOCK_ID_SIZE) stream_start_time = int(time()) block_queue_to_use = secrets.choice(gossip_block_queues) @@ -40,10 +40,12 @@ async def accept_stem_blocks( for _ in range(MAX_STEM_BLOCKS_PER_STREAM): block_id = ( await wait_for(read_routine, base_wait_timeout)).decode('utf-8') - block_size = (await wait_for( - reader.read(block_size_digits), - base_wait_timeout)).decode('utf-8') + if not block_id: + break + block_size = (await wait_for( + reader.readexactly(block_size_digits), + base_wait_timeout)).decode('utf-8') if not block_size: break @@ -54,8 +56,7 @@ async def accept_stem_blocks( raise ValueError("Max block size") raw_block: bytes = await wait_for( - reader.read(block_size), base_wait_timeout * 6) - + reader.readexactly(block_size), base_wait_timeout * 6) if not raw_block: break @@ -65,5 +66,5 @@ async def accept_stem_blocks( # Regardless of stem phase, we add to queue # Client will decide if they are to be stemmed - read_routine = reader.read(BLOCK_ID_SIZE) + read_routine = reader.readexactly(BLOCK_ID_SIZE)