Compare commits

...

3 Commits

7 changed files with 121 additions and 10 deletions

View File

@ -41,7 +41,7 @@ def get_blocks_after_timestamp(
def has_block(block_hash): def has_block(block_hash):
return block_hash in db.list_keys() return block_hash in db.list_keys(block_db_path)
def get_block(block_hash) -> Block: def get_block(block_hash) -> Block:

View File

@ -24,7 +24,7 @@ def _do_timeout(func, *args):
return res return res
def set_if_new(db_path, key, value): def set_if_new(db_path, key, value) -> bool:
def _set(key, value): def _set(key, value):
with dbm.open(db_path, "c") as my_db: with dbm.open(db_path, "c") as my_db:
try: try:
@ -33,7 +33,11 @@ def set_if_new(db_path, key, value):
my_db[key] = value my_db[key] = value
else: else:
raise DuplicateKey raise DuplicateKey
_do_timeout(_set, key, value) try:
_do_timeout(_set, key, value)
except DuplicateKey:
return False
return True
def set(db_path, key, value): def set(db_path, key, value):

View File

@ -34,6 +34,7 @@ from .announce import do_announce
from .dandelionstem import stem_out from .dandelionstem import stem_out
from .peerexchange import get_new_peers from .peerexchange import get_new_peers
from ..peerset import gossip_peer_set from ..peerset import gossip_peer_set
from .streamblocks import stream_from_peers
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify

View File

@ -52,7 +52,7 @@ def stream_from_peers():
# Create sockets for them # Create sockets for them
# Spawn thread to stream from the socket # Spawn thread to stream from the socket
tried_peers: OrderedSet[Peer] = OrderedSet() tried_peers: OrderedSet['Peer'] = OrderedSet()
sys_rand = SystemRandom() sys_rand = SystemRandom()
@ -60,10 +60,14 @@ def stream_from_peers():
offset = 0 offset = 0
def _stream_from_peer(peer: Peer): def _stream_from_peer(peer: 'Peer'):
try: try:
sock = peer.get_socket(CONNECT_TIMEOUT) sock = peer.get_socket(CONNECT_TIMEOUT)
except Exception:
logger.warn(traceback.format_exc(), terminal=True)
need_socket_lock.release()
return
try:
sock.sendall( sock.sendall(
command_to_byte(GossipCommands.STREAM_BLOCKS) command_to_byte(GossipCommands.STREAM_BLOCKS)
) )
@ -97,9 +101,8 @@ def stream_from_peers():
# Tell them to keep streaming # Tell them to keep streaming
sock.sendall(int(1).to_bytes(1, 'big')) sock.sendall(int(1).to_bytes(1, 'big'))
sock.close()
except Exception: except Exception:
logger.warn(traceback.format_exc()) logger.warn(traceback.format_exc(), terminal=True)
finally: finally:
sock.close() sock.close()
need_socket_lock.release() need_socket_lock.release()

View File

@ -0,0 +1,105 @@
import os, uuid
from sqlite3 import Time
import socket
from queue import Queue
from time import sleep
import secrets
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
from threading import Thread
import asyncio
import unittest
import sys
sys.path.append(".")
sys.path.append("src/")
from unittest.mock import patch
from ordered_set import OrderedSet
from gossip import peerset
import onionrblocks
import blockdb
from gossip.peerset import gossip_peer_set
from gossip.client import stream_from_peers
from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
MAX_PEERS = 10
TRANSPORT_SIZE_BYTES = 64
server_file = TEST_DIR + 'test_serv.sock'
def _server():
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.bind(server_file)
s.listen(1)
conn, _ = s.accept()
with conn:
while True:
conn.recv(1)
conn.recv(BLOCK_STREAM_OFFSET_DIGITS)
for bl in test_blocks:
conn.sendall(bl.id)
conn.recv(1)
conn.sendall(str(len(bl.raw)).encode('utf-8').zfill(BLOCK_MAX_SIZE_LEN))
conn.sendall(bl.raw)
conn.recv(1)
def gen_random_block():
return onionrblocks.create_anonvdf_block(os.urandom(12), b'txt', 3600)
test_blocks = []
test_block_count = 5
for i in range(test_block_count):
test_blocks.append(gen_random_block())
Thread(target=_server, daemon=True).start()
class MockPeer:
def __init__(self):
self.transport_address = secrets.token_hex(16)
def __hash__(self):
return hash(self.transport_address)
def get_socket(self, timeout):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(server_file)
return s
class OnionrGossipClientDiffuse(unittest.TestCase):
def test_client_stream(self):
gossip_peer_set.add(MockPeer())
Thread(target=stream_from_peers, daemon=True).start()
c = 0
while c < 60:
c += 1
if len(list(blockdb.get_blocks_after_timestamp(0))) == test_block_count:
break
sleep(1)
else:
raise TimeoutError("Did not stream blocks in time")
unittest.main()

View File

@ -1,4 +1,3 @@
from ast import Assert
import os, uuid import os, uuid
from unittest.mock import Mock from unittest.mock import Mock
from time import sleep from time import sleep

View File

@ -1,4 +1,3 @@
from ast import Assert
import os, uuid import os, uuid
from unittest.mock import Mock from unittest.mock import Mock
from time import sleep from time import sleep