diff --git a/src/gossip/client/announce.py b/src/gossip/client/announce.py index 3b54e889..8aaab390 100644 --- a/src/gossip/client/announce.py +++ b/src/gossip/client/announce.py @@ -1,9 +1,11 @@ +from time import sleep from typing import TYPE_CHECKING if TYPE_CHECKING: from .. import Peer +import logger from ..commands import GossipCommands, command_to_byte import onionrplugins @@ -11,27 +13,38 @@ import onionrplugins def do_announce(peer_set): "Announce with N peers of each identified transport" def _announce(announce_peer: 'Peer', our_transport_address: str): + try: + our_transport_address = our_transport_address.encode('utf-8') + except AttributeError: + pass sock = announce_peer.get_socket() sock.send( command_to_byte(GossipCommands.ANNOUNCE) + our_transport_address) - if sock.dup + if int.from_bytes(sock.recv(1), 'big') != 1: + logger.warn( + f"Could not announce with {announce_peer.transport_address}") + sock.close() + while not len(peer_set): + sleep(1) per_transport = 3 peer_types = {} count_for_peer = 0 for peer in peer_set: try: - count_for_peer = peer_types[peer.__name__] + count_for_peer = peer_types[peer.__class__] except KeyError: - peer_types[peer.__name__] = 0 - continue + count_for_peer = peer_types[peer.__class__] = 0 if count_for_peer == per_transport: continue + # Plugin for the transport associated with the peer will call _announce + # with the peer and *our* transport address onionrplugins.events.event( 'get_our_transport', - data={'callback': _announce, 'peer': peer}) + data={'callback': _announce, 'peer': peer}, + threaded=True) - peer_types[peer.__name__] += 1 + peer_types[peer.__class__] += 1 diff --git a/src/gossip/server.py b/src/gossip/server.py index d01a989c..acf48a6b 100644 --- a/src/gossip/server.py +++ b/src/gossip/server.py @@ -4,6 +4,8 @@ from typing import Set from queue import Queue +from onionrplugins import onionrevents + if TYPE_CHECKING: from onionrblocks import Block from peer import Peer @@ -46,6 +48,15 @@ def gossip_server( writer.write(b'PONG') case GossipCommands.CLOSE: writer.close() + case GossipCommands.ANNOUNCE: + async def _read_announce(): + address = await reader.read(56) + onionrevents.event( + 'announce_rec', + data={'peer_set': peer_set, 'address': address}, + threaded=False) + writer.write(int(1).to_bytes(1, 'big')) + await asyncio.wait_for(_read_announce(), 10) await writer.drain() diff --git a/static-data/default-plugins/tor/main.py b/static-data/default-plugins/tor/main.py index a1131561..441517be 100644 --- a/static-data/default-plugins/tor/main.py +++ b/static-data/default-plugins/tor/main.py @@ -63,6 +63,17 @@ def on_init(api, data=None): f"Tor Transport Plugin v{PLUGIN_VERSION} enabled", terminal=True) +def on_get_our_transport(api, data=None): + callback_func = data['callback'] + for_peer = data['peer'] + if data['peer'].__class__ == TorPeer: + callback_func(for_peer, config.get('tor.transport_address')) + + +def on_announce_rec(api, data=None): + print("got announce rec event") + + def on_bootstrap(api, data: Set[Peer] = None): bootstrap_nodes: Set[str] peers = data @@ -89,7 +100,7 @@ def on_bootstrap(api, data: Set[Peer] = None): tor_peer = TorPeer(socks_address, socks_port, transport_address) try: tor_peer.get_socket() - except Exception as e: + except Exception: logger.warn( f"Could not connnect to Tor peer {transport_address} " + "see logs for more info",