Work on stemout

This commit is contained in:
Kevin F 2022-03-12 19:28:18 -06:00
parent 4c54677387
commit 19159ffa06
8 changed files with 68 additions and 14 deletions

View File

@ -6,7 +6,7 @@ from utils import identifyhome
block_db_path = identifyhome.identify_home() + 'blockdata'
def store_vdf_block(block: Block):
def add_block_to_db(block: Block):
db.set(block_db_path, block.id, block.raw)

View File

@ -26,7 +26,7 @@ import onionrplugins
from ..commands import GossipCommands
from gossip.dandelion.phase import DandelionPhase
from onionrthreads import add_onionr_thread
from blockdb import store_vdf_block
from blockdb import add_block_to_db
from .announce import do_announce
@ -51,7 +51,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
def gossip_client(
peer_set: OrderedSet['Peer'],
block_queues: Tuple[Queue['Block']],
block_queues: Tuple[Queue['Block'], Queue['Block']],
dandelion_seed: bytes):
"""
Gossip client does the following:

View File

@ -1,7 +1,16 @@
from queue import Queue
from threading import Timer
from time import sleep
from secrets import choice
import traceback
from typing import TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Tuple, List, Set
from onionrthreads import add_delayed_thread
from blockdb import add_block_to_db
import logger
from ..constants import BLACKHOLE_EVADE_TIMER_SECS, MAX_OUTBOUND_DANDELION_EDGE
if TYPE_CHECKING:
from ordered_set import OrderedSet
@ -9,15 +18,45 @@ if TYPE_CHECKING:
from ..peer import Peer
from ..dandelion.phase import DandelionPhase
class NotEnoughEdges(ValueError): pass # noqa
def _setup_edge(
peer_set: OrderedSet['Peer'], exclude_set: OrderedSet['Peer']):
"""Negotiate stem connection with random peer, add to exclude set if fail"""
try:
peer: 'Peer' = choice(peer_set - exclude_set)
except IndexError:
raise NotEnoughEdges
try:
s = peer.get_socket()
except Exception:
logger.debug(traceback.format_exc())
exclude_set.add(peer)
def stem_out(
block_queues: Tuple[Queue['Block']],
block_queues: Tuple[Queue['Block'], Queue['Block']],
peer_set: OrderedSet['Peer'],
d_phase: 'DandelionPhase'):
# Spawn a thread with block set to add to db after time for black hole attack
# Spawn threads with deep copied block queue to add to db after time
# for black hole attack
for block_q in block_queues:
add_delayed_thread(
lambda q: set(map(add_block_to_db, q)),
BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue))
# don't bother if there are no possible outbound edges
if not len(peer_set):
sleep(1)
return
# Pick edges randomly
# Using orderedset for the tried edges to ensure random pairing with queue
tried_edges: OrderedSet['Peer'] = OrderedSet()

View File

@ -10,7 +10,7 @@ if TYPE_CHECKING:
def store_blocks(
block_queues: Tuple[Queue['Block']],
block_queues: Tuple[Queue['Block'], Queue['Block']],
dandelion_phase: 'DandelionPhase'):
new_queue: Queue['Block'] = Queue()
@ -31,7 +31,7 @@ def store_blocks(
while not dandelion_phase.is_stem_phase() \
and dandelion_phase.remaining_time() > 1:
try:
blockdb.store_vdf_block(
blockdb.add_block_to_db(
new_queue.get(timeout=dandelion_phase.remaining_time())
)
except TimeoutError:

View File

@ -4,5 +4,12 @@ TRANSPORT_SIZE_BYTES = 64
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_ID_SIZE = 128
DANDELION_EPOCH_LENGTH = 60
MAX_INBOUND_DANDELION_EDGE = 2
MAX_STEM_BLOCKS_PER_STREAM = 1000
# Magic number i made up, not really specified in dandelion++ paper
MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowlorisstor browser dvm 16
# Dandelion subgraph is aprox 4-regular
MAX_OUTBOUND_DANDELION_EDGE = 2
MAX_STEM_BLOCKS_PER_STREAM = 1000
BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3

View File

@ -40,7 +40,7 @@ inbound_dandelion_edge_count = [0]
def gossip_server(
peer_set: OrderedSet['Peer'],
block_queues: Tuple[Queue['Block']],
block_queues: Tuple[Queue['Block'], Queue['Block']],
dandelion_seed: bytes):
async def peer_connected(

View File

@ -20,7 +20,7 @@ if TYPE_CHECKING:
async def accept_stem_blocks(
block_queues: Tuple[Queue['Block']],
block_queues: Tuple[Queue['Block'], Queue['Block']],
reader: 'StreamReader',
writer: 'StreamWriter',
inbound_edge_count: List[int]):
@ -35,6 +35,8 @@ async def accept_stem_blocks(
read_routine = reader.read(BLOCK_ID_SIZE)
stream_start_time = int(time())
block_queue_to_use = secrets.choice(block_queues)
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
@ -51,7 +53,7 @@ async def accept_stem_blocks(
raw_block: bytes = await wait_for(
reader.read(block_size), base_wait_timeout * 6)
secrets.choice(block_queues).put(
block_queue_to_use.put(
Block(block_id, raw_block, auto_verify=True)
)
# Regardless of stem phase, we add to queue

View File

@ -4,7 +4,6 @@ from typing import Iterable
import traceback
from threading import Thread
from uuid import uuid4
from time import sleep
import logger
@ -42,3 +41,10 @@ def add_onionr_thread(
*args),
kwargs=kwargs,
daemon=True).start()
def add_delayed_thread(func: Callable, sleep_secs: int, *args, **kwargs):
assert sleep_secs > 0
t = Thread(target=func, args=args, kwargs=kwargs, daemon=True)
sleep(sleep_secs)
t.start()