Compare commits

...

2 Commits

Author SHA1 Message Date
Kevin F
8bd4a4c524 Fixed stemout blocking and performance issues 2022-07-28 16:10:36 -05:00
Kevin F
c663be30f1 bump flask 2022-07-28 16:09:31 -05:00
7 changed files with 42 additions and 26 deletions

View File

@ -2,7 +2,7 @@ urllib3==1.26.7
requests==2.28.1
PyNaCl==1.5.0
gevent==21.12.0
Flask==2.1.2
Flask==2.1.3
PySocks==1.7.1
stem==1.8.0
deadsimplekv==0.3.2

View File

@ -18,7 +18,7 @@ deadsimplekv==0.3.2
# via -r requirements.in
filenuke==0.0.0
# via -r requirements.in
flask==2.1.2
flask==2.1.3
# via -r requirements.in
gevent==21.12.0
# via -r requirements.in

View File

@ -1,4 +1,5 @@
from queue import Empty
from collections import deque
from queue import Empty, Queue
from time import sleep
from secrets import choice
import traceback
@ -123,6 +124,10 @@ async def stem_out(d_phase: 'DandelionPhase'):
# if we have at least 1 peer,
# do dandelion anyway in non strict mode
# Allow poorly connected networks to communicate faster
for block in gossip_block_queues[1].queue:
gossip_block_queues[0].put_nowait(block)
else:
gossip_block_queues[1].queue = deque()
break
sleep(1)
else:

View File

@ -1,5 +1,7 @@
from asyncio import sleep
from threading import Thread
from queue import Empty
from typing import TYPE_CHECKING
import logger
@ -20,8 +22,8 @@ async def do_stem_stream(
remaining_time = d_phase.remaining_time()
my_phase_id = d_phase.phase_id
with peer_socket:
while remaining_time > 5 and my_phase_id == d_phase.phase_id:
while remaining_time > 1 and my_phase_id == d_phase.phase_id:
# Primary client component that communicate's with gossip.server.acceptstem
remaining_time = d_phase.remaining_time()
while remaining_time:
@ -29,13 +31,19 @@ async def do_stem_stream(
# queues can't block because we're in async
bl = block_queue.get(block=False)
except Empty:
remaining_time = d_phase.remaining_time()
await sleep(1)
else:
break
logger.info("Sending block over dandelion++", terminal=True)
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
def _send_it():
with peer_socket:
try:
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8'))
except AttributeError:
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
peer_socket.sendall(block_size.encode('utf-8'))
peer_socket.sendall(bl.raw)
Thread(target=_send_it, daemon=True, name="stemout block").start()

View File

@ -65,6 +65,9 @@ def stream_from_peers():
def _stream_from_peer(peer: 'Peer'):
try:
sock = peer.get_socket(CONNECT_TIMEOUT)
except ConnectionRefusedError:
need_socket_lock.release()
return
except Exception:
logger.warn(traceback.format_exc(), terminal=True)
need_socket_lock.release()

View File

@ -36,8 +36,8 @@ async def accept_stem_blocks(
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
read_routine = reader.readexactly(BLOCK_ID_SIZE)
logger.debug(f"Reading block id in stem server", terminal=True)
block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
block_id = await wait_for(read_routine, base_wait_timeout)
block_id = block_id.decode('utf-8')
if not block_id:
break

View File

@ -39,7 +39,7 @@ def block_serialized():
req_data = request.data
block_id = req_data[:BLOCK_ID_SIZE]
block_data = req_data[BLOCK_ID_SIZE:]
blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
#blockqueues.gossip_block_queues[stream_to_use].put(
#Block(block_id, block_data, auto_verify=False), block=False)
#blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
blockqueues.gossip_block_queues[stream_to_use].put(
Block(block_id, block_data, auto_verify=False), block=False)
return "ok"