From 6c2b1e49a22f8c00a19b8e90632db72577defb21 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Mon, 14 Mar 2022 10:04:28 -0500 Subject: [PATCH] Implemented dandelion stemout stream building --- src/gossip/client/dandelionstem.py | 59 +++++++++++++++++----- src/gossip/constants.py | 4 +- static-data/default-plugins/tor/torpeer.py | 1 + 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/src/gossip/client/dandelionstem.py b/src/gossip/client/dandelionstem.py index 7012f14e..6a97d1f0 100644 --- a/src/gossip/client/dandelionstem.py +++ b/src/gossip/client/dandelionstem.py @@ -1,16 +1,16 @@ from queue import Queue -from threading import Thread, Timer from time import sleep from secrets import choice import traceback +from asyncio import wait_for -from typing import TYPE_CHECKING, Tuple, List, Set +from typing import TYPE_CHECKING, Coroutine, Tuple, List from onionrthreads import add_delayed_thread from blockdb import add_block_to_db import logger -from ..constants import BLACKHOLE_EVADE_TIMER_SECS, MAX_OUTBOUND_DANDELION_EDGE +from ..constants import BLACKHOLE_EVADE_TIMER_SECS, OUTBOUND_DANDELION_EDGES from ..commands import GossipCommands, command_to_byte from .. import dandelion @@ -33,11 +33,20 @@ async def _setup_edge( peer: 'Peer' = choice(peer_set - exclude_set) except IndexError: raise NotEnoughEdges + + async def _get_sock(): + return peer.get_socket() + + # If peer is good or bad, exclude it no matter what + exclude_set.add(peer) + try: - s = peer.get_socket() + s = await wait_for(_get_sock(), 12) + except TimeoutError: + logger.debug(f"{peer.transport_address} timed out when trying stemout") except Exception: logger.debug(traceback.format_exc()) - exclude_set.add(peer) + return try: s.sendall(command_to_byte(GossipCommands.PUT_BLOCKS)) @@ -54,9 +63,7 @@ async def _setup_edge( else: # Return peer socket if it is in stem reception mode successfully return s - finally: - # If peer is good or bad, exclude it no matter what - exclude_set.add(peer) + # If they won't accept stem blocks, close the socket s.close() @@ -73,7 +80,6 @@ async def stem_out( peer_set: OrderedSet['Peer'], d_phase: 'DandelionPhase'): - # don't bother if there are no possible outbound edges if not len(peer_set): sleep(1) @@ -87,22 +93,49 @@ async def stem_out( BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue)) peer_sockets: List['socket.socket'] = [] + stream_routines: List[Coroutine] = [] # Pick edges randomly # Using orderedset for the tried edges to ensure random pairing with queue tried_edges: OrderedSet['Peer'] = OrderedSet() - while len(peer_sockets) < MAX_OUTBOUND_DANDELION_EDGE: + while len(peer_sockets) < OUTBOUND_DANDELION_EDGES: try: - peer_sockets.append(_setup_edge(peer_set, tried_edges)) + # Get a socket for stem out (makes sure they accept) + peer_sockets.append( + await wait_for(_setup_edge(peer_set, tried_edges), 15)) except NotEnoughEdges: - logger.debug("Not able to build enough peers for stemout") + # No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE) + logger.warn("Not able to build enough peers for stemout.", terminal=True) break - finally: + else: + # Ran out of time for stem phase if not d_phase.is_stem_phase() or d_phase.remaining_time() < 5: logger.error( "Did not stem out any blocks in time, " + "if this happens regularly you may be under attack", terminal=True) + list(map(lambda p: p.close(), peer_sockets)) + peer_sockets.clear() + break + # If above loop ran out of time or NotEnoughEdges, loops below will not execute + for count, peer_socket in enumerate(peer_sockets): + stream_routines.append( + _do_stem_stream(peer_socket, block_queues[count], d_phase)) + for routine in stream_routines: + try: + await wait_for(routine, d_phase.remaining_time() + 1) + except TimeoutError: + # This is what should happen if everything went well + pass + except Exception: + logger.warn(traceback.format_exc()) + else: + # stream routine exited early + pass + + # Clean up stem sockets + for sock in peer_sockets: + sock.close() diff --git a/src/gossip/constants.py b/src/gossip/constants.py index dbe32cda..2716156a 100644 --- a/src/gossip/constants.py +++ b/src/gossip/constants.py @@ -3,13 +3,13 @@ PEER_AMOUNT_TO_ASK = 3 TRANSPORT_SIZE_BYTES = 64 BLOCK_MAX_SIZE = 1024 * 2000 BLOCK_ID_SIZE = 128 -DANDELION_EPOCH_LENGTH = 60 +DANDELION_EPOCH_LENGTH = 120 # Magic number i made up, not really specified in dandelion++ paper MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris # Dandelion subgraph is aprox 4-regular -MAX_OUTBOUND_DANDELION_EDGE = 2 +OUTBOUND_DANDELION_EDGES = 2 MAX_STEM_BLOCKS_PER_STREAM = 1000 BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3 \ No newline at end of file diff --git a/static-data/default-plugins/tor/torpeer.py b/static-data/default-plugins/tor/torpeer.py index 6a355fea..8c1fd6ed 100644 --- a/static-data/default-plugins/tor/torpeer.py +++ b/static-data/default-plugins/tor/torpeer.py @@ -13,6 +13,7 @@ class TorPeer: def get_socket(self) -> socks.socksocket: s = socks.socksocket() s.set_proxy(socks.SOCKS4, self.socks_host, self.socks_port, rdns=True) + s.settimeout(60) s.connect((self.onion_address, 80)) return s