From 9d17c7bd645b4c13c9218343a8158ca51dd97b76 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Fri, 4 Mar 2022 18:05:12 -0600 Subject: [PATCH] Implemented server dandelion++ stem portion --- src/gossip/client/__init__.py | 6 ++-- src/gossip/client/dandelionstem.py | 6 ++-- src/gossip/constants.py | 5 ++- src/gossip/dandelion/__init__.py | 7 ++++ src/gossip/{ => dandelion}/phase.py | 0 src/gossip/graph.py | 2 -- src/gossip/server/__init__.py | 19 ++++++++++- src/gossip/server/acceptstem.py | 52 ++++++++++++++++++++--------- 8 files changed, 73 insertions(+), 24 deletions(-) create mode 100644 src/gossip/dandelion/__init__.py rename src/gossip/{ => dandelion}/phase.py (100%) delete mode 100644 src/gossip/graph.py diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index c75c582c..10e7fdf5 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -8,6 +8,8 @@ from typing import Set from time import sleep from queue import Queue + +from ..constants import DANDELION_EPOCH_LENGTH from ..connectpeer import connect_peer if TYPE_CHECKING: @@ -17,7 +19,7 @@ if TYPE_CHECKING: import logger import onionrplugins from ..commands import GossipCommands -from gossip.phase import DandelionPhase +from gossip.dandelion.phase import DandelionPhase from onionrthreads import add_onionr_thread from .announce import do_announce @@ -57,7 +59,7 @@ def gossip_client( get_new_peers, 1200, peer_set, initial_sleep=5) - dandelion_phase = DandelionPhase(dandelion_seed, 30) + dandelion_phase = DandelionPhase(dandelion_seed, DANDELION_EPOCH_LENGTH) while True: while not len(peer_set): diff --git a/src/gossip/client/dandelionstem.py b/src/gossip/client/dandelionstem.py index 244f1c50..861e3e2e 100644 --- a/src/gossip/client/dandelionstem.py +++ b/src/gossip/client/dandelionstem.py @@ -6,13 +6,13 @@ from typing import TYPE_CHECKING, Set if TYPE_CHECKING: from onionrblocks import Block from ..peer import Peer - from ..phase import DandelionPhase + from ..dandelion.phase import DandelionPhase def stem_out( block_queue: Queue['Block'], peer_set: Set['Block'], - d_phase: DandelionPhase): - block = block_queue.get(block=True, timeout=time_remaining_secs) + d_phase: 'DandelionPhase'): + block = block_queue.get(block=True, timeout=5) raw_block = block.raw block_size = len(block.raw) block_id = block.id diff --git a/src/gossip/constants.py b/src/gossip/constants.py index c19efba4..656a48b7 100644 --- a/src/gossip/constants.py +++ b/src/gossip/constants.py @@ -2,4 +2,7 @@ BOOTSTRAP_ATTEMPTS = 5 PEER_AMOUNT_TO_ASK = 3 TRANSPORT_SIZE_BYTES = 64 BLOCK_MAX_SIZE = 1024 * 2000 -BLOCK_ID_SIZE = 128 \ No newline at end of file +BLOCK_ID_SIZE = 128 +DANDELION_EPOCH_LENGTH = 60 +MAX_INBOUND_DANDELION_EDGE = 2 +MAX_STEM_BLOCKS_PER_STREAM = 1000 \ No newline at end of file diff --git a/src/gossip/dandelion/__init__.py b/src/gossip/dandelion/__init__.py new file mode 100644 index 00000000..0e64ad6e --- /dev/null +++ b/src/gossip/dandelion/__init__.py @@ -0,0 +1,7 @@ +from enum import Enum + +from .phase import DandelionPhase + +class StemAcceptResult: + DENY = int(0).to_bytes(1, 'big') + ALLOW = int(1).to_bytes(1, 'big') diff --git a/src/gossip/phase.py b/src/gossip/dandelion/phase.py similarity index 100% rename from src/gossip/phase.py rename to src/gossip/dandelion/phase.py diff --git a/src/gossip/graph.py b/src/gossip/graph.py deleted file mode 100644 index fcbdc8d0..00000000 --- a/src/gossip/graph.py +++ /dev/null @@ -1,2 +0,0 @@ -class DandelionGraph: - def \ No newline at end of file diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index 2e3e0681..98106836 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -1,4 +1,5 @@ import asyncio +import traceback from typing import TYPE_CHECKING from typing import Set, List @@ -8,6 +9,7 @@ from gossip import constants from ..connectpeer import connect_peer from onionrplugins import onionrevents +import logger if TYPE_CHECKING: from onionrblocks import Block @@ -32,6 +34,8 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . """ +inbound_dandelion_edge_count = [0] + def gossip_server( peer_set: Set['Peer'], @@ -40,6 +44,7 @@ def gossip_server( async def peer_connected( reader: 'StreamReader', writer: 'StreamWriter'): + while True: try: cmd = await asyncio.wait_for(reader.read(1), 60) @@ -73,7 +78,19 @@ def gossip_server( 'utf-8').removesuffix(b'.onion')) case GossipCommands.PUT_BLOCKS: # Create block queue & append stemmed blocks to it - await accept_stem_blocks(block_queues, reader, writer) + + try: + await accept_stem_blocks( + block_queues, + reader, writer, + inbound_dandelion_edge_count) + except Exception: + logger.warn( + f"Err getting\n{traceback.format_exc()}", + terminal=True) + # Subtract dandelion edge, make sure >=0 + inbound_dandelion_edge_count[0] = \ + max(inbound_dandelion_edge_count[0] - 1, 0) break await writer.drain() diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index d1024d4c..49f1c682 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -4,45 +4,67 @@ from queue import Queue from time import time from asyncio import wait_for +from onionrblocks import Block + +from ..dandelion import DandelionPhase, StemAcceptResult from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE +from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM block_size_digits = len(str(BLOCK_MAX_SIZE)) base_wait_timeout = 10 if TYPE_CHECKING: - from onionrblocks import Block + from asyncio import StreamWriter, StreamReader async def accept_stem_blocks( block_queues: List[Queue['Block']], reader: 'StreamReader', - writer: 'StreamWriter'): + writer: 'StreamWriter', + inbound_edge_count: List[int]): + + if inbound_edge_count[0] >= MAX_INBOUND_DANDELION_EDGE: + writer.write(StemAcceptResult.DENY) + return + writer.write(StemAcceptResult.ALLOW) + inbound_edge_count[0] += 1 # Start getting the first block read_routine = reader.read(BLOCK_ID_SIZE) stream_start_time = int(time()) - max_accept_blocks = 1000 q = Queue() - block_queues.append(q) - - for _ in range(max_accept_blocks): - block_id = await wait_for(read_routine, base_wait_timeout) - block_size = int( - await wait_for( - reader.read(block_size_digits), - base_wait_timeout)).decode('utf-8') + appended_queue = False + #block_queues.append(q) + for _ in range(MAX_STEM_BLOCKS_PER_STREAM): + block_id = ( + await wait_for(read_routine, base_wait_timeout)).decode('utf-8') + block_size = (await wait_for( + reader.read(block_size_digits), + base_wait_timeout)).decode('utf-8') if not all(c in "0123456789" for c in block_size): raise ValueError("Invalid block size data (non 0-9 char)") + block_size = int(block_size) if block_size > BLOCK_MAX_SIZE: raise ValueError("Max block size") - - - - + + raw_block: bytes = await wait_for( + reader.read(block_size), base_wait_timeout * 6) + + q.put( + Block(block_id, raw_block, auto_verify=True) + ) + # Regardless of stem phase, we add to queue + # Client will decide if they are to be stemmed + + if not appended_queue: + if len(block_queues) < MAX_INBOUND_DANDELION_EDGE: + block_queues.append(q) + appended_queue = True + read_routine = reader.read(BLOCK_ID_SIZE)