Compare commits

...

3 Commits

15 changed files with 93 additions and 83 deletions

View File

@ -14,10 +14,6 @@ def add_block_to_db(block: Block):
# Raises db.DuplicateKey if dupe
db.set_if_new(block_db_path, block.id, block.raw)
for func in block_storage_observers:
func(block)
def get_blocks_by_type(block_type: str) -> "Generator[Block]":
block_db = db.get_db_obj(block_db_path, 'u')
for block_hash in db.list_keys(block_db_path):

View File

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Coroutine, List
from ordered_set import OrderedSet
import config
from onionrthreads import add_delayed_thread
from blockdb import add_block_to_db
import logger
@ -81,10 +82,11 @@ async def stem_out(d_phase: 'DandelionPhase'):
sleep(1)
return
not_enough_edges = False
strict_dandelion = config.get('security.strict_dandelion', True)
def blackhole_protection(q):
for bl in q:
add_block_to_db(q)
add_block_to_db(bl)
# Spawn threads with deep copied block queue to add to db after time
@ -114,7 +116,14 @@ async def stem_out(d_phase: 'DandelionPhase'):
# "Making too few edges for stemout " +
# "this is bad for anonymity if frequent.",
# terminal=True)
if strict_dandelion:
not_enough_edges = True
else:
if peer_sockets:
# if we have at least 1 peer,
# do dandelion anyway in non strict mode
# Allow poorly connected networks to communicate faster
break
sleep(1)
else:
# Ran out of time for stem phase
@ -122,7 +131,7 @@ async def stem_out(d_phase: 'DandelionPhase'):
logger.error(
"Did not stem out any blocks in time, " +
"if this happens regularly you may be under attack",
terminal=True)
terminal=False)
for s in peer_sockets:
if s:
s.close()

View File

@ -1,3 +1,5 @@
from asyncio import sleep
from queue import Empty
from typing import TYPE_CHECKING
import logger
@ -22,7 +24,14 @@ async def do_stem_stream(
while remaining_time > 5 and my_phase_id == d_phase.phase_id:
# Primary client component that communicate's with gossip.server.acceptstem
remaining_time = d_phase.remaining_time()
bl: 'Block' = block_queue.get(block=True, timeout=remaining_time)
while remaining_time:
try:
# queues can't block because we're in async
bl = block_queue.get(block=False)
except Empty:
await sleep(1)
else:
break
logger.info("Sending block over dandelion++", terminal=True)
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)

View File

@ -5,6 +5,7 @@ Download blocks that are being diffused
doesn't apply for blocks in the gossip queue that are awaiting
descision to fluff or stem
"""
from ast import Index
from threading import Thread, Semaphore
from random import SystemRandom
from time import sleep
@ -120,6 +121,12 @@ def stream_from_peers():
while True:
need_socket_lock.acquire()
available_set = gossip_peer_set - tried_peers
if not len(available_set) and len(tried_peers):
try:
tried_peers.pop()
except IndexError:
pass
available_set = gossip_peer_set - tried_peers
peers = sys_rand.sample(
available_set,
min(MAX_STREAMS, len(available_set)))
@ -137,3 +144,4 @@ def stream_from_peers():
except IndexError:
need_socket_lock.release()
break

View File

@ -14,4 +14,4 @@ MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris
OUTBOUND_DANDELION_EDGES = 2
MAX_STEM_BLOCKS_PER_STREAM = 1000
BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3
BLACKHOLE_EVADE_TIMER_SECS = 10

View File

@ -20,6 +20,7 @@ from ..constants import BLOCK_MAX_SIZE, BLOCK_SIZE_LEN
from ..constants import BLOCK_STREAM_OFFSET_DIGITS
import logger
import blockdb
from blockdb import get_blocks_after_timestamp, block_storage_observers
"""
This program is free software: you can redistribute it and/or modify
@ -53,13 +54,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
raise ValueError(
"Peer's specified time offset skewed too far into the future")
newly_stored_blocks = Queue()
def _add_to_queue(bl):
newly_stored_blocks.put_nowait(bl)
block_storage_observers.append(
_add_to_queue
)
async def _send_block(block: 'Block'):
writer.write(block.id)
@ -90,17 +85,20 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
)
except IncompleteReadError:
keep_writing = False
time_offset = time()
# Diffuse blocks stored since we started this stream
while keep_writing:
await _send_block(await newly_stored_blocks.get())
bls = blockdb.get_blocks_after_timestamp(time_offset)
for bl in bls:
await _send_block(bl)
try:
keep_writing = bool(
int.from_bytes(await reader.readexactly(1), 'big')
)
except IncompleteReadError:
keep_writing = False
break
except Exception:
logger.warn(traceback.format_exc(), terminal=True)
block_storage_observers.remove(_add_to_queue)

View File

@ -8,6 +8,7 @@ import secrets
from flask import Blueprint, Response, request
from onionrblocks import Block
import blockdb
import logger
from gossip import blockqueues
@ -38,6 +39,7 @@ def block_serialized():
req_data = request.data
block_id = req_data[:BLOCK_ID_SIZE]
block_data = req_data[BLOCK_ID_SIZE:]
blockqueues.gossip_block_queues[stream_to_use].put(
Block(block_id, block_data, auto_verify=False))
blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
#blockqueues.gossip_block_queues[stream_to_use].put(
#Block(block_id, block_data, auto_verify=False), block=False)
return "ok"

View File

@ -112,7 +112,6 @@ def daemon():
f"Onionr daemon is running under pid {os.getpid()}", terminal=True)
events.event('init', threaded=False)
events.event('daemon_start')
Thread(target=gossip.start_gossip_threads, daemon=True).start()
try:

View File

@ -1,9 +1,9 @@
'''
Onionr - Private P2P Communication
"""
Onionr - Private P2P Communication.
This file deals with management of modules/plugins.
'''
'''
<anagement of modules/plugins.
"""
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
@ -16,7 +16,7 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
"""
import os, re, importlib
import traceback
@ -32,9 +32,9 @@ _instances = dict()
config.reload()
def reload(stop_event = True):
'''
"""
Reloads all the plugins
'''
"""
check()
@ -93,9 +93,9 @@ def enable(name, start_event = True):
def disable(name, stop_event = True):
'''
"""
Disables a plugin
'''
"""
check()
@ -111,9 +111,9 @@ def disable(name, stop_event = True):
stop(name)
def start(name):
'''
"""
Starts the plugin
'''
"""
check()
@ -135,9 +135,9 @@ def start(name):
return None
def stop(name):
'''
"""
Stops the plugin
'''
"""
check()
@ -183,12 +183,11 @@ def import_module_from_file(full_path_to_module):
return module
def get_plugin(name):
'''
"""
Returns the instance of a module
'''
"""
check()
if str(name).lower() in _instances:
return _instances[str(name).lower()]
else:
@ -196,23 +195,23 @@ def get_plugin(name):
return get_plugin(name)
def get_plugins():
'''
"""
Returns a list of plugins (deprecated)
'''
"""
return _instances
def exists(name):
'''
"""
Return value indicates whether or not the plugin exists
'''
"""
return os.path.isdir(get_plugins_folder(str(name).lower()))
def get_enabled_plugins():
'''
"""
Returns a list of the enabled plugins
'''
"""
check()
config.reload()
@ -220,16 +219,16 @@ def get_enabled_plugins():
return list(config.get('plugins.enabled', list()))
def is_enabled(name):
'''
"""
Return value indicates whether or not the plugin is enabled
'''
"""
return name in get_enabled_plugins()
def get_plugins_folder(name = None, absolute = True):
'''
"""
Returns the path to the plugins folder
'''
"""
path = ''
@ -246,16 +245,16 @@ def get_plugins_folder(name = None, absolute = True):
return path + '/'
def get_plugin_data_folder(name, absolute = True):
'''
"""
Returns the location of a plugin's data folder
'''
"""
return get_plugins_folder(name, absolute)
def check():
'''
"""
Checks to make sure files exist
'''
"""
if not config.is_set('plugins'):
logger.debug('Generating plugin configuration data...')

View File

@ -40,11 +40,10 @@ def __event_caller(event_name, data = {}):
if plugin in disabled: continue
try:
call(plugins.get_plugin(plugin), event_name, data, get_pluginapi(data))
except ModuleNotFoundError as e:
except ModuleNotFoundError as _:
logger.warn('Disabling nonexistant plugin "%s"...' % plugin, terminal=True)
plugins.disable(plugin, stop_event = False)
except Exception as e:
except Exception as _:
logger.error('Event "%s" failed for plugin "%s".' % (event_name, plugin), terminal=True)
logger.error('\n' + traceback.format_exc(), terminal=True)

View File

@ -1 +1 @@
/dev/shm/onionr442130151/gossip-server.sock
/dev/shm/onionr3177415330/gossip-server.sock

View File

@ -25,8 +25,8 @@ sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
# import after path insert
from unixpeer import UnixPeer
from bootstrap import on_bootstrap
from announce import on_announce_rec
from unixbootstrap import on_bootstrap
from unixannounce import on_announce_rec
#from shutdown import on_shutdown_event
"""
@ -57,7 +57,6 @@ def on_init(api, data=None):
logger.info(
f"Peers can connect to {gossip_server_socket_file}", terminal=True)
def on_get_our_transport(api, data=None):
callback_func = data['callback']
for_peer = data['peer']
@ -65,9 +64,3 @@ def on_get_our_transport(api, data=None):
return
if data['peer'].__class__ == UnixPeer:
callback_func(for_peer, gossip_server_socket_file)
def on_gossip_start(api, data: Set[Peer] = None):
# We don't do gossip logic
return

View File

@ -34,8 +34,7 @@ def load_existing_peers(callback: Callable):
daemon=True).start()
def on_bootstrap(api, data):
def on_bootstrap(api, data=None):
callback_func = data['callback']
try:

View File

@ -12,7 +12,6 @@ class UnixPeer:
def get_socket(self, connect_timeout) -> socket:
s = socket(AF_UNIX, SOCK_STREAM)
#s.settimeout(connect_timeout)
s.connect(self.transport_address)