From 3a26d053feb63436f6c57157964f324317a7f4f4 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Mon, 9 May 2022 12:38:03 -0500 Subject: [PATCH] Gossip client tests, fixed delayed threads --- src/gossip/__init__.py | 4 +- src/gossip/client/__init__.py | 57 ++++---- src/gossip/client/streamblocks.py | 1 + src/gossip/server/acceptstem.py | 2 + src/httpapi/apiutils/setbindip.py | 2 +- src/onionrthreads/__init__.py | 8 +- static-data/bootstrap-nodes.txt | 1 - static-data/default-plugins/example/main.py | 2 +- static-data/default-plugins/tor/bootstrap.txt | 1 - .../test_dandelion_client_block_choice.py | 88 ++++++++++++ .../dandelion/test_dandelion_phase.py | 70 ++++++++++ .../test_client_stream_have.py | 129 ++++++++++++++++++ ...nt_stream.py => test_client_stream_mny.py} | 12 +- 13 files changed, 344 insertions(+), 33 deletions(-) delete mode 100755 static-data/bootstrap-nodes.txt create mode 100644 tests/gossip-unittests/dandelion/test_dandelion_client_block_choice.py create mode 100644 tests/gossip-unittests/dandelion/test_dandelion_phase.py create mode 100644 tests/gossip-unittests/test_client_stream_have.py rename tests/gossip-unittests/{test_client_stream.py => test_client_stream_mny.py} (90%) diff --git a/src/gossip/__init__.py b/src/gossip/__init__.py index 2506dcb0..2a0dda17 100644 --- a/src/gossip/__init__.py +++ b/src/gossip/__init__.py @@ -14,7 +14,7 @@ import onionrplugins import logger from .connectpeer import connect_peer -from .client import gossip_client +from .client import start_gossip_client from .server import gossip_server from .constants import BOOTSTRAP_ATTEMPTS from .peerset import gossip_peer_set @@ -47,7 +47,7 @@ def start_gossip_threads(): gossip_server, 1, initial_sleep=0.2) threading.Thread( - target=gossip_client, daemon=True).start() + target=start_gossip_client, daemon=True).start() onionrplugins.events.event('gossip_start', data=None, threaded=True) for _ in range(BOOTSTRAP_ATTEMPTS): onionrplugins.events.event( diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index fc80a324..a2f50315 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -13,8 +13,6 @@ from queue import Queue from onionrblocks import Block -from gossip.client.storeblocks import store_blocks - from ..constants import DANDELION_EPOCH_LENGTH from ..connectpeer import connect_peer @@ -30,6 +28,7 @@ from onionrthreads import add_onionr_thread from blockdb import add_block_to_db +from .storeblocks import store_blocks from .announce import do_announce from .dandelionstem import stem_out from .peerexchange import get_new_peers @@ -51,8 +50,34 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . """ +dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH) -def gossip_client(): + +def block_queue_processing(): + + + while not len(gossip_peer_set): + sleep(0.2) + if dandelion_phase.remaining_time() <= 15: + logger.debug("Sleeping", terminal=True) + sleep(dandelion_phase.remaining_time()) + if dandelion_phase.is_stem_phase(): + logger.debug("Entering stem phase", terminal=True) + try: + # Stem out blocks for (roughly) remaining epoch time + asyncio.run(stem_out(dandelion_phase)) + except TimeoutError: + pass + except Exception: + logger.error(traceback.format_exc(), terminal=True) + pass + else: + logger.debug("Entering fluff phase", terminal=True) + # Add block to primary block db, where the diffuser can read it + store_blocks(dandelion_phase) + + +def start_gossip_client(): """ Gossip client does the following: @@ -72,26 +97,10 @@ def gossip_client(): 120, initial_sleep=5) # Start a new thread to stream blocks from peers - - - dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH) + add_onionr_thread( + stream_from_peers, + 3, initial_sleep=10 + ) while True: - while not len(gossip_peer_set): - sleep(0.2) - if dandelion_phase.remaining_time() <= 10: - sleep(dandelion_phase.remaining_time()) - if dandelion_phase.is_stem_phase(): - logger.debug("Entering stem phase", terminal=True) - try: - # Stem out blocks for (roughly) remaining epoch time - asyncio.run(stem_out(dandelion_phase)) - except TimeoutError: - continue - except Exception: - logger.error(traceback.format_exc(), terminal=True) - continue - else: - logger.debug("Entering fluff phase", terminal=True) - # Add block to primary block db, where the diffuser can read it - store_blocks(dandelion_phase) + block_queue_processing() diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks.py index 1e2b0170..84ce88a6 100644 --- a/src/gossip/client/streamblocks.py +++ b/src/gossip/client/streamblocks.py @@ -47,6 +47,7 @@ MAX_STREAMS = 3 CONNECT_TIMEOUT = 12 MAX_TRIED_PEERS = 10_000 + def stream_from_peers(): # Pick N peers to stream from # Create sockets for them diff --git a/src/gossip/server/acceptstem.py b/src/gossip/server/acceptstem.py index 0e50b73f..5de949cf 100644 --- a/src/gossip/server/acceptstem.py +++ b/src/gossip/server/acceptstem.py @@ -5,6 +5,7 @@ from asyncio import wait_for from onionrblocks import Block +import logger from ..dandelion import StemAcceptResult from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM @@ -57,6 +58,7 @@ async def accept_stem_blocks( if not raw_block: break + logger.debug("Got a stem block, put into queue", terminal=True) block_queue_to_use.put( Block(block_id, raw_block, auto_verify=True) ) diff --git a/src/httpapi/apiutils/setbindip.py b/src/httpapi/apiutils/setbindip.py index 0a1d4e18..adcffa59 100644 --- a/src/httpapi/apiutils/setbindip.py +++ b/src/httpapi/apiutils/setbindip.py @@ -24,7 +24,7 @@ def set_bind_IP(filePath=''): hostOctets = ['127'] + hostOctets # Convert the localhost address to a normal string address data = '.'.join(hostOctets) - + # Try to bind IP. Some platforms like Mac block non normal 127.x.x.x s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: diff --git a/src/onionrthreads/__init__.py b/src/onionrthreads/__init__.py index 3365a136..4bb732cc 100644 --- a/src/onionrthreads/__init__.py +++ b/src/onionrthreads/__init__.py @@ -45,6 +45,10 @@ def add_onionr_thread( def add_delayed_thread(func: Callable, sleep_secs: int, *args, **kwargs): assert sleep_secs > 0 - t = Thread(target=func, args=args, kwargs=kwargs, daemon=True) - sleep(sleep_secs) + def _do_delay_thread(): + t = Thread(target=func, args=args, kwargs=kwargs, daemon=True) + sleep(sleep_secs) + t.start() + t = Thread(target=_do_delay_thread, daemon=True) t.start() + return t diff --git a/static-data/bootstrap-nodes.txt b/static-data/bootstrap-nodes.txt deleted file mode 100755 index 70563cb8..00000000 --- a/static-data/bootstrap-nodes.txt +++ /dev/null @@ -1 +0,0 @@ -kj7oltega2clc6a2g2z5argxwrdz5b2yb4dcztfuuetup5pozr4ogxqd.onion,ilv54tw2ppfhuylzscdp3kutjoohvnmhhkcv65c7mfczkexeahsqvqid.onion \ No newline at end of file diff --git a/static-data/default-plugins/example/main.py b/static-data/default-plugins/example/main.py index 98df8439..8f5bc917 100644 --- a/static-data/default-plugins/example/main.py +++ b/static-data/default-plugins/example/main.py @@ -42,7 +42,7 @@ PLUGIN_VERSION = '0.0.0' def on_blocktest_cmd(api, data=None): - bl = onionrblocks.create_anonvdf_block(b"test", b"txt", 3600) + bl = onionrblocks.create_anonvdf_block(input("Enter a message:").encode('utf-8'), b"txt", 3600) logger.info( local_command( '/addvdfblock', diff --git a/static-data/default-plugins/tor/bootstrap.txt b/static-data/default-plugins/tor/bootstrap.txt index d908f82c..e69de29b 100644 --- a/static-data/default-plugins/tor/bootstrap.txt +++ b/static-data/default-plugins/tor/bootstrap.txt @@ -1 +0,0 @@ -aai7opy5q6innjpb2zgviexvbenhpne7lggnh4lumudojwga2m4wbaqd \ No newline at end of file diff --git a/tests/gossip-unittests/dandelion/test_dandelion_client_block_choice.py b/tests/gossip-unittests/dandelion/test_dandelion_client_block_choice.py new file mode 100644 index 00000000..33e01de8 --- /dev/null +++ b/tests/gossip-unittests/dandelion/test_dandelion_client_block_choice.py @@ -0,0 +1,88 @@ +import os, uuid +from sqlite3 import Time +import socket +from queue import Queue +from time import sleep +import secrets + + +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +from threading import Thread +import asyncio +import unittest +import sys +sys.path.append(".") +sys.path.append("src/") +from unittest.mock import patch + +import onionrblocks + + +from filepaths import gossip_server_socket_file +from gossip.client import block_queue_processing +from gossip import client +from gossip.blockqueues import gossip_block_queues +from gossip.peerset import gossip_peer_set + + +BLOCK_MAX_SIZE = 1024 * 2000 +BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE)) +BLOCK_ID_SIZE = 128 +BLOCK_STREAM_OFFSET_DIGITS = 8 +MAX_PEERS = 10 +TRANSPORT_SIZE_BYTES = 64 + +server_file = TEST_DIR + 'test_serv.sock' + + +def gen_random_block(): + return onionrblocks.create_anonvdf_block(os.urandom(12), b'txt', 3600) + + +test_blocks = [] + +test_thread = [] + +test_block_count = 5 +for i in range(test_block_count): + test_blocks.append(gen_random_block()) + + +class MockPeer: + def __init__(self): + self.transport_address = secrets.token_hex(16) + def __hash__(self): + return hash(self.transport_address) + + + def get_socket(self, timeout): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(server_file) + return s + +class MockPhase: + def __init__(self): + return + def remaining_time(self): + return 120 + def is_stem_phase(self): + return False + + +class OnionrGossipClientBlockChoice(unittest.TestCase): + + + @patch('gossip.client.dandelionstem.stem_out') + @patch('gossip.client.store_blocks') + def test_client_block_processing_fluff_phase(self, mock_store_blocks, mock_stem_out): + gossip_peer_set.add(MockPeer()) + + client.dandelion_phase = MockPhase() + block_queue_processing() + self.assertTrue(mock_store_blocks.called) + + +unittest.main() diff --git a/tests/gossip-unittests/dandelion/test_dandelion_phase.py b/tests/gossip-unittests/dandelion/test_dandelion_phase.py new file mode 100644 index 00000000..1d44f1d5 --- /dev/null +++ b/tests/gossip-unittests/dandelion/test_dandelion_phase.py @@ -0,0 +1,70 @@ +from time import sleep, time +import sys +from hashlib import shake_128 +import secrets +import asyncio +import unittest +sys.path.append(".") +sys.path.append("src/") +from gossip.dandelion import phase + + +class FakePhase: + def __init__(self, seed, epoch_interval_secs: int): + self.seed = seed + assert len(self.seed) == 32 + self.epoch = int(time()) + self.epoch_interval = epoch_interval_secs + self._is_stem = bool(secrets.randbits(1)) + self.phase_id = b'' + + + def _update_stem_phase(self, cur_time): + self.epoch = cur_time + # Hash the self.seed with the time stamp to produce 8 pseudorandom bytes + # Produce an len(8) byte string for time as well for year 2038 problem + self.phase_id = shake_128( + self.seed + + int.to_bytes(cur_time, 8, 'big')).digest(8) + + # Use first byte of phase id as random source for stem phase picking + if self.phase_id[0] % 2: + self._is_stem = True + else: + self._is_stem = False + + + def remaining_time(self) -> int: + current_time = int(time()) + + return max(0, self.epoch_interval - (current_time - self.epoch)) + + def is_stem_phase(self) -> bool: + current_time = int(time()) + if current_time - self.epoch >= self.epoch_interval: + self._update_stem_phase(current_time) + return self._is_stem + + +class OnionrGossipTestDandelionPhase(unittest.TestCase): + + def test_dandelion_phase(self): + epoch = 3 # seconds + seed = phase.seed + self.assertTrue(len(seed) == 32) + p = phase.DandelionPhase(epoch) + fake_p = FakePhase(phase.seed, epoch) + + assert p.phase_id == fake_p.phase_id + + def test_dandelion_phase_both(self): + epoch = 3 # seconds + seed = phase.seed + self.assertTrue(len(seed) == 32) + p = phase.DandelionPhase(epoch) + fake_p = FakePhase(phase.seed, epoch) + + assert p.phase_id == fake_p.phase_id + + +unittest.main() diff --git a/tests/gossip-unittests/test_client_stream_have.py b/tests/gossip-unittests/test_client_stream_have.py new file mode 100644 index 00000000..8432a0cd --- /dev/null +++ b/tests/gossip-unittests/test_client_stream_have.py @@ -0,0 +1,129 @@ +import os, uuid +from sqlite3 import Time +import socket +from queue import Queue +from time import sleep +import secrets + + +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +from threading import Thread +import asyncio +import unittest +import sys +sys.path.append(".") +sys.path.append("src/") +from unittest.mock import patch + +from ordered_set import OrderedSet +from gossip import peerset + + +import onionrblocks + +import blockdb + +from gossip.peerset import gossip_peer_set +from gossip.client import stream_from_peers + + + +from filepaths import gossip_server_socket_file + + +BLOCK_MAX_SIZE = 1024 * 2000 +BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE)) +BLOCK_ID_SIZE = 128 +BLOCK_STREAM_OFFSET_DIGITS = 8 +MAX_PEERS = 10 +TRANSPORT_SIZE_BYTES = 64 + +server_file = TEST_DIR + 'test_serv.sock' + + +def gen_random_block(): + return onionrblocks.create_anonvdf_block(os.urandom(12), b'txt', 3600) + + +test_blocks = [] + +test_thread = [] + +test_block_count = 5 +for i in range(test_block_count): + test_blocks.append(gen_random_block()) + +already_have_blocks = [] + +duplicated_block = [False] + +for bl in secrets.SystemRandom().sample(test_blocks, 3): + blockdb.add_block_to_db(bl) + already_have_blocks.append(bl.id) + + +def _server(): + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.bind(server_file) + s.listen(1) + conn, _ = s.accept() + with conn: + conn.recv(1) + conn.recv(BLOCK_STREAM_OFFSET_DIGITS) + for bl in test_blocks: + conn.sendall(bl.id) + choice = conn.recv(1) + if choice == int(1).to_bytes(1, 'big') \ + and bl.id in already_have_blocks: + duplicated_block[0] = True + print('duplicated block') + elif choice == int(0).to_bytes(1, 'big') and bl.id in already_have_blocks: + print('skipping block correctly') + continue + + conn.sendall(str(len(bl.raw)).encode('utf-8').zfill(BLOCK_MAX_SIZE_LEN)) + conn.sendall(bl.raw) + conn.recv(1) + + +Thread(target=_server, daemon=True).start() + +class MockPeer: + def __init__(self): + self.transport_address = secrets.token_hex(16) + def __hash__(self): + return hash(self.transport_address) + + + def get_socket(self, timeout): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(server_file) + return s + + +class OnionrGossipClientDiffuse(unittest.TestCase): + + + def test_client_stream_have(self): + + gossip_peer_set.add(MockPeer()) + + t = Thread(target=stream_from_peers, daemon=True) + test_thread.append(t) + test_thread[0].start() + + c = 0 + while c < 10: + c += 1 + if len(list(blockdb.get_blocks_after_timestamp(0))) == test_block_count: + break + sleep(1) + else: + raise TimeoutError("Did not stream blocks in time") + + self.assertFalse(duplicated_block[0]) + +unittest.main() diff --git a/tests/gossip-unittests/test_client_stream.py b/tests/gossip-unittests/test_client_stream_mny.py similarity index 90% rename from tests/gossip-unittests/test_client_stream.py rename to tests/gossip-unittests/test_client_stream_mny.py index 81b9b2b8..dabbff13 100644 --- a/tests/gossip-unittests/test_client_stream.py +++ b/tests/gossip-unittests/test_client_stream_mny.py @@ -64,6 +64,8 @@ def gen_random_block(): test_blocks = [] +test_thread = [] + test_block_count = 5 for i in range(test_block_count): test_blocks.append(gen_random_block()) @@ -89,8 +91,16 @@ class OnionrGossipClientDiffuse(unittest.TestCase): def test_client_stream(self): + try: + os.remove(blockdb.block_db_path) + except FileNotFoundError: + pass + gossip_peer_set.add(MockPeer()) - Thread(target=stream_from_peers, daemon=True).start() + + t = Thread(target=stream_from_peers, daemon=True) + test_thread.append(t) + test_thread[0].start() c = 0 while c < 60: