From 996bff267b2d4e061fe30b3c493d1ac98d264bd9 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Wed, 30 Mar 2022 01:13:42 -0500 Subject: [PATCH] Work on normal gossip diffusion --- src/gossip/client/streamblocks.py | 53 +++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks.py index 68c760e4..ed1fdd27 100644 --- a/src/gossip/client/streamblocks.py +++ b/src/gossip/client/streamblocks.py @@ -6,13 +6,22 @@ doesn't apply for blocks in the gossip queue that are awaiting descision to fluff or stem """ +from threading import Thread, Semaphore from random import SystemRandom +from time import sleep +import traceback if TYPE_CHECKING: from socket import socket from typing import TYPE_CHECKING, List + from gossip.peer import Peer + +from ordered_set import OrderedSet + +import logger from ..peerset import gossip_peer_set +from ..commands import GossipCommands, command_to_byte """ This program is free software: you can redistribute it and/or modify @@ -30,10 +39,50 @@ along with this program. If not, see . """ +MAX_STREAMS = 3 +CONNECT_TIMEOUT = 12 +MAX_TRIED_PEERS = 10_000 + def stream_from_peers(): - peer_stream_sockets: List[socket] = [] + # Pick N peers to stream from + # Create sockets for them + # Spawn thread to stream from the socket + + tried_peers: OrderedSet[Peer] = OrderedSet() sys_rand = SystemRandom() + need_socket_lock = Semaphore(MAX_STREAMS) + + def _stream_from_peer(peer: Peer): + + try: + sock = peer.get_socket(CONNECT_TIMEOUT) + sock.sendall( + command_to_byte(GossipCommands.STREAM_BLOCKS) + ) + except Exception: + logger.warn(traceback.format_exc()) + sock.close() + need_socket_lock.release() + while True: - peers = sys_rand.sample(gossip_peer_set, min(3, len(gossip_peer_set))) + need_socket_lock.acquire() + available_set = gossip_peer_set - tried_peers + peers = sys_rand.sample( + available_set, + min(MAX_STREAMS, len(available_set))) + + tried_peers.update(peers) + if len(tried_peers) >= MAX_TRIED_PEERS: + tried_peers.pop() + + while len(peers): + try: + Thread( + target=_stream_from_peer, + args=[peers.pop()], + daemon=True).start() + except IndexError: + need_socket_lock.release() + break