diff --git a/src/gossip/__init__.py b/src/gossip/__init__.py index 20d023f3..34311e09 100644 --- a/src/gossip/__init__.py +++ b/src/gossip/__init__.py @@ -47,7 +47,7 @@ def start_gossip_threads(): gossip_server, 1, 'gossip_server', initial_sleep=0.2) threading.Thread( - target=start_gossip_client, daemon=True).start() + target=start_gossip_client, daemon=True, name="start_gossip_client").start() onionrplugins.events.event('gossip_start', data=None, threaded=True) for _ in range(BOOTSTRAP_ATTEMPTS): onionrplugins.events.event( diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index 4dd1954f..23b982c9 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -22,6 +22,7 @@ if TYPE_CHECKING: from ordered_set import OrderedSet import logger +import config import onionrplugins from ..commands import GossipCommands from gossip.dandelion.phase import DandelionPhase @@ -60,9 +61,9 @@ def block_queue_processing(): while not len(gossip_peer_set): sleep(0.2) if dandelion_phase.remaining_time() <= 15: - logger.debug("Sleeping", terminal=True) + #logger.debug("Sleeping", terminal=True) sleep(dandelion_phase.remaining_time()) - if dandelion_phase.is_stem_phase(): + if dandelion_phase.is_stem_phase() and config.get('security.dandelion.enabled', True): logger.debug("Entering stem phase", terminal=True) try: # Stem out blocks for (roughly) remaining epoch time @@ -73,8 +74,9 @@ def block_queue_processing(): logger.error(traceback.format_exc(), terminal=True) pass else: - logger.debug("Entering fluff phase", terminal=True) + #logger.debug("Entering fluff phase", terminal=True) # Add block to primary block db, where the diffuser can read it + sleep(0.1) store_blocks(dandelion_phase) @@ -88,7 +90,7 @@ def start_gossip_client(): bl: Block - Thread(target=do_announce, daemon=True).start() + Thread(target=do_announce, daemon=True, name='do_announce').start() # Start a thread that runs every 1200 secs to # Ask peers for a subset for their peer set diff --git a/src/gossip/client/dandelionstem/__init__.py b/src/gossip/client/dandelionstem/__init__.py index d59b5235..74b2a4b7 100644 --- a/src/gossip/client/dandelionstem/__init__.py +++ b/src/gossip/client/dandelionstem/__init__.py @@ -83,7 +83,7 @@ async def stem_out(d_phase: 'DandelionPhase'): sleep(1) return not_enough_edges = False - strict_dandelion = config.get('security.strict_dandelion', True) + strict_dandelion = config.get('security.dandelion.strict', True) def blackhole_protection(q): for bl in q: diff --git a/src/gossip/client/dandelionstem/stemstream.py b/src/gossip/client/dandelionstem/stemstream.py index ef0a22d7..080e7e7f 100644 --- a/src/gossip/client/dandelionstem/stemstream.py +++ b/src/gossip/client/dandelionstem/stemstream.py @@ -39,11 +39,14 @@ async def do_stem_stream( block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN) def _send_it(): - with peer_socket: - try: - peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8')) - except AttributeError: - peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE)) - peer_socket.sendall(block_size.encode('utf-8')) - peer_socket.sendall(bl.raw) + try: + with peer_socket: + try: + peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8')) + except AttributeError: + peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE)) + peer_socket.sendall(block_size.encode('utf-8')) + peer_socket.sendall(bl.raw) + except OSError: + pass Thread(target=_send_it, daemon=True, name="stemout block").start() diff --git a/src/gossip/client/peerexchange.py b/src/gossip/client/peerexchange.py index 45e31ca0..fe59c9fc 100644 --- a/src/gossip/client/peerexchange.py +++ b/src/gossip/client/peerexchange.py @@ -78,7 +78,9 @@ def get_new_peers(): # Start threads to ask the peers for more peers threads = [] for peer in peers_we_ask: - t = Thread(target=_do_ask_peer, args=[peer], daemon=True) + t = Thread( + target=_do_ask_peer, + args=[peer], daemon=True, name='_do_ask_peer') t.start() threads.append(t) peers_we_ask.clear() diff --git a/src/gossip/client/streamblocks/streamfrom.py b/src/gossip/client/streamblocks/streamfrom.py index 76abf4cb..ef97bb0e 100644 --- a/src/gossip/client/streamblocks/streamfrom.py +++ b/src/gossip/client/streamblocks/streamfrom.py @@ -44,7 +44,7 @@ along with this program. If not, see . """ -MAX_STREAMS = 3 +MAX_STREAMS = 6 CONNECT_TIMEOUT = 12 MAX_TRIED_PEERS = 10_000 @@ -63,6 +63,8 @@ def stream_from_peers(): def _stream_from_peer(peer: 'Peer'): + stream_counter = 0 + stream_times = 100 try: sock = peer.get_socket(CONNECT_TIMEOUT) except ConnectionRefusedError: @@ -79,8 +81,10 @@ def stream_from_peers(): sock.sendall( str(offset).zfill(BLOCK_STREAM_OFFSET_DIGITS).encode('utf-8')) - while True: - logger.debug("Reading block id in stream", terminal=True) + while stream_times >= stream_counter: + stream_counter += 1 + logger.debug("Reading block id in stream with " + peer.transport_address, terminal=True) + sock.settimeout(5) block_id = sock.recv(BLOCK_ID_SIZE) if blockdb.has_block(block_id): sock.sendall(int(0).to_bytes(1, 'big')) @@ -89,12 +93,15 @@ def stream_from_peers(): logger.debug("Reading block size in stream", terminal=True) + sock.settimeout(5) block_size = int(sock.recv(BLOCK_SIZE_LEN)) if block_size > BLOCK_MAX_SIZE or block_size <= 0: logger.warn( f"Peer {peer.transport_address} " + "reported block size out of range") break + + sock.settimeout(5) block_data = sock.recv(block_size) logger.debug( @@ -122,14 +129,14 @@ def stream_from_peers(): # spawn stream threads infinitely while True: - need_socket_lock.acquire() + available_set = gossip_peer_set - tried_peers if not len(available_set) and len(tried_peers): try: - tried_peers.pop() + tried_peers.clear() except IndexError: pass - available_set = gossip_peer_set - tried_peers + available_set = gossip_peer_set.copy() peers = sys_rand.sample( available_set, min(MAX_STREAMS, len(available_set))) @@ -140,10 +147,12 @@ def stream_from_peers(): while len(peers): try: + need_socket_lock.acquire() Thread( target=_stream_from_peer, args=[peers.pop()], - daemon=True).start() + daemon=True, + name="_stream_from_peer").start() except IndexError: need_socket_lock.release() break diff --git a/src/gossip/client/streamblocks/streamto.py b/src/gossip/client/streamblocks/streamto.py index e1082dbc..2d0a2a18 100644 --- a/src/gossip/client/streamblocks/streamto.py +++ b/src/gossip/client/streamblocks/streamto.py @@ -2,8 +2,8 @@ from secrets import SystemRandom from time import time from typing import List, TYPE_CHECKING -if TYPE_CHECKING: - from onionrblocks import Block +#if TYPE_CHECKING: +from onionrblocks import Block from gossip.commands import GossipCommands, command_to_byte from blockdb import get_blocks_after_timestamp @@ -19,7 +19,7 @@ class SendTimestamp: def stream_to_peer(): if SECS_ELAPSED_NO_INCOMING_BEFORE_STREAM > time() - lastincoming.last_incoming_timestamp: - SendTimestamp.timestamp = int(time()) + SendTimestamp.timestamp = int(time()) - 60 return if not len(gossip_peer_set): return @@ -28,6 +28,7 @@ def stream_to_peer(): buffer: List['Block'] = [] def _do_upload(): + print('uploading to', peer.transport_address) with peer.get_socket(30) as p: p.sendall(command_to_byte(GossipCommands.PUT_BLOCK_DIFFUSE)) @@ -47,10 +48,11 @@ def stream_to_peer(): # and to efficiently avoid connecting without sending anything buffer_max = 10 for block in get_blocks_after_timestamp(SendTimestamp.timestamp): + assert isinstance(block, Block) buffer.append(block) if len(buffer) > buffer_max: _do_upload(buffer) if len(buffer): _do_upload() - SendTimestamp.timestamp = int(time()) + SendTimestamp.timestamp = int(time()) - 60 diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index 9d8f38fd..3e3a301d 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -133,8 +133,10 @@ def gossip_server(): ).start() await _get_block_diffused() break - - await writer.drain() + try: + await writer.drain() + except BrokenPipeError: + pass async def main(): diff --git a/src/gossip/server/diffuseblocks.py b/src/gossip/server/diffuseblocks.py index 4fb21336..28cafac8 100644 --- a/src/gossip/server/diffuseblocks.py +++ b/src/gossip/server/diffuseblocks.py @@ -100,6 +100,8 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): except IncompleteReadError: keep_writing = False break + except ConnectionResetError: + pass except Exception: logger.warn(traceback.format_exc(), terminal=True) diff --git a/src/onionrcommands/daemonlaunch/__init__.py b/src/onionrcommands/daemonlaunch/__init__.py index 4a90ce3b..a8ad51be 100755 --- a/src/onionrcommands/daemonlaunch/__init__.py +++ b/src/onionrcommands/daemonlaunch/__init__.py @@ -112,7 +112,10 @@ def daemon(): f"Onionr daemon is running under pid {os.getpid()}", terminal=True) events.event('init', threaded=False) events.event('daemon_start') - Thread(target=gossip.start_gossip_threads, daemon=True).start() + Thread( + target=gossip.start_gossip_threads, + daemon=True, + name='start_gossip_threads').start() try: apiservers.private_api.start() diff --git a/src/onionrcommands/daemonlaunch/quotes.py b/src/onionrcommands/daemonlaunch/quotes.py index ec80f83a..49f9607e 100644 --- a/src/onionrcommands/daemonlaunch/quotes.py +++ b/src/onionrcommands/daemonlaunch/quotes.py @@ -8,8 +8,7 @@ QUOTES = [ ""), ("Study after study has show that human behavior changes when we know we’re being watched.\nUnder observation, we act less free, which means we effectively *are* less free.", "Edward Snowdwen"), - ("A revolution without dancing is a revolution not worth having", - "V for Vendetta"), + ("Privacy is a fundamental right", ""), ("There can be no justice so long as laws are absolute. Even life itself is an exercise in exceptions", "Picard"), ("Openness and participation are antidotes to surveillance and control", diff --git a/src/onionrplugins/onionrevents.py b/src/onionrplugins/onionrevents.py index b8f723b6..723e983b 100755 --- a/src/onionrplugins/onionrevents.py +++ b/src/onionrplugins/onionrevents.py @@ -51,7 +51,7 @@ def event(event_name, data = {}, threaded = True): """Call an event on all plugins (if defined)""" if threaded: - thread = Thread(target = __event_caller, args = (event_name, data)) + thread = Thread(target = __event_caller, args = (event_name, data), name=f'{event_name} event', daemon=True) thread.start() return thread else: diff --git a/static-data/default-plugins/tor/announce.py b/static-data/default-plugins/tor/announce.py index edc96b5e..ad87d2a6 100644 --- a/static-data/default-plugins/tor/announce.py +++ b/static-data/default-plugins/tor/announce.py @@ -21,8 +21,6 @@ def on_announce_rec(api, data=None): if announced.removesuffix('.onion') == config.get( 'tor.transport_address', '').removesuffix('.onion'): - logger.warn( - "Received announcement for our own node, which shouldn't happen") return diff --git a/static-data/default-plugins/unixtransport/main.py b/static-data/default-plugins/unixtransport/main.py index b100105f..1da50611 100644 --- a/static-data/default-plugins/unixtransport/main.py +++ b/static-data/default-plugins/unixtransport/main.py @@ -9,6 +9,7 @@ from time import sleep import traceback from typing import Set, TYPE_CHECKING from threading import Thread +import shelve import stem from stem.control import Controller @@ -19,6 +20,7 @@ import config from filepaths import gossip_server_socket_file from gossip.peer import Peer +from gossip.peerset import gossip_peer_set locale.setlocale(locale.LC_ALL, '') sys.path.insert(0, os.path.dirname(os.path.realpath(__file__))) @@ -27,6 +29,7 @@ from unixpeer import UnixPeer from unixbootstrap import on_bootstrap from unixannounce import on_announce_rec +from unixfilepaths import peer_database_file #from shutdown import on_shutdown_event """ @@ -49,7 +52,11 @@ plugin_name = 'unixtransport' PLUGIN_VERSION = '0.0.0' - +def on_shutdown_event(api, data=None): + with shelve.open(peer_database_file, 'c') as db: + for peer in gossip_peer_set: + if isinstance(peer, UnixPeer): + db[peer.transport_address] = peer def on_init(api, data=None): logger.info( diff --git a/static-data/default-plugins/unixtransport/unixannounce.py b/static-data/default-plugins/unixtransport/unixannounce.py index d2721023..040caadf 100644 --- a/static-data/default-plugins/unixtransport/unixannounce.py +++ b/static-data/default-plugins/unixtransport/unixannounce.py @@ -16,11 +16,7 @@ def on_announce_rec(api, data=None): if not announced.endswith('.sock'): return - if announced == gossip_server_socket_file: - logger.warn( - "Received announcement for our unix node, which shouldn't happen", - terminal=True) return logger.info(f"Peer {announced} announced to us.", terminal=True) diff --git a/static-data/default_config.json b/static-data/default_config.json index 6f4cc0fb..3dad7441 100755 --- a/static-data/default_config.json +++ b/static-data/default_config.json @@ -50,6 +50,12 @@ "i_dont_want_privacy": false, "server": "" }, + "security": { + "dandelion": { + "strict": true, + "enabled": true + } + }, "timers": { "getBlocks": 10, "lookupBlocks": 25