From 8bd4a4c524c2c83484ab8eafb14825cbc4e186d9 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Thu, 28 Jul 2022 16:10:36 -0500 Subject: [PATCH] Fixed stemout blocking and performance issues --- src/gossip/client/dandelionstem/__init__.py | 7 ++- src/gossip/client/dandelionstem/stemstream.py | 44 +++++++++++-------- src/gossip/client/streamblocks/streamfrom.py | 3 ++ src/gossip/server/acceptstem.py | 4 +- src/httpapi/addblock/__init__.py | 6 +-- 5 files changed, 40 insertions(+), 24 deletions(-) diff --git a/src/gossip/client/dandelionstem/__init__.py b/src/gossip/client/dandelionstem/__init__.py index 0313e950..d59b5235 100644 --- a/src/gossip/client/dandelionstem/__init__.py +++ b/src/gossip/client/dandelionstem/__init__.py @@ -1,4 +1,5 @@ -from queue import Empty +from collections import deque +from queue import Empty, Queue from time import sleep from secrets import choice import traceback @@ -123,6 +124,10 @@ async def stem_out(d_phase: 'DandelionPhase'): # if we have at least 1 peer, # do dandelion anyway in non strict mode # Allow poorly connected networks to communicate faster + for block in gossip_block_queues[1].queue: + gossip_block_queues[0].put_nowait(block) + else: + gossip_block_queues[1].queue = deque() break sleep(1) else: diff --git a/src/gossip/client/dandelionstem/stemstream.py b/src/gossip/client/dandelionstem/stemstream.py index 03d53f71..ef0a22d7 100644 --- a/src/gossip/client/dandelionstem/stemstream.py +++ b/src/gossip/client/dandelionstem/stemstream.py @@ -1,5 +1,7 @@ from asyncio import sleep +from threading import Thread from queue import Empty + from typing import TYPE_CHECKING import logger @@ -20,22 +22,28 @@ async def do_stem_stream( remaining_time = d_phase.remaining_time() my_phase_id = d_phase.phase_id - with peer_socket: - while remaining_time > 5 and my_phase_id == d_phase.phase_id: - # Primary client component that communicate's with gossip.server.acceptstem - remaining_time = d_phase.remaining_time() - while remaining_time: + + while remaining_time > 1 and my_phase_id == d_phase.phase_id: + # Primary client component that communicate's with gossip.server.acceptstem + remaining_time = d_phase.remaining_time() + while remaining_time: + try: + # queues can't block because we're in async + bl = block_queue.get(block=False) + except Empty: + remaining_time = d_phase.remaining_time() + await sleep(1) + else: + break + logger.info("Sending block over dandelion++", terminal=True) + + block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN) + def _send_it(): + with peer_socket: try: - # queues can't block because we're in async - bl = block_queue.get(block=False) - except Empty: - await sleep(1) - else: - break - logger.info("Sending block over dandelion++", terminal=True) - - block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN) - - peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE)) - peer_socket.sendall(block_size.encode('utf-8')) - peer_socket.sendall(bl.raw) + peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8')) + except AttributeError: + peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE)) + peer_socket.sendall(block_size.encode('utf-8')) + peer_socket.sendall(bl.raw) + Thread(target=_send_it, daemon=True, name="stemout block").start() diff --git a/src/gossip/client/streamblocks/streamfrom.py b/src/gossip/client/streamblocks/streamfrom.py index 6c5f5039..76abf4cb 100644 --- a/src/gossip/client/streamblocks/streamfrom.py +++ b/src/gossip/client/streamblocks/streamfrom.py @@ -65,6 +65,9 @@ def stream_from_peers(): def _stream_from_peer(peer: 'Peer'): try: sock = peer.get_socket(CONNECT_TIMEOUT) + except ConnectionRefusedError: + need_socket_lock.release() + return except Exception: logger.warn(traceback.format_exc(), terminal=True) need_socket_lock.release() diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index 06e4569e..42aa00f6 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -36,8 +36,8 @@ async def accept_stem_blocks( for _ in range(MAX_STEM_BLOCKS_PER_STREAM): read_routine = reader.readexactly(BLOCK_ID_SIZE) logger.debug(f"Reading block id in stem server", terminal=True) - block_id = ( - await wait_for(read_routine, base_wait_timeout)).decode('utf-8') + block_id = await wait_for(read_routine, base_wait_timeout) + block_id = block_id.decode('utf-8') if not block_id: break diff --git a/src/httpapi/addblock/__init__.py b/src/httpapi/addblock/__init__.py index 0eabefdc..98eb355d 100644 --- a/src/httpapi/addblock/__init__.py +++ b/src/httpapi/addblock/__init__.py @@ -39,7 +39,7 @@ def block_serialized(): req_data = request.data block_id = req_data[:BLOCK_ID_SIZE] block_data = req_data[BLOCK_ID_SIZE:] - blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False)) - #blockqueues.gossip_block_queues[stream_to_use].put( - #Block(block_id, block_data, auto_verify=False), block=False) + #blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False)) + blockqueues.gossip_block_queues[stream_to_use].put( + Block(block_id, block_data, auto_verify=False), block=False) return "ok"