Block diffusal mostly done

This commit is contained in:
Kevin F 2022-04-04 00:48:30 -05:00
parent 9d2c2641f4
commit c7ba974264
4 changed files with 12 additions and 6 deletions

View File

@ -18,7 +18,7 @@ def add_block_to_db(block: Block):
func(block) func(block)
def get_blocks_by_type(block_type: str) -> Generator[Block]: def get_blocks_by_type(block_type: str) -> "Generator[Block]":
block_db = db.get_db_obj(block_db_path, 'u') block_db = db.get_db_obj(block_db_path, 'u')
for block_hash in db.list_keys(block_db_path): for block_hash in db.list_keys(block_db_path):
block = Block(block_hash, block_db[block_hash], auto_verify=False) block = Block(block_hash, block_db[block_hash], auto_verify=False)
@ -27,7 +27,7 @@ def get_blocks_by_type(block_type: str) -> Generator[Block]:
def get_blocks_after_timestamp( def get_blocks_after_timestamp(
timestamp: int, block_type: str = '') -> Generator[Block]: timestamp: int, block_type: str = '') -> "Generator[Block]":
block_db = db.get_db_obj(block_db_path, 'u') block_db = db.get_db_obj(block_db_path, 'u')
for block_hash in db.list_keys(block_db_path): for block_hash in db.list_keys(block_db_path):

View File

@ -4,12 +4,12 @@ Download blocks that are being diffused
doesn't apply for blocks in the gossip queue that are awaiting doesn't apply for blocks in the gossip queue that are awaiting
descision to fluff or stem descision to fluff or stem
""" """
from threading import Thread, Semaphore from threading import Thread, Semaphore
from random import SystemRandom from random import SystemRandom
from time import sleep from time import sleep
import traceback import traceback
from typing import TYPE_CHECKING, List
import blockdb import blockdb
@ -17,7 +17,6 @@ from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN, BLOCK
if TYPE_CHECKING: if TYPE_CHECKING:
from socket import socket from socket import socket
from typing import TYPE_CHECKING, List
from gossip.peer import Peer from gossip.peer import Peer
from ordered_set import OrderedSet from ordered_set import OrderedSet
@ -75,6 +74,8 @@ def stream_from_peers():
if blockdb.has_block(block_id): if blockdb.has_block(block_id):
sock.sendall(int(0).to_bytes(1, 'big')) sock.sendall(int(0).to_bytes(1, 'big'))
continue continue
sock.sendall(int(1).to_bytes(1, 'big'))
block_size = int(sock.recv(BLOCK_MAX_SIZE_LEN)) block_size = int(sock.recv(BLOCK_MAX_SIZE_LEN))
if block_size > BLOCK_MAX_SIZE or block_size <= 0: if block_size > BLOCK_MAX_SIZE or block_size <= 0:
logger.warn( logger.warn(
@ -88,8 +89,11 @@ def stream_from_peers():
block_id, block_data, auto_verify=True) block_id, block_data, auto_verify=True)
) )
except Exception: except Exception:
# They gave us a bad block, kill the stream
# Could be corruption or malice
sock.sendall(int(0).to_bytes(1, 'big')) sock.sendall(int(0).to_bytes(1, 'big'))
raise raise
# Tell them to keep streaming
sock.sendall(int(1).to_bytes(1, 'big')) sock.sendall(int(1).to_bytes(1, 'big'))
sock.close() sock.close()
@ -99,6 +103,7 @@ def stream_from_peers():
sock.close() sock.close()
need_socket_lock.release() need_socket_lock.release()
# spawn stream threads infinitely
while True: while True:
need_socket_lock.acquire() need_socket_lock.acquire()
available_set = gossip_peer_set - tried_peers available_set = gossip_peer_set - tried_peers

View File

@ -21,7 +21,7 @@ from filepaths import gossip_server_socket_file
from ..commands import GossipCommands from ..commands import GossipCommands
from ..peerset import gossip_peer_set from ..peerset import gossip_peer_set
from .acceptstem import accept_stem_blocks from .acceptstem import accept_stem_blocks
from .diffuseblocks import stream_blocks from .diffuseblocks import diffuse_blocks
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -80,7 +80,7 @@ def gossip_server():
'utf-8').removesuffix(b'.onion')) 'utf-8').removesuffix(b'.onion'))
case GossipCommands.STREAM_BLOCKS: case GossipCommands.STREAM_BLOCKS:
try: try:
await stream_blocks(reader, writer) await diffuse_blocks(reader, writer)
except Exception: except Exception:
logger.warn( logger.warn(
f"Err streaming blocks\n{traceback.format_exc()}", f"Err streaming blocks\n{traceback.format_exc()}",

View File

@ -63,6 +63,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
async def _send_block(bl: 'Block'): async def _send_block(bl: 'Block'):
writer.write(block.id) writer.write(block.id)
await writer.drain()
# we tell id above, they tell if they want the block # we tell id above, they tell if they want the block
if int.from_bytes(await reader.readexactly(1), 'big') == 0: if int.from_bytes(await reader.readexactly(1), 'big') == 0: