From 17b268d9e473ac495a2b48ee427610147a64c990 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Wed, 2 Mar 2022 07:29:59 -0600 Subject: [PATCH] lots of work on gossip --- src/__init__.py | 7 ++- src/gossip/__init__.py | 13 ++-- src/gossip/client/__init__.py | 25 ++++---- src/gossip/client/dandelionstem.py | 20 ++++++ src/gossip/client/peerexchange.py | 64 ++++++++++++++++++++ src/gossip/constants.py | 5 ++ src/gossip/graph.py | 2 + src/gossip/phase.py | 5 ++ src/gossip/{server.py => server/__init__.py} | 26 ++++++-- src/gossip/server/acceptstem.py | 48 +++++++++++++++ static-data/default-plugins/tor/main.py | 2 +- static-data/default-plugins/tor/torpeer.py | 2 + 12 files changed, 191 insertions(+), 28 deletions(-) create mode 100644 src/gossip/client/dandelionstem.py create mode 100644 src/gossip/constants.py create mode 100644 src/gossip/graph.py rename src/gossip/{server.py => server/__init__.py} (70%) create mode 100644 src/gossip/server/acceptstem.py diff --git a/src/__init__.py b/src/__init__.py index bf5baf9e..f9aa7843 100755 --- a/src/__init__.py +++ b/src/__init__.py @@ -8,9 +8,10 @@ Run with 'help' for usage. # Enable pyjion if we can because why not pyjion_enabled = False try: - import pyjion - pyjion.enable() - pyjion_enabled = True + pass + #import pyjion + #pyjion.enable() + #pyjion_enabled = True except ImportError: pass diff --git a/src/gossip/__init__.py b/src/gossip/__init__.py index adfed514..4be451ac 100644 --- a/src/gossip/__init__.py +++ b/src/gossip/__init__.py @@ -1,6 +1,6 @@ import threading from time import sleep -from typing import TYPE_CHECKING, Set +from typing import TYPE_CHECKING, Set, List from os import urandom import queue @@ -17,6 +17,7 @@ from .connectpeer import connect_peer from .client import gossip_client from .server import gossip_server from .commands import GossipCommands +from .constants import BOOTSTRAP_ATTEMPTS """ Onionr uses a flavor of Dandelion++ epidemic routing @@ -39,19 +40,19 @@ In stem phase, server disables diffusion def start_gossip_threads( - peer_set: Set['Peer'], block_queue: queue.Queue['Block']): + peer_set: Set['Peer'], block_queues: List[queue.Queue['Block']]): # Peer set is largely handled by the transport plugins # There is a unified set so gossip logic is not repeated seed = urandom(32) add_onionr_thread( - gossip_server, 1, peer_set, block_queue, seed, initial_sleep=0.2) + gossip_server, 1, peer_set, block_queues, seed, initial_sleep=0.2) threading.Thread( target=gossip_client, - args=[peer_set, block_queue, seed], daemon=True).start() + args=[peer_set, block_queues, seed], daemon=True).start() onionrplugins.events.event('gossip_start', data=peer_set, threaded=True) - for _ in range(2): + for _ in range(BOOTSTRAP_ATTEMPTS): onionrplugins.events.event( 'bootstrap', data={'peer_set': peer_set, 'callback': connect_peer}, threaded=False) @@ -59,5 +60,3 @@ def start_gossip_threads( if len(peer_set): return logger.error("Could not connect to any peers :(", terminal=True) - - diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index 883dd5c3..c75c582c 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -8,6 +8,7 @@ from typing import Set from time import sleep from queue import Queue +from ..connectpeer import connect_peer if TYPE_CHECKING: from onionrblocks import Block @@ -20,7 +21,9 @@ from gossip.phase import DandelionPhase from onionrthreads import add_onionr_thread from .announce import do_announce -#from .peerexchange import get_new_peers +from .dandelionstem import stem_out +from .peerexchange import get_new_peers + """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -48,25 +51,25 @@ def gossip_client( Stream new blocks """ - def _trigger_new_peers_event(new_peer_set: Set['Peer']): - onionrplugins.events.event( - 'new_peer', - data={'peers': peer_set, 'new_peers': new_peer_set}) + do_announce(peer_set) - add_onionr_thread(do_announce, 3600, peer_set, initial_sleep=10) - """ add_onionr_thread( get_new_peers, - 1200, peer_set, _trigger_new_peers_event, initial_sleep=5) - """ - + 1200, peer_set, initial_sleep=5) dandelion_phase = DandelionPhase(dandelion_seed, 30) + while True: while not len(peer_set): sleep(0.2) if dandelion_phase.is_stem_phase(): - pass + try: + # Stem out blocks for (roughly) remaining epoch time + stem_out( + block_queue, peer_set, dandelion_phase) + except TimeoutError: + continue + continue else: pass diff --git a/src/gossip/client/dandelionstem.py b/src/gossip/client/dandelionstem.py new file mode 100644 index 00000000..244f1c50 --- /dev/null +++ b/src/gossip/client/dandelionstem.py @@ -0,0 +1,20 @@ +from queue import Queue + + +from typing import TYPE_CHECKING, Set + +if TYPE_CHECKING: + from onionrblocks import Block + from ..peer import Peer + from ..phase import DandelionPhase + +def stem_out( + block_queue: Queue['Block'], + peer_set: Set['Block'], + d_phase: DandelionPhase): + block = block_queue.get(block=True, timeout=time_remaining_secs) + raw_block = block.raw + block_size = len(block.raw) + block_id = block.id + del block + diff --git a/src/gossip/client/peerexchange.py b/src/gossip/client/peerexchange.py index e69de29b..571e4bf6 100644 --- a/src/gossip/client/peerexchange.py +++ b/src/gossip/client/peerexchange.py @@ -0,0 +1,64 @@ +from threading import Thread +from time import sleep +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from socket import socket + +from onionrplugins import onionrevents + +from ..peer import Peer +from ..commands import GossipCommands, command_to_byte +from ..constants import PEER_AMOUNT_TO_ASK, TRANSPORT_SIZE_BYTES +from .. import connectpeer + +MAX_PEERS = 10 + + +def _ask_peer(peer, peer_set): + s: 'socket' = peer.get_socket() + s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE)) + # Get 10 max peers + for _ in range(MAX_PEERS): + peer = s.recv(TRANSPORT_SIZE_BYTES) + if not peer: + break + connect_data = { + 'address': peer, + 'callback': connectpeer.connect_peer, + 'peer_set': peer_set + } + onionrevents.event('announce_rec', data=connect_data, threaded=True) + s.close() + + +def get_new_peers(peer_set): + while not len(peer_set): + sleep(0.5) + + # Deep copy the peer list + peer_list: Peer = list(peer_set) + peers_we_ask: Peer = [] + asked_count = 0 + + while asked_count < PEER_AMOUNT_TO_ASK: + try: + peers_we_ask.append(peer_list.pop()) + except IndexError: + break + asked_count += 1 + + if not len(peers_we_ask): + raise ValueError("No peers present in pool during get_new_peers") + peer_list.clear() # Clear the deep copy so it doesn't occupy memory + + # Start threads to ask the peers for more peers + threads = [] + for peer in peers_we_ask: + t = Thread(target=_ask_peer, args=[peer, peer_set], daemon=True) + t.start() + threads.append(t) + peers_we_ask.clear() + # Wait for the threads to finish because this function is on a timer + for thread in threads: + thread.join() + diff --git a/src/gossip/constants.py b/src/gossip/constants.py new file mode 100644 index 00000000..c19efba4 --- /dev/null +++ b/src/gossip/constants.py @@ -0,0 +1,5 @@ +BOOTSTRAP_ATTEMPTS = 5 +PEER_AMOUNT_TO_ASK = 3 +TRANSPORT_SIZE_BYTES = 64 +BLOCK_MAX_SIZE = 1024 * 2000 +BLOCK_ID_SIZE = 128 \ No newline at end of file diff --git a/src/gossip/graph.py b/src/gossip/graph.py new file mode 100644 index 00000000..fcbdc8d0 --- /dev/null +++ b/src/gossip/graph.py @@ -0,0 +1,2 @@ +class DandelionGraph: + def \ No newline at end of file diff --git a/src/gossip/phase.py b/src/gossip/phase.py index 4280c16d..81344152 100644 --- a/src/gossip/phase.py +++ b/src/gossip/phase.py @@ -26,6 +26,11 @@ class DandelionPhase: self._is_stem = False + def remaining_time(self) -> int: + current_time = int(time()) + return self.epoch_interval - (current_time - self.epoch) + + def is_stem_phase(self) -> bool: current_time = int(time()) if current_time - self.epoch >= self.epoch_interval: diff --git a/src/gossip/server.py b/src/gossip/server/__init__.py similarity index 70% rename from src/gossip/server.py rename to src/gossip/server/__init__.py index c7271516..2e3e0681 100644 --- a/src/gossip/server.py +++ b/src/gossip/server/__init__.py @@ -1,18 +1,22 @@ import asyncio from typing import TYPE_CHECKING -from typing import Set +from typing import Set, List from queue import Queue -from .connectpeer import connect_peer + +from gossip import constants +from ..connectpeer import connect_peer from onionrplugins import onionrevents if TYPE_CHECKING: from onionrblocks import Block from peer import Peer + from asyncio import StreamReader, StreamWriter from filepaths import gossip_server_socket_file -from .commands import GossipCommands +from ..commands import GossipCommands +from .acceptstem import accept_stem_blocks """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -31,10 +35,11 @@ along with this program. If not, see . def gossip_server( peer_set: Set['Peer'], - block_queue: Queue['Block'], + block_queues: List[Queue['Block']], dandelion_seed: bytes): - async def peer_connected(reader, writer): + async def peer_connected( + reader: 'StreamReader', writer: 'StreamWriter'): while True: try: cmd = await asyncio.wait_for(reader.read(1), 60) @@ -51,7 +56,8 @@ def gossip_server( pass case GossipCommands.ANNOUNCE: async def _read_announce(): - address = await reader.read(56) + address = await reader.read( + constants.TRANSPORT_SIZE_BYTES) onionrevents.event( 'announce_rec', data={'peer_set': peer_set, @@ -60,6 +66,14 @@ def gossip_server( threaded=True) writer.write(int(1).to_bytes(1, 'big')) await asyncio.wait_for(_read_announce(), 10) + case GossipCommands.PEER_EXCHANGE: + for peer in peer_set: + writer.write( + peer.transport_address.encode( + 'utf-8').removesuffix(b'.onion')) + case GossipCommands.PUT_BLOCKS: + # Create block queue & append stemmed blocks to it + await accept_stem_blocks(block_queues, reader, writer) break await writer.drain() diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py new file mode 100644 index 00000000..d1024d4c --- /dev/null +++ b/src/gossip/server/acceptstem.py @@ -0,0 +1,48 @@ +from typing import TYPE_CHECKING +from typing import List +from queue import Queue +from time import time +from asyncio import wait_for + +from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE + + +block_size_digits = len(str(BLOCK_MAX_SIZE)) +base_wait_timeout = 10 + +if TYPE_CHECKING: + from onionrblocks import Block + from asyncio import StreamWriter, StreamReader + + +async def accept_stem_blocks( + block_queues: List[Queue['Block']], + reader: 'StreamReader', + writer: 'StreamWriter'): + + # Start getting the first block + read_routine = reader.read(BLOCK_ID_SIZE) + stream_start_time = int(time()) + max_accept_blocks = 1000 + + q = Queue() + block_queues.append(q) + + for _ in range(max_accept_blocks): + block_id = await wait_for(read_routine, base_wait_timeout) + block_size = int( + await wait_for( + reader.read(block_size_digits), + base_wait_timeout)).decode('utf-8') + + + if not all(c in "0123456789" for c in block_size): + raise ValueError("Invalid block size data (non 0-9 char)") + if block_size > BLOCK_MAX_SIZE: + raise ValueError("Max block size") + + + + + + diff --git a/static-data/default-plugins/tor/main.py b/static-data/default-plugins/tor/main.py index fe0a365c..64eb1124 100644 --- a/static-data/default-plugins/tor/main.py +++ b/static-data/default-plugins/tor/main.py @@ -78,7 +78,7 @@ def on_gossip_start(api, data: Set[Peer] = None): controller.authenticate() logger.info( "Tor socks is listening on " + - f"{controller.get_listeners('SOCKS')}", terminal=True) + f"{controller.get_listeners('SOCKS')[0]}", terminal=True) key = config.get('tor.key') new_address = '' if not key: diff --git a/static-data/default-plugins/tor/torpeer.py b/static-data/default-plugins/tor/torpeer.py index f0134a49..6a355fea 100644 --- a/static-data/default-plugins/tor/torpeer.py +++ b/static-data/default-plugins/tor/torpeer.py @@ -4,6 +4,8 @@ import socks class TorPeer: def __init__(self, socks_host, socks_port, onion_address): + if not onion_address or onion_address == '.onion': + raise ValueError("Invalid transport address") self.transport_address = self.onion_address = onion_address self.socks_host = socks_host self.socks_port = socks_port