diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index 4860e62f..fc80a324 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -34,6 +34,7 @@ from .announce import do_announce from .dandelionstem import stem_out from .peerexchange import get_new_peers from ..peerset import gossip_peer_set +from .streamblocks import stream_from_peers """ This program is free software: you can redistribute it and/or modify diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks.py index 0932cd39..192f7280 100644 --- a/src/gossip/client/streamblocks.py +++ b/src/gossip/client/streamblocks.py @@ -52,7 +52,7 @@ def stream_from_peers(): # Create sockets for them # Spawn thread to stream from the socket - tried_peers: OrderedSet[Peer] = OrderedSet() + tried_peers: OrderedSet['Peer'] = OrderedSet() sys_rand = SystemRandom() @@ -60,10 +60,14 @@ def stream_from_peers(): offset = 0 - def _stream_from_peer(peer: Peer): - + def _stream_from_peer(peer: 'Peer'): try: sock = peer.get_socket(CONNECT_TIMEOUT) + except Exception: + logger.warn(traceback.format_exc(), terminal=True) + need_socket_lock.release() + return + try: sock.sendall( command_to_byte(GossipCommands.STREAM_BLOCKS) ) @@ -97,9 +101,8 @@ def stream_from_peers(): # Tell them to keep streaming sock.sendall(int(1).to_bytes(1, 'big')) - sock.close() except Exception: - logger.warn(traceback.format_exc()) + logger.warn(traceback.format_exc(), terminal=True) finally: sock.close() need_socket_lock.release() diff --git a/tests/gossip-unittests/test_client_stream.py b/tests/gossip-unittests/test_client_stream.py new file mode 100644 index 00000000..81b9b2b8 --- /dev/null +++ b/tests/gossip-unittests/test_client_stream.py @@ -0,0 +1,105 @@ +import os, uuid +from sqlite3 import Time +import socket +from queue import Queue +from time import sleep +import secrets + + +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +from threading import Thread +import asyncio +import unittest +import sys +sys.path.append(".") +sys.path.append("src/") +from unittest.mock import patch + +from ordered_set import OrderedSet +from gossip import peerset + + +import onionrblocks + +import blockdb + +from gossip.peerset import gossip_peer_set +from gossip.client import stream_from_peers + + + +from filepaths import gossip_server_socket_file + + +BLOCK_MAX_SIZE = 1024 * 2000 +BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE)) +BLOCK_ID_SIZE = 128 +BLOCK_STREAM_OFFSET_DIGITS = 8 +MAX_PEERS = 10 +TRANSPORT_SIZE_BYTES = 64 + +server_file = TEST_DIR + 'test_serv.sock' + +def _server(): + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.bind(server_file) + s.listen(1) + conn, _ = s.accept() + with conn: + while True: + conn.recv(1) + conn.recv(BLOCK_STREAM_OFFSET_DIGITS) + for bl in test_blocks: + conn.sendall(bl.id) + conn.recv(1) + conn.sendall(str(len(bl.raw)).encode('utf-8').zfill(BLOCK_MAX_SIZE_LEN)) + conn.sendall(bl.raw) + conn.recv(1) + +def gen_random_block(): + return onionrblocks.create_anonvdf_block(os.urandom(12), b'txt', 3600) + +test_blocks = [] + +test_block_count = 5 +for i in range(test_block_count): + test_blocks.append(gen_random_block()) + + +Thread(target=_server, daemon=True).start() + +class MockPeer: + def __init__(self): + self.transport_address = secrets.token_hex(16) + def __hash__(self): + return hash(self.transport_address) + + + def get_socket(self, timeout): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(server_file) + return s + + + +class OnionrGossipClientDiffuse(unittest.TestCase): + + + def test_client_stream(self): + gossip_peer_set.add(MockPeer()) + Thread(target=stream_from_peers, daemon=True).start() + + c = 0 + while c < 60: + c += 1 + if len(list(blockdb.get_blocks_after_timestamp(0))) == test_block_count: + break + sleep(1) + else: + raise TimeoutError("Did not stream blocks in time") + + +unittest.main() diff --git a/tests/gossip-unittests/test_peer_announce.py b/tests/gossip-unittests/test_peer_announce.py index 9c1964b6..7651d68f 100644 --- a/tests/gossip-unittests/test_peer_announce.py +++ b/tests/gossip-unittests/test_peer_announce.py @@ -1,4 +1,3 @@ -from ast import Assert import os, uuid from unittest.mock import Mock from time import sleep diff --git a/tests/gossip-unittests/test_peer_exchange.py b/tests/gossip-unittests/test_peer_exchange.py index 2c65a9ce..60871205 100644 --- a/tests/gossip-unittests/test_peer_exchange.py +++ b/tests/gossip-unittests/test_peer_exchange.py @@ -1,4 +1,3 @@ -from ast import Assert import os, uuid from unittest.mock import Mock from time import sleep