Turned dandelion stem into a module and corrected use of wait_for

This commit is contained in:
Kevin F 2022-03-20 12:52:58 -05:00
parent f740d475c4
commit e985966e7c

View File

@ -2,7 +2,6 @@ from queue import Queue
from time import sleep from time import sleep
from secrets import choice from secrets import choice
import traceback import traceback
from asyncio import wait_for
from typing import TYPE_CHECKING, Coroutine, Tuple, List from typing import TYPE_CHECKING, Coroutine, Tuple, List
@ -10,15 +9,17 @@ from onionrthreads import add_delayed_thread
from blockdb import add_block_to_db from blockdb import add_block_to_db
import logger import logger
from ..constants import BLACKHOLE_EVADE_TIMER_SECS, OUTBOUND_DANDELION_EDGES from ...constants import BLACKHOLE_EVADE_TIMER_SECS, OUTBOUND_DANDELION_EDGES
from ..commands import GossipCommands, command_to_byte from ...commands import GossipCommands, command_to_byte
from .. import dandelion from ... import dandelion
from .stemstream import do_stem_stream
if TYPE_CHECKING: if TYPE_CHECKING:
from ordered_set import OrderedSet from ordered_set import OrderedSet
from onionrblocks import Block from onionrblocks import Block
from ..peer import Peer from ...peer import Peer
from ..dandelion.phase import DandelionPhase from ...dandelion.phase import DandelionPhase
import socket import socket
@ -27,21 +28,18 @@ class StemConnectionDenied(ConnectionRefusedError): pass # noqa
async def _setup_edge( async def _setup_edge(
peer_set: OrderedSet['Peer'], exclude_set: OrderedSet['Peer']): peer_set: "OrderedSet[Peer]", exclude_set: "OrderedSet[Peer]"):
"""Negotiate stem connection with random peer, add to exclu set if fail""" """Negotiate stem connection with random peer, add to exclu set if fail"""
try: try:
peer: 'Peer' = choice(peer_set - exclude_set) peer: 'Peer' = choice(peer_set - exclude_set)
except IndexError: except IndexError:
raise NotEnoughEdges raise NotEnoughEdges
async def _get_sock():
return peer.get_socket()
# If peer is good or bad, exclude it no matter what # If peer is good or bad, exclude it no matter what
exclude_set.add(peer) exclude_set.add(peer)
try: try:
s = await wait_for(_get_sock(), 12) s = peer.get_socket(12)
except TimeoutError: except TimeoutError:
logger.debug(f"{peer.transport_address} timed out when trying stemout") logger.debug(f"{peer.transport_address} timed out when trying stemout")
except Exception: except Exception:
@ -50,6 +48,7 @@ async def _setup_edge(
try: try:
s.sendall(command_to_byte(GossipCommands.PUT_BLOCKS)) s.sendall(command_to_byte(GossipCommands.PUT_BLOCKS))
s.settimeout(10)
if s.recv(1) == dandelion.StemAcceptResult.DENY: if s.recv(1) == dandelion.StemAcceptResult.DENY:
raise StemConnectionDenied raise StemConnectionDenied
except StemConnectionDenied: except StemConnectionDenied:
@ -68,16 +67,9 @@ async def _setup_edge(
s.close() s.close()
async def _do_stem_stream(
peer_socket: 'socket.socket',
block_queue: Queue['Block'],
d_phase: 'DandelionPhase'):
return
async def stem_out( async def stem_out(
block_queues: Tuple[Queue['Block'], Queue['Block']], block_queues: Tuple["Queue[Block]", "Queue[Block]"],
peer_set: OrderedSet['Peer'], peer_set: "OrderedSet[Peer]",
d_phase: 'DandelionPhase'): d_phase: 'DandelionPhase'):
# don't bother if there are no possible outbound edges # don't bother if there are no possible outbound edges
@ -97,16 +89,16 @@ async def stem_out(
# Pick edges randomly # Pick edges randomly
# Using orderedset for the tried edges to ensure random pairing with queue # Using orderedset for the tried edges to ensure random pairing with queue
tried_edges: OrderedSet['Peer'] = OrderedSet() tried_edges: "OrderedSet[Peer]" = OrderedSet()
while len(peer_sockets) < OUTBOUND_DANDELION_EDGES: while len(peer_sockets) < OUTBOUND_DANDELION_EDGES:
try: try:
# Get a socket for stem out (makes sure they accept) # Get a socket for stem out (makes sure they accept)
peer_sockets.append( peer_sockets.append(await _setup_edge(peer_set, tried_edges))
await wait_for(_setup_edge(peer_set, tried_edges), 15))
except NotEnoughEdges: except NotEnoughEdges:
# No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE) # No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE)
logger.warn("Not able to build enough peers for stemout.", terminal=True) logger.warn("Not able to build enough peers for stemout.",
terminal=True)
break break
else: else:
# Ran out of time for stem phase # Ran out of time for stem phase
@ -122,20 +114,13 @@ async def stem_out(
for count, peer_socket in enumerate(peer_sockets): for count, peer_socket in enumerate(peer_sockets):
stream_routines.append( stream_routines.append(
_do_stem_stream(peer_socket, block_queues[count], d_phase)) do_stem_stream(peer_socket, block_queues[count], d_phase))
for routine in stream_routines: for routine in stream_routines:
try: try:
await wait_for(routine, d_phase.remaining_time() + 1) await routine
except TimeoutError:
# This is what should happen if everything went well
pass
except Exception: except Exception:
logger.warn(traceback.format_exc()) logger.warn(traceback.format_exc())
else: else:
# stream routine exited early # stream routine exited early
pass pass
# Clean up stem sockets
for sock in peer_sockets:
sock.close()