From dae99dc2f76d35271968cf9b77aab2e6eee2f483 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Mon, 28 Mar 2022 00:13:36 -0500 Subject: [PATCH] Work on normal gossip diffusion --- src/blockdb/dbpath.py | 4 ++ src/gossip/client/__init__.py | 3 ++ src/gossip/client/streamblocks.py | 39 +++++++++++++++++++ src/gossip/server/__init__.py | 2 +- .../{streamblocks.py => diffuseblocks.py} | 19 +++++---- 5 files changed, 58 insertions(+), 9 deletions(-) create mode 100644 src/blockdb/dbpath.py create mode 100644 src/gossip/client/streamblocks.py rename src/gossip/server/{streamblocks.py => diffuseblocks.py} (85%) diff --git a/src/blockdb/dbpath.py b/src/blockdb/dbpath.py new file mode 100644 index 00000000..0d2bef09 --- /dev/null +++ b/src/blockdb/dbpath.py @@ -0,0 +1,4 @@ +from utils import identifyhome + + +block_db_path = identifyhome.identify_home() + 'blockdata' \ No newline at end of file diff --git a/src/gossip/client/__init__.py b/src/gossip/client/__init__.py index 1ad89870..bf8e2b92 100644 --- a/src/gossip/client/__init__.py +++ b/src/gossip/client/__init__.py @@ -70,6 +70,9 @@ def gossip_client(): get_new_peers, 1200, initial_sleep=5) + # Start a new thread to stream blocks from peers + + dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH) while True: diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks.py new file mode 100644 index 00000000..68c760e4 --- /dev/null +++ b/src/gossip/client/streamblocks.py @@ -0,0 +1,39 @@ +"""Onionr - Private P2P Communication. + +Download blocks that are being diffused + +doesn't apply for blocks in the gossip queue that are awaiting +descision to fluff or stem + +""" +from random import SystemRandom + +if TYPE_CHECKING: + from socket import socket + from typing import TYPE_CHECKING, List + +from ..peerset import gossip_peer_set + +""" +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + + +def stream_from_peers(): + peer_stream_sockets: List[socket] = [] + + sys_rand = SystemRandom() + + while True: + peers = sys_rand.sample(gossip_peer_set, min(3, len(gossip_peer_set))) diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index bb0160b9..d67671f7 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -21,7 +21,7 @@ from filepaths import gossip_server_socket_file from ..commands import GossipCommands from ..peerset import gossip_peer_set from .acceptstem import accept_stem_blocks -from .streamblocks import stream_blocks +from .diffuseblocks import stream_blocks """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by diff --git a/src/gossip/server/streamblocks.py b/src/gossip/server/diffuseblocks.py similarity index 85% rename from src/gossip/server/streamblocks.py rename to src/gossip/server/diffuseblocks.py index 43f732d1..02fcf654 100644 --- a/src/gossip/server/streamblocks.py +++ b/src/gossip/server/diffuseblocks.py @@ -1,6 +1,6 @@ """Onionr - Private P2P Communication. -Stream blocks we can for inbound edge peers that ask for them +Diffuse blocks we can for inbound edge peers that ask for them doesn't apply for blocks in the gossip queue that are awaiting descision to fluff or stem @@ -37,7 +37,7 @@ along with this program. If not, see . """ -async def stream_blocks(reader: 'StreamReader', writer: 'StreamWriter'): +async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): """stream blocks to a peer created since an offset """ time_offset = await wait_for(reader.read(8), 12) @@ -54,6 +54,7 @@ async def stream_blocks(reader: 'StreamReader', writer: 'StreamWriter'): "Peer's specified time offset skewed too far into the future") newly_stored_blocks = queue.Queue() + def _add_to_queue(bl): newly_stored_blocks.put_nowait(bl) block_storage_observers.append( @@ -70,6 +71,7 @@ async def stream_blocks(reader: 'StreamReader', writer: 'StreamWriter'): await writer.drain() try: + # Send initial blocks from offset for block in get_blocks_after_timestamp(time_offset): if not keep_writing: break @@ -83,14 +85,15 @@ async def stream_blocks(reader: 'StreamReader', writer: 'StreamWriter'): except IncompleteReadError: keep_writing = False + # Diffuse blocks stored since we started this stream while keep_writing: await _send_block(newly_stored_blocks.get()) - try: - keep_writing = bool( - int.from_bytes(await reader.readexactly(1), 'big') - ) - except IncompleteReadError: - keep_writing = False + try: + keep_writing = bool( + int.from_bytes(await reader.readexactly(1), 'big') + ) + except IncompleteReadError: + keep_writing = False except Exception: logger.warn(traceback.format_exc(), terminal=True)