diff --git a/src/blockdb/__init__.py b/src/blockdb/__init__.py index de7f44c9..fa922428 100644 --- a/src/blockdb/__init__.py +++ b/src/blockdb/__init__.py @@ -18,7 +18,7 @@ def add_block_to_db(block: Block): func(block) -def get_blocks_by_type(block_type: str) -> Generator[Block]: +def get_blocks_by_type(block_type: str) -> "Generator[Block]": block_db = db.get_db_obj(block_db_path, 'u') for block_hash in db.list_keys(block_db_path): block = Block(block_hash, block_db[block_hash], auto_verify=False) @@ -27,7 +27,7 @@ def get_blocks_by_type(block_type: str) -> Generator[Block]: def get_blocks_after_timestamp( - timestamp: int, block_type: str = '') -> Generator[Block]: + timestamp: int, block_type: str = '') -> "Generator[Block]": block_db = db.get_db_obj(block_db_path, 'u') for block_hash in db.list_keys(block_db_path): diff --git a/src/gossip/client/streamblocks.py b/src/gossip/client/streamblocks.py index 4c310ddc..c71b7de8 100644 --- a/src/gossip/client/streamblocks.py +++ b/src/gossip/client/streamblocks.py @@ -4,12 +4,12 @@ Download blocks that are being diffused doesn't apply for blocks in the gossip queue that are awaiting descision to fluff or stem - """ from threading import Thread, Semaphore from random import SystemRandom from time import sleep import traceback +from typing import TYPE_CHECKING, List import blockdb @@ -17,7 +17,6 @@ from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN, BLOCK if TYPE_CHECKING: from socket import socket - from typing import TYPE_CHECKING, List from gossip.peer import Peer from ordered_set import OrderedSet @@ -75,6 +74,8 @@ def stream_from_peers(): if blockdb.has_block(block_id): sock.sendall(int(0).to_bytes(1, 'big')) continue + sock.sendall(int(1).to_bytes(1, 'big')) + block_size = int(sock.recv(BLOCK_MAX_SIZE_LEN)) if block_size > BLOCK_MAX_SIZE or block_size <= 0: logger.warn( @@ -88,8 +89,11 @@ def stream_from_peers(): block_id, block_data, auto_verify=True) ) except Exception: + # They gave us a bad block, kill the stream + # Could be corruption or malice sock.sendall(int(0).to_bytes(1, 'big')) raise + # Tell them to keep streaming sock.sendall(int(1).to_bytes(1, 'big')) sock.close() @@ -99,6 +103,7 @@ def stream_from_peers(): sock.close() need_socket_lock.release() + # spawn stream threads infinitely while True: need_socket_lock.acquire() available_set = gossip_peer_set - tried_peers diff --git a/src/gossip/server/__init__.py b/src/gossip/server/__init__.py index d67671f7..2ff0efe9 100644 --- a/src/gossip/server/__init__.py +++ b/src/gossip/server/__init__.py @@ -21,7 +21,7 @@ from filepaths import gossip_server_socket_file from ..commands import GossipCommands from ..peerset import gossip_peer_set from .acceptstem import accept_stem_blocks -from .diffuseblocks import stream_blocks +from .diffuseblocks import diffuse_blocks """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -80,7 +80,7 @@ def gossip_server(): 'utf-8').removesuffix(b'.onion')) case GossipCommands.STREAM_BLOCKS: try: - await stream_blocks(reader, writer) + await diffuse_blocks(reader, writer) except Exception: logger.warn( f"Err streaming blocks\n{traceback.format_exc()}", diff --git a/src/gossip/server/diffuseblocks.py b/src/gossip/server/diffuseblocks.py index 656c4006..25d88f1f 100644 --- a/src/gossip/server/diffuseblocks.py +++ b/src/gossip/server/diffuseblocks.py @@ -63,6 +63,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'): async def _send_block(bl: 'Block'): writer.write(block.id) + await writer.drain() # we tell id above, they tell if they want the block if int.from_bytes(await reader.readexactly(1), 'big') == 0: