Fixed stemout blocking and performance issues

This commit is contained in:
Kevin F 2022-07-28 16:10:36 -05:00
parent c663be30f1
commit 8bd4a4c524
5 changed files with 40 additions and 24 deletions

View File

@ -1,4 +1,5 @@
from queue import Empty from collections import deque
from queue import Empty, Queue
from time import sleep from time import sleep
from secrets import choice from secrets import choice
import traceback import traceback
@ -123,6 +124,10 @@ async def stem_out(d_phase: 'DandelionPhase'):
# if we have at least 1 peer, # if we have at least 1 peer,
# do dandelion anyway in non strict mode # do dandelion anyway in non strict mode
# Allow poorly connected networks to communicate faster # Allow poorly connected networks to communicate faster
for block in gossip_block_queues[1].queue:
gossip_block_queues[0].put_nowait(block)
else:
gossip_block_queues[1].queue = deque()
break break
sleep(1) sleep(1)
else: else:

View File

@ -1,5 +1,7 @@
from asyncio import sleep from asyncio import sleep
from threading import Thread
from queue import Empty from queue import Empty
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import logger import logger
@ -20,22 +22,28 @@ async def do_stem_stream(
remaining_time = d_phase.remaining_time() remaining_time = d_phase.remaining_time()
my_phase_id = d_phase.phase_id my_phase_id = d_phase.phase_id
with peer_socket:
while remaining_time > 5 and my_phase_id == d_phase.phase_id: while remaining_time > 1 and my_phase_id == d_phase.phase_id:
# Primary client component that communicate's with gossip.server.acceptstem # Primary client component that communicate's with gossip.server.acceptstem
remaining_time = d_phase.remaining_time() remaining_time = d_phase.remaining_time()
while remaining_time: while remaining_time:
try:
# queues can't block because we're in async
bl = block_queue.get(block=False)
except Empty:
remaining_time = d_phase.remaining_time()
await sleep(1)
else:
break
logger.info("Sending block over dandelion++", terminal=True)
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
def _send_it():
with peer_socket:
try: try:
# queues can't block because we're in async peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8'))
bl = block_queue.get(block=False) except AttributeError:
except Empty: peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
await sleep(1) peer_socket.sendall(block_size.encode('utf-8'))
else: peer_socket.sendall(bl.raw)
break Thread(target=_send_it, daemon=True, name="stemout block").start()
logger.info("Sending block over dandelion++", terminal=True)
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
peer_socket.sendall(block_size.encode('utf-8'))
peer_socket.sendall(bl.raw)

View File

@ -65,6 +65,9 @@ def stream_from_peers():
def _stream_from_peer(peer: 'Peer'): def _stream_from_peer(peer: 'Peer'):
try: try:
sock = peer.get_socket(CONNECT_TIMEOUT) sock = peer.get_socket(CONNECT_TIMEOUT)
except ConnectionRefusedError:
need_socket_lock.release()
return
except Exception: except Exception:
logger.warn(traceback.format_exc(), terminal=True) logger.warn(traceback.format_exc(), terminal=True)
need_socket_lock.release() need_socket_lock.release()

View File

@ -36,8 +36,8 @@ async def accept_stem_blocks(
for _ in range(MAX_STEM_BLOCKS_PER_STREAM): for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
read_routine = reader.readexactly(BLOCK_ID_SIZE) read_routine = reader.readexactly(BLOCK_ID_SIZE)
logger.debug(f"Reading block id in stem server", terminal=True) logger.debug(f"Reading block id in stem server", terminal=True)
block_id = ( block_id = await wait_for(read_routine, base_wait_timeout)
await wait_for(read_routine, base_wait_timeout)).decode('utf-8') block_id = block_id.decode('utf-8')
if not block_id: if not block_id:
break break

View File

@ -39,7 +39,7 @@ def block_serialized():
req_data = request.data req_data = request.data
block_id = req_data[:BLOCK_ID_SIZE] block_id = req_data[:BLOCK_ID_SIZE]
block_data = req_data[BLOCK_ID_SIZE:] block_data = req_data[BLOCK_ID_SIZE:]
blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False)) #blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
#blockqueues.gossip_block_queues[stream_to_use].put( blockqueues.gossip_block_queues[stream_to_use].put(
#Block(block_id, block_data, auto_verify=False), block=False) Block(block_id, block_data, auto_verify=False), block=False)
return "ok" return "ok"