Implemented dandelion stemout stream building

This commit is contained in:
Kevin F 2022-03-14 10:04:28 -05:00
parent 9bf16c5758
commit 6c2b1e49a2
3 changed files with 49 additions and 15 deletions

View File

@ -1,16 +1,16 @@
from queue import Queue from queue import Queue
from threading import Thread, Timer
from time import sleep from time import sleep
from secrets import choice from secrets import choice
import traceback import traceback
from asyncio import wait_for
from typing import TYPE_CHECKING, Tuple, List, Set from typing import TYPE_CHECKING, Coroutine, Tuple, List
from onionrthreads import add_delayed_thread from onionrthreads import add_delayed_thread
from blockdb import add_block_to_db from blockdb import add_block_to_db
import logger import logger
from ..constants import BLACKHOLE_EVADE_TIMER_SECS, MAX_OUTBOUND_DANDELION_EDGE from ..constants import BLACKHOLE_EVADE_TIMER_SECS, OUTBOUND_DANDELION_EDGES
from ..commands import GossipCommands, command_to_byte from ..commands import GossipCommands, command_to_byte
from .. import dandelion from .. import dandelion
@ -33,11 +33,20 @@ async def _setup_edge(
peer: 'Peer' = choice(peer_set - exclude_set) peer: 'Peer' = choice(peer_set - exclude_set)
except IndexError: except IndexError:
raise NotEnoughEdges raise NotEnoughEdges
async def _get_sock():
return peer.get_socket()
# If peer is good or bad, exclude it no matter what
exclude_set.add(peer)
try: try:
s = peer.get_socket() s = await wait_for(_get_sock(), 12)
except TimeoutError:
logger.debug(f"{peer.transport_address} timed out when trying stemout")
except Exception: except Exception:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
exclude_set.add(peer) return
try: try:
s.sendall(command_to_byte(GossipCommands.PUT_BLOCKS)) s.sendall(command_to_byte(GossipCommands.PUT_BLOCKS))
@ -54,9 +63,7 @@ async def _setup_edge(
else: else:
# Return peer socket if it is in stem reception mode successfully # Return peer socket if it is in stem reception mode successfully
return s return s
finally:
# If peer is good or bad, exclude it no matter what
exclude_set.add(peer)
# If they won't accept stem blocks, close the socket # If they won't accept stem blocks, close the socket
s.close() s.close()
@ -73,7 +80,6 @@ async def stem_out(
peer_set: OrderedSet['Peer'], peer_set: OrderedSet['Peer'],
d_phase: 'DandelionPhase'): d_phase: 'DandelionPhase'):
# don't bother if there are no possible outbound edges # don't bother if there are no possible outbound edges
if not len(peer_set): if not len(peer_set):
sleep(1) sleep(1)
@ -87,22 +93,49 @@ async def stem_out(
BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue)) BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue))
peer_sockets: List['socket.socket'] = [] peer_sockets: List['socket.socket'] = []
stream_routines: List[Coroutine] = []
# Pick edges randomly # Pick edges randomly
# Using orderedset for the tried edges to ensure random pairing with queue # Using orderedset for the tried edges to ensure random pairing with queue
tried_edges: OrderedSet['Peer'] = OrderedSet() tried_edges: OrderedSet['Peer'] = OrderedSet()
while len(peer_sockets) < MAX_OUTBOUND_DANDELION_EDGE: while len(peer_sockets) < OUTBOUND_DANDELION_EDGES:
try: try:
peer_sockets.append(_setup_edge(peer_set, tried_edges)) # Get a socket for stem out (makes sure they accept)
peer_sockets.append(
await wait_for(_setup_edge(peer_set, tried_edges), 15))
except NotEnoughEdges: except NotEnoughEdges:
logger.debug("Not able to build enough peers for stemout") # No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE)
logger.warn("Not able to build enough peers for stemout.", terminal=True)
break break
finally: else:
# Ran out of time for stem phase
if not d_phase.is_stem_phase() or d_phase.remaining_time() < 5: if not d_phase.is_stem_phase() or d_phase.remaining_time() < 5:
logger.error( logger.error(
"Did not stem out any blocks in time, " + "Did not stem out any blocks in time, " +
"if this happens regularly you may be under attack", "if this happens regularly you may be under attack",
terminal=True) terminal=True)
list(map(lambda p: p.close(), peer_sockets))
peer_sockets.clear()
break
# If above loop ran out of time or NotEnoughEdges, loops below will not execute
for count, peer_socket in enumerate(peer_sockets):
stream_routines.append(
_do_stem_stream(peer_socket, block_queues[count], d_phase))
for routine in stream_routines:
try:
await wait_for(routine, d_phase.remaining_time() + 1)
except TimeoutError:
# This is what should happen if everything went well
pass
except Exception:
logger.warn(traceback.format_exc())
else:
# stream routine exited early
pass
# Clean up stem sockets
for sock in peer_sockets:
sock.close()

View File

@ -3,13 +3,13 @@ PEER_AMOUNT_TO_ASK = 3
TRANSPORT_SIZE_BYTES = 64 TRANSPORT_SIZE_BYTES = 64
BLOCK_MAX_SIZE = 1024 * 2000 BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_ID_SIZE = 128 BLOCK_ID_SIZE = 128
DANDELION_EPOCH_LENGTH = 60 DANDELION_EPOCH_LENGTH = 120
# Magic number i made up, not really specified in dandelion++ paper # Magic number i made up, not really specified in dandelion++ paper
MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris
# Dandelion subgraph is aprox 4-regular # Dandelion subgraph is aprox 4-regular
MAX_OUTBOUND_DANDELION_EDGE = 2 OUTBOUND_DANDELION_EDGES = 2
MAX_STEM_BLOCKS_PER_STREAM = 1000 MAX_STEM_BLOCKS_PER_STREAM = 1000
BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3 BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3

View File

@ -13,6 +13,7 @@ class TorPeer:
def get_socket(self) -> socks.socksocket: def get_socket(self) -> socks.socksocket:
s = socks.socksocket() s = socks.socksocket()
s.set_proxy(socks.SOCKS4, self.socks_host, self.socks_port, rdns=True) s.set_proxy(socks.SOCKS4, self.socks_host, self.socks_port, rdns=True)
s.settimeout(60)
s.connect((self.onion_address, 80)) s.connect((self.onion_address, 80))
return s return s