2022-03-02 13:29:59 +00:00
|
|
|
from queue import Queue
|
2022-03-11 17:15:18 +00:00
|
|
|
from time import sleep
|
2022-03-13 01:28:18 +00:00
|
|
|
from secrets import choice
|
|
|
|
import traceback
|
2022-03-02 13:29:59 +00:00
|
|
|
|
2022-03-14 15:04:28 +00:00
|
|
|
from typing import TYPE_CHECKING, Coroutine, Tuple, List
|
2022-03-13 01:28:18 +00:00
|
|
|
|
|
|
|
from onionrthreads import add_delayed_thread
|
|
|
|
from blockdb import add_block_to_db
|
|
|
|
import logger
|
|
|
|
|
2022-03-20 17:52:58 +00:00
|
|
|
from ...constants import BLACKHOLE_EVADE_TIMER_SECS, OUTBOUND_DANDELION_EDGES
|
|
|
|
from ...commands import GossipCommands, command_to_byte
|
|
|
|
from ... import dandelion
|
|
|
|
|
|
|
|
from .stemstream import do_stem_stream
|
2022-03-02 13:29:59 +00:00
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
2022-03-11 17:15:18 +00:00
|
|
|
from ordered_set import OrderedSet
|
2022-03-02 13:29:59 +00:00
|
|
|
from onionrblocks import Block
|
2022-03-20 17:52:58 +00:00
|
|
|
from ...peer import Peer
|
|
|
|
from ...dandelion.phase import DandelionPhase
|
2022-03-13 06:35:22 +00:00
|
|
|
import socket
|
2022-03-02 13:29:59 +00:00
|
|
|
|
2022-03-13 01:28:18 +00:00
|
|
|
|
|
|
|
class NotEnoughEdges(ValueError): pass # noqa
|
2022-03-13 06:35:22 +00:00
|
|
|
class StemConnectionDenied(ConnectionRefusedError): pass # noqa
|
2022-03-13 01:28:18 +00:00
|
|
|
|
|
|
|
|
2022-03-13 06:35:22 +00:00
|
|
|
async def _setup_edge(
|
2022-03-20 17:52:58 +00:00
|
|
|
peer_set: "OrderedSet[Peer]", exclude_set: "OrderedSet[Peer]"):
|
2022-03-13 06:35:22 +00:00
|
|
|
"""Negotiate stem connection with random peer, add to exclu set if fail"""
|
2022-03-13 01:28:18 +00:00
|
|
|
try:
|
|
|
|
peer: 'Peer' = choice(peer_set - exclude_set)
|
|
|
|
except IndexError:
|
|
|
|
raise NotEnoughEdges
|
2022-03-14 15:04:28 +00:00
|
|
|
|
|
|
|
# If peer is good or bad, exclude it no matter what
|
|
|
|
exclude_set.add(peer)
|
|
|
|
|
2022-03-13 01:28:18 +00:00
|
|
|
try:
|
2022-03-20 17:52:58 +00:00
|
|
|
s = peer.get_socket(12)
|
2022-03-14 15:04:28 +00:00
|
|
|
except TimeoutError:
|
|
|
|
logger.debug(f"{peer.transport_address} timed out when trying stemout")
|
2022-03-13 01:28:18 +00:00
|
|
|
except Exception:
|
|
|
|
logger.debug(traceback.format_exc())
|
2022-03-14 15:04:28 +00:00
|
|
|
return
|
2022-03-13 01:28:18 +00:00
|
|
|
|
2022-03-13 06:35:22 +00:00
|
|
|
try:
|
|
|
|
s.sendall(command_to_byte(GossipCommands.PUT_BLOCKS))
|
2022-03-20 17:52:58 +00:00
|
|
|
s.settimeout(10)
|
2022-03-13 06:35:22 +00:00
|
|
|
if s.recv(1) == dandelion.StemAcceptResult.DENY:
|
|
|
|
raise StemConnectionDenied
|
|
|
|
except StemConnectionDenied:
|
|
|
|
logger.debug(
|
|
|
|
"Stem connection denied (peer has too many) " +
|
|
|
|
f"{peer.transport_address}")
|
|
|
|
except Exception:
|
|
|
|
logger.warn(
|
|
|
|
"Error asking peer to establish stem connection" +
|
|
|
|
traceback.format_exc(), terminal=True)
|
|
|
|
else:
|
|
|
|
# Return peer socket if it is in stem reception mode successfully
|
|
|
|
return s
|
2022-03-14 15:04:28 +00:00
|
|
|
|
2022-03-13 06:35:22 +00:00
|
|
|
# If they won't accept stem blocks, close the socket
|
|
|
|
s.close()
|
|
|
|
|
|
|
|
|
|
|
|
async def stem_out(
|
2022-03-20 17:52:58 +00:00
|
|
|
block_queues: Tuple["Queue[Block]", "Queue[Block]"],
|
|
|
|
peer_set: "OrderedSet[Peer]",
|
2022-03-05 00:05:12 +00:00
|
|
|
d_phase: 'DandelionPhase'):
|
2022-03-11 17:15:18 +00:00
|
|
|
|
2022-03-13 06:35:22 +00:00
|
|
|
# don't bother if there are no possible outbound edges
|
|
|
|
if not len(peer_set):
|
|
|
|
sleep(1)
|
|
|
|
return
|
|
|
|
|
2022-03-13 01:28:18 +00:00
|
|
|
# Spawn threads with deep copied block queue to add to db after time
|
|
|
|
# for black hole attack
|
|
|
|
for block_q in block_queues:
|
|
|
|
add_delayed_thread(
|
|
|
|
lambda q: set(map(add_block_to_db, q)),
|
|
|
|
BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue))
|
2022-03-11 17:15:18 +00:00
|
|
|
|
2022-03-13 06:35:22 +00:00
|
|
|
peer_sockets: List['socket.socket'] = []
|
2022-03-14 15:04:28 +00:00
|
|
|
stream_routines: List[Coroutine] = []
|
2022-03-11 17:15:18 +00:00
|
|
|
|
2022-03-13 01:28:18 +00:00
|
|
|
# Pick edges randomly
|
|
|
|
# Using orderedset for the tried edges to ensure random pairing with queue
|
2022-03-20 17:52:58 +00:00
|
|
|
tried_edges: "OrderedSet[Peer]" = OrderedSet()
|
2022-03-13 01:28:18 +00:00
|
|
|
|
2022-03-14 15:04:28 +00:00
|
|
|
while len(peer_sockets) < OUTBOUND_DANDELION_EDGES:
|
2022-03-13 06:35:22 +00:00
|
|
|
try:
|
2022-03-14 15:04:28 +00:00
|
|
|
# Get a socket for stem out (makes sure they accept)
|
2022-03-20 17:52:58 +00:00
|
|
|
peer_sockets.append(await _setup_edge(peer_set, tried_edges))
|
2022-03-13 06:35:22 +00:00
|
|
|
except NotEnoughEdges:
|
2022-03-14 15:04:28 +00:00
|
|
|
# No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE)
|
2022-03-20 17:52:58 +00:00
|
|
|
logger.warn("Not able to build enough peers for stemout.",
|
|
|
|
terminal=True)
|
2022-03-13 06:35:22 +00:00
|
|
|
break
|
2022-03-14 15:04:28 +00:00
|
|
|
else:
|
|
|
|
# Ran out of time for stem phase
|
2022-03-13 06:35:22 +00:00
|
|
|
if not d_phase.is_stem_phase() or d_phase.remaining_time() < 5:
|
|
|
|
logger.error(
|
|
|
|
"Did not stem out any blocks in time, " +
|
|
|
|
"if this happens regularly you may be under attack",
|
|
|
|
terminal=True)
|
2022-03-14 15:04:28 +00:00
|
|
|
list(map(lambda p: p.close(), peer_sockets))
|
|
|
|
peer_sockets.clear()
|
|
|
|
break
|
|
|
|
# If above loop ran out of time or NotEnoughEdges, loops below will not execute
|
2022-03-13 01:28:18 +00:00
|
|
|
|
2022-03-14 15:04:28 +00:00
|
|
|
for count, peer_socket in enumerate(peer_sockets):
|
|
|
|
stream_routines.append(
|
2022-03-20 17:52:58 +00:00
|
|
|
do_stem_stream(peer_socket, block_queues[count], d_phase))
|
2022-03-02 13:29:59 +00:00
|
|
|
|
2022-03-14 15:04:28 +00:00
|
|
|
for routine in stream_routines:
|
|
|
|
try:
|
2022-03-20 17:52:58 +00:00
|
|
|
await routine
|
2022-03-14 15:04:28 +00:00
|
|
|
except Exception:
|
|
|
|
logger.warn(traceback.format_exc())
|
|
|
|
else:
|
|
|
|
# stream routine exited early
|
|
|
|
pass
|