Implemented non-dandelion uploading to improve network performance

This commit is contained in:
Kevin F 2022-07-26 12:45:48 -05:00
parent 9b6d1f5dbd
commit 90176e43fb
12 changed files with 166 additions and 10 deletions

View File

@ -44,7 +44,7 @@ def start_gossip_threads():
# There is a unified set so gossip logic is not repeated # There is a unified set so gossip logic is not repeated
add_onionr_thread( add_onionr_thread(
gossip_server, 1, initial_sleep=0.2) gossip_server, 1, 'gossip_server', initial_sleep=0.2)
threading.Thread( threading.Thread(
target=start_gossip_client, daemon=True).start() target=start_gossip_client, daemon=True).start()

View File

@ -34,7 +34,7 @@ from .announce import do_announce
from .dandelionstem import stem_out from .dandelionstem import stem_out
from .peerexchange import get_new_peers from .peerexchange import get_new_peers
from ..peerset import gossip_peer_set from ..peerset import gossip_peer_set
from .streamblocks import stream_from_peers from .streamblocks import stream_from_peers, stream_to_peer
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
@ -97,16 +97,24 @@ def start_gossip_client():
# transport plugin handles the new peer # transport plugin handles the new peer
add_onionr_thread( add_onionr_thread(
get_new_peers, get_new_peers,
60, initial_sleep=120) 60, 'get_new_peers', initial_sleep=120)
# Start a new thread to stream blocks from peers # Start a new thread to stream blocks from peers
# These blocks are being diffused and are stored in # These blocks are being diffused and are stored in
# the peer's block database # the peer's block database
add_onionr_thread( add_onionr_thread(
stream_from_peers, stream_from_peers,
3, initial_sleep=10 3, 'stream_from_peers', initial_sleep=10
) )
# Start a thread to upload blocks, useful for when
# connectivity is poor or we are not allowing incoming
# connections on any transports
add_onionr_thread(
stream_to_peer,
10, 'stream_to_peer', initial_sleep=1)
# Blocks we receive or create through all means except # Blocks we receive or create through all means except
# Diffusal are put into block queues, we decide to either # Diffusal are put into block queues, we decide to either
# stem or diffuse a block from the queue based on the current # stem or diffuse a block from the queue based on the current

View File

@ -0,0 +1,2 @@
from .streamfrom import stream_from_peers
from .streamto import stream_to_peer

View File

@ -14,7 +14,7 @@ from typing import TYPE_CHECKING, List
import blockdb import blockdb
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS from ...constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS
if TYPE_CHECKING: if TYPE_CHECKING:
from socket import socket from socket import socket
@ -25,8 +25,8 @@ from ordered_set import OrderedSet
import logger import logger
import onionrblocks import onionrblocks
from ..peerset import gossip_peer_set from ...peerset import gossip_peer_set
from ..commands import GossipCommands, command_to_byte from ...commands import GossipCommands, command_to_byte
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify

View File

@ -0,0 +1,56 @@
from secrets import SystemRandom
from time import time
from typing import List, TYPE_CHECKING
if TYPE_CHECKING:
from onionrblocks import Block
from gossip.commands import GossipCommands, command_to_byte
from blockdb import get_blocks_after_timestamp
from ...constants import BLOCK_ID_SIZE, BLOCK_SIZE_LEN
from ...peerset import gossip_peer_set
from ...server import lastincoming
SECS_ELAPSED_NO_INCOMING_BEFORE_STREAM = 3
class SendTimestamp:
timestamp: int = 0
def stream_to_peer():
if SECS_ELAPSED_NO_INCOMING_BEFORE_STREAM > time() - lastincoming.last_incoming_timestamp:
SendTimestamp.timestamp = int(time())
return
if not len(gossip_peer_set):
return
rand = SystemRandom()
peer = rand.choice(gossip_peer_set)
buffer: List['Block'] = []
def _do_upload():
with peer.get_socket(30) as p:
p.sendall(command_to_byte(GossipCommands.PUT_BLOCK_DIFFUSE))
while len(buffer):
try:
block = buffer.pop()
except IndexError:
break
p.sendall(block.id.zfill(BLOCK_ID_SIZE))
if int.from_bytes(p.recv(1), 'big') == 0:
continue
block_size = str(len(block.raw)).zfill(BLOCK_SIZE_LEN)
p.sendall(block_size.encode('utf-8'))
p.sendall(block.raw)
# Buffer some blocks so we're not streaming too many to one peer
# and to efficiently avoid connecting without sending anything
buffer_max = 10
for block in get_blocks_after_timestamp(SendTimestamp.timestamp):
buffer.append(block)
if len(buffer) > buffer_max:
_do_upload(buffer)
if len(buffer):
_do_upload()
SendTimestamp.timestamp = int(time())

View File

@ -6,6 +6,7 @@ class GossipCommands(IntEnum):
PEER_EXCHANGE = auto() PEER_EXCHANGE = auto()
STREAM_BLOCKS = auto() STREAM_BLOCKS = auto()
PUT_BLOCKS = auto() PUT_BLOCKS = auto()
PUT_BLOCK_DIFFUSE = auto()
def command_to_byte(cmd: GossipCommands): def command_to_byte(cmd: GossipCommands):

View File

@ -1,9 +1,12 @@
import asyncio import asyncio
import traceback import traceback
from time import time
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Set, Tuple from typing import Set, Tuple
from queue import Queue from threading import Thread
from onionrblocks import Block
from gossip import constants from gossip import constants
from ..connectpeer import connect_peer from ..connectpeer import connect_peer
@ -18,10 +21,13 @@ if TYPE_CHECKING:
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter
from filepaths import gossip_server_socket_file from filepaths import gossip_server_socket_file
import blockdb
from blockdb import add_block_to_db
from ..commands import GossipCommands from ..commands import GossipCommands
from ..peerset import gossip_peer_set from ..peerset import gossip_peer_set
from .acceptstem import accept_stem_blocks from .acceptstem import accept_stem_blocks
from .diffuseblocks import diffuse_blocks from .diffuseblocks import diffuse_blocks
from .import lastincoming
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -52,6 +58,7 @@ def gossip_server():
break break
except asyncio.IncompleteReadError: except asyncio.IncompleteReadError:
break break
lastincoming.last_incoming_timestamp = int(time())
cmd = int.from_bytes(cmd, 'big') cmd = int.from_bytes(cmd, 'big')
if cmd == b'' or cmd == 0: if cmd == b'' or cmd == 0:
@ -107,6 +114,24 @@ def gossip_server():
# Subtract dandelion edge, make sure >=0 # Subtract dandelion edge, make sure >=0
inbound_dandelion_edge_count[0] = \ inbound_dandelion_edge_count[0] = \
max(inbound_dandelion_edge_count[0] - 1, 0) max(inbound_dandelion_edge_count[0] - 1, 0)
case GossipCommands.PUT_BLOCK_DIFFUSE:
async def _get_block_diffused():
block_id = await reader.readexactly(constants.BLOCK_ID_SIZE)
if blockdb.has_block(block_id):
writer.write(int(0).to_bytes(1, 'big'))
else:
writer.write(int(1).to_bytes(1, 'big'))
await writer.drain()
block_size = int(await asyncio.wait_for(reader.readexactly(constants.BLOCK_SIZE_LEN), 30))
block_data = await reader.readexactly(block_size)
Thread(
target=add_block_to_db,
args=[
Block(block_id, block_data, auto_verify=True)]
).start()
await _get_block_diffused()
break break
await writer.drain() await writer.drain()

View File

@ -0,0 +1 @@
last_incoming_timestamp = 0

View File

@ -27,7 +27,8 @@ def _onionr_thread(func: Callable,
def add_onionr_thread( def add_onionr_thread(
func: Callable, func: Callable,
sleep_secs: int, *args, initial_sleep: int = 5, **kwargs): sleep_secs: int, thread_name: str,
*args, initial_sleep: int = 5, **kwargs):
"""Spawn a new onionr thread that exits when the main thread does. """Spawn a new onionr thread that exits when the main thread does.
Runs in an infinite loop with sleep between calls Runs in an infinite loop with sleep between calls
@ -40,6 +41,7 @@ def add_onionr_thread(
initial_sleep, initial_sleep,
*args), *args),
kwargs=kwargs, kwargs=kwargs,
name=thread_name,
daemon=True).start() daemon=True).start()

View File

@ -1 +1 @@
/dev/shm/onionr3177415330/gossip-server.sock /dev/shm/onionr90775244/gossip-server.sock,/dev/shm/onionr1873728538/gossip-server.sock

View File

@ -58,6 +58,9 @@ def on_bootstrap(api, data=None):
if address == gossip_server_socket_file or not address: if address == gossip_server_socket_file or not address:
continue continue
if not os.path.exists(address):
continue
# Tell the gossip logic that this peer is ready to connect # Tell the gossip logic that this peer is ready to connect
# it will add it to data['peer_set'] if it responds to ping # it will add it to data['peer_set'] if it responds to ping
Thread( Thread(

View File

@ -0,0 +1,58 @@
import os, uuid
from queue import Empty
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
from time import sleep
from threading import Thread
import asyncio
import unittest
import sys
sys.path.append(".")
sys.path.append("src/")
from ordered_set import OrderedSet
import onionrblocks
import blockdb
from gossip.server import gossip_server
from gossip.blockqueues import gossip_block_queues
from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
class OnionrServerPutBLTestDiffuse(unittest.TestCase):
def test_put_block(self):
Thread(target=gossip_server, daemon=True).start()
sleep(0.01)
bl = onionrblocks.blockcreator.create_anonvdf_block(
b"my test block", b"txt", 2800)
async def block_put_client():
reader, writer = await asyncio.open_unix_connection(
gossip_server_socket_file)
writer.write(int(6).to_bytes(1, 'big'))
writer.write(bl.id)
self.assertEqual(int.from_bytes(await reader.readexactly(1), 'big'), 1)
writer.write(
str(len(bl.raw)).zfill(BLOCK_SIZE_LEN).encode('utf-8'))
writer.write(bl.raw)
sleep(0.2)
self.assertIn(bl.id, list([block.id for block in blockdb.get_blocks_after_timestamp(0)]))
await writer.drain()
asyncio.run(block_put_client())
unittest.main()