From 237cdde4e581474c1a7914d1ac8eb291b1f8e5fc Mon Sep 17 00:00:00 2001 From: Kevin F Date: Wed, 20 Apr 2022 00:28:29 -0500 Subject: [PATCH] Added peer exchange test (passing) --- src/gossip/client/__init__.py | 4 +++- src/gossip/client/announce.py | 2 +- src/gossip/client/peerexchange.py | 8 ++++++-- src/gossip/client/streamblocks.py | 5 +++-- src/gossip/connectpeer.py | 2 +- src/gossip/server/__init__.py | 11 ++++++----- src/gossip/server/diffuseblocks.py | 1 + static-data/default-plugins/tor/announce.py | 9 ++++----- static-data/default-plugins/tor/main.py | 2 +- static-data/default-plugins/tor/torpeer.py | 4 ++++ 10 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index bf8e2b92..ea089e0a 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -76,6 +76,8 @@ def gossip_client(): dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH) while True: + sleep(5) + continue while not len(gossip_peer_set): sleep(0.2) if dandelion_phase.remaining_time() <= 10: @@ -84,7 +86,7 @@ def gossip_client(): logger.debug("Entering stem phase", terminal=True) try: # Stem out blocks for (roughly) remaining epoch time - asyncio.run(stem_out()) + asyncio.run(stem_out(dandelion_phase)) except TimeoutError: continue except Exception: diff --git a/src/gossip/client/announce.py b/src/gossip/client/announce.py index a0d953b1..c21c6c3c 100644 --- a/src/gossip/client/announce.py +++ b/src/gossip/client/announce.py @@ -16,7 +16,7 @@ def do_announce(): "Announce with N peers of each identified transport" def _announce(announce_peer: 'Peer', our_transport_address: str): try: - our_transport_address = our_transport_address.encode('utf-8') + our_transport_address = our_transport_address.encode('utf-8') + b"\n" except AttributeError: pass sock = announce_peer.get_socket(12) diff --git a/src/gossip/client/peerexchange.py b/src/gossip/client/peerexchange.py index b8682fa1..b26a2431 100644 --- a/src/gossip/client/peerexchange.py +++ b/src/gossip/client/peerexchange.py @@ -20,7 +20,11 @@ def _ask_peer(peer): s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE)) # Get 10 max peers for _ in range(MAX_PEERS): - peer = s.recv(TRANSPORT_SIZE_BYTES) + peer = b'' + c = b'' + while c != b'\n': + c = s.recv(1) + peer += c if not peer: break connect_data = { @@ -54,7 +58,7 @@ def get_new_peers(): # Start threads to ask the peers for more peers threads = [] for peer in peers_we_ask: - t = Thread(target=_ask_peer, args=[peer, gossip_peer_set], daemon=True) + t = Thread(target=_ask_peer, args=[peer], daemon=True) t.start() threads.append(t) peers_we_ask.clear() diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks.py index c71b7de8..0932cd39 100644 --- a/src/gossip/client/streamblocks.py +++ b/src/gossip/client/streamblocks.py @@ -59,6 +59,7 @@ def stream_from_peers(): need_socket_lock = Semaphore(MAX_STREAMS) offset = 0 + def _stream_from_peer(peer: Peer): try: @@ -83,11 +84,11 @@ def stream_from_peers(): "reported block size out of range") break block_data = sock.recv(block_size) + try: blockdb.add_block_to_db( onionrblocks.Block( - block_id, block_data, auto_verify=True) - ) + block_id, block_data, auto_verify=True)) except Exception: # They gave us a bad block, kill the stream # Could be corruption or malice diff --git a/src/gossip/connectpeer.py b/src/gossip/connectpeer.py index c744250c..8bf555fd 100644 --- a/src/gossip/connectpeer.py +++ b/src/gossip/connectpeer.py @@ -8,7 +8,7 @@ def connect_peer(peer): if peer in gossip_peer_set: return try: - s = peer.get_socket(12) + s = peer.get_socket(15) except Exception: logger.warn(f"Could not connect to {peer.transport_address}") else: diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index 401d5eb2..5f15ac30 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -1,4 +1,5 @@ import asyncio +from audioop import add import traceback from typing import TYPE_CHECKING from typing import Set, Tuple @@ -63,8 +64,8 @@ def gossip_server(): pass case GossipCommands.ANNOUNCE: async def _read_announce(): - address = await reader.readexactly( - constants.TRANSPORT_SIZE_BYTES) + address = await reader.readuntil(b'\n') + if address: onionrevents.event( 'announce_rec', @@ -74,10 +75,10 @@ def gossip_server(): writer.write(int(1).to_bytes(1, 'big')) await asyncio.wait_for(_read_announce(), 10) case GossipCommands.PEER_EXCHANGE: + for peer in gossip_peer_set: - writer.write( - peer.transport_address.encode( - 'utf-8').removesuffix(b'.onion')) + writer.write(peer.transport_address.encode('utf-8') + b'\n') + await writer.drain() case GossipCommands.STREAM_BLOCKS: try: await diffuse_blocks(reader, writer) diff --git a/src/gossip/server/diffuseblocks.py b/src/gossip/server/diffuseblocks.py index d0044b5a..cb34f4bb 100644 --- a/src/gossip/server/diffuseblocks.py +++ b/src/gossip/server/diffuseblocks.py @@ -70,6 +70,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): return await writer.drain() + # write block size writer.write( str(len(block.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8')) await writer.drain() diff --git a/static-data/default-plugins/tor/announce.py b/static-data/default-plugins/tor/announce.py index fe483ab3..6911517a 100644 --- a/static-data/default-plugins/tor/announce.py +++ b/static-data/default-plugins/tor/announce.py @@ -3,7 +3,6 @@ import logger from getsocks import get_socks from torpeer import TorPeer -from gossip.peerset import gossip_peer_set def on_announce_rec(api, data=None): @@ -16,11 +15,11 @@ def on_announce_rec(api, data=None): pass if announced == config.get('tor.transport_address'): - logger.warn("Recieved announcement for our own node, which shouldnt happen") + logger.warn( + "Received announcement for our own node, which shouldn't happen") return + announced = announced.strip() announced += '.onion' - data['callback']( - gossip_peer_set, - TorPeer(socks_address, socks_port, announced)) + data['callback'](TorPeer(socks_address, socks_port, announced)) diff --git a/static-data/default-plugins/tor/main.py b/static-data/default-plugins/tor/main.py index 511497ce..bba3a47e 100644 --- a/static-data/default-plugins/tor/main.py +++ b/static-data/default-plugins/tor/main.py @@ -92,7 +92,7 @@ def on_gossip_start(api, data: Set[Peer] = None): try: add_onion_resp = controller.create_ephemeral_hidden_service( {'80': f'unix:{gossip_server_socket_file}'}, - key_content=key, key_type='ED25519-V3', detached=True) + key_content=key, key_type='ED25519-V3', detached=True, await_publication=True) except stem.ProtocolError: logger.error( "Could not start Tor transport. Try restarting Onionr", diff --git a/static-data/default-plugins/tor/torpeer.py b/static-data/default-plugins/tor/torpeer.py index 5d53855f..f3b290ae 100644 --- a/static-data/default-plugins/tor/torpeer.py +++ b/static-data/default-plugins/tor/torpeer.py @@ -6,11 +6,15 @@ class TorPeer: def __init__(self, socks_host, socks_port, onion_address): if not onion_address or onion_address == '.onion': raise ValueError("Invalid transport address") + if not onion_address.endswith('.onion'): + self.onion_address = onion_address.strip() + '.onion' + self.transport_address = self.onion_address = onion_address self.socks_host = socks_host self.socks_port = socks_port def get_socket(self, connect_timeout) -> socks.socksocket: + s = socks.socksocket() s.set_proxy(socks.SOCKS4, self.socks_host, self.socks_port, rdns=True) s.settimeout(connect_timeout)