diff --git a/src/apiservers/private/__init__.py b/src/apiservers/private/__init__.py index 0cc2b09d..e42bc435 100644 --- a/src/apiservers/private/__init__.py +++ b/src/apiservers/private/__init__.py @@ -53,10 +53,6 @@ class PrivateAPI: self.startTime = epoch.get_epoch() app = flask.Flask(__name__) - self.gossip_block_queue: 'queue.Queue' = None - self.gossip_peer_set: Set['Peer'] = None - - bind_port = int(config.get('client.client.port', 59496)) self.bindPort = bind_port diff --git a/src/gossip/__init__.py b/src/gossip/__init__.py index 6ebcfad6..2506dcb0 100644 --- a/src/gossip/__init__.py +++ b/src/gossip/__init__.py @@ -1,7 +1,6 @@ import threading from time import sleep from typing import TYPE_CHECKING, Set, Tuple -from os import urandom if TYPE_CHECKING: from ordered_set import OrderedSet @@ -17,8 +16,8 @@ import logger from .connectpeer import connect_peer from .client import gossip_client from .server import gossip_server -from .commands import GossipCommands from .constants import BOOTSTRAP_ATTEMPTS +from .peerset import gossip_peer_set """ Onionr uses a flavor of Dandelion++ epidemic routing @@ -40,25 +39,21 @@ In stem phase, server disables diffusion """ -def start_gossip_threads( - peer_set: "OrderedSet[Peer]", - block_queues: Tuple["Queue[Block]"]): +def start_gossip_threads(): # Peer set is largely handled by the transport plugins # There is a unified set so gossip logic is not repeated - seed = urandom(32) add_onionr_thread( - gossip_server, 1, peer_set, block_queues, seed, initial_sleep=0.2) + gossip_server, 1, initial_sleep=0.2) threading.Thread( - target=gossip_client, - args=[peer_set, block_queues, seed], daemon=True).start() - onionrplugins.events.event('gossip_start', data=peer_set, threaded=True) + target=gossip_client, daemon=True).start() + onionrplugins.events.event('gossip_start', data=None, threaded=True) for _ in range(BOOTSTRAP_ATTEMPTS): onionrplugins.events.event( - 'bootstrap', data={'peer_set': peer_set, 'callback': connect_peer}, + 'bootstrap', data={'callback': connect_peer}, threaded=False) sleep(60) - if len(peer_set): + if len(gossip_peer_set): return logger.error("Could not connect to any peers :(", terminal=True) diff --git a/src/gossip/blockqueues.py b/src/gossip/blockqueues.py new file mode 100644 index 00000000..93d0b2c4 --- /dev/null +++ b/src/gossip/blockqueues.py @@ -0,0 +1,30 @@ +from queue import Queue +"""Onionr - Private P2P Communication. + +block_queues where all received or created blocks are placed + +Blocks are placed here before being sent to the network, the reason they are in +2 queues is for dandelion++ implementation. + +The queues are drained randomly to incoming or outgoing edges depending on +dandelion++ phase +""" +from typing import Tuple, TYPE_CHECKING +if TYPE_CHECKING: + from onionrblocks import Block +""" +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +gossip_block_queues: Tuple["Queue[Block]", "Queue[Block]"] = (Queue(), Queue()) diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index 8ab5a920..1ad89870 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -33,6 +33,7 @@ from blockdb import add_block_to_db from .announce import do_announce from .dandelionstem import stem_out from .peerexchange import get_new_peers +from ..peerset import gossip_peer_set """ This program is free software: you can redistribute it and/or modify @@ -50,10 +51,7 @@ along with this program. If not, see . """ -def gossip_client( - peer_set: "OrderedSet[Peer]", - block_queues: Tuple["Queue[Block]", "Queue[Block]"], - dandelion_seed: bytes): +def gossip_client(): """ Gossip client does the following: @@ -61,7 +59,7 @@ def gossip_client( Stream new blocks """ bl: Block - do_announce(peer_set) + do_announce() # Start a thread that runs every 1200 secs to # Ask peers for a subset for their peer set @@ -70,12 +68,12 @@ def gossip_client( # transport plugin handles the new peer add_onionr_thread( get_new_peers, - 1200, peer_set, initial_sleep=5) + 1200, initial_sleep=5) - dandelion_phase = DandelionPhase(dandelion_seed, DANDELION_EPOCH_LENGTH) + dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH) while True: - while not len(peer_set): + while not len(gossip_peer_set): sleep(0.2) if dandelion_phase.remaining_time() <= 10: sleep(dandelion_phase.remaining_time()) @@ -83,8 +81,7 @@ def gossip_client( logger.debug("Entering stem phase", terminal=True) try: # Stem out blocks for (roughly) remaining epoch time - asyncio.run(stem_out( - block_queues, peer_set, dandelion_phase)) + asyncio.run(stem_out()) except TimeoutError: continue except Exception: @@ -93,4 +90,4 @@ def gossip_client( else: logger.debug("Entering fluff phase", terminal=True) # Add block to primary block db, where the diffuser can read it - store_blocks(block_queues, dandelion_phase) + store_blocks(dandelion_phase) diff --git a/src/gossip/client/announce.py b/src/gossip/client/announce.py index fd41af6c..a0d953b1 100644 --- a/src/gossip/client/announce.py +++ b/src/gossip/client/announce.py @@ -6,11 +6,13 @@ if TYPE_CHECKING: from .. import Peer import logger -from ..commands import GossipCommands, command_to_byte import onionrplugins +from ..commands import GossipCommands, command_to_byte +from ..peerset import gossip_peer_set -def do_announce(peer_set): + +def do_announce(): "Announce with N peers of each identified transport" def _announce(announce_peer: 'Peer', our_transport_address: str): try: @@ -25,13 +27,13 @@ def do_announce(peer_set): f"Could not announce with {announce_peer.transport_address}") sock.close() - while not len(peer_set): + while not len(gossip_peer_set): sleep(1) per_transport = 3 peer_types = {} count_for_peer = 0 - for peer in peer_set: + for peer in gossip_peer_set: try: count_for_peer = peer_types[peer.__class__] except KeyError: diff --git a/src/gossip/client/dandelionstem/__init__.py b/src/gossip/client/dandelionstem/__init__.py index e970642b..5e538a2f 100644 --- a/src/gossip/client/dandelionstem/__init__.py +++ b/src/gossip/client/dandelionstem/__init__.py @@ -14,6 +14,8 @@ import logger from ...constants import BLACKHOLE_EVADE_TIMER_SECS, OUTBOUND_DANDELION_EDGES from ...commands import GossipCommands, command_to_byte from ... import dandelion +from ...blockqueues import gossip_block_queues +from ...peerset import gossip_peer_set from .stemstream import do_stem_stream @@ -69,19 +71,16 @@ async def _setup_edge( s.close() -async def stem_out( - block_queues: Tuple["Queue[Block]", "Queue[Block]"], - peer_set: "OrderedSet[Peer]", - d_phase: 'DandelionPhase'): +async def stem_out(d_phase: 'DandelionPhase'): # don't bother if there are no possible outbound edges - if not len(peer_set): + if not len(gossip_peer_set): sleep(1) return # Spawn threads with deep copied block queue to add to db after time # for black hole attack - for block_q in block_queues: + for block_q in gossip_block_queues: add_delayed_thread( lambda q: set(map(add_block_to_db, q)), BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue)) @@ -96,7 +95,7 @@ async def stem_out( while len(peer_sockets) < OUTBOUND_DANDELION_EDGES: try: # Get a socket for stem out (makes sure they accept) - peer_sockets.append(await _setup_edge(peer_set, tried_edges)) + peer_sockets.append(await _setup_edge(gossip_peer_set, tried_edges)) except NotEnoughEdges: # No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE) logger.warn("Not able to build enough peers for stemout.", @@ -116,7 +115,7 @@ async def stem_out( for count, peer_socket in enumerate(peer_sockets): stream_routines.append( - do_stem_stream(peer_socket, block_queues[count], d_phase)) + do_stem_stream(peer_socket, gossip_block_queues[count], d_phase)) for routine in stream_routines: try: diff --git a/src/gossip/client/peerexchange.py b/src/gossip/client/peerexchange.py index b5ea7996..b8682fa1 100644 --- a/src/gossip/client/peerexchange.py +++ b/src/gossip/client/peerexchange.py @@ -10,11 +10,12 @@ from ..peer import Peer from ..commands import GossipCommands, command_to_byte from ..constants import PEER_AMOUNT_TO_ASK, TRANSPORT_SIZE_BYTES from .. import connectpeer +from ..peerset import gossip_peer_set MAX_PEERS = 10 -def _ask_peer(peer, peer_set): +def _ask_peer(peer): s: 'socket' = peer.get_socket(12) s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE)) # Get 10 max peers @@ -24,19 +25,18 @@ def _ask_peer(peer, peer_set): break connect_data = { 'address': peer, - 'callback': connectpeer.connect_peer, - 'peer_set': peer_set + 'callback': connectpeer.connect_peer } onionrevents.event('announce_rec', data=connect_data, threaded=True) s.close() -def get_new_peers(peer_set): - while not len(peer_set): +def get_new_peers(): + while not len(gossip_peer_set): sleep(0.5) # Deep copy the peer list - peer_list: Peer = list(peer_set) + peer_list: Peer = list(gossip_peer_set) peers_we_ask: Peer = [] asked_count = 0 @@ -54,7 +54,7 @@ def get_new_peers(peer_set): # Start threads to ask the peers for more peers threads = [] for peer in peers_we_ask: - t = Thread(target=_ask_peer, args=[peer, peer_set], daemon=True) + t = Thread(target=_ask_peer, args=[peer, gossip_peer_set], daemon=True) t.start() threads.append(t) peers_we_ask.clear() diff --git a/src/gossip/client/storeblocks.py b/src/gossip/client/storeblocks.py index f1486f35..095b9d4c 100644 --- a/src/gossip/client/storeblocks.py +++ b/src/gossip/client/storeblocks.py @@ -9,10 +9,10 @@ if TYPE_CHECKING: from onionrblocks import Block from ..dandelion.phase import DandelionPhase +from ..blockqueues import gossip_block_queues -def store_blocks( - block_queues: Tuple["Queue[Block]", "Queue[Block]"], - dandelion_phase: 'DandelionPhase'): + +def store_blocks(dandelion_phase: 'DandelionPhase'): new_queue: "Queue[Block]" = Queue() @@ -26,7 +26,7 @@ def store_blocks( except Empty: pass - for block_queue in block_queues: + for block_queue in gossip_block_queues: Thread(target=_watch_queue, args=[block_queue], daemon=True).start() while not dandelion_phase.is_stem_phase() \ diff --git a/src/gossip/connectpeer.py b/src/gossip/connectpeer.py index 0c027db3..c744250c 100644 --- a/src/gossip/connectpeer.py +++ b/src/gossip/connectpeer.py @@ -1,9 +1,11 @@ from gossip.commands import GossipCommands, command_to_byte +from .peerset import gossip_peer_set + import logger -def connect_peer(peer_set, peer): - if peer in peer_set: +def connect_peer(peer): + if peer in gossip_peer_set: return try: s = peer.get_socket(12) @@ -12,5 +14,5 @@ def connect_peer(peer_set, peer): else: s.sendall(command_to_byte(GossipCommands.CLOSE)) s.close() - peer_set.add(peer) + gossip_peer_set.add(peer) logger.info(f"connected to {peer.transport_address}") diff --git a/src/gossip/dandelion/phase.py b/src/gossip/dandelion/phase.py index 0792061b..0efbecce 100644 --- a/src/gossip/dandelion/phase.py +++ b/src/gossip/dandelion/phase.py @@ -1,12 +1,14 @@ from time import time from hashlib import shake_128 from secrets import randbits +import secrets + +seed = secrets.token_bytes(32) class DandelionPhase: - def __init__(self, seed: bytes, epoch_interval_secs: int): - self.seed = seed # Seed intended to be from good random source like urandom - assert len(self.seed) == 32 + def __init__(self, epoch_interval_secs: int): + assert len(seed) == 32 self.epoch = int(time()) self.epoch_interval = epoch_interval_secs self._is_stem = bool(randbits(1)) @@ -18,7 +20,7 @@ class DandelionPhase: # Hash the seed with the time stamp to produce 8 pseudorandom bytes # Produce an len(8) byte string for time as well for year 2038 problem self.phase_id = shake_128( - self.seed + + seed + int.to_bytes(cur_time, 8, 'big')).digest(8) # Use first byte of phase id as random source for stem phase picking diff --git a/src/gossip/peerset.py b/src/gossip/peerset.py new file mode 100644 index 00000000..cbc9a7e6 --- /dev/null +++ b/src/gossip/peerset.py @@ -0,0 +1,25 @@ +"""Onionr - Private P2P Communication. + +singleton set of gossip peers +""" +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .peer import Peer +from ordered_set import OrderedSet +""" +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +gossip_peer_set: OrderedSet['Peer'] = OrderedSet() diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index 4a9a42d2..b9d715b8 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -19,6 +19,7 @@ if TYPE_CHECKING: from filepaths import gossip_server_socket_file from ..commands import GossipCommands +from ..peerset import gossip_peer_set from .acceptstem import accept_stem_blocks """ This program is free software: you can redistribute it and/or modify @@ -38,10 +39,7 @@ along with this program. If not, see . inbound_dandelion_edge_count = [0] -def gossip_server( - peer_set: "OrderedSet[Peer]", - block_queues: Tuple["Queue[Block]", "Queue[Block]"], - dandelion_seed: bytes): +def gossip_server(): async def peer_connected( reader: 'StreamReader', writer: 'StreamWriter'): @@ -66,14 +64,13 @@ def gossip_server( constants.TRANSPORT_SIZE_BYTES) onionrevents.event( 'announce_rec', - data={'peer_set': peer_set, - 'address': address, + data={'address': address, 'callback': connect_peer}, threaded=True) writer.write(int(1).to_bytes(1, 'big')) await asyncio.wait_for(_read_announce(), 10) case GossipCommands.PEER_EXCHANGE: - for peer in peer_set: + for peer in gossip_peer_set: writer.write( peer.transport_address.encode( 'utf-8').removesuffix(b'.onion')) @@ -82,7 +79,6 @@ def gossip_server( try: await accept_stem_blocks( - block_queues, reader, writer, inbound_dandelion_edge_count) except asyncio.exceptions.TimeoutError: diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index fb63ac56..c2c5852f 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -9,6 +9,7 @@ from onionrblocks import Block from ..dandelion import DandelionPhase, StemAcceptResult from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM +from ..blockqueues import gossip_block_queues block_size_digits = len(str(BLOCK_MAX_SIZE)) @@ -20,7 +21,6 @@ if TYPE_CHECKING: async def accept_stem_blocks( - block_queues: Tuple["Queue[Block]", "Queue[Block]"], reader: 'StreamReader', writer: 'StreamWriter', inbound_edge_count: List[int]): @@ -35,7 +35,7 @@ async def accept_stem_blocks( read_routine = reader.read(BLOCK_ID_SIZE) stream_start_time = int(time()) - block_queue_to_use = secrets.choice(block_queues) + block_queue_to_use = secrets.choice(gossip_block_queues) for _ in range(MAX_STEM_BLOCKS_PER_STREAM): block_id = ( diff --git a/src/onionrcommands/daemonlaunch/__init__.py b/src/onionrcommands/daemonlaunch/__init__.py index 8c4910f6..7dffd4db 100755 --- a/src/onionrcommands/daemonlaunch/__init__.py +++ b/src/onionrcommands/daemonlaunch/__init__.py @@ -117,13 +117,7 @@ def daemon(): events.event('init', threaded=False) events.event('daemon_start') - shared_state.get(apiservers.ClientAPI).gossip_peer_set = OrderedSet() - shared_state.get(apiservers.ClientAPI).gossip_block_queues = \ - (queue.Queue(), queue.Queue()) - - gossip.start_gossip_threads( - shared_state.get(apiservers.ClientAPI).gossip_peer_set, - shared_state.get(apiservers.ClientAPI).gossip_block_queues) + gossip.start_gossip_threads() try: shared_state.get(apiservers.ClientAPI).start() diff --git a/static-data/default-plugins/tor/announce.py b/static-data/default-plugins/tor/announce.py index 63b0b980..fe483ab3 100644 --- a/static-data/default-plugins/tor/announce.py +++ b/static-data/default-plugins/tor/announce.py @@ -3,6 +3,7 @@ import logger from getsocks import get_socks from torpeer import TorPeer +from gossip.peerset import gossip_peer_set def on_announce_rec(api, data=None): @@ -21,5 +22,5 @@ def on_announce_rec(api, data=None): announced += '.onion' data['callback']( - data['peer_set'], + gossip_peer_set, TorPeer(socks_address, socks_port, announced)) diff --git a/static-data/default-plugins/tor/bootstrap.py b/static-data/default-plugins/tor/bootstrap.py index ad2a6737..181b1140 100644 --- a/static-data/default-plugins/tor/bootstrap.py +++ b/static-data/default-plugins/tor/bootstrap.py @@ -3,6 +3,7 @@ from time import sleep import os from gossip.peer import Peer +from gossip.peerset import gossip_peer_set import logger import config from getsocks import get_socks @@ -39,7 +40,7 @@ def on_bootstrap(api, data): # it will add it to data['peer_set'] if it responds to ping Thread( target=data['callback'], - args=[data['peer_set'], TorPeer(socks_address, socks_port, address)], + args=[TorPeer(socks_address, socks_port, address)], daemon=True).start() diff --git a/static-data/default-plugins/tor/bootstrap.txt b/static-data/default-plugins/tor/bootstrap.txt new file mode 100644 index 00000000..e69de29b diff --git a/static-data/default-plugins/tor/main.py b/static-data/default-plugins/tor/main.py index 64eb1124..511497ce 100644 --- a/static-data/default-plugins/tor/main.py +++ b/static-data/default-plugins/tor/main.py @@ -1,7 +1,6 @@ """Onionr - Private P2P Communication. -This default plugin handles "flow" messages -(global chatroom style communication) +Tor transport plugin """ import sys import os