Work on normal gossip diffusion

This commit is contained in:
Kevin F 2022-03-26 18:24:40 -05:00
parent b07c176f5c
commit c215d4decd
7 changed files with 61 additions and 25 deletions

View File

@ -13,6 +13,7 @@
* [ ] Restore webUI as a plugin * [ ] Restore webUI as a plugin
* [ ] Restore/reimplement mail plugin * [ ] Restore/reimplement mail plugin
* [ ] Restore/reimplement friends plugin * [ ] Restore/reimplement friends plugin
* [ ] Refresh test suite
## Store Plugin Release (9.1) ## Store Plugin Release (9.1)

View File

@ -1,16 +1,24 @@
from typing import Callable, Generator, List
from onionrblocks import Block from onionrblocks import Block
import db import db
from utils import identifyhome
block_db_path = identifyhome.identify_home() + 'blockdata' from .dbpath import block_db_path
block_storage_observers: List[Callable] = []
def add_block_to_db(block: Block): def add_block_to_db(block: Block):
db.set(block_db_path, block.id, block.raw) # Raises db.DuplicateKey if dupe
db.set_if_new(block_db_path, block.id, block.raw)
for func in block_storage_observers:
func(block)
def get_blocks_by_type(block_type: str): def get_blocks_by_type(block_type: str) -> Generator[Block]:
block_db = db.get_db_obj(block_db_path, 'u') block_db = db.get_db_obj(block_db_path, 'u')
for block_hash in db.list_keys(block_db_path): for block_hash in db.list_keys(block_db_path):
block = Block(block_hash, block_db[block_hash], auto_verify=False) block = Block(block_hash, block_db[block_hash], auto_verify=False)
@ -18,7 +26,8 @@ def get_blocks_by_type(block_type: str):
yield block yield block
def get_blocks_after_timestamp(timestamp: int, block_type: str = ''): def get_blocks_after_timestamp(
timestamp: int, block_type: str = '') -> Generator[Block]:
block_db = db.get_db_obj(block_db_path, 'u') block_db = db.get_db_obj(block_db_path, 'u')
for block_hash in db.list_keys(block_db_path): for block_hash in db.list_keys(block_db_path):

View File

@ -4,6 +4,8 @@ import os
timeout = 120 timeout = 120
class DuplicateKey(ValueError): pass
def _do_timeout(func, *args): def _do_timeout(func, *args):
ts = 0 ts = 0
@ -22,6 +24,18 @@ def _do_timeout(func, *args):
return res return res
def set_if_new(db_path, key, value):
def _set(key, value):
with dbm.open(db_path, "c") as my_db:
try:
my_db[key]
except KeyError:
my_db[key] = value
else:
raise DuplicateKey
_do_timeout(_set, key, value)
def set(db_path, key, value): def set(db_path, key, value):
"""Set a value in the db, open+timeout so not good for rapid use""" """Set a value in the db, open+timeout so not good for rapid use"""
def _set(key, value): def _set(key, value):

View File

@ -1,6 +1,6 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from ...constants import BLOCK_MAX_SIZE from ...constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN
if TYPE_CHECKING: if TYPE_CHECKING:
from queue import Queue from queue import Queue
@ -23,7 +23,7 @@ async def do_stem_stream(
remaining_time = d_phase.remaining_time() remaining_time = d_phase.remaining_time()
bl: 'Block' = block_queue.get(block=True, timeout=remaining_time) bl: 'Block' = block_queue.get(block=True, timeout=remaining_time)
block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE) block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE_LEN)
peer_socket.sendall(bl.id) peer_socket.sendall(bl.id)
peer_socket.sendall(block_size.encode('utf-8')) peer_socket.sendall(block_size.encode('utf-8'))

View File

@ -2,8 +2,9 @@ BOOTSTRAP_ATTEMPTS = 5
PEER_AMOUNT_TO_ASK = 3 PEER_AMOUNT_TO_ASK = 3
TRANSPORT_SIZE_BYTES = 64 TRANSPORT_SIZE_BYTES = 64
BLOCK_MAX_SIZE = 1024 * 2000 BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128 BLOCK_ID_SIZE = 128
DANDELION_EPOCH_LENGTH = 120 DANDELION_EPOCH_LENGTH = 60
# Magic number i made up, not really specified in dandelion++ paper # Magic number i made up, not really specified in dandelion++ paper
MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris

View File

@ -21,6 +21,7 @@ from filepaths import gossip_server_socket_file
from ..commands import GossipCommands from ..commands import GossipCommands
from ..peerset import gossip_peer_set from ..peerset import gossip_peer_set
from .acceptstem import accept_stem_blocks from .acceptstem import accept_stem_blocks
from .streamblocks import stream_blocks
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -46,9 +47,11 @@ def gossip_server():
while True: while True:
try: try:
cmd = await asyncio.wait_for(reader.read(1), 60) cmd = await asyncio.wait_for(reader.readexactly(1), 60)
except asyncio.exceptions.CancelledError: except asyncio.exceptions.CancelledError:
break break
except asyncio.IncompleteReadError:
break
cmd = int.from_bytes(cmd, 'big') cmd = int.from_bytes(cmd, 'big')
if cmd == b'' or cmd == 0: if cmd == b'' or cmd == 0:
@ -60,8 +63,9 @@ def gossip_server():
pass pass
case GossipCommands.ANNOUNCE: case GossipCommands.ANNOUNCE:
async def _read_announce(): async def _read_announce():
address = await reader.read( address = await reader.readexactly(
constants.TRANSPORT_SIZE_BYTES) constants.TRANSPORT_SIZE_BYTES)
if address:
onionrevents.event( onionrevents.event(
'announce_rec', 'announce_rec',
data={'address': address, data={'address': address,
@ -74,9 +78,15 @@ def gossip_server():
writer.write( writer.write(
peer.transport_address.encode( peer.transport_address.encode(
'utf-8').removesuffix(b'.onion')) 'utf-8').removesuffix(b'.onion'))
case GossipCommands.STREAM_BLOCKS:
try:
await stream_blocks(reader, writer)
except Exception:
logger.warn(
f"Err streaming blocks\n{traceback.format_exc()}",
terminal=True)
case GossipCommands.PUT_BLOCKS: case GossipCommands.PUT_BLOCKS:
# Create block queue & append stemmed blocks to it # Pick block queue & append stemmed blocks to it
try: try:
await accept_stem_blocks( await accept_stem_blocks(
reader, writer, reader, writer,

View File

@ -32,7 +32,7 @@ async def accept_stem_blocks(
inbound_edge_count[0] += 1 inbound_edge_count[0] += 1
# Start getting the first block # Start getting the first block
read_routine = reader.read(BLOCK_ID_SIZE) read_routine = reader.readexactly(BLOCK_ID_SIZE)
stream_start_time = int(time()) stream_start_time = int(time())
block_queue_to_use = secrets.choice(gossip_block_queues) block_queue_to_use = secrets.choice(gossip_block_queues)
@ -40,10 +40,12 @@ async def accept_stem_blocks(
for _ in range(MAX_STEM_BLOCKS_PER_STREAM): for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
block_id = ( block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8') await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
block_size = (await wait_for( if not block_id:
reader.read(block_size_digits), break
base_wait_timeout)).decode('utf-8')
block_size = (await wait_for(
reader.readexactly(block_size_digits),
base_wait_timeout)).decode('utf-8')
if not block_size: if not block_size:
break break
@ -54,8 +56,7 @@ async def accept_stem_blocks(
raise ValueError("Max block size") raise ValueError("Max block size")
raw_block: bytes = await wait_for( raw_block: bytes = await wait_for(
reader.read(block_size), base_wait_timeout * 6) reader.readexactly(block_size), base_wait_timeout * 6)
if not raw_block: if not raw_block:
break break
@ -65,5 +66,5 @@ async def accept_stem_blocks(
# Regardless of stem phase, we add to queue # Regardless of stem phase, we add to queue
# Client will decide if they are to be stemmed # Client will decide if they are to be stemmed
read_routine = reader.read(BLOCK_ID_SIZE) read_routine = reader.readexactly(BLOCK_ID_SIZE)