diff --git a/src/blockdb/__init__.py b/src/blockdb/__init__.py index 369a7f76..92fca8c0 100644 --- a/src/blockdb/__init__.py +++ b/src/blockdb/__init__.py @@ -6,7 +6,7 @@ from utils import identifyhome block_db_path = identifyhome.identify_home() + 'blockdata' -def store_vdf_block(block: Block): +def add_block_to_db(block: Block): db.set(block_db_path, block.id, block.raw) diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index 58f59473..fe887daa 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -26,7 +26,7 @@ import onionrplugins from ..commands import GossipCommands from gossip.dandelion.phase import DandelionPhase from onionrthreads import add_onionr_thread -from blockdb import store_vdf_block +from blockdb import add_block_to_db from .announce import do_announce @@ -51,7 +51,7 @@ along with this program. If not, see . def gossip_client( peer_set: OrderedSet['Peer'], - block_queues: Tuple[Queue['Block']], + block_queues: Tuple[Queue['Block'], Queue['Block']], dandelion_seed: bytes): """ Gossip client does the following: diff --git a/src/gossip/client/dandelionstem.py b/src/gossip/client/dandelionstem.py index 7100e72a..e03c9bab 100644 --- a/src/gossip/client/dandelionstem.py +++ b/src/gossip/client/dandelionstem.py @@ -1,7 +1,16 @@ from queue import Queue +from threading import Timer from time import sleep +from secrets import choice +import traceback -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Tuple, List, Set + +from onionrthreads import add_delayed_thread +from blockdb import add_block_to_db +import logger + +from ..constants import BLACKHOLE_EVADE_TIMER_SECS, MAX_OUTBOUND_DANDELION_EDGE if TYPE_CHECKING: from ordered_set import OrderedSet @@ -9,15 +18,45 @@ if TYPE_CHECKING: from ..peer import Peer from ..dandelion.phase import DandelionPhase + +class NotEnoughEdges(ValueError): pass # noqa + + +def _setup_edge( + peer_set: OrderedSet['Peer'], exclude_set: OrderedSet['Peer']): + """Negotiate stem connection with random peer, add to exclude set if fail""" + try: + peer: 'Peer' = choice(peer_set - exclude_set) + except IndexError: + raise NotEnoughEdges + try: + s = peer.get_socket() + except Exception: + logger.debug(traceback.format_exc()) + exclude_set.add(peer) + + + def stem_out( - block_queues: Tuple[Queue['Block']], + block_queues: Tuple[Queue['Block'], Queue['Block']], peer_set: OrderedSet['Peer'], d_phase: 'DandelionPhase'): - # Spawn a thread with block set to add to db after time for black hole attack + # Spawn threads with deep copied block queue to add to db after time + # for black hole attack + for block_q in block_queues: + add_delayed_thread( + lambda q: set(map(add_block_to_db, q)), + BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue)) + # don't bother if there are no possible outbound edges if not len(peer_set): sleep(1) return + # Pick edges randomly + # Using orderedset for the tried edges to ensure random pairing with queue + tried_edges: OrderedSet['Peer'] = OrderedSet() + + diff --git a/src/gossip/client/storeblocks.py b/src/gossip/client/storeblocks.py index a6511117..44def179 100644 --- a/src/gossip/client/storeblocks.py +++ b/src/gossip/client/storeblocks.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: def store_blocks( - block_queues: Tuple[Queue['Block']], + block_queues: Tuple[Queue['Block'], Queue['Block']], dandelion_phase: 'DandelionPhase'): new_queue: Queue['Block'] = Queue() @@ -31,7 +31,7 @@ def store_blocks( while not dandelion_phase.is_stem_phase() \ and dandelion_phase.remaining_time() > 1: try: - blockdb.store_vdf_block( + blockdb.add_block_to_db( new_queue.get(timeout=dandelion_phase.remaining_time()) ) except TimeoutError: diff --git a/src/gossip/constants.py b/src/gossip/constants.py index 656a48b7..3ce02139 100644 --- a/src/gossip/constants.py +++ b/src/gossip/constants.py @@ -4,5 +4,12 @@ TRANSPORT_SIZE_BYTES = 64 BLOCK_MAX_SIZE = 1024 * 2000 BLOCK_ID_SIZE = 128 DANDELION_EPOCH_LENGTH = 60 -MAX_INBOUND_DANDELION_EDGE = 2 -MAX_STEM_BLOCKS_PER_STREAM = 1000 \ No newline at end of file + +# Magic number i made up, not really specified in dandelion++ paper +MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowlorisstor browser dvm 16 + +# Dandelion subgraph is aprox 4-regular +MAX_OUTBOUND_DANDELION_EDGE = 2 + +MAX_STEM_BLOCKS_PER_STREAM = 1000 +BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3 \ No newline at end of file diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index 092ec34b..f0881010 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -40,7 +40,7 @@ inbound_dandelion_edge_count = [0] def gossip_server( peer_set: OrderedSet['Peer'], - block_queues: Tuple[Queue['Block']], + block_queues: Tuple[Queue['Block'], Queue['Block']], dandelion_seed: bytes): async def peer_connected( diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index 09b80e02..fd528bee 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -20,7 +20,7 @@ if TYPE_CHECKING: async def accept_stem_blocks( - block_queues: Tuple[Queue['Block']], + block_queues: Tuple[Queue['Block'], Queue['Block']], reader: 'StreamReader', writer: 'StreamWriter', inbound_edge_count: List[int]): @@ -35,6 +35,8 @@ async def accept_stem_blocks( read_routine = reader.read(BLOCK_ID_SIZE) stream_start_time = int(time()) + block_queue_to_use = secrets.choice(block_queues) + for _ in range(MAX_STEM_BLOCKS_PER_STREAM): block_id = ( await wait_for(read_routine, base_wait_timeout)).decode('utf-8') @@ -51,7 +53,7 @@ async def accept_stem_blocks( raw_block: bytes = await wait_for( reader.read(block_size), base_wait_timeout * 6) - secrets.choice(block_queues).put( + block_queue_to_use.put( Block(block_id, raw_block, auto_verify=True) ) # Regardless of stem phase, we add to queue diff --git a/src/onionrthreads/__init__.py b/src/onionrthreads/__init__.py index c8cddee1..3365a136 100644 --- a/src/onionrthreads/__init__.py +++ b/src/onionrthreads/__init__.py @@ -4,7 +4,6 @@ from typing import Iterable import traceback from threading import Thread from uuid import uuid4 - from time import sleep import logger @@ -42,3 +41,10 @@ def add_onionr_thread( *args), kwargs=kwargs, daemon=True).start() + + +def add_delayed_thread(func: Callable, sleep_secs: int, *args, **kwargs): + assert sleep_secs > 0 + t = Thread(target=func, args=args, kwargs=kwargs, daemon=True) + sleep(sleep_secs) + t.start()