Compare commits

...

2 Commits

Author SHA1 Message Date
Kevin F df02bdb826 Work on normal gossip diffusion 2022-03-26 18:26:55 -05:00
Kevin F c215d4decd Work on normal gossip diffusion 2022-03-26 18:24:40 -05:00
8 changed files with 158 additions and 25 deletions

View File

@ -13,6 +13,7 @@
* [ ] Restore webUI as a plugin
* [ ] Restore/reimplement mail plugin
* [ ] Restore/reimplement friends plugin
* [ ] Refresh test suite
## Store Plugin Release (9.1)

View File

@ -1,16 +1,24 @@
from typing import Callable, Generator, List
from onionrblocks import Block
import db
from utils import identifyhome
block_db_path = identifyhome.identify_home() + 'blockdata'
from .dbpath import block_db_path
block_storage_observers: List[Callable] = []
def add_block_to_db(block: Block):
db.set(block_db_path, block.id, block.raw)
# Raises db.DuplicateKey if dupe
db.set_if_new(block_db_path, block.id, block.raw)
for func in block_storage_observers:
func(block)
def get_blocks_by_type(block_type: str):
def get_blocks_by_type(block_type: str) -> Generator[Block]:
block_db = db.get_db_obj(block_db_path, 'u')
for block_hash in db.list_keys(block_db_path):
block = Block(block_hash, block_db[block_hash], auto_verify=False)
@ -18,7 +26,8 @@ def get_blocks_by_type(block_type: str):
yield block
def get_blocks_after_timestamp(timestamp: int, block_type: str = ''):
def get_blocks_after_timestamp(
timestamp: int, block_type: str = '') -> Generator[Block]:
block_db = db.get_db_obj(block_db_path, 'u')
for block_hash in db.list_keys(block_db_path):

View File

@ -4,6 +4,8 @@ import os
timeout = 120
class DuplicateKey(ValueError): pass
def _do_timeout(func, *args):
ts = 0
@ -22,6 +24,18 @@ def _do_timeout(func, *args):
return res
def set_if_new(db_path, key, value):
def _set(key, value):
with dbm.open(db_path, "c") as my_db:
try:
my_db[key]
except KeyError:
my_db[key] = value
else:
raise DuplicateKey
_do_timeout(_set, key, value)
def set(db_path, key, value):
"""Set a value in the db, open+timeout so not good for rapid use"""
def _set(key, value):

View File

@ -1,6 +1,6 @@
from typing import TYPE_CHECKING
from ...constants import BLOCK_MAX_SIZE
from ...constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN
if TYPE_CHECKING:
from queue import Queue
@ -23,7 +23,7 @@ async def do_stem_stream(
remaining_time = d_phase.remaining_time()
bl: 'Block' = block_queue.get(block=True, timeout=remaining_time)
block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE)
block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE_LEN)
peer_socket.sendall(bl.id)
peer_socket.sendall(block_size.encode('utf-8'))

View File

@ -2,8 +2,9 @@ BOOTSTRAP_ATTEMPTS = 5
PEER_AMOUNT_TO_ASK = 3
TRANSPORT_SIZE_BYTES = 64
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
DANDELION_EPOCH_LENGTH = 120
DANDELION_EPOCH_LENGTH = 60
# Magic number i made up, not really specified in dandelion++ paper
MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris

View File

@ -21,6 +21,7 @@ from filepaths import gossip_server_socket_file
from ..commands import GossipCommands
from ..peerset import gossip_peer_set
from .acceptstem import accept_stem_blocks
from .streamblocks import stream_blocks
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -46,9 +47,11 @@ def gossip_server():
while True:
try:
cmd = await asyncio.wait_for(reader.read(1), 60)
cmd = await asyncio.wait_for(reader.readexactly(1), 60)
except asyncio.exceptions.CancelledError:
break
except asyncio.IncompleteReadError:
break
cmd = int.from_bytes(cmd, 'big')
if cmd == b'' or cmd == 0:
@ -60,23 +63,30 @@ def gossip_server():
pass
case GossipCommands.ANNOUNCE:
async def _read_announce():
address = await reader.read(
address = await reader.readexactly(
constants.TRANSPORT_SIZE_BYTES)
onionrevents.event(
'announce_rec',
data={'address': address,
'callback': connect_peer},
threaded=True)
writer.write(int(1).to_bytes(1, 'big'))
if address:
onionrevents.event(
'announce_rec',
data={'address': address,
'callback': connect_peer},
threaded=True)
writer.write(int(1).to_bytes(1, 'big'))
await asyncio.wait_for(_read_announce(), 10)
case GossipCommands.PEER_EXCHANGE:
for peer in gossip_peer_set:
writer.write(
peer.transport_address.encode(
'utf-8').removesuffix(b'.onion'))
case GossipCommands.STREAM_BLOCKS:
try:
await stream_blocks(reader, writer)
except Exception:
logger.warn(
f"Err streaming blocks\n{traceback.format_exc()}",
terminal=True)
case GossipCommands.PUT_BLOCKS:
# Create block queue & append stemmed blocks to it
# Pick block queue & append stemmed blocks to it
try:
await accept_stem_blocks(
reader, writer,

View File

@ -32,7 +32,7 @@ async def accept_stem_blocks(
inbound_edge_count[0] += 1
# Start getting the first block
read_routine = reader.read(BLOCK_ID_SIZE)
read_routine = reader.readexactly(BLOCK_ID_SIZE)
stream_start_time = int(time())
block_queue_to_use = secrets.choice(gossip_block_queues)
@ -40,10 +40,12 @@ async def accept_stem_blocks(
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
block_size = (await wait_for(
reader.read(block_size_digits),
base_wait_timeout)).decode('utf-8')
if not block_id:
break
block_size = (await wait_for(
reader.readexactly(block_size_digits),
base_wait_timeout)).decode('utf-8')
if not block_size:
break
@ -54,8 +56,7 @@ async def accept_stem_blocks(
raise ValueError("Max block size")
raw_block: bytes = await wait_for(
reader.read(block_size), base_wait_timeout * 6)
reader.readexactly(block_size), base_wait_timeout * 6)
if not raw_block:
break
@ -65,5 +66,5 @@ async def accept_stem_blocks(
# Regardless of stem phase, we add to queue
# Client will decide if they are to be stemmed
read_routine = reader.read(BLOCK_ID_SIZE)
read_routine = reader.readexactly(BLOCK_ID_SIZE)

View File

@ -0,0 +1,97 @@
"""Onionr - Private P2P Communication.
Stream blocks we can for inbound edge peers that ask for them
doesn't apply for blocks in the gossip queue that are awaiting
descision to fluff or stem
"""
from asyncio import IncompleteReadError, wait_for
import queue
import traceback
from typing import TYPE_CHECKING
from time import time
if TYPE_CHECKING:
from asyncio import StreamWriter, StreamReader
from onionrblocks import Block
from ..constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN
import logger
from blockdb import get_blocks_after_timestamp, block_storage_observers
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
async def stream_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
"""stream blocks to a peer created since an offset
"""
time_offset = await wait_for(reader.read(8), 12)
time_offset = time_offset.decode('utf-8')
keep_writing = True
# Makes sure timestamp is positive and not weird
if not all(c in "0123456789" for c in time_offset):
raise ValueError("Invalid time offset")
time_offset = int(time_offset)
if time_offset - time() < -5:
raise ValueError(
"Peer's specified time offset skewed too far into the future")
newly_stored_blocks = queue.Queue()
def _add_to_queue(bl):
newly_stored_blocks.put_nowait(bl)
block_storage_observers.append(
_add_to_queue
)
async def _send_block(bl: 'Block'):
writer.write(block.id)
await writer.drain()
writer.write(
str(len(block.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))
await writer.drain()
writer.write(block.raw)
await writer.drain()
try:
for block in get_blocks_after_timestamp(time_offset):
if not keep_writing:
break
await _send_block(block)
try:
keep_writing = bool(
int.from_bytes(await reader.readexactly(1), 'big')
)
except IncompleteReadError:
keep_writing = False
while keep_writing:
await _send_block(newly_stored_blocks.get())
try:
keep_writing = bool(
int.from_bytes(await reader.readexactly(1), 'big')
)
except IncompleteReadError:
keep_writing = False
except Exception:
logger.warn(traceback.format_exc(), terminal=True)
block_storage_observers.remove(_add_to_queue)