Compare commits
3 Commits
4edbde82cc
...
50f0cfa6f4
Author | SHA1 | Date | |
---|---|---|---|
|
50f0cfa6f4 | ||
|
91df7507f4 | ||
|
c065be0145 |
@ -41,7 +41,7 @@ def get_blocks_after_timestamp(
|
||||
|
||||
|
||||
def has_block(block_hash):
|
||||
return block_hash in db.list_keys()
|
||||
return block_hash in db.list_keys(block_db_path)
|
||||
|
||||
|
||||
def get_block(block_hash) -> Block:
|
||||
|
@ -24,7 +24,7 @@ def _do_timeout(func, *args):
|
||||
return res
|
||||
|
||||
|
||||
def set_if_new(db_path, key, value):
|
||||
def set_if_new(db_path, key, value) -> bool:
|
||||
def _set(key, value):
|
||||
with dbm.open(db_path, "c") as my_db:
|
||||
try:
|
||||
@ -33,7 +33,11 @@ def set_if_new(db_path, key, value):
|
||||
my_db[key] = value
|
||||
else:
|
||||
raise DuplicateKey
|
||||
_do_timeout(_set, key, value)
|
||||
try:
|
||||
_do_timeout(_set, key, value)
|
||||
except DuplicateKey:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def set(db_path, key, value):
|
||||
|
@ -34,6 +34,7 @@ from .announce import do_announce
|
||||
from .dandelionstem import stem_out
|
||||
from .peerexchange import get_new_peers
|
||||
from ..peerset import gossip_peer_set
|
||||
from .streamblocks import stream_from_peers
|
||||
|
||||
"""
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
|
@ -52,7 +52,7 @@ def stream_from_peers():
|
||||
# Create sockets for them
|
||||
# Spawn thread to stream from the socket
|
||||
|
||||
tried_peers: OrderedSet[Peer] = OrderedSet()
|
||||
tried_peers: OrderedSet['Peer'] = OrderedSet()
|
||||
|
||||
sys_rand = SystemRandom()
|
||||
|
||||
@ -60,10 +60,14 @@ def stream_from_peers():
|
||||
offset = 0
|
||||
|
||||
|
||||
def _stream_from_peer(peer: Peer):
|
||||
|
||||
def _stream_from_peer(peer: 'Peer'):
|
||||
try:
|
||||
sock = peer.get_socket(CONNECT_TIMEOUT)
|
||||
except Exception:
|
||||
logger.warn(traceback.format_exc(), terminal=True)
|
||||
need_socket_lock.release()
|
||||
return
|
||||
try:
|
||||
sock.sendall(
|
||||
command_to_byte(GossipCommands.STREAM_BLOCKS)
|
||||
)
|
||||
@ -97,9 +101,8 @@ def stream_from_peers():
|
||||
# Tell them to keep streaming
|
||||
sock.sendall(int(1).to_bytes(1, 'big'))
|
||||
|
||||
sock.close()
|
||||
except Exception:
|
||||
logger.warn(traceback.format_exc())
|
||||
logger.warn(traceback.format_exc(), terminal=True)
|
||||
finally:
|
||||
sock.close()
|
||||
need_socket_lock.release()
|
||||
|
105
tests/gossip-unittests/test_client_stream.py
Normal file
105
tests/gossip-unittests/test_client_stream.py
Normal file
@ -0,0 +1,105 @@
|
||||
import os, uuid
|
||||
from sqlite3 import Time
|
||||
import socket
|
||||
from queue import Queue
|
||||
from time import sleep
|
||||
import secrets
|
||||
|
||||
|
||||
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
|
||||
from threading import Thread
|
||||
import asyncio
|
||||
import unittest
|
||||
import sys
|
||||
sys.path.append(".")
|
||||
sys.path.append("src/")
|
||||
from unittest.mock import patch
|
||||
|
||||
from ordered_set import OrderedSet
|
||||
from gossip import peerset
|
||||
|
||||
|
||||
import onionrblocks
|
||||
|
||||
import blockdb
|
||||
|
||||
from gossip.peerset import gossip_peer_set
|
||||
from gossip.client import stream_from_peers
|
||||
|
||||
|
||||
|
||||
from filepaths import gossip_server_socket_file
|
||||
|
||||
|
||||
BLOCK_MAX_SIZE = 1024 * 2000
|
||||
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
|
||||
BLOCK_ID_SIZE = 128
|
||||
BLOCK_STREAM_OFFSET_DIGITS = 8
|
||||
MAX_PEERS = 10
|
||||
TRANSPORT_SIZE_BYTES = 64
|
||||
|
||||
server_file = TEST_DIR + 'test_serv.sock'
|
||||
|
||||
def _server():
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
||||
s.bind(server_file)
|
||||
s.listen(1)
|
||||
conn, _ = s.accept()
|
||||
with conn:
|
||||
while True:
|
||||
conn.recv(1)
|
||||
conn.recv(BLOCK_STREAM_OFFSET_DIGITS)
|
||||
for bl in test_blocks:
|
||||
conn.sendall(bl.id)
|
||||
conn.recv(1)
|
||||
conn.sendall(str(len(bl.raw)).encode('utf-8').zfill(BLOCK_MAX_SIZE_LEN))
|
||||
conn.sendall(bl.raw)
|
||||
conn.recv(1)
|
||||
|
||||
def gen_random_block():
|
||||
return onionrblocks.create_anonvdf_block(os.urandom(12), b'txt', 3600)
|
||||
|
||||
test_blocks = []
|
||||
|
||||
test_block_count = 5
|
||||
for i in range(test_block_count):
|
||||
test_blocks.append(gen_random_block())
|
||||
|
||||
|
||||
Thread(target=_server, daemon=True).start()
|
||||
|
||||
class MockPeer:
|
||||
def __init__(self):
|
||||
self.transport_address = secrets.token_hex(16)
|
||||
def __hash__(self):
|
||||
return hash(self.transport_address)
|
||||
|
||||
|
||||
def get_socket(self, timeout):
|
||||
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
s.connect(server_file)
|
||||
return s
|
||||
|
||||
|
||||
|
||||
class OnionrGossipClientDiffuse(unittest.TestCase):
|
||||
|
||||
|
||||
def test_client_stream(self):
|
||||
gossip_peer_set.add(MockPeer())
|
||||
Thread(target=stream_from_peers, daemon=True).start()
|
||||
|
||||
c = 0
|
||||
while c < 60:
|
||||
c += 1
|
||||
if len(list(blockdb.get_blocks_after_timestamp(0))) == test_block_count:
|
||||
break
|
||||
sleep(1)
|
||||
else:
|
||||
raise TimeoutError("Did not stream blocks in time")
|
||||
|
||||
|
||||
unittest.main()
|
@ -1,4 +1,3 @@
|
||||
from ast import Assert
|
||||
import os, uuid
|
||||
from unittest.mock import Mock
|
||||
from time import sleep
|
||||
|
@ -1,4 +1,3 @@
|
||||
from ast import Assert
|
||||
import os, uuid
|
||||
from unittest.mock import Mock
|
||||
from time import sleep
|
||||
|
Loading…
Reference in New Issue
Block a user