Compare commits
3 Commits
d11d12b67f
...
e7daaf576f
Author | SHA1 | Date | |
---|---|---|---|
|
e7daaf576f | ||
|
84c13ade51 | ||
|
d2b5298bc6 |
@ -14,10 +14,6 @@ def add_block_to_db(block: Block):
|
||||
# Raises db.DuplicateKey if dupe
|
||||
db.set_if_new(block_db_path, block.id, block.raw)
|
||||
|
||||
for func in block_storage_observers:
|
||||
func(block)
|
||||
|
||||
|
||||
def get_blocks_by_type(block_type: str) -> "Generator[Block]":
|
||||
block_db = db.get_db_obj(block_db_path, 'u')
|
||||
for block_hash in db.list_keys(block_db_path):
|
||||
|
@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Coroutine, List
|
||||
|
||||
from ordered_set import OrderedSet
|
||||
|
||||
import config
|
||||
from onionrthreads import add_delayed_thread
|
||||
from blockdb import add_block_to_db
|
||||
import logger
|
||||
@ -81,10 +82,11 @@ async def stem_out(d_phase: 'DandelionPhase'):
|
||||
sleep(1)
|
||||
return
|
||||
not_enough_edges = False
|
||||
strict_dandelion = config.get('security.strict_dandelion', True)
|
||||
|
||||
def blackhole_protection(q):
|
||||
for bl in q:
|
||||
add_block_to_db(q)
|
||||
add_block_to_db(bl)
|
||||
|
||||
|
||||
# Spawn threads with deep copied block queue to add to db after time
|
||||
@ -114,7 +116,14 @@ async def stem_out(d_phase: 'DandelionPhase'):
|
||||
# "Making too few edges for stemout " +
|
||||
# "this is bad for anonymity if frequent.",
|
||||
# terminal=True)
|
||||
if strict_dandelion:
|
||||
not_enough_edges = True
|
||||
else:
|
||||
if peer_sockets:
|
||||
# if we have at least 1 peer,
|
||||
# do dandelion anyway in non strict mode
|
||||
# Allow poorly connected networks to communicate faster
|
||||
break
|
||||
sleep(1)
|
||||
else:
|
||||
# Ran out of time for stem phase
|
||||
@ -122,7 +131,7 @@ async def stem_out(d_phase: 'DandelionPhase'):
|
||||
logger.error(
|
||||
"Did not stem out any blocks in time, " +
|
||||
"if this happens regularly you may be under attack",
|
||||
terminal=True)
|
||||
terminal=False)
|
||||
for s in peer_sockets:
|
||||
if s:
|
||||
s.close()
|
||||
|
@ -1,3 +1,5 @@
|
||||
from asyncio import sleep
|
||||
from queue import Empty
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import logger
|
||||
@ -22,7 +24,14 @@ async def do_stem_stream(
|
||||
while remaining_time > 5 and my_phase_id == d_phase.phase_id:
|
||||
# Primary client component that communicate's with gossip.server.acceptstem
|
||||
remaining_time = d_phase.remaining_time()
|
||||
bl: 'Block' = block_queue.get(block=True, timeout=remaining_time)
|
||||
while remaining_time:
|
||||
try:
|
||||
# queues can't block because we're in async
|
||||
bl = block_queue.get(block=False)
|
||||
except Empty:
|
||||
await sleep(1)
|
||||
else:
|
||||
break
|
||||
logger.info("Sending block over dandelion++", terminal=True)
|
||||
|
||||
block_size = str(len(bl.raw)).zfill(BLOCK_SIZE_LEN)
|
||||
|
@ -5,6 +5,7 @@ Download blocks that are being diffused
|
||||
doesn't apply for blocks in the gossip queue that are awaiting
|
||||
descision to fluff or stem
|
||||
"""
|
||||
from ast import Index
|
||||
from threading import Thread, Semaphore
|
||||
from random import SystemRandom
|
||||
from time import sleep
|
||||
@ -120,6 +121,12 @@ def stream_from_peers():
|
||||
while True:
|
||||
need_socket_lock.acquire()
|
||||
available_set = gossip_peer_set - tried_peers
|
||||
if not len(available_set) and len(tried_peers):
|
||||
try:
|
||||
tried_peers.pop()
|
||||
except IndexError:
|
||||
pass
|
||||
available_set = gossip_peer_set - tried_peers
|
||||
peers = sys_rand.sample(
|
||||
available_set,
|
||||
min(MAX_STREAMS, len(available_set)))
|
||||
@ -137,3 +144,4 @@ def stream_from_peers():
|
||||
except IndexError:
|
||||
need_socket_lock.release()
|
||||
break
|
||||
|
||||
|
@ -14,4 +14,4 @@ MAX_INBOUND_DANDELION_EDGE = 50 # Mainly picked to avoid slowloris
|
||||
OUTBOUND_DANDELION_EDGES = 2
|
||||
|
||||
MAX_STEM_BLOCKS_PER_STREAM = 1000
|
||||
BLACKHOLE_EVADE_TIMER_SECS = DANDELION_EPOCH_LENGTH * 3
|
||||
BLACKHOLE_EVADE_TIMER_SECS = 10
|
@ -20,6 +20,7 @@ from ..constants import BLOCK_MAX_SIZE, BLOCK_SIZE_LEN
|
||||
from ..constants import BLOCK_STREAM_OFFSET_DIGITS
|
||||
|
||||
import logger
|
||||
import blockdb
|
||||
from blockdb import get_blocks_after_timestamp, block_storage_observers
|
||||
"""
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
@ -53,13 +54,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
|
||||
raise ValueError(
|
||||
"Peer's specified time offset skewed too far into the future")
|
||||
|
||||
newly_stored_blocks = Queue()
|
||||
|
||||
def _add_to_queue(bl):
|
||||
newly_stored_blocks.put_nowait(bl)
|
||||
block_storage_observers.append(
|
||||
_add_to_queue
|
||||
)
|
||||
|
||||
async def _send_block(block: 'Block'):
|
||||
writer.write(block.id)
|
||||
@ -90,17 +85,20 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
|
||||
)
|
||||
except IncompleteReadError:
|
||||
keep_writing = False
|
||||
time_offset = time()
|
||||
|
||||
# Diffuse blocks stored since we started this stream
|
||||
while keep_writing:
|
||||
await _send_block(await newly_stored_blocks.get())
|
||||
bls = blockdb.get_blocks_after_timestamp(time_offset)
|
||||
for bl in bls:
|
||||
await _send_block(bl)
|
||||
try:
|
||||
keep_writing = bool(
|
||||
int.from_bytes(await reader.readexactly(1), 'big')
|
||||
)
|
||||
except IncompleteReadError:
|
||||
keep_writing = False
|
||||
break
|
||||
except Exception:
|
||||
logger.warn(traceback.format_exc(), terminal=True)
|
||||
|
||||
block_storage_observers.remove(_add_to_queue)
|
||||
|
@ -8,6 +8,7 @@ import secrets
|
||||
from flask import Blueprint, Response, request
|
||||
|
||||
from onionrblocks import Block
|
||||
import blockdb
|
||||
|
||||
import logger
|
||||
from gossip import blockqueues
|
||||
@ -38,6 +39,7 @@ def block_serialized():
|
||||
req_data = request.data
|
||||
block_id = req_data[:BLOCK_ID_SIZE]
|
||||
block_data = req_data[BLOCK_ID_SIZE:]
|
||||
blockqueues.gossip_block_queues[stream_to_use].put(
|
||||
Block(block_id, block_data, auto_verify=False))
|
||||
blockdb.add_block_to_db(Block(block_id, block_data, auto_verify=False))
|
||||
#blockqueues.gossip_block_queues[stream_to_use].put(
|
||||
#Block(block_id, block_data, auto_verify=False), block=False)
|
||||
return "ok"
|
||||
|
@ -112,7 +112,6 @@ def daemon():
|
||||
f"Onionr daemon is running under pid {os.getpid()}", terminal=True)
|
||||
events.event('init', threaded=False)
|
||||
events.event('daemon_start')
|
||||
|
||||
Thread(target=gossip.start_gossip_threads, daemon=True).start()
|
||||
|
||||
try:
|
||||
|
@ -1,9 +1,9 @@
|
||||
'''
|
||||
Onionr - Private P2P Communication
|
||||
"""
|
||||
Onionr - Private P2P Communication.
|
||||
|
||||
This file deals with management of modules/plugins.
|
||||
'''
|
||||
'''
|
||||
<anagement of modules/plugins.
|
||||
"""
|
||||
"""
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
@ -16,7 +16,7 @@
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
"""
|
||||
import os, re, importlib
|
||||
import traceback
|
||||
|
||||
@ -32,9 +32,9 @@ _instances = dict()
|
||||
config.reload()
|
||||
|
||||
def reload(stop_event = True):
|
||||
'''
|
||||
"""
|
||||
Reloads all the plugins
|
||||
'''
|
||||
"""
|
||||
|
||||
check()
|
||||
|
||||
@ -93,9 +93,9 @@ def enable(name, start_event = True):
|
||||
|
||||
|
||||
def disable(name, stop_event = True):
|
||||
'''
|
||||
"""
|
||||
Disables a plugin
|
||||
'''
|
||||
"""
|
||||
|
||||
check()
|
||||
|
||||
@ -111,9 +111,9 @@ def disable(name, stop_event = True):
|
||||
stop(name)
|
||||
|
||||
def start(name):
|
||||
'''
|
||||
"""
|
||||
Starts the plugin
|
||||
'''
|
||||
"""
|
||||
|
||||
check()
|
||||
|
||||
@ -135,9 +135,9 @@ def start(name):
|
||||
return None
|
||||
|
||||
def stop(name):
|
||||
'''
|
||||
"""
|
||||
Stops the plugin
|
||||
'''
|
||||
"""
|
||||
|
||||
check()
|
||||
|
||||
@ -183,12 +183,11 @@ def import_module_from_file(full_path_to_module):
|
||||
return module
|
||||
|
||||
def get_plugin(name):
|
||||
'''
|
||||
"""
|
||||
Returns the instance of a module
|
||||
'''
|
||||
"""
|
||||
|
||||
check()
|
||||
|
||||
if str(name).lower() in _instances:
|
||||
return _instances[str(name).lower()]
|
||||
else:
|
||||
@ -196,23 +195,23 @@ def get_plugin(name):
|
||||
return get_plugin(name)
|
||||
|
||||
def get_plugins():
|
||||
'''
|
||||
"""
|
||||
Returns a list of plugins (deprecated)
|
||||
'''
|
||||
"""
|
||||
|
||||
return _instances
|
||||
|
||||
def exists(name):
|
||||
'''
|
||||
"""
|
||||
Return value indicates whether or not the plugin exists
|
||||
'''
|
||||
"""
|
||||
|
||||
return os.path.isdir(get_plugins_folder(str(name).lower()))
|
||||
|
||||
def get_enabled_plugins():
|
||||
'''
|
||||
"""
|
||||
Returns a list of the enabled plugins
|
||||
'''
|
||||
"""
|
||||
|
||||
check()
|
||||
config.reload()
|
||||
@ -220,16 +219,16 @@ def get_enabled_plugins():
|
||||
return list(config.get('plugins.enabled', list()))
|
||||
|
||||
def is_enabled(name):
|
||||
'''
|
||||
"""
|
||||
Return value indicates whether or not the plugin is enabled
|
||||
'''
|
||||
"""
|
||||
|
||||
return name in get_enabled_plugins()
|
||||
|
||||
def get_plugins_folder(name = None, absolute = True):
|
||||
'''
|
||||
"""
|
||||
Returns the path to the plugins folder
|
||||
'''
|
||||
"""
|
||||
|
||||
path = ''
|
||||
|
||||
@ -246,16 +245,16 @@ def get_plugins_folder(name = None, absolute = True):
|
||||
return path + '/'
|
||||
|
||||
def get_plugin_data_folder(name, absolute = True):
|
||||
'''
|
||||
"""
|
||||
Returns the location of a plugin's data folder
|
||||
'''
|
||||
"""
|
||||
|
||||
return get_plugins_folder(name, absolute)
|
||||
|
||||
def check():
|
||||
'''
|
||||
"""
|
||||
Checks to make sure files exist
|
||||
'''
|
||||
"""
|
||||
|
||||
if not config.is_set('plugins'):
|
||||
logger.debug('Generating plugin configuration data...')
|
||||
|
@ -40,11 +40,10 @@ def __event_caller(event_name, data = {}):
|
||||
if plugin in disabled: continue
|
||||
try:
|
||||
call(plugins.get_plugin(plugin), event_name, data, get_pluginapi(data))
|
||||
except ModuleNotFoundError as e:
|
||||
except ModuleNotFoundError as _:
|
||||
logger.warn('Disabling nonexistant plugin "%s"...' % plugin, terminal=True)
|
||||
plugins.disable(plugin, stop_event = False)
|
||||
except Exception as e:
|
||||
|
||||
except Exception as _:
|
||||
logger.error('Event "%s" failed for plugin "%s".' % (event_name, plugin), terminal=True)
|
||||
logger.error('\n' + traceback.format_exc(), terminal=True)
|
||||
|
||||
|
@ -1 +1 @@
|
||||
/dev/shm/onionr442130151/gossip-server.sock
|
||||
/dev/shm/onionr3177415330/gossip-server.sock
|
@ -25,8 +25,8 @@ sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
|
||||
# import after path insert
|
||||
from unixpeer import UnixPeer
|
||||
|
||||
from bootstrap import on_bootstrap
|
||||
from announce import on_announce_rec
|
||||
from unixbootstrap import on_bootstrap
|
||||
from unixannounce import on_announce_rec
|
||||
#from shutdown import on_shutdown_event
|
||||
|
||||
"""
|
||||
@ -57,7 +57,6 @@ def on_init(api, data=None):
|
||||
logger.info(
|
||||
f"Peers can connect to {gossip_server_socket_file}", terminal=True)
|
||||
|
||||
|
||||
def on_get_our_transport(api, data=None):
|
||||
callback_func = data['callback']
|
||||
for_peer = data['peer']
|
||||
@ -65,9 +64,3 @@ def on_get_our_transport(api, data=None):
|
||||
return
|
||||
if data['peer'].__class__ == UnixPeer:
|
||||
callback_func(for_peer, gossip_server_socket_file)
|
||||
|
||||
|
||||
def on_gossip_start(api, data: Set[Peer] = None):
|
||||
# We don't do gossip logic
|
||||
return
|
||||
|
||||
|
@ -34,8 +34,7 @@ def load_existing_peers(callback: Callable):
|
||||
daemon=True).start()
|
||||
|
||||
|
||||
def on_bootstrap(api, data):
|
||||
|
||||
def on_bootstrap(api, data=None):
|
||||
callback_func = data['callback']
|
||||
|
||||
try:
|
@ -12,7 +12,6 @@ class UnixPeer:
|
||||
|
||||
|
||||
def get_socket(self, connect_timeout) -> socket:
|
||||
|
||||
s = socket(AF_UNIX, SOCK_STREAM)
|
||||
#s.settimeout(connect_timeout)
|
||||
s.connect(self.transport_address)
|
||||
|
Loading…
Reference in New Issue
Block a user