diff --git a/src/blockdb/__init__.py b/src/blockdb/__init__.py index bded0296..de7f44c9 100644 --- a/src/blockdb/__init__.py +++ b/src/blockdb/__init__.py @@ -38,3 +38,15 @@ def get_blocks_after_timestamp( yield block else: yield block + + +def has_block(block_hash): + return block_hash in db.list_keys() + + +def get_block(block_hash) -> Block: + return Block( + block_hash, + db.get_db_obj(block_db_path, 'u').get(block_hash), + auto_verify=False) + diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks.py index ed1fdd27..4c310ddc 100644 --- a/src/gossip/client/streamblocks.py +++ b/src/gossip/client/streamblocks.py @@ -11,6 +11,10 @@ from random import SystemRandom from time import sleep import traceback +import blockdb + +from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS + if TYPE_CHECKING: from socket import socket from typing import TYPE_CHECKING, List @@ -20,6 +24,7 @@ from ordered_set import OrderedSet import logger +import onionrblocks from ..peerset import gossip_peer_set from ..commands import GossipCommands, command_to_byte @@ -53,6 +58,7 @@ def stream_from_peers(): sys_rand = SystemRandom() need_socket_lock = Semaphore(MAX_STREAMS) + offset = 0 def _stream_from_peer(peer: Peer): @@ -61,8 +67,35 @@ def stream_from_peers(): sock.sendall( command_to_byte(GossipCommands.STREAM_BLOCKS) ) + sock.sendall( + str(offset).zfill(BLOCK_STREAM_OFFSET_DIGITS).encode('utf-8')) + + while True: + block_id = sock.recv(BLOCK_ID_SIZE) + if blockdb.has_block(block_id): + sock.sendall(int(0).to_bytes(1, 'big')) + continue + block_size = int(sock.recv(BLOCK_MAX_SIZE_LEN)) + if block_size > BLOCK_MAX_SIZE or block_size <= 0: + logger.warn( + f"Peer {peer.transport_address} " + + "reported block size out of range") + break + block_data = sock.recv(block_size) + try: + blockdb.add_block_to_db( + onionrblocks.Block( + block_id, block_data, auto_verify=True) + ) + except Exception: + sock.sendall(int(0).to_bytes(1, 'big')) + raise + sock.sendall(int(1).to_bytes(1, 'big')) + + sock.close() except Exception: logger.warn(traceback.format_exc()) + finally: sock.close() need_socket_lock.release() diff --git a/src/gossip/constants.py b/src/gossip/constants.py index 78be49c6..647fcc14 100644 --- a/src/gossip/constants.py +++ b/src/gossip/constants.py @@ -4,6 +4,7 @@ TRANSPORT_SIZE_BYTES = 64 BLOCK_MAX_SIZE = 1024 * 2000 BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE)) BLOCK_ID_SIZE = 128 +BLOCK_STREAM_OFFSET_DIGITS = 8 DANDELION_EPOCH_LENGTH = 60 # Magic number i made up, not really specified in dandelion++ paper diff --git a/src/gossip/server/diffuseblocks.py b/src/gossip/server/diffuseblocks.py index 02fcf654..656c4006 100644 --- a/src/gossip/server/diffuseblocks.py +++ b/src/gossip/server/diffuseblocks.py @@ -17,7 +17,7 @@ if TYPE_CHECKING: from asyncio import StreamWriter, StreamReader from onionrblocks import Block -from ..constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN +from ..constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS import logger from blockdb import get_blocks_after_timestamp, block_storage_observers @@ -40,7 +40,7 @@ along with this program. If not, see . async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): """stream blocks to a peer created since an offset """ - time_offset = await wait_for(reader.read(8), 12) + time_offset = await wait_for(reader.readexactly(BLOCK_STREAM_OFFSET_DIGITS), 12) time_offset = time_offset.decode('utf-8') keep_writing = True @@ -63,6 +63,11 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): async def _send_block(bl: 'Block'): writer.write(block.id) + + # we tell id above, they tell if they want the block + if int.from_bytes(await reader.readexactly(1), 'big') == 0: + return + await writer.drain() writer.write( str(len(block.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))