From d11d12b67f8fceb73f713d45a7ccea5d119c17d7 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Mon, 11 Jul 2022 10:27:13 -0500 Subject: [PATCH] async bug fixes --- src/gossip/client/__init__.py | 6 ++---- src/gossip/client/announce.py | 10 ++++++---- src/gossip/server/__init__.py | 5 +---- src/gossip/server/diffuseblocks.py | 8 +++----- static-data/default-plugins/tor/announce.py | 10 ++++++---- static-data/default-plugins/unixtransport/announce.py | 3 +++ static-data/default-plugins/unixtransport/bootstrap.py | 1 + .../default-plugins/unixtransport/bootstrap.txt | 2 +- static-data/default-plugins/unixtransport/main.py | 2 ++ 9 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index 69b1553c..acad4e10 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -87,10 +87,8 @@ def start_gossip_client(): """ bl: Block - def _start_announce(): - sleep(60) - do_announce() - Thread(target=_start_announce, daemon=True).start() + + Thread(target=do_announce, daemon=True).start() # Start a thread that runs every 1200 secs to # Ask peers for a subset for their peer set diff --git a/src/gossip/client/announce.py b/src/gossip/client/announce.py index 1d02b0ad..b4cc58ca 100644 --- a/src/gossip/client/announce.py +++ b/src/gossip/client/announce.py @@ -14,11 +14,15 @@ from ..peerset import gossip_peer_set def do_announce(): "Announce with N peers of each identified transport" + per_transport = 4 + peer_types = {} + count_for_peer = 0 def _announce(announce_peer: 'Peer', our_transport_address: str): + assert our_transport_address try: our_transport_address = our_transport_address.encode('utf-8') + b"\n" except AttributeError: - pass + our_transport_address = our_transport_address + b'\n' sock = announce_peer.get_socket(12) sock.sendall(command_to_byte(GossipCommands.ANNOUNCE)) sock.sendall(our_transport_address) @@ -30,9 +34,7 @@ def do_announce(): while not len(gossip_peer_set): sleep(1) - per_transport = 3 - peer_types = {} - count_for_peer = 0 + for peer in gossip_peer_set: try: count_for_peer = peer_types[peer.__class__] diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index 2525a1c5..17486deb 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -1,5 +1,4 @@ import asyncio -from audioop import add import traceback from typing import TYPE_CHECKING from typing import Set, Tuple @@ -60,7 +59,6 @@ def gossip_server(): match GossipCommands(cmd): case GossipCommands.PING: writer.write(b'PONG') - break case GossipCommands.ANNOUNCE: async def _read_announce(): address = await reader.readuntil(b'\n') @@ -72,6 +70,7 @@ def gossip_server(): 'callback': connect_peer}, threaded=True) writer.write(int(1).to_bytes(1, 'big')) + await writer.drain() await asyncio.wait_for(_read_announce(), 10) case GossipCommands.PEER_EXCHANGE: @@ -111,14 +110,12 @@ def gossip_server(): break await writer.drain() - writer.close() async def main(): server = await asyncio.start_unix_server( peer_connected, gossip_server_socket_file ) - async with server: await server.serve_forever() diff --git a/src/gossip/server/diffuseblocks.py b/src/gossip/server/diffuseblocks.py index ccf43985..93b35383 100644 --- a/src/gossip/server/diffuseblocks.py +++ b/src/gossip/server/diffuseblocks.py @@ -6,9 +6,8 @@ doesn't apply for blocks in the gossip queue that are awaiting descision to fluff or stem """ -from asyncio import IncompleteReadError, wait_for +from asyncio import IncompleteReadError, wait_for, Queue -import queue import traceback from typing import TYPE_CHECKING from time import time @@ -54,7 +53,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): raise ValueError( "Peer's specified time offset skewed too far into the future") - newly_stored_blocks = queue.Queue() + newly_stored_blocks = Queue() def _add_to_queue(bl): newly_stored_blocks.put_nowait(bl) @@ -70,7 +69,6 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): if int.from_bytes(await reader.readexactly(1), 'big') == 0: return - await writer.drain() # write block size writer.write( str(len(block.raw)).zfill(BLOCK_SIZE_LEN).encode('utf-8')) @@ -95,7 +93,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): # Diffuse blocks stored since we started this stream while keep_writing: - await _send_block(newly_stored_blocks.get()) + await _send_block(await newly_stored_blocks.get()) try: keep_writing = bool( int.from_bytes(await reader.readexactly(1), 'big') diff --git a/static-data/default-plugins/tor/announce.py b/static-data/default-plugins/tor/announce.py index fdea5859..edc96b5e 100644 --- a/static-data/default-plugins/tor/announce.py +++ b/static-data/default-plugins/tor/announce.py @@ -1,19 +1,23 @@ import config import logger +from gossip.peerset import gossip_peer_set from getsocks import get_socks from torpeer import TorPeer +MAX_TOR_PEERS = 20 def on_announce_rec(api, data=None): - socks_address, socks_port = get_socks()[0] - announced: str = data['address'] try: announced = announced.decode('utf-8') except AttributeError: pass announced = announced.strip() + if not announced.endswith('.onion'): + return + socks_address, socks_port = get_socks()[0] + if announced.removesuffix('.onion') == config.get( 'tor.transport_address', '').removesuffix('.onion'): @@ -21,8 +25,6 @@ def on_announce_rec(api, data=None): "Received announcement for our own node, which shouldn't happen") return - if not announced.endswith('.onion'): - announced += '.onion' logger.info(f"Peer {announced} announced to us.", terminal=True) diff --git a/static-data/default-plugins/unixtransport/announce.py b/static-data/default-plugins/unixtransport/announce.py index 951d6271..d2721023 100644 --- a/static-data/default-plugins/unixtransport/announce.py +++ b/static-data/default-plugins/unixtransport/announce.py @@ -13,6 +13,9 @@ def on_announce_rec(api, data=None): except AttributeError: pass announced = announced.strip() + if not announced.endswith('.sock'): + return + if announced == gossip_server_socket_file: logger.warn( diff --git a/static-data/default-plugins/unixtransport/bootstrap.py b/static-data/default-plugins/unixtransport/bootstrap.py index 1305998e..a5c3b13e 100644 --- a/static-data/default-plugins/unixtransport/bootstrap.py +++ b/static-data/default-plugins/unixtransport/bootstrap.py @@ -65,4 +65,5 @@ def on_bootstrap(api, data): target=callback_func, args=[UnixPeer(address)], daemon=True).start() + sleep(1) diff --git a/static-data/default-plugins/unixtransport/bootstrap.txt b/static-data/default-plugins/unixtransport/bootstrap.txt index 9f935d1d..66f498a4 100644 --- a/static-data/default-plugins/unixtransport/bootstrap.txt +++ b/static-data/default-plugins/unixtransport/bootstrap.txt @@ -1 +1 @@ -/dev/shm/onionr655223043/gossip-server.sock \ No newline at end of file +/dev/shm/onionr442130151/gossip-server.sock \ No newline at end of file diff --git a/static-data/default-plugins/unixtransport/main.py b/static-data/default-plugins/unixtransport/main.py index a363a4d5..49fbbcc7 100644 --- a/static-data/default-plugins/unixtransport/main.py +++ b/static-data/default-plugins/unixtransport/main.py @@ -61,6 +61,8 @@ def on_init(api, data=None): def on_get_our_transport(api, data=None): callback_func = data['callback'] for_peer = data['peer'] + if for_peer.transport_address == gossip_server_socket_file: + return if data['peer'].__class__ == UnixPeer: callback_func(for_peer, gossip_server_socket_file)