Compare commits

..

No commits in common. "8bd4a4c524c2c83484ab8eafb14825cbc4e186d9" and "90176e43fbebfd88d01dcb3271676613aeb2d00c" have entirely different histories.

7 changed files with 26 additions and 42 deletions

View File

@ -2,7 +2,7 @@ urllib3==1.26.7
requests==2.28.1
PyNaCl==1.5.0
gevent==21.12.0
Flask==2.1.3
Flask==2.1.2
PySocks==1.7.1
stem==1.8.0
deadsimplekv==0.3.2

View File

@ -18,7 +18,7 @@ deadsimplekv==0.3.2
# via -r requirements.in
filenuke==0.0.0
# via -r requirements.in
flask==2.1.3
flask==2.1.2
# via -r requirements.in
gevent==21.12.0
# via -r requirements.in

View File

@ -1,5 +1,4 @@
from collections import deque
from queue import Empty, Queue
from queue import Empty
from time import sleep
from secrets import choice
import traceback
@ -124,10 +123,6 @@ async def stem_out(d_phase: 'DandelionPhase'):
# if we have at least 1 peer,
# do dandelion anyway in non strict mode
# Allow poorly connected networks to communicate faster
for block in gossip_block_queues[1].queue:
gossip_block_queues[0].put_nowait(block)
else:
gossip_block_queues[1].queue = deque()
break
sleep(1)
else:

View File

@ -1,7 +1,5 @@
from asyncio import sleep
from threading import Thread
from queue import Empty
from typing import TYPE_CHECKING
import logger
@ -22,28 +20,22 @@ async def do_stem_stream(
remaining_time = d_phase.remaining_time()
my_phase_id = d_phase.phase_id
while remaining_time > 1 and my_phase_id == d_phase.phase_id:
# Primary client component that communicate's with gossip.server.acceptstem
remaining_time = d_phase.remaining_time()
while remaining_time:
try:
# queues can't block because we're in async
bl = block_queue.get(block=False)
except Empty:
remaining_time = d_phase.remaining_time()
await sleep(1)
else:
break
logger.info("Sending block over dandelion++", terminal=True)
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
def _send_it():
with peer_socket:
with peer_socket:
while remaining_time > 5 and my_phase_id == d_phase.phase_id:
# Primary client component that communicate's with gossip.server.acceptstem
remaining_time = d_phase.remaining_time()
while remaining_time:
try:
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8'))
except AttributeError:
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
peer_socket.sendall(block_size.encode('utf-8'))
peer_socket.sendall(bl.raw)
Thread(target=_send_it, daemon=True, name="stemout block").start()
# queues can't block because we're in async
bl = block_queue.get(block=False)
except Empty:
await sleep(1)
else:
break
logger.info("Sending block over dandelion++", terminal=True)
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
peer_socket.sendall(block_size.encode('utf-8'))
peer_socket.sendall(bl.raw)

View File

@ -65,9 +65,6 @@ def stream_from_peers():
def _stream_from_peer(peer: 'Peer'):
try:
sock = peer.get_socket(CONNECT_TIMEOUT)
except ConnectionRefusedError:
need_socket_lock.release()
return
except Exception:
logger.warn(traceback.format_exc(), terminal=True)
need_socket_lock.release()

View File

@ -36,8 +36,8 @@ async def accept_stem_blocks(
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
read_routine = reader.readexactly(BLOCK_ID_SIZE)
logger.debug(f"Reading block id in stem server", terminal=True)
block_id = await wait_for(read_routine, base_wait_timeout)
block_id = block_id.decode('utf-8')
block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
if not block_id:
break

View File

@ -39,7 +39,7 @@ def block_serialized():
req_data = request.data
block_id = req_data[:BLOCK_ID_SIZE]
block_data = req_data[BLOCK_ID_SIZE:]
#blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
blockqueues.gossip_block_queues[stream_to_use].put(
Block(block_id, block_data, auto_verify=False), block=False)
blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
#blockqueues.gossip_block_queues[stream_to_use].put(
#Block(block_id, block_data, auto_verify=False), block=False)
return "ok"