gossip fixes
This commit is contained in:
parent
d2b5298bc6
commit
84c13ade51
@ -14,10 +14,6 @@ def add_block_to_db(block: Block):
|
|||||||
# Raises db.DuplicateKey if dupe
|
# Raises db.DuplicateKey if dupe
|
||||||
db.set_if_new(block_db_path, block.id, block.raw)
|
db.set_if_new(block_db_path, block.id, block.raw)
|
||||||
|
|
||||||
for func in block_storage_observers:
|
|
||||||
func(block)
|
|
||||||
|
|
||||||
|
|
||||||
def get_blocks_by_type(block_type: str) -> "Generator[Block]":
|
def get_blocks_by_type(block_type: str) -> "Generator[Block]":
|
||||||
block_db = db.get_db_obj(block_db_path, 'u')
|
block_db = db.get_db_obj(block_db_path, 'u')
|
||||||
for block_hash in db.list_keys(block_db_path):
|
for block_hash in db.list_keys(block_db_path):
|
||||||
|
@ -84,7 +84,7 @@ async def stem_out(d_phase: 'DandelionPhase'):
|
|||||||
|
|
||||||
def blackhole_protection(q):
|
def blackhole_protection(q):
|
||||||
for bl in q:
|
for bl in q:
|
||||||
add_block_to_db(q)
|
add_block_to_db(bl)
|
||||||
|
|
||||||
|
|
||||||
# Spawn threads with deep copied block queue to add to db after time
|
# Spawn threads with deep copied block queue to add to db after time
|
||||||
@ -122,7 +122,7 @@ async def stem_out(d_phase: 'DandelionPhase'):
|
|||||||
logger.error(
|
logger.error(
|
||||||
"Did not stem out any blocks in time, " +
|
"Did not stem out any blocks in time, " +
|
||||||
"if this happens regularly you may be under attack",
|
"if this happens regularly you may be under attack",
|
||||||
terminal=True)
|
terminal=False)
|
||||||
for s in peer_sockets:
|
for s in peer_sockets:
|
||||||
if s:
|
if s:
|
||||||
s.close()
|
s.close()
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
from asyncio import sleep
|
||||||
|
from queue import Empty
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import logger
|
import logger
|
||||||
@ -22,7 +24,14 @@ async def do_stem_stream(
|
|||||||
while remaining_time > 5 and my_phase_id == d_phase.phase_id:
|
while remaining_time > 5 and my_phase_id == d_phase.phase_id:
|
||||||
# Primary client component that communicate's with gossip.server.acceptstem
|
# Primary client component that communicate's with gossip.server.acceptstem
|
||||||
remaining_time = d_phase.remaining_time()
|
remaining_time = d_phase.remaining_time()
|
||||||
bl: 'Block' = block_queue.get(block=True, timeout=remaining_time)
|
while remaining_time:
|
||||||
|
try:
|
||||||
|
# queues can't block because we're in async
|
||||||
|
bl = block_queue.get(block=False)
|
||||||
|
except Empty:
|
||||||
|
await sleep(1)
|
||||||
|
else:
|
||||||
|
break
|
||||||
logger.info("Sending block over dandelion++", terminal=True)
|
logger.info("Sending block over dandelion++", terminal=True)
|
||||||
|
|
||||||
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
|
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
|
||||||
|
@ -5,6 +5,7 @@ Download blocks that are being diffused
|
|||||||
doesn't apply for blocks in the gossip queue that are awaiting
|
doesn't apply for blocks in the gossip queue that are awaiting
|
||||||
descision to fluff or stem
|
descision to fluff or stem
|
||||||
"""
|
"""
|
||||||
|
from ast import Index
|
||||||
from threading import Thread, Semaphore
|
from threading import Thread, Semaphore
|
||||||
from random import SystemRandom
|
from random import SystemRandom
|
||||||
from time import sleep
|
from time import sleep
|
||||||
@ -120,6 +121,12 @@ def stream_from_peers():
|
|||||||
while True:
|
while True:
|
||||||
need_socket_lock.acquire()
|
need_socket_lock.acquire()
|
||||||
available_set = gossip_peer_set - tried_peers
|
available_set = gossip_peer_set - tried_peers
|
||||||
|
if not len(available_set) and len(tried_peers):
|
||||||
|
try:
|
||||||
|
tried_peers.pop()
|
||||||
|
except IndexError:
|
||||||
|
pass
|
||||||
|
available_set = gossip_peer_set - tried_peers
|
||||||
peers = sys_rand.sample(
|
peers = sys_rand.sample(
|
||||||
available_set,
|
available_set,
|
||||||
min(MAX_STREAMS, len(available_set)))
|
min(MAX_STREAMS, len(available_set)))
|
||||||
@ -137,3 +144,4 @@ def stream_from_peers():
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
need_socket_lock.release()
|
need_socket_lock.release()
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -14,4 +14,4 @@ MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris
|
|||||||
OUTBOUND_DANDELION_EDGES = 2
|
OUTBOUND_DANDELION_EDGES = 2
|
||||||
|
|
||||||
MAX_STEM_BLOCKS_PER_STREAM = 1000
|
MAX_STEM_BLOCKS_PER_STREAM = 1000
|
||||||
BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3
|
BLACKHOLE_EVADE_TIMER_SECS = 10
|
@ -20,6 +20,7 @@ from ..constants import BLOCK_MAX_SIZE, BLOCK_SIZE_LEN
|
|||||||
from ..constants import BLOCK_STREAM_OFFSET_DIGITS
|
from ..constants import BLOCK_STREAM_OFFSET_DIGITS
|
||||||
|
|
||||||
import logger
|
import logger
|
||||||
|
import blockdb
|
||||||
from blockdb import get_blocks_after_timestamp, block_storage_observers
|
from blockdb import get_blocks_after_timestamp, block_storage_observers
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
@ -53,13 +54,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
|
|||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Peer's specified time offset skewed too far into the future")
|
"Peer's specified time offset skewed too far into the future")
|
||||||
|
|
||||||
newly_stored_blocks = Queue()
|
|
||||||
|
|
||||||
def _add_to_queue(bl):
|
|
||||||
newly_stored_blocks.put_nowait(bl)
|
|
||||||
block_storage_observers.append(
|
|
||||||
_add_to_queue
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _send_block(block: 'Block'):
|
async def _send_block(block: 'Block'):
|
||||||
writer.write(block.id)
|
writer.write(block.id)
|
||||||
@ -90,17 +85,20 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
|
|||||||
)
|
)
|
||||||
except IncompleteReadError:
|
except IncompleteReadError:
|
||||||
keep_writing = False
|
keep_writing = False
|
||||||
|
time_offset = time()
|
||||||
|
|
||||||
# Diffuse blocks stored since we started this stream
|
# Diffuse blocks stored since we started this stream
|
||||||
while keep_writing:
|
while keep_writing:
|
||||||
await _send_block(await newly_stored_blocks.get())
|
bls = blockdb.get_blocks_after_timestamp(time_offset)
|
||||||
try:
|
for bl in bls:
|
||||||
keep_writing = bool(
|
await _send_block(bl)
|
||||||
int.from_bytes(await reader.readexactly(1), 'big')
|
try:
|
||||||
)
|
keep_writing = bool(
|
||||||
except IncompleteReadError:
|
int.from_bytes(await reader.readexactly(1), 'big')
|
||||||
keep_writing = False
|
)
|
||||||
|
except IncompleteReadError:
|
||||||
|
keep_writing = False
|
||||||
|
break
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warn(traceback.format_exc(), terminal=True)
|
logger.warn(traceback.format_exc(), terminal=True)
|
||||||
|
|
||||||
block_storage_observers.remove(_add_to_queue)
|
|
||||||
|
@ -8,6 +8,7 @@ import secrets
|
|||||||
from flask import Blueprint, Response, request
|
from flask import Blueprint, Response, request
|
||||||
|
|
||||||
from onionrblocks import Block
|
from onionrblocks import Block
|
||||||
|
import blockdb
|
||||||
|
|
||||||
import logger
|
import logger
|
||||||
from gossip import blockqueues
|
from gossip import blockqueues
|
||||||
@ -38,6 +39,7 @@ def block_serialized():
|
|||||||
req_data = request.data
|
req_data = request.data
|
||||||
block_id = req_data[:BLOCK_ID_SIZE]
|
block_id = req_data[:BLOCK_ID_SIZE]
|
||||||
block_data = req_data[BLOCK_ID_SIZE:]
|
block_data = req_data[BLOCK_ID_SIZE:]
|
||||||
blockqueues.gossip_block_queues[stream_to_use].put(
|
blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
|
||||||
Block(block_id, block_data, auto_verify=False))
|
#blockqueues.gossip_block_queues[stream_to_use].put(
|
||||||
|
#Block(block_id, block_data, auto_verify=False), block=False)
|
||||||
return "ok"
|
return "ok"
|
||||||
|
Loading…
Reference in New Issue
Block a user