Substantial bug fixes and performance improvements

This commit is contained in:
Kevin F 2022-07-30 15:41:11 -05:00
parent 8bd4a4c524
commit f220e398f1
16 changed files with 69 additions and 38 deletions

View File

@ -47,7 +47,7 @@ def start_gossip_threads():
gossip_server, 1, 'gossip_server', initial_sleep=0.2) gossip_server, 1, 'gossip_server', initial_sleep=0.2)
threading.Thread( threading.Thread(
target=start_gossip_client, daemon=True).start() target=start_gossip_client, daemon=True, name="start_gossip_client").start()
onionrplugins.events.event('gossip_start', data=None, threaded=True) onionrplugins.events.event('gossip_start', data=None, threaded=True)
for _ in range(BOOTSTRAP_ATTEMPTS): for _ in range(BOOTSTRAP_ATTEMPTS):
onionrplugins.events.event( onionrplugins.events.event(

View File

@ -22,6 +22,7 @@ if TYPE_CHECKING:
from ordered_set import OrderedSet from ordered_set import OrderedSet
import logger import logger
import config
import onionrplugins import onionrplugins
from ..commands import GossipCommands from ..commands import GossipCommands
from gossip.dandelion.phase import DandelionPhase from gossip.dandelion.phase import DandelionPhase
@ -60,9 +61,9 @@ def block_queue_processing():
while not len(gossip_peer_set): while not len(gossip_peer_set):
sleep(0.2) sleep(0.2)
if dandelion_phase.remaining_time() <= 15: if dandelion_phase.remaining_time() <= 15:
logger.debug("Sleeping", terminal=True) #logger.debug("Sleeping", terminal=True)
sleep(dandelion_phase.remaining_time()) sleep(dandelion_phase.remaining_time())
if dandelion_phase.is_stem_phase(): if dandelion_phase.is_stem_phase() and config.get('security.dandelion.enabled', True):
logger.debug("Entering stem phase", terminal=True) logger.debug("Entering stem phase", terminal=True)
try: try:
# Stem out blocks for (roughly) remaining epoch time # Stem out blocks for (roughly) remaining epoch time
@ -73,8 +74,9 @@ def block_queue_processing():
logger.error(traceback.format_exc(), terminal=True) logger.error(traceback.format_exc(), terminal=True)
pass pass
else: else:
logger.debug("Entering fluff phase", terminal=True) #logger.debug("Entering fluff phase", terminal=True)
# Add block to primary block db, where the diffuser can read it # Add block to primary block db, where the diffuser can read it
sleep(0.1)
store_blocks(dandelion_phase) store_blocks(dandelion_phase)
@ -88,7 +90,7 @@ def start_gossip_client():
bl: Block bl: Block
Thread(target=do_announce, daemon=True).start() Thread(target=do_announce, daemon=True, name='do_announce').start()
# Start a thread that runs every 1200 secs to # Start a thread that runs every 1200 secs to
# Ask peers for a subset for their peer set # Ask peers for a subset for their peer set

View File

@ -83,7 +83,7 @@ async def stem_out(d_phase: 'DandelionPhase'):
sleep(1) sleep(1)
return return
not_enough_edges = False not_enough_edges = False
strict_dandelion = config.get('security.strict_dandelion', True) strict_dandelion = config.get('security.dandelion.strict', True)
def blackhole_protection(q): def blackhole_protection(q):
for bl in q: for bl in q:

View File

@ -39,11 +39,14 @@ async def do_stem_stream(
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN) block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
def _send_it(): def _send_it():
with peer_socket: try:
try: with peer_socket:
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8')) try:
except AttributeError: peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE).encode('utf-8'))
peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE)) except AttributeError:
peer_socket.sendall(block_size.encode('utf-8')) peer_socket.sendall(bl.id.zfill(BLOCK_ID_SIZE))
peer_socket.sendall(bl.raw) peer_socket.sendall(block_size.encode('utf-8'))
peer_socket.sendall(bl.raw)
except OSError:
pass
Thread(target=_send_it, daemon=True, name="stemout block").start() Thread(target=_send_it, daemon=True, name="stemout block").start()

View File

@ -78,7 +78,9 @@ def get_new_peers():
# Start threads to ask the peers for more peers # Start threads to ask the peers for more peers
threads = [] threads = []
for peer in peers_we_ask: for peer in peers_we_ask:
t = Thread(target=_do_ask_peer, args=[peer], daemon=True) t = Thread(
target=_do_ask_peer,
args=[peer], daemon=True, name='_do_ask_peer')
t.start() t.start()
threads.append(t) threads.append(t)
peers_we_ask.clear() peers_we_ask.clear()

View File

@ -44,7 +44,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
MAX_STREAMS = 3 MAX_STREAMS = 6
CONNECT_TIMEOUT = 12 CONNECT_TIMEOUT = 12
MAX_TRIED_PEERS = 10_000 MAX_TRIED_PEERS = 10_000
@ -63,6 +63,8 @@ def stream_from_peers():
def _stream_from_peer(peer: 'Peer'): def _stream_from_peer(peer: 'Peer'):
stream_counter = 0
stream_times = 100
try: try:
sock = peer.get_socket(CONNECT_TIMEOUT) sock = peer.get_socket(CONNECT_TIMEOUT)
except ConnectionRefusedError: except ConnectionRefusedError:
@ -79,8 +81,10 @@ def stream_from_peers():
sock.sendall( sock.sendall(
str(offset).zfill(BLOCK_STREAM_OFFSET_DIGITS).encode('utf-8')) str(offset).zfill(BLOCK_STREAM_OFFSET_DIGITS).encode('utf-8'))
while True: while stream_times >= stream_counter:
logger.debug("Reading block id in stream", terminal=True) stream_counter += 1
logger.debug("Reading block id in stream with " + peer.transport_address, terminal=True)
sock.settimeout(5)
block_id = sock.recv(BLOCK_ID_SIZE) block_id = sock.recv(BLOCK_ID_SIZE)
if blockdb.has_block(block_id): if blockdb.has_block(block_id):
sock.sendall(int(0).to_bytes(1, 'big')) sock.sendall(int(0).to_bytes(1, 'big'))
@ -89,12 +93,15 @@ def stream_from_peers():
logger.debug("Reading block size in stream", terminal=True) logger.debug("Reading block size in stream", terminal=True)
sock.settimeout(5)
block_size = int(sock.recv(BLOCK_SIZE_LEN)) block_size = int(sock.recv(BLOCK_SIZE_LEN))
if block_size > BLOCK_MAX_SIZE or block_size <= 0: if block_size > BLOCK_MAX_SIZE or block_size <= 0:
logger.warn( logger.warn(
f"Peer {peer.transport_address} " + f"Peer {peer.transport_address} " +
"reported block size out of range") "reported block size out of range")
break break
sock.settimeout(5)
block_data = sock.recv(block_size) block_data = sock.recv(block_size)
logger.debug( logger.debug(
@ -122,14 +129,14 @@ def stream_from_peers():
# spawn stream threads infinitely # spawn stream threads infinitely
while True: while True:
need_socket_lock.acquire()
available_set = gossip_peer_set - tried_peers available_set = gossip_peer_set - tried_peers
if not len(available_set) and len(tried_peers): if not len(available_set) and len(tried_peers):
try: try:
tried_peers.pop() tried_peers.clear()
except IndexError: except IndexError:
pass pass
available_set = gossip_peer_set - tried_peers available_set = gossip_peer_set.copy()
peers = sys_rand.sample( peers = sys_rand.sample(
available_set, available_set,
min(MAX_STREAMS, len(available_set))) min(MAX_STREAMS, len(available_set)))
@ -140,10 +147,12 @@ def stream_from_peers():
while len(peers): while len(peers):
try: try:
need_socket_lock.acquire()
Thread( Thread(
target=_stream_from_peer, target=_stream_from_peer,
args=[peers.pop()], args=[peers.pop()],
daemon=True).start() daemon=True,
name="_stream_from_peer").start()
except IndexError: except IndexError:
need_socket_lock.release() need_socket_lock.release()
break break

View File

@ -2,8 +2,8 @@ from secrets import SystemRandom
from time import time from time import time
from typing import List, TYPE_CHECKING from typing import List, TYPE_CHECKING
if TYPE_CHECKING: #if TYPE_CHECKING:
from onionrblocks import Block from onionrblocks import Block
from gossip.commands import GossipCommands, command_to_byte from gossip.commands import GossipCommands, command_to_byte
from blockdb import get_blocks_after_timestamp from blockdb import get_blocks_after_timestamp
@ -19,7 +19,7 @@ class SendTimestamp:
def stream_to_peer(): def stream_to_peer():
if SECS_ELAPSED_NO_INCOMING_BEFORE_STREAM > time() - lastincoming.last_incoming_timestamp: if SECS_ELAPSED_NO_INCOMING_BEFORE_STREAM > time() - lastincoming.last_incoming_timestamp:
SendTimestamp.timestamp = int(time()) SendTimestamp.timestamp = int(time()) - 60
return return
if not len(gossip_peer_set): if not len(gossip_peer_set):
return return
@ -28,6 +28,7 @@ def stream_to_peer():
buffer: List['Block'] = [] buffer: List['Block'] = []
def _do_upload(): def _do_upload():
print('uploading to', peer.transport_address)
with peer.get_socket(30) as p: with peer.get_socket(30) as p:
p.sendall(command_to_byte(GossipCommands.PUT_BLOCK_DIFFUSE)) p.sendall(command_to_byte(GossipCommands.PUT_BLOCK_DIFFUSE))
@ -47,10 +48,11 @@ def stream_to_peer():
# and to efficiently avoid connecting without sending anything # and to efficiently avoid connecting without sending anything
buffer_max = 10 buffer_max = 10
for block in get_blocks_after_timestamp(SendTimestamp.timestamp): for block in get_blocks_after_timestamp(SendTimestamp.timestamp):
assert isinstance(block, Block)
buffer.append(block) buffer.append(block)
if len(buffer) > buffer_max: if len(buffer) > buffer_max:
_do_upload(buffer) _do_upload(buffer)
if len(buffer): if len(buffer):
_do_upload() _do_upload()
SendTimestamp.timestamp = int(time()) SendTimestamp.timestamp = int(time()) - 60

View File

@ -133,8 +133,10 @@ def gossip_server():
).start() ).start()
await _get_block_diffused() await _get_block_diffused()
break break
try:
await writer.drain() await writer.drain()
except BrokenPipeError:
pass
async def main(): async def main():

View File

@ -100,6 +100,8 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
except IncompleteReadError: except IncompleteReadError:
keep_writing = False keep_writing = False
break break
except ConnectionResetError:
pass
except Exception: except Exception:
logger.warn(traceback.format_exc(), terminal=True) logger.warn(traceback.format_exc(), terminal=True)

View File

@ -112,7 +112,10 @@ def daemon():
f"Onionr daemon is running under pid {os.getpid()}", terminal=True) f"Onionr daemon is running under pid {os.getpid()}", terminal=True)
events.event('init', threaded=False) events.event('init', threaded=False)
events.event('daemon_start') events.event('daemon_start')
Thread(target=gossip.start_gossip_threads, daemon=True).start() Thread(
target=gossip.start_gossip_threads,
daemon=True,
name='start_gossip_threads').start()
try: try:
apiservers.private_api.start() apiservers.private_api.start()

View File

@ -8,8 +8,7 @@ QUOTES = [
""), ""),
("Study after study has show that human behavior changes when we know were being watched.\nUnder observation, we act less free, which means we effectively *are* less free.", ("Study after study has show that human behavior changes when we know were being watched.\nUnder observation, we act less free, which means we effectively *are* less free.",
"Edward Snowdwen"), "Edward Snowdwen"),
("A revolution without dancing is a revolution not worth having", ("Privacy is a fundamental right", ""),
"V for Vendetta"),
("There can be no justice so long as laws are absolute. Even life itself is an exercise in exceptions", ("There can be no justice so long as laws are absolute. Even life itself is an exercise in exceptions",
"Picard"), "Picard"),
("Openness and participation are antidotes to surveillance and control", ("Openness and participation are antidotes to surveillance and control",

View File

@ -51,7 +51,7 @@ def event(event_name, data = {}, threaded = True):
"""Call an event on all plugins (if defined)""" """Call an event on all plugins (if defined)"""
if threaded: if threaded:
thread = Thread(target = __event_caller, args = (event_name, data)) thread = Thread(target = __event_caller, args = (event_name, data), name=f'{event_name} event', daemon=True)
thread.start() thread.start()
return thread return thread
else: else:

View File

@ -21,8 +21,6 @@ def on_announce_rec(api, data=None):
if announced.removesuffix('.onion') == config.get( if announced.removesuffix('.onion') == config.get(
'tor.transport_address', '').removesuffix('.onion'): 'tor.transport_address', '').removesuffix('.onion'):
logger.warn(
"Received announcement for our own node, which shouldn't happen")
return return

View File

@ -9,6 +9,7 @@ from time import sleep
import traceback import traceback
from typing import Set, TYPE_CHECKING from typing import Set, TYPE_CHECKING
from threading import Thread from threading import Thread
import shelve
import stem import stem
from stem.control import Controller from stem.control import Controller
@ -19,6 +20,7 @@ import config
from filepaths import gossip_server_socket_file from filepaths import gossip_server_socket_file
from gossip.peer import Peer from gossip.peer import Peer
from gossip.peerset import gossip_peer_set
locale.setlocale(locale.LC_ALL, '') locale.setlocale(locale.LC_ALL, '')
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
@ -27,6 +29,7 @@ from unixpeer import UnixPeer
from unixbootstrap import on_bootstrap from unixbootstrap import on_bootstrap
from unixannounce import on_announce_rec from unixannounce import on_announce_rec
from unixfilepaths import peer_database_file
#from shutdown import on_shutdown_event #from shutdown import on_shutdown_event
""" """
@ -49,7 +52,11 @@ plugin_name = 'unixtransport'
PLUGIN_VERSION = '0.0.0' PLUGIN_VERSION = '0.0.0'
def on_shutdown_event(api, data=None):
with shelve.open(peer_database_file, 'c') as db:
for peer in gossip_peer_set:
if isinstance(peer, UnixPeer):
db[peer.transport_address] = peer
def on_init(api, data=None): def on_init(api, data=None):
logger.info( logger.info(

View File

@ -16,11 +16,7 @@ def on_announce_rec(api, data=None):
if not announced.endswith('.sock'): if not announced.endswith('.sock'):
return return
if announced == gossip_server_socket_file: if announced == gossip_server_socket_file:
logger.warn(
"Received announcement for our unix node, which shouldn't happen",
terminal=True)
return return
logger.info(f"Peer {announced} announced to us.", terminal=True) logger.info(f"Peer {announced} announced to us.", terminal=True)

View File

@ -50,6 +50,12 @@
"i_dont_want_privacy": false, "i_dont_want_privacy": false,
"server": "" "server": ""
}, },
"security": {
"dandelion": {
"strict": true,
"enabled": true
}
},
"timers": { "timers": {
"getBlocks": 10, "getBlocks": 10,
"lookupBlocks": 25 "lookupBlocks": 25