Compare commits
2 Commits
90176e43fb
...
8bd4a4c524
Author | SHA1 | Date | |
---|---|---|---|
|
8bd4a4c524 | ||
|
c663be30f1 |
@ -2,7 +2,7 @@ urllib3==1.26.7
|
|||||||
requests==2.28.1
|
requests==2.28.1
|
||||||
PyNaCl==1.5.0
|
PyNaCl==1.5.0
|
||||||
gevent==21.12.0
|
gevent==21.12.0
|
||||||
Flask==2.1.2
|
Flask==2.1.3
|
||||||
PySocks==1.7.1
|
PySocks==1.7.1
|
||||||
stem==1.8.0
|
stem==1.8.0
|
||||||
deadsimplekv==0.3.2
|
deadsimplekv==0.3.2
|
||||||
|
@ -18,7 +18,7 @@ deadsimplekv==0.3.2
|
|||||||
# via -r requirements.in
|
# via -r requirements.in
|
||||||
filenuke==0.0.0
|
filenuke==0.0.0
|
||||||
# via -r requirements.in
|
# via -r requirements.in
|
||||||
flask==2.1.2
|
flask==2.1.3
|
||||||
# via -r requirements.in
|
# via -r requirements.in
|
||||||
gevent==21.12.0
|
gevent==21.12.0
|
||||||
# via -r requirements.in
|
# via -r requirements.in
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
from queue import Empty
|
from collections import deque
|
||||||
|
from queue import Empty, Queue
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from secrets import choice
|
from secrets import choice
|
||||||
import traceback
|
import traceback
|
||||||
@ -123,6 +124,10 @@ async def stem_out(d_phase: 'DandelionPhase'):
|
|||||||
# if we have at least 1 peer,
|
# if we have at least 1 peer,
|
||||||
# do dandelion anyway in non strict mode
|
# do dandelion anyway in non strict mode
|
||||||
# Allow poorly connected networks to communicate faster
|
# Allow poorly connected networks to communicate faster
|
||||||
|
for block in gossip_block_queues[1].queue:
|
||||||
|
gossip_block_queues[0].put_nowait(block)
|
||||||
|
else:
|
||||||
|
gossip_block_queues[1].queue = deque()
|
||||||
break
|
break
|
||||||
sleep(1)
|
sleep(1)
|
||||||
else:
|
else:
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
from asyncio import sleep
|
from asyncio import sleep
|
||||||
|
from threading import Thread
|
||||||
from queue import Empty
|
from queue import Empty
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import logger
|
import logger
|
||||||
@ -20,22 +22,28 @@ async def do_stem_stream(
|
|||||||
remaining_time = d_phase.remaining_time()
|
remaining_time = d_phase.remaining_time()
|
||||||
my_phase_id = d_phase.phase_id
|
my_phase_id = d_phase.phase_id
|
||||||
|
|
||||||
with peer_socket:
|
|
||||||
while remaining_time > 5 and my_phase_id == d_phase.phase_id:
|
while remaining_time > 1 and my_phase_id == d_phase.phase_id:
|
||||||
# Primary client component that communicate's with gossip.server.acceptstem
|
# Primary client component that communicate's with gossip.server.acceptstem
|
||||||
remaining_time = d_phase.remaining_time()
|
remaining_time = d_phase.remaining_time()
|
||||||
while remaining_time:
|
while remaining_time:
|
||||||
|
try:
|
||||||
|
# queues can't block because we're in async
|
||||||
|
bl = block_queue.get(block=False)
|
||||||
|
except Empty:
|
||||||
|
remaining_time = d_phase.remaining_time()
|
||||||
|
await sleep(1)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
logger.info("Sending block over dandelion++", terminal=True)
|
||||||
|
|
||||||
|
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
|
||||||
|
def _send_it():
|
||||||
|
with peer_socket:
|
||||||
try:
|
try:
|
||||||
# queues can't block because we're in async
|
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8'))
|
||||||
bl = block_queue.get(block=False)
|
except AttributeError:
|
||||||
except Empty:
|
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
|
||||||
await sleep(1)
|
peer_socket.sendall(block_size.encode('utf-8'))
|
||||||
else:
|
peer_socket.sendall(bl.raw)
|
||||||
break
|
Thread(target=_send_it, daemon=True, name="stemout block").start()
|
||||||
logger.info("Sending block over dandelion++", terminal=True)
|
|
||||||
|
|
||||||
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
|
|
||||||
|
|
||||||
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
|
|
||||||
peer_socket.sendall(block_size.encode('utf-8'))
|
|
||||||
peer_socket.sendall(bl.raw)
|
|
||||||
|
@ -65,6 +65,9 @@ def stream_from_peers():
|
|||||||
def _stream_from_peer(peer: 'Peer'):
|
def _stream_from_peer(peer: 'Peer'):
|
||||||
try:
|
try:
|
||||||
sock = peer.get_socket(CONNECT_TIMEOUT)
|
sock = peer.get_socket(CONNECT_TIMEOUT)
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
need_socket_lock.release()
|
||||||
|
return
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warn(traceback.format_exc(), terminal=True)
|
logger.warn(traceback.format_exc(), terminal=True)
|
||||||
need_socket_lock.release()
|
need_socket_lock.release()
|
||||||
|
@ -36,8 +36,8 @@ async def accept_stem_blocks(
|
|||||||
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
|
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
|
||||||
read_routine = reader.readexactly(BLOCK_ID_SIZE)
|
read_routine = reader.readexactly(BLOCK_ID_SIZE)
|
||||||
logger.debug(f"Reading block id in stem server", terminal=True)
|
logger.debug(f"Reading block id in stem server", terminal=True)
|
||||||
block_id = (
|
block_id = await wait_for(read_routine, base_wait_timeout)
|
||||||
await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
|
block_id = block_id.decode('utf-8')
|
||||||
if not block_id:
|
if not block_id:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ def block_serialized():
|
|||||||
req_data = request.data
|
req_data = request.data
|
||||||
block_id = req_data[:BLOCK_ID_SIZE]
|
block_id = req_data[:BLOCK_ID_SIZE]
|
||||||
block_data = req_data[BLOCK_ID_SIZE:]
|
block_data = req_data[BLOCK_ID_SIZE:]
|
||||||
blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
|
#blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
|
||||||
#blockqueues.gossip_block_queues[stream_to_use].put(
|
blockqueues.gossip_block_queues[stream_to_use].put(
|
||||||
#Block(block_id, block_data, auto_verify=False), block=False)
|
Block(block_id, block_data, auto_verify=False), block=False)
|
||||||
return "ok"
|
return "ok"
|
||||||
|
Loading…
Reference in New Issue
Block a user