From d6b1c98cbd7aa890f282207c8cf0200439c0848b Mon Sep 17 00:00:00 2001 From: Kevin F Date: Tue, 22 Feb 2022 14:34:19 -0600 Subject: [PATCH] Ping loop while brainstorming. --- src/gossip/client.py | 36 ++++++++++++++++++++-- src/gossip/commands.py | 5 +-- src/gossip/server.py | 13 ++++++-- static-data/default-plugins/tor/main.py | 2 +- static-data/default-plugins/tor/torpeer.py | 2 +- 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/src/gossip/client.py b/src/gossip/client.py index a26f5a4c..1185fec3 100644 --- a/src/gossip/client.py +++ b/src/gossip/client.py @@ -2,8 +2,10 @@ Dandelion ++ Gossip client logic """ +import traceback from typing import TYPE_CHECKING from typing import Set +from time import sleep from queue import Queue @@ -11,7 +13,9 @@ if TYPE_CHECKING: from onionrblocks import Block from .peer import Peer +import logger import onionrplugins +from .commands import GossipCommands """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -28,9 +32,37 @@ along with this program. If not, see . """ - def gossip_client( peer_set: Set['Peer'], block_queue: Queue['Block'], dandelion_seed: bytes): - return + """ + Gossip client does the following: + + Stem new blocks we created or downloaded *during stem phase* + Stream new blocks + """ + + remove_peers = [] + + while True: + remove_peers.clear() + for peer in peer_set: + try: + sock = peer.get_socket() + except Exception: + logger.warn("Lost connection to " + peer.transport_address) + logger.warn(traceback.format_exc()) + remove_peers.append(peer) + break + sock.sendall(int(GossipCommands.PING).to_bytes(1, 'big')) + if sock.recv(10) == b"PONG": + print("Got ping at peer") + while len(remove_peers): + try: + peer_set.remove(remove_peers.pop()) + except KeyError: + pass + + sleep(30) + return diff --git a/src/gossip/commands.py b/src/gossip/commands.py index b7e8a986..e2b8e839 100644 --- a/src/gossip/commands.py +++ b/src/gossip/commands.py @@ -1,9 +1,10 @@ -from enum import Enum, auto +from enum import IntEnum, auto -class GossipCommands(Enum): +class GossipCommands(IntEnum): PING = 1 ANNOUNCE = auto() PEER_EXCHANGE = auto() STREAM_BLOCKS = auto() PUT_BLOCKS = auto() + CLOSE = auto() diff --git a/src/gossip/server.py b/src/gossip/server.py index bcbb8c50..d01a989c 100644 --- a/src/gossip/server.py +++ b/src/gossip/server.py @@ -33,10 +33,19 @@ def gossip_server( async def peer_connected(reader, writer): while True: - cmd = asyncio.wait_for(await reader.read(1), 30) - match cmd: + try: + cmd = await asyncio.wait_for(reader.read(1), 60) + except asyncio.exceptions.CancelledError: + writer.close() + + cmd = int.from_bytes(cmd, 'big') + if cmd == b'' or cmd == 0: + continue + match GossipCommands(cmd): case GossipCommands.PING: writer.write(b'PONG') + case GossipCommands.CLOSE: + writer.close() await writer.drain() diff --git a/static-data/default-plugins/tor/main.py b/static-data/default-plugins/tor/main.py index a29bb35a..a1131561 100644 --- a/static-data/default-plugins/tor/main.py +++ b/static-data/default-plugins/tor/main.py @@ -77,7 +77,7 @@ def on_bootstrap(api, data: Set[Peer] = None): sleep(0.1) socks_address, socks_port = get_socks()[0] - sleep(10) + sleep(5) for transport_address in bootstrap_nodes: config.reload() diff --git a/static-data/default-plugins/tor/torpeer.py b/static-data/default-plugins/tor/torpeer.py index 37726a35..e76377b5 100644 --- a/static-data/default-plugins/tor/torpeer.py +++ b/static-data/default-plugins/tor/torpeer.py @@ -4,7 +4,7 @@ import socks class TorPeer: def __init__(self, socks_host, socks_port, onion_address): - self.onion_address = onion_address + self.transport_address = self.onion_address = onion_address self.socks_host = socks_host self.socks_port = socks_port