From 4c546773871db12fc43fe54905afcc6932ebecea Mon Sep 17 00:00:00 2001 From: Kevin F Date: Fri, 11 Mar 2022 11:15:18 -0600 Subject: [PATCH] Use tuple of queues for block queues instead of list for clarity that queues cannot be added --- src/gossip/__init__.py | 6 ++++-- src/gossip/client/__init__.py | 7 +++--- src/gossip/client/dandelionstem.py | 24 ++++++++++----------- src/gossip/client/storeblocks.py | 4 ++-- src/gossip/server/__init__.py | 7 +++--- src/gossip/server/acceptstem.py | 17 +++++---------- src/onionrcommands/daemonlaunch/__init__.py | 11 +++++----- 7 files changed, 36 insertions(+), 40 deletions(-) diff --git a/src/gossip/__init__.py b/src/gossip/__init__.py index 4be451ac..38207bf6 100644 --- a/src/gossip/__init__.py +++ b/src/gossip/__init__.py @@ -1,10 +1,11 @@ import threading from time import sleep -from typing import TYPE_CHECKING, Set, List +from typing import TYPE_CHECKING, Set, Tuple from os import urandom import queue if TYPE_CHECKING: + from ordered_set import OrderedSet from onionrblocks import Block from .peer import Peer @@ -40,7 +41,8 @@ In stem phase, server disables diffusion def start_gossip_threads( - peer_set: Set['Peer'], block_queues: List[queue.Queue['Block']]): + peer_set: OrderedSet['Peer'], + block_queues: Tuple[queue.Queue['Block']]): # Peer set is largely handled by the transport plugins # There is a unified set so gossip logic is not repeated seed = urandom(32) diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index c2b6b96a..58f59473 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -4,7 +4,7 @@ Dandelion ++ Gossip client logic """ import traceback from typing import TYPE_CHECKING -from typing import Set, List +from typing import Set, Tuple from time import sleep from queue import Queue @@ -19,6 +19,7 @@ from ..connectpeer import connect_peer if TYPE_CHECKING: from ..peer import Peer + from ordered_set import OrderedSet import logger import onionrplugins @@ -49,8 +50,8 @@ along with this program. If not, see . def gossip_client( - peer_set: Set['Peer'], - block_queues: List[Queue['Block']], + peer_set: OrderedSet['Peer'], + block_queues: Tuple[Queue['Block']], dandelion_seed: bytes): """ Gossip client does the following: diff --git a/src/gossip/client/dandelionstem.py b/src/gossip/client/dandelionstem.py index 7df89ebe..7100e72a 100644 --- a/src/gossip/client/dandelionstem.py +++ b/src/gossip/client/dandelionstem.py @@ -1,23 +1,23 @@ from queue import Queue +from time import sleep - -from typing import TYPE_CHECKING, Set +from typing import TYPE_CHECKING, Tuple if TYPE_CHECKING: + from ordered_set import OrderedSet from onionrblocks import Block from ..peer import Peer from ..dandelion.phase import DandelionPhase def stem_out( - block_queue: Queue['Block'], - peer_set: Set['Block'], + block_queues: Tuple[Queue['Block']], + peer_set: OrderedSet['Peer'], d_phase: 'DandelionPhase'): - # Deep copy the block queues so that everything gets - # stemmed out if we run out of time in epoch - # Also spawn a thread with block set to add to db after time for black hole attack - block = block_queue.get(block=True, timeout=d_phase.remaining_time) - raw_block = block.raw - block_size = len(block.raw) - block_id = block.id - del block + + # Spawn a thread with block set to add to db after time for black hole attack + + if not len(peer_set): + sleep(1) + return + diff --git a/src/gossip/client/storeblocks.py b/src/gossip/client/storeblocks.py index ac4265ca..a6511117 100644 --- a/src/gossip/client/storeblocks.py +++ b/src/gossip/client/storeblocks.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING, Tuple from threading import Thread from queue import Queue @@ -10,7 +10,7 @@ if TYPE_CHECKING: def store_blocks( - block_queues: List[Queue['Block']], + block_queues: Tuple[Queue['Block']], dandelion_phase: 'DandelionPhase'): new_queue: Queue['Block'] = Queue() diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index 98106836..092ec34b 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -1,7 +1,7 @@ import asyncio import traceback from typing import TYPE_CHECKING -from typing import Set, List +from typing import Set, Tuple from queue import Queue @@ -14,6 +14,7 @@ import logger if TYPE_CHECKING: from onionrblocks import Block from peer import Peer + from ordered_set import OrderedSet from asyncio import StreamReader, StreamWriter from filepaths import gossip_server_socket_file @@ -38,8 +39,8 @@ inbound_dandelion_edge_count = [0] def gossip_server( - peer_set: Set['Peer'], - block_queues: List[Queue['Block']], + peer_set: OrderedSet['Peer'], + block_queues: Tuple[Queue['Block']], dandelion_seed: bytes): async def peer_connected( diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index 0cd2e7bb..09b80e02 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from typing import List -from queue import Queue +from typing import List, Tuple +import secrets from time import time from asyncio import wait_for @@ -15,12 +15,12 @@ block_size_digits = len(str(BLOCK_MAX_SIZE)) base_wait_timeout = 10 if TYPE_CHECKING: - + from queue import Queue from asyncio import StreamWriter, StreamReader async def accept_stem_blocks( - block_queues: List[Queue['Block']], + block_queues: Tuple[Queue['Block']], reader: 'StreamReader', writer: 'StreamWriter', inbound_edge_count: List[int]): @@ -35,9 +35,6 @@ async def accept_stem_blocks( read_routine = reader.read(BLOCK_ID_SIZE) stream_start_time = int(time()) - q = Queue() - appended_queue = False - for _ in range(MAX_STEM_BLOCKS_PER_STREAM): block_id = ( await wait_for(read_routine, base_wait_timeout)).decode('utf-8') @@ -54,15 +51,11 @@ async def accept_stem_blocks( raw_block: bytes = await wait_for( reader.read(block_size), base_wait_timeout * 6) - q.put( + secrets.choice(block_queues).put( Block(block_id, raw_block, auto_verify=True) ) # Regardless of stem phase, we add to queue # Client will decide if they are to be stemmed - if not appended_queue: - if len(block_queues) < MAX_INBOUND_DANDELION_EDGE: - block_queues.append(q) - appended_queue = True read_routine = reader.read(BLOCK_ID_SIZE) diff --git a/src/onionrcommands/daemonlaunch/__init__.py b/src/onionrcommands/daemonlaunch/__init__.py index 4b617379..fc8857f8 100755 --- a/src/onionrcommands/daemonlaunch/__init__.py +++ b/src/onionrcommands/daemonlaunch/__init__.py @@ -7,14 +7,13 @@ import queue import sys import platform import signal -from threading import Thread from stem.connection import IncorrectPassword -import stem import toomanyobjs import filenuke from deadsimplekv import DeadSimpleKV import psutil +from ordered_set import OrderedSet import config @@ -31,7 +30,6 @@ from onionrcrypto import getourkeypair import runtests from httpapi import daemoneventsapi from .. import version -from utils.bettersleep import better_sleep from .killdaemon import kill_daemon # noqa from .showlogo import show_logo import gossip @@ -119,12 +117,13 @@ def daemon(): events.event('init', threaded=False) events.event('daemon_start') - shared_state.get(apiservers.ClientAPI).gossip_peer_set = set() - shared_state.get(apiservers.ClientAPI).gossip_block_queue = queue.Queue() + shared_state.get(apiservers.ClientAPI).gossip_peer_set = OrderedSet() + shared_state.get(apiservers.ClientAPI).gossip_block_queues = \ + (queue.Queue(), queue.Queue()) gossip.start_gossip_threads( shared_state.get(apiservers.ClientAPI).gossip_peer_set, - shared_state.get(apiservers.ClientAPI).gossip_block_queue) + shared_state.get(apiservers.ClientAPI).gossip_block_queues) try: shared_state.get(apiservers.ClientAPI).start()