diff --git a/src/gossip/__init__.py b/src/gossip/__init__.py index 2a0dda17..20d023f3 100644 --- a/src/gossip/__init__.py +++ b/src/gossip/__init__.py @@ -44,7 +44,7 @@ def start_gossip_threads(): # There is a unified set so gossip logic is not repeated add_onionr_thread( - gossip_server, 1, initial_sleep=0.2) + gossip_server, 1, 'gossip_server', initial_sleep=0.2) threading.Thread( target=start_gossip_client, daemon=True).start() diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index acad4e10..4dd1954f 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -34,7 +34,7 @@ from .announce import do_announce from .dandelionstem import stem_out from .peerexchange import get_new_peers from ..peerset import gossip_peer_set -from .streamblocks import stream_from_peers +from .streamblocks import stream_from_peers, stream_to_peer """ This program is free software: you can redistribute it and/or modify @@ -97,16 +97,24 @@ def start_gossip_client(): # transport plugin handles the new peer add_onionr_thread( get_new_peers, - 60, initial_sleep=120) + 60, 'get_new_peers', initial_sleep=120) # Start a new thread to stream blocks from peers # These blocks are being diffused and are stored in # the peer's block database add_onionr_thread( stream_from_peers, - 3, initial_sleep=10 + 3, 'stream_from_peers', initial_sleep=10 ) + # Start a thread to upload blocks, useful for when + # connectivity is poor or we are not allowing incoming + # connections on any transports + + add_onionr_thread( + stream_to_peer, + 10, 'stream_to_peer', initial_sleep=1) + # Blocks we receive or create through all means except # Diffusal are put into block queues, we decide to either # stem or diffuse a block from the queue based on the current diff --git a/src/gossip/client/streamblocks/__init__.py b/src/gossip/client/streamblocks/__init__.py new file mode 100644 index 00000000..5c154490 --- /dev/null +++ b/src/gossip/client/streamblocks/__init__.py @@ -0,0 +1,2 @@ +from .streamfrom import stream_from_peers +from .streamto import stream_to_peer \ No newline at end of file diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks/streamfrom.py similarity index 95% rename from src/gossip/client/streamblocks.py rename to src/gossip/client/streamblocks/streamfrom.py index f286287e..6c5f5039 100644 --- a/src/gossip/client/streamblocks.py +++ b/src/gossip/client/streamblocks/streamfrom.py @@ -14,7 +14,7 @@ from typing import TYPE_CHECKING, List import blockdb -from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS +from ...constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS if TYPE_CHECKING: from socket import socket @@ -25,8 +25,8 @@ from ordered_set import OrderedSet import logger import onionrblocks -from ..peerset import gossip_peer_set -from ..commands import GossipCommands, command_to_byte +from ...peerset import gossip_peer_set +from ...commands import GossipCommands, command_to_byte """ This program is free software: you can redistribute it and/or modify diff --git a/src/gossip/client/streamblocks/streamto.py b/src/gossip/client/streamblocks/streamto.py new file mode 100644 index 00000000..e1082dbc --- /dev/null +++ b/src/gossip/client/streamblocks/streamto.py @@ -0,0 +1,56 @@ +from secrets import SystemRandom +from time import time +from typing import List, TYPE_CHECKING + +if TYPE_CHECKING: + from onionrblocks import Block + +from gossip.commands import GossipCommands, command_to_byte +from blockdb import get_blocks_after_timestamp +from ...constants import BLOCK_ID_SIZE, BLOCK_SIZE_LEN + +from ...peerset import gossip_peer_set +from ...server import lastincoming + +SECS_ELAPSED_NO_INCOMING_BEFORE_STREAM = 3 + +class SendTimestamp: + timestamp: int = 0 + +def stream_to_peer(): + if SECS_ELAPSED_NO_INCOMING_BEFORE_STREAM > time() - lastincoming.last_incoming_timestamp: + SendTimestamp.timestamp = int(time()) + return + if not len(gossip_peer_set): + return + rand = SystemRandom() + peer = rand.choice(gossip_peer_set) + buffer: List['Block'] = [] + + def _do_upload(): + with peer.get_socket(30) as p: + p.sendall(command_to_byte(GossipCommands.PUT_BLOCK_DIFFUSE)) + + while len(buffer): + try: + block = buffer.pop() + except IndexError: + break + p.sendall(block.id.zfill(BLOCK_ID_SIZE)) + if int.from_bytes(p.recv(1), 'big') == 0: + continue + block_size = str(len(block.raw)).zfill(BLOCK_SIZE_LEN) + p.sendall(block_size.encode('utf-8')) + p.sendall(block.raw) + + # Buffer some blocks so we're not streaming too many to one peer + # and to efficiently avoid connecting without sending anything + buffer_max = 10 + for block in get_blocks_after_timestamp(SendTimestamp.timestamp): + buffer.append(block) + if len(buffer) > buffer_max: + _do_upload(buffer) + if len(buffer): + _do_upload() + + SendTimestamp.timestamp = int(time()) diff --git a/src/gossip/commands.py b/src/gossip/commands.py index 9cfde118..9882d047 100644 --- a/src/gossip/commands.py +++ b/src/gossip/commands.py @@ -6,6 +6,7 @@ class GossipCommands(IntEnum): PEER_EXCHANGE = auto() STREAM_BLOCKS = auto() PUT_BLOCKS = auto() + PUT_BLOCK_DIFFUSE = auto() def command_to_byte(cmd: GossipCommands): diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index 17486deb..9d8f38fd 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -1,9 +1,12 @@ import asyncio import traceback +from time import time from typing import TYPE_CHECKING from typing import Set, Tuple -from queue import Queue +from threading import Thread + +from onionrblocks import Block from gossip import constants from ..connectpeer import connect_peer @@ -18,10 +21,13 @@ if TYPE_CHECKING: from asyncio import StreamReader, StreamWriter from filepaths import gossip_server_socket_file +import blockdb +from blockdb import add_block_to_db from ..commands import GossipCommands from ..peerset import gossip_peer_set from .acceptstem import accept_stem_blocks from .diffuseblocks import diffuse_blocks +from .import lastincoming """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -52,6 +58,7 @@ def gossip_server(): break except asyncio.IncompleteReadError: break + lastincoming.last_incoming_timestamp = int(time()) cmd = int.from_bytes(cmd, 'big') if cmd == b'' or cmd == 0: @@ -107,6 +114,24 @@ def gossip_server(): # Subtract dandelion edge, make sure >=0 inbound_dandelion_edge_count[0] = \ max(inbound_dandelion_edge_count[0] - 1, 0) + case GossipCommands.PUT_BLOCK_DIFFUSE: + async def _get_block_diffused(): + block_id = await reader.readexactly(constants.BLOCK_ID_SIZE) + if blockdb.has_block(block_id): + writer.write(int(0).to_bytes(1, 'big')) + else: + + writer.write(int(1).to_bytes(1, 'big')) + await writer.drain() + block_size = int(await asyncio.wait_for(reader.readexactly(constants.BLOCK_SIZE_LEN), 30)) + block_data = await reader.readexactly(block_size) + + Thread( + target=add_block_to_db, + args=[ + Block(block_id, block_data, auto_verify=True)] + ).start() + await _get_block_diffused() break await writer.drain() diff --git a/src/gossip/server/lastincoming.py b/src/gossip/server/lastincoming.py new file mode 100644 index 00000000..74bed333 --- /dev/null +++ b/src/gossip/server/lastincoming.py @@ -0,0 +1 @@ +last_incoming_timestamp = 0 \ No newline at end of file diff --git a/src/onionrthreads/__init__.py b/src/onionrthreads/__init__.py index 4bb732cc..1090a40d 100644 --- a/src/onionrthreads/__init__.py +++ b/src/onionrthreads/__init__.py @@ -27,7 +27,8 @@ def _onionr_thread(func: Callable, def add_onionr_thread( func: Callable, - sleep_secs: int, *args, initial_sleep: int = 5, **kwargs): + sleep_secs: int, thread_name: str, + *args, initial_sleep: int = 5, **kwargs): """Spawn a new onionr thread that exits when the main thread does. Runs in an infinite loop with sleep between calls @@ -40,6 +41,7 @@ def add_onionr_thread( initial_sleep, *args), kwargs=kwargs, + name=thread_name, daemon=True).start() diff --git a/static-data/default-plugins/unixtransport/bootstrap.txt b/static-data/default-plugins/unixtransport/bootstrap.txt index 31712af4..9fcbdf5b 100644 --- a/static-data/default-plugins/unixtransport/bootstrap.txt +++ b/static-data/default-plugins/unixtransport/bootstrap.txt @@ -1 +1 @@ -/dev/shm/onionr3177415330/gossip-server.sock \ No newline at end of file +/dev/shm/onionr90775244/gossip-server.sock,/dev/shm/onionr1873728538/gossip-server.sock \ No newline at end of file diff --git a/static-data/default-plugins/unixtransport/unixbootstrap.py b/static-data/default-plugins/unixtransport/unixbootstrap.py index 29176117..12296231 100644 --- a/static-data/default-plugins/unixtransport/unixbootstrap.py +++ b/static-data/default-plugins/unixtransport/unixbootstrap.py @@ -58,6 +58,9 @@ def on_bootstrap(api, data=None): if address == gossip_server_socket_file or not address: continue + if not os.path.exists(address): + continue + # Tell the gossip logic that this peer is ready to connect # it will add it to data['peer_set'] if it responds to ping Thread( diff --git a/tests/gossip-unittests/test_server_put_block_diffuse.py b/tests/gossip-unittests/test_server_put_block_diffuse.py new file mode 100644 index 00000000..715ed3d4 --- /dev/null +++ b/tests/gossip-unittests/test_server_put_block_diffuse.py @@ -0,0 +1,58 @@ +import os, uuid +from queue import Empty +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +from time import sleep +from threading import Thread +import asyncio +import unittest +import sys +sys.path.append(".") +sys.path.append("src/") + +from ordered_set import OrderedSet +import onionrblocks + +import blockdb +from gossip.server import gossip_server +from gossip.blockqueues import gossip_block_queues +from filepaths import gossip_server_socket_file + + +BLOCK_MAX_SIZE = 1024 * 2000 +BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE)) +BLOCK_ID_SIZE = 128 +BLOCK_STREAM_OFFSET_DIGITS = 8 + + +class OnionrServerPutBLTestDiffuse(unittest.TestCase): + + + def test_put_block(self): + + Thread(target=gossip_server, daemon=True).start() + sleep(0.01) + + bl = onionrblocks.blockcreator.create_anonvdf_block( + b"my test block", b"txt", 2800) + + async def block_put_client(): + reader, writer = await asyncio.open_unix_connection( + gossip_server_socket_file) + writer.write(int(6).to_bytes(1, 'big')) + writer.write(bl.id) + + self.assertEqual(int.from_bytes(await reader.readexactly(1), 'big'), 1) + writer.write( + str(len(bl.raw)).zfill(BLOCK_SIZE_LEN).encode('utf-8')) + writer.write(bl.raw) + sleep(0.2) + self.assertIn(bl.id, list([block.id for block in blockdb.get_blocks_after_timestamp(0)])) + await writer.drain() + + + asyncio.run(block_put_client()) + +unittest.main()