gossip bug and performance fixes

This commit is contained in:
Kevin F 2022-06-26 00:34:49 -05:00
parent b9fa446cb0
commit b25e376349
22 changed files with 86 additions and 51 deletions

View File

@ -1,9 +1,9 @@
from queue import Empty, Queue
from queue import Empty
from time import sleep
from secrets import choice
import traceback
from typing import TYPE_CHECKING, Coroutine, Tuple, List
from typing import TYPE_CHECKING, Coroutine, List
from ordered_set import OrderedSet
@ -20,8 +20,6 @@ from ...peerset import gossip_peer_set
from .stemstream import do_stem_stream
if TYPE_CHECKING:
from onionrblocks import Block
from ...peer import Peer
from ...dandelion.phase import DandelionPhase
import socket
@ -56,7 +54,8 @@ async def _setup_edge(
if s.recv(1) == dandelion.StemAcceptResult.DENY:
raise StemConnectionDenied
except TimeoutError:
logger.debug("Peer timed out when establishing stem connection", terminal=True)
logger.debug(
"Peer timed out when establishing stem connection", terminal=True)
logger.debug(traceback.format_exc())
except StemConnectionDenied:
logger.debug(
@ -83,12 +82,15 @@ async def stem_out(d_phase: 'DandelionPhase'):
return
not_enough_edges = False
def blackhole_protection(q):
for bl in q:
add_block_to_db(q)
# Spawn threads with deep copied block queue to add to db after time
# for black hole attack
for block_q in gossip_block_queues:
add_delayed_thread(
lambda q: set(map(add_block_to_db, q)),
BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue))
add_delayed_thread(blackhole_protection, BLACKHOLE_EVADE_TIMER_SECS, block_q.queue)
peer_sockets: List['socket.socket'] = []
stream_routines: List[Coroutine] = []
@ -98,15 +100,22 @@ async def stem_out(d_phase: 'DandelionPhase'):
tried_edges: "OrderedSet[Peer]" = OrderedSet()
while len(peer_sockets) < OUTBOUND_DANDELION_EDGES or not_enough_edges:
if gossip_block_queues[0].qsize() == 0 and \
gossip_block_queues[1].qsize() == 0:
sleep(1)
continue
try:
# Get a socket for stem out (makes sure they accept)
peer_sockets.append(await _setup_edge(gossip_peer_set, tried_edges))
peer_sockets.append(
await _setup_edge(gossip_peer_set, tried_edges))
except NotEnoughEdges:
# No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE)
logger.warn("Making too few edges for stemout " +
"this is bad for anonymity if frequent.",
terminal=True)
#logger.debug(
# "Making too few edges for stemout " +
# "this is bad for anonymity if frequent.",
# terminal=True)
not_enough_edges = True
sleep(1)
else:
# Ran out of time for stem phase
if not d_phase.is_stem_phase() or d_phase.remaining_time() < 5:
@ -119,7 +128,8 @@ async def stem_out(d_phase: 'DandelionPhase'):
s.close()
peer_sockets.clear()
break
# If above loop ran out of time or NotEnoughEdges, loops below will not execute
# If above loop ran out of time or NotEnoughEdges,
# loops below will not execute
for count, peer_socket in enumerate(peer_sockets):
stream_routines.append(

View File

@ -1,6 +1,7 @@
from typing import TYPE_CHECKING
from ...constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN
import logger
from ...constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_SIZE_LEN
if TYPE_CHECKING:
from queue import Queue
@ -22,9 +23,10 @@ async def do_stem_stream(
# Primary client component that communicate's with gossip.server.acceptstem
remaining_time = d_phase.remaining_time()
bl: 'Block' = block_queue.get(block=True, timeout=remaining_time)
logger.info("Sending block over dandelion++", terminal=True)
block_size = str(len(bl.raw)).zfill(BLOCK_MAX_SIZE_LEN)
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
peer_socket.sendall(bl.id)
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
peer_socket.sendall(block_size.encode('utf-8'))
peer_socket.sendall(bl.raw)

View File

@ -49,6 +49,7 @@ def _ask_peer(peer):
'address': peer,
'callback': connectpeer.connect_peer
}
logger.info("Got new peer from exchange " + peer.decode('utf-8'), terminal=True)
onionrevents.event('announce_rec', data=connect_data, threaded=True)
s.close()

View File

@ -13,7 +13,7 @@ from typing import TYPE_CHECKING, List
import blockdb
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS
if TYPE_CHECKING:
from socket import socket
@ -76,13 +76,16 @@ def stream_from_peers():
str(offset).zfill(BLOCK_STREAM_OFFSET_DIGITS).encode('utf-8'))
while True:
logger.debug("Reading block id in stream", terminal=True)
block_id = sock.recv(BLOCK_ID_SIZE)
if blockdb.has_block(block_id):
sock.sendall(int(0).to_bytes(1, 'big'))
continue
sock.sendall(int(1).to_bytes(1, 'big'))
block_size = int(sock.recv(BLOCK_MAX_SIZE_LEN))
logger.debug("Reading block size in stream", terminal=True)
block_size = int(sock.recv(BLOCK_SIZE_LEN))
if block_size > BLOCK_MAX_SIZE or block_size <= 0:
logger.warn(
f"Peer {peer.transport_address} " +
@ -90,6 +93,9 @@ def stream_from_peers():
break
block_data = sock.recv(block_size)
logger.debug(
"We got a block from stream, assuming it is valid",
terminal=True)
try:
blockdb.add_block_to_db(
onionrblocks.Block(

View File

@ -2,7 +2,7 @@ BOOTSTRAP_ATTEMPTS = 5
PEER_AMOUNT_TO_ASK = 3
TRANSPORT_SIZE_BYTES = 64
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
DANDELION_EPOCH_LENGTH = 60

View File

@ -92,16 +92,18 @@ def gossip_server():
reader, writer,
inbound_dandelion_edge_count)
except asyncio.exceptions.TimeoutError:
pass
logger.debug(
"Inbound edge timed out when steming blocks to us",
terminal=True)
except asyncio.exceptions.IncompleteReadError:
pass
logger.debug(
"Inbound edge timed out (Incomplete Read) when steming blocks to us",
terminal=True)
except Exception:
logger.warn(
f"Err acceptind stem blocks\n{traceback.format_exc()}",
f"Err accepting stem blocks\n{traceback.format_exc()}",
terminal=True)
# Subtract dandelion edge, make sure >=0
inbound_dandelion_edge_count[0] = \

View File

@ -7,12 +7,11 @@ from onionrblocks import Block
import logger
from ..dandelion import StemAcceptResult
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE
from ..constants import BLOCK_ID_SIZE, BLOCK_SIZE_LEN, BLOCK_MAX_SIZE
from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM
from ..blockqueues import gossip_block_queues
block_size_digits = len(str(BLOCK_MAX_SIZE))
base_wait_timeout = 120
if TYPE_CHECKING:
@ -28,21 +27,23 @@ async def accept_stem_blocks(
writer.write(StemAcceptResult.DENY)
return
writer.write(StemAcceptResult.ALLOW)
await writer.drain()
inbound_edge_count[0] += 1
# Start getting the first block
read_routine = reader.readexactly(BLOCK_ID_SIZE)
block_queue_to_use = secrets.choice(gossip_block_queues)
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
read_routine = reader.readexactly(BLOCK_ID_SIZE)
logger.debug(f"Reading block id in stem server", terminal=True)
block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
if not block_id:
break
logger.debug(f"Reading block size in stem server", terminal=True)
block_size = (await wait_for(
reader.readexactly(block_size_digits),
reader.readexactly(BLOCK_SIZE_LEN),
base_wait_timeout)).decode('utf-8')
if not block_size:
break
@ -53,6 +54,8 @@ async def accept_stem_blocks(
if block_size > BLOCK_MAX_SIZE:
raise ValueError("Max block size")
logger.debug(f"Reading block of size {block_size} in stem server", terminal=True)
raw_block: bytes = await wait_for(
reader.readexactly(block_size), base_wait_timeout * 6)
if not raw_block:
@ -66,5 +69,3 @@ async def accept_stem_blocks(
# Regardless of stem phase, we add to queue
# Client will decide if they are to be stemmed
read_routine = reader.readexactly(BLOCK_ID_SIZE)

View File

@ -17,7 +17,8 @@ if TYPE_CHECKING:
from asyncio import StreamWriter, StreamReader
from onionrblocks import Block
from ..constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS
from ..constants import BLOCK_MAX_SIZE, BLOCK_SIZE_LEN
from ..constants import BLOCK_STREAM_OFFSET_DIGITS
import logger
from blockdb import get_blocks_after_timestamp, block_storage_observers
@ -72,7 +73,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
await writer.drain()
# write block size
writer.write(
str(len(block.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))
str(len(block.raw)).zfill(BLOCK_SIZE_LEN).encode('utf-8'))
await writer.drain()
writer.write(block.raw)
await writer.drain()

View File

@ -30,11 +30,11 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
blockapi_blueprint = Blueprint('blockapi', __name__)
stream_to_use = secrets.randbits(1)
# Add a block that we generated (or received from a transport like LAN/sneakernet)
@blockapi_blueprint.route('/addvdfblock', methods=['POST'])
def block_serialized():
stream_to_use = secrets.randbits(1)
req_data = request.data
block_id = req_data[:BLOCK_ID_SIZE]
block_data = req_data[BLOCK_ID_SIZE:]

View File

@ -5,10 +5,12 @@ Default example plugin for devs or to test blocks
import sys
import os
import locale
from time import sleep
import traceback
from typing import Set, TYPE_CHECKING
from threading import Thread, local
import blockdb
from gossip.peerset import gossip_peer_set
import logger
@ -39,10 +41,8 @@ plugin_name = 'example'
PLUGIN_VERSION = '0.0.0'
def on_blocktest_cmd(api, data=None):
bl = onionrblocks.create_anonvdf_block(input("Enter a message:").encode('utf-8'), b"txt", 3600)
bl = onionrblocks.create_anonvdf_block(input("Enter a message:").encode('utf-8'), b"tst", 3600)
logger.info(
local_command(
'/addvdfblock',
@ -51,6 +51,18 @@ def on_blocktest_cmd(api, data=None):
terminal=True)
def on_printtest_cmd(api, data=None):
while True:
try:
print(list(blockdb.get_blocks_by_type("tst"))[0].data)
except IndexError:
pass
try:
sleep(1)
except KeyboardInterrupt:
break
def on_init(api, data=None):
logger.info(

View File

@ -1 +1 @@
b3u6g7syd6ddicwoxe7ydelqnldyr6g5skvvmjzgh6duwjk6jhv2ixqd
jjvq7itovbt6gcttj5p25zgalht3zraqfvgzojhdqdd2rarriehrfnyd,m4vae3qvhnpz65jscbulv5j7lymhnbfv3hdhfwk7qztknldbgn3b3oqd

View File

@ -29,7 +29,7 @@ from gossip.peerset import gossip_peer_set
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
MAX_PEERS = 10

View File

@ -33,7 +33,7 @@ from gossip.client.announce import do_announce
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
MAX_PEERS = 10

View File

@ -28,7 +28,7 @@ from gossip.peerset import gossip_peer_set
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
MAX_PEERS = 10

View File

@ -35,7 +35,7 @@ from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
MAX_PEERS = 10
@ -84,7 +84,7 @@ def _server():
print('skipping block correctly')
continue
conn.sendall(str(len(bl.raw)).encode('utf-8').zfill(BLOCK_MAX_SIZE_LEN))
conn.sendall(str(len(bl.raw)).encode('utf-8').zfill(BLOCK_SIZE_LEN))
conn.sendall(bl.raw)
conn.recv(1)

View File

@ -35,7 +35,7 @@ from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
MAX_PEERS = 10
@ -55,7 +55,7 @@ def _server():
for bl in test_blocks:
conn.sendall(bl.id)
conn.recv(1)
conn.sendall(str(len(bl.raw)).encode('utf-8').zfill(BLOCK_MAX_SIZE_LEN))
conn.sendall(str(len(bl.raw)).encode('utf-8').zfill(BLOCK_SIZE_LEN))
conn.sendall(bl.raw)
conn.recv(1)

View File

@ -31,7 +31,7 @@ from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
MAX_PEERS = 10

View File

@ -32,7 +32,7 @@ from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
MAX_PEERS = 10

View File

@ -22,7 +22,7 @@ from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
@ -51,7 +51,7 @@ class OnionrServerPutBlocksTest(unittest.TestCase):
for bl in blocks:
writer.write(bl.id)
writer.write(
str(len(bl.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))
str(len(bl.raw)).zfill(BLOCK_SIZE_LEN).encode('utf-8'))
writer.write(bl.raw)
await writer.drain()

View File

@ -22,7 +22,7 @@ from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
@ -46,7 +46,7 @@ class OnionrServerPutBlockTest(unittest.TestCase):
writer.write(bl.id)
writer.write(
str(len(bl.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))
str(len(bl.raw)).zfill(BLOCK_SIZE_LEN).encode('utf-8'))
writer.write(bl.raw)
await writer.drain()

View File

@ -19,7 +19,7 @@ from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
@ -63,7 +63,7 @@ class OnionrDiffuseMany(unittest.TestCase):
# check block size
self.assertEqual(
len(bl.raw),
int((await reader.readexactly(BLOCK_MAX_SIZE_LEN)).decode('utf-8')))
int((await reader.readexactly(BLOCK_SIZE_LEN)).decode('utf-8')))
self.assertEqual(bl.raw, await reader.readexactly(len(bl.raw)))
writer.write(int(1).to_bytes(1, 'big'))

View File

@ -19,7 +19,7 @@ from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
@ -53,7 +53,7 @@ class OnionrServerDiffuseTest(unittest.TestCase):
# check block size
self.assertEqual(
len(bl.raw),
int((await reader.readexactly(BLOCK_MAX_SIZE_LEN)).decode('utf-8')))
int((await reader.readexactly(BLOCK_SIZE_LEN)).decode('utf-8')))
self.assertEqual(bl.raw, await reader.readexactly(len(bl.raw)))