Compare commits

...

3 Commits

15 changed files with 93 additions and 83 deletions

View File

@ -14,10 +14,6 @@ def add_block_to_db(block: Block):
# Raises db.DuplicateKey if dupe # Raises db.DuplicateKey if dupe
db.set_if_new(block_db_path, block.id, block.raw) db.set_if_new(block_db_path, block.id, block.raw)
for func in block_storage_observers:
func(block)
def get_blocks_by_type(block_type: str) -> "Generator[Block]": def get_blocks_by_type(block_type: str) -> "Generator[Block]":
block_db = db.get_db_obj(block_db_path, 'u') block_db = db.get_db_obj(block_db_path, 'u')
for block_hash in db.list_keys(block_db_path): for block_hash in db.list_keys(block_db_path):

View File

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Coroutine, List
from ordered_set import OrderedSet from ordered_set import OrderedSet
import config
from onionrthreads import add_delayed_thread from onionrthreads import add_delayed_thread
from blockdb import add_block_to_db from blockdb import add_block_to_db
import logger import logger
@ -81,10 +82,11 @@ async def stem_out(d_phase: 'DandelionPhase'):
sleep(1) sleep(1)
return return
not_enough_edges = False not_enough_edges = False
strict_dandelion = config.get('security.strict_dandelion', True)
def blackhole_protection(q): def blackhole_protection(q):
for bl in q: for bl in q:
add_block_to_db(q) add_block_to_db(bl)
# Spawn threads with deep copied block queue to add to db after time # Spawn threads with deep copied block queue to add to db after time
@ -114,7 +116,14 @@ async def stem_out(d_phase: 'DandelionPhase'):
# "Making too few edges for stemout " + # "Making too few edges for stemout " +
# "this is bad for anonymity if frequent.", # "this is bad for anonymity if frequent.",
# terminal=True) # terminal=True)
if strict_dandelion:
not_enough_edges = True not_enough_edges = True
else:
if peer_sockets:
# if we have at least 1 peer,
# do dandelion anyway in non strict mode
# Allow poorly connected networks to communicate faster
break
sleep(1) sleep(1)
else: else:
# Ran out of time for stem phase # Ran out of time for stem phase
@ -122,7 +131,7 @@ async def stem_out(d_phase: 'DandelionPhase'):
logger.error( logger.error(
"Did not stem out any blocks in time, " + "Did not stem out any blocks in time, " +
"if this happens regularly you may be under attack", "if this happens regularly you may be under attack",
terminal=True) terminal=False)
for s in peer_sockets: for s in peer_sockets:
if s: if s:
s.close() s.close()

View File

@ -1,3 +1,5 @@
from asyncio import sleep
from queue import Empty
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import logger import logger
@ -22,7 +24,14 @@ async def do_stem_stream(
while remaining_time > 5 and my_phase_id == d_phase.phase_id: while remaining_time > 5 and my_phase_id == d_phase.phase_id:
# Primary client component that communicate's with gossip.server.acceptstem # Primary client component that communicate's with gossip.server.acceptstem
remaining_time = d_phase.remaining_time() remaining_time = d_phase.remaining_time()
bl: 'Block' = block_queue.get(block=True, timeout=remaining_time) while remaining_time:
try:
# queues can't block because we're in async
bl = block_queue.get(block=False)
except Empty:
await sleep(1)
else:
break
logger.info("Sending block over dandelion++", terminal=True) logger.info("Sending block over dandelion++", terminal=True)
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN) block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)

View File

@ -5,6 +5,7 @@ Download blocks that are being diffused
doesn't apply for blocks in the gossip queue that are awaiting doesn't apply for blocks in the gossip queue that are awaiting
descision to fluff or stem descision to fluff or stem
""" """
from ast import Index
from threading import Thread, Semaphore from threading import Thread, Semaphore
from random import SystemRandom from random import SystemRandom
from time import sleep from time import sleep
@ -120,6 +121,12 @@ def stream_from_peers():
while True: while True:
need_socket_lock.acquire() need_socket_lock.acquire()
available_set = gossip_peer_set - tried_peers available_set = gossip_peer_set - tried_peers
if not len(available_set) and len(tried_peers):
try:
tried_peers.pop()
except IndexError:
pass
available_set = gossip_peer_set - tried_peers
peers = sys_rand.sample( peers = sys_rand.sample(
available_set, available_set,
min(MAX_STREAMS, len(available_set))) min(MAX_STREAMS, len(available_set)))
@ -137,3 +144,4 @@ def stream_from_peers():
except IndexError: except IndexError:
need_socket_lock.release() need_socket_lock.release()
break break

View File

@ -14,4 +14,4 @@ MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris
OUTBOUND_DANDELION_EDGES = 2 OUTBOUND_DANDELION_EDGES = 2
MAX_STEM_BLOCKS_PER_STREAM = 1000 MAX_STEM_BLOCKS_PER_STREAM = 1000
BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3 BLACKHOLE_EVADE_TIMER_SECS = 10

View File

@ -20,6 +20,7 @@ from ..constants import BLOCK_MAX_SIZE, BLOCK_SIZE_LEN
from ..constants import BLOCK_STREAM_OFFSET_DIGITS from ..constants import BLOCK_STREAM_OFFSET_DIGITS
import logger import logger
import blockdb
from blockdb import get_blocks_after_timestamp, block_storage_observers from blockdb import get_blocks_after_timestamp, block_storage_observers
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
@ -53,13 +54,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
raise ValueError( raise ValueError(
"Peer's specified time offset skewed too far into the future") "Peer's specified time offset skewed too far into the future")
newly_stored_blocks = Queue()
def _add_to_queue(bl):
newly_stored_blocks.put_nowait(bl)
block_storage_observers.append(
_add_to_queue
)
async def _send_block(block: 'Block'): async def _send_block(block: 'Block'):
writer.write(block.id) writer.write(block.id)
@ -90,17 +85,20 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
) )
except IncompleteReadError: except IncompleteReadError:
keep_writing = False keep_writing = False
time_offset = time()
# Diffuse blocks stored since we started this stream # Diffuse blocks stored since we started this stream
while keep_writing: while keep_writing:
await _send_block(await newly_stored_blocks.get()) bls = blockdb.get_blocks_after_timestamp(time_offset)
for bl in bls:
await _send_block(bl)
try: try:
keep_writing = bool( keep_writing = bool(
int.from_bytes(await reader.readexactly(1), 'big') int.from_bytes(await reader.readexactly(1), 'big')
) )
except IncompleteReadError: except IncompleteReadError:
keep_writing = False keep_writing = False
break
except Exception: except Exception:
logger.warn(traceback.format_exc(), terminal=True) logger.warn(traceback.format_exc(), terminal=True)
block_storage_observers.remove(_add_to_queue)

View File

@ -8,6 +8,7 @@ import secrets
from flask import Blueprint, Response, request from flask import Blueprint, Response, request
from onionrblocks import Block from onionrblocks import Block
import blockdb
import logger import logger
from gossip import blockqueues from gossip import blockqueues
@ -38,6 +39,7 @@ def block_serialized():
req_data = request.data req_data = request.data
block_id = req_data[:BLOCK_ID_SIZE] block_id = req_data[:BLOCK_ID_SIZE]
block_data = req_data[BLOCK_ID_SIZE:] block_data = req_data[BLOCK_ID_SIZE:]
blockqueues.gossip_block_queues[stream_to_use].put( blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
Block(block_id, block_data, auto_verify=False)) #blockqueues.gossip_block_queues[stream_to_use].put(
#Block(block_id, block_data, auto_verify=False), block=False)
return "ok" return "ok"

View File

@ -112,7 +112,6 @@ def daemon():
f"Onionr daemon is running under pid {os.getpid()}", terminal=True) f"Onionr daemon is running under pid {os.getpid()}", terminal=True)
events.event('init', threaded=False) events.event('init', threaded=False)
events.event('daemon_start') events.event('daemon_start')
Thread(target=gossip.start_gossip_threads, daemon=True).start() Thread(target=gossip.start_gossip_threads, daemon=True).start()
try: try:

View File

@ -1,9 +1,9 @@
''' """
Onionr - Private P2P Communication Onionr - Private P2P Communication.
This file deals with management of modules/plugins. <anagement of modules/plugins.
''' """
''' """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or the Free Software Foundation, either version 3 of the License, or
@ -16,7 +16,7 @@
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
''' """
import os, re, importlib import os, re, importlib
import traceback import traceback
@ -32,9 +32,9 @@ _instances = dict()
config.reload() config.reload()
def reload(stop_event = True): def reload(stop_event = True):
''' """
Reloads all the plugins Reloads all the plugins
''' """
check() check()
@ -93,9 +93,9 @@ def enable(name, start_event = True):
def disable(name, stop_event = True): def disable(name, stop_event = True):
''' """
Disables a plugin Disables a plugin
''' """
check() check()
@ -111,9 +111,9 @@ def disable(name, stop_event = True):
stop(name) stop(name)
def start(name): def start(name):
''' """
Starts the plugin Starts the plugin
''' """
check() check()
@ -135,9 +135,9 @@ def start(name):
return None return None
def stop(name): def stop(name):
''' """
Stops the plugin Stops the plugin
''' """
check() check()
@ -183,12 +183,11 @@ def import_module_from_file(full_path_to_module):
return module return module
def get_plugin(name): def get_plugin(name):
''' """
Returns the instance of a module Returns the instance of a module
''' """
check() check()
if str(name).lower() in _instances: if str(name).lower() in _instances:
return _instances[str(name).lower()] return _instances[str(name).lower()]
else: else:
@ -196,23 +195,23 @@ def get_plugin(name):
return get_plugin(name) return get_plugin(name)
def get_plugins(): def get_plugins():
''' """
Returns a list of plugins (deprecated) Returns a list of plugins (deprecated)
''' """
return _instances return _instances
def exists(name): def exists(name):
''' """
Return value indicates whether or not the plugin exists Return value indicates whether or not the plugin exists
''' """
return os.path.isdir(get_plugins_folder(str(name).lower())) return os.path.isdir(get_plugins_folder(str(name).lower()))
def get_enabled_plugins(): def get_enabled_plugins():
''' """
Returns a list of the enabled plugins Returns a list of the enabled plugins
''' """
check() check()
config.reload() config.reload()
@ -220,16 +219,16 @@ def get_enabled_plugins():
return list(config.get('plugins.enabled', list())) return list(config.get('plugins.enabled', list()))
def is_enabled(name): def is_enabled(name):
''' """
Return value indicates whether or not the plugin is enabled Return value indicates whether or not the plugin is enabled
''' """
return name in get_enabled_plugins() return name in get_enabled_plugins()
def get_plugins_folder(name = None, absolute = True): def get_plugins_folder(name = None, absolute = True):
''' """
Returns the path to the plugins folder Returns the path to the plugins folder
''' """
path = '' path = ''
@ -246,16 +245,16 @@ def get_plugins_folder(name = None, absolute = True):
return path + '/' return path + '/'
def get_plugin_data_folder(name, absolute = True): def get_plugin_data_folder(name, absolute = True):
''' """
Returns the location of a plugin's data folder Returns the location of a plugin's data folder
''' """
return get_plugins_folder(name, absolute) return get_plugins_folder(name, absolute)
def check(): def check():
''' """
Checks to make sure files exist Checks to make sure files exist
''' """
if not config.is_set('plugins'): if not config.is_set('plugins'):
logger.debug('Generating plugin configuration data...') logger.debug('Generating plugin configuration data...')

View File

@ -40,11 +40,10 @@ def __event_caller(event_name, data = {}):
if plugin in disabled: continue if plugin in disabled: continue
try: try:
call(plugins.get_plugin(plugin), event_name, data, get_pluginapi(data)) call(plugins.get_plugin(plugin), event_name, data, get_pluginapi(data))
except ModuleNotFoundError as e: except ModuleNotFoundError as _:
logger.warn('Disabling nonexistant plugin "%s"...' % plugin, terminal=True) logger.warn('Disabling nonexistant plugin "%s"...' % plugin, terminal=True)
plugins.disable(plugin, stop_event = False) plugins.disable(plugin, stop_event = False)
except Exception as e: except Exception as _:
logger.error('Event "%s" failed for plugin "%s".' % (event_name, plugin), terminal=True) logger.error('Event "%s" failed for plugin "%s".' % (event_name, plugin), terminal=True)
logger.error('\n' + traceback.format_exc(), terminal=True) logger.error('\n' + traceback.format_exc(), terminal=True)

View File

@ -1 +1 @@
/dev/shm/onionr442130151/gossip-server.sock /dev/shm/onionr3177415330/gossip-server.sock

View File

@ -25,8 +25,8 @@ sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
# import after path insert # import after path insert
from unixpeer import UnixPeer from unixpeer import UnixPeer
from bootstrap import on_bootstrap from unixbootstrap import on_bootstrap
from announce import on_announce_rec from unixannounce import on_announce_rec
#from shutdown import on_shutdown_event #from shutdown import on_shutdown_event
""" """
@ -57,7 +57,6 @@ def on_init(api, data=None):
logger.info( logger.info(
f"Peers can connect to {gossip_server_socket_file}", terminal=True) f"Peers can connect to {gossip_server_socket_file}", terminal=True)
def on_get_our_transport(api, data=None): def on_get_our_transport(api, data=None):
callback_func = data['callback'] callback_func = data['callback']
for_peer = data['peer'] for_peer = data['peer']
@ -65,9 +64,3 @@ def on_get_our_transport(api, data=None):
return return
if data['peer'].__class__ == UnixPeer: if data['peer'].__class__ == UnixPeer:
callback_func(for_peer, gossip_server_socket_file) callback_func(for_peer, gossip_server_socket_file)
def on_gossip_start(api, data: Set[Peer] = None):
# We don't do gossip logic
return

View File

@ -34,8 +34,7 @@ def load_existing_peers(callback: Callable):
daemon=True).start() daemon=True).start()
def on_bootstrap(api, data): def on_bootstrap(api, data=None):
callback_func = data['callback'] callback_func = data['callback']
try: try:

View File

@ -12,7 +12,6 @@ class UnixPeer:
def get_socket(self, connect_timeout) -> socket: def get_socket(self, connect_timeout) -> socket:
s = socket(AF_UNIX, SOCK_STREAM) s = socket(AF_UNIX, SOCK_STREAM)
#s.settimeout(connect_timeout) #s.settimeout(connect_timeout)
s.connect(self.transport_address) s.connect(self.transport_address)