Work on gossip tests and cleaned up api server some

This commit is contained in:
Kevin F 2022-05-01 00:45:26 -05:00
parent e6b61c5f59
commit 1b37264eb7
18 changed files with 136 additions and 112 deletions

View File

@ -13,6 +13,10 @@ for f in tests/*.py; do
python3 "$f" || close # if needed python3 "$f" || close # if needed
let "ran++" let "ran++"
done done
for f in tests/gossip-unittests/*.py; do
python3 "$f" || close # if needed
let "ran++"
done
echo "ran $ran unittests. Unittest Time: $SECONDS" echo "ran $ran unittests. Unittest Time: $SECONDS"
ran=0; ran=0;

View File

@ -6,4 +6,4 @@ Private is meant for controlling and accessing this node
from . import private from . import private
ClientAPI = private.PrivateAPI private_api = private.private_api

View File

@ -65,7 +65,7 @@ class PrivateAPI:
else: else:
self.host = httpapi.apiutils.setbindip.set_bind_IP( self.host = httpapi.apiutils.setbindip.set_bind_IP(
private_API_host_file) private_API_host_file)
logger.info('Running api on %s:%s' % (self.host, self.bindPort)) logger.info(f'Running API on {self.host}:{self.bindPort}', terminal=True)
self.httpServer = '' self.httpServer = ''
self.queueResponse = {} self.queueResponse = {}
@ -75,7 +75,6 @@ class PrivateAPI:
def start(self): def start(self):
"""Start client gevent API web server with flask client app.""" """Start client gevent API web server with flask client app."""
waitforsetvar.wait_for_set_var(self, "_too_many")
fd_handler = httpapi.fdsafehandler.FDSafeHandler fd_handler = httpapi.fdsafehandler.FDSafeHandler
self.httpServer = WSGIServer((self.host, self.bindPort), self.httpServer = WSGIServer((self.host, self.bindPort),
@ -116,3 +115,6 @@ class PrivateAPI:
decrypt=decrypt, decrypt=decrypt,
raw=raw, raw=raw,
headerOnly=headerOnly) headerOnly=headerOnly)
private_api = PrivateAPI()

View File

@ -41,16 +41,4 @@ def register_private_blueprints(private_api, app):
app.register_blueprint(private_sse_blueprint) app.register_blueprint(private_sse_blueprint)
app.register_blueprint(fileoffsetreader.offset_reader_api) app.register_blueprint(fileoffsetreader.offset_reader_api)
def _add_events_bp():
while True:
try:
private_api._too_many
break
except AttributeError:
sleep(0.2)
app.register_blueprint(
private_api._too_many.get_by_string('DaemonEventsBP').flask_bp)
Thread(target=_add_events_bp, daemon=True).start()
return app return app

View File

@ -68,7 +68,7 @@ def gossip_client():
# transport plugin handles the new peer # transport plugin handles the new peer
add_onionr_thread( add_onionr_thread(
get_new_peers, get_new_peers,
1200, initial_sleep=5) 120, initial_sleep=5)
# Start a new thread to stream blocks from peers # Start a new thread to stream blocks from peers
@ -76,8 +76,6 @@ def gossip_client():
dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH) dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH)
while True: while True:
sleep(5)
continue
while not len(gossip_peer_set): while not len(gossip_peer_set):
sleep(0.2) sleep(0.2)
if dandelion_phase.remaining_time() <= 10: if dandelion_phase.remaining_time() <= 10:

View File

@ -98,7 +98,7 @@ async def stem_out(d_phase: 'DandelionPhase'):
peer_sockets.append(await _setup_edge(gossip_peer_set, tried_edges)) peer_sockets.append(await _setup_edge(gossip_peer_set, tried_edges))
except NotEnoughEdges: except NotEnoughEdges:
# No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE) # No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE)
logger.warn("Not able to build enough peers for stemout.", logger.warn("Not able to build enough tunnels for stemout.",
terminal=True) terminal=True)
break break
else: else:

View File

@ -96,9 +96,13 @@ def gossip_server():
logger.debug( logger.debug(
"Inbound edge timed out when steming blocks to us", "Inbound edge timed out when steming blocks to us",
terminal=True) terminal=True)
except asyncio.exceptions.IncompleteReadError:
logger.debug(
"Inbound edge timed out (Incomplete Read) when steming blocks to us",
terminal=True)
except Exception: except Exception:
logger.warn( logger.warn(
f"Err getting\n{traceback.format_exc()}", f"Err acceptind stem blocks\n{traceback.format_exc()}",
terminal=True) terminal=True)
# Subtract dandelion edge, make sure >=0 # Subtract dandelion edge, make sure >=0
inbound_dandelion_edge_count[0] = \ inbound_dandelion_edge_count[0] = \

View File

@ -12,7 +12,7 @@ from ..blockqueues import gossip_block_queues
block_size_digits = len(str(BLOCK_MAX_SIZE)) block_size_digits = len(str(BLOCK_MAX_SIZE))
base_wait_timeout = 30 base_wait_timeout = 120
if TYPE_CHECKING: if TYPE_CHECKING:
from asyncio import StreamWriter, StreamReader from asyncio import StreamWriter, StreamReader

View File

@ -0,0 +1,44 @@
"""Onionr - Private P2P Communication.
Serialized APIs
"""
from asyncio.log import logger
import secrets
from flask import Blueprint, Response, request
from onionrblocks import Block
import logger
from gossip import blockqueues
from gossip.constants import BLOCK_ID_SIZE
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
blockapi = Blueprint('blockapi', __name__)
stream_to_use = secrets.randbits(1)
# Add a block that we generated (or received from a transport like LAN/sneakernet)
@blockapi.route('/addvdfblock', methods=['POST'])
def block_serialized():
req_data = request.data
block_id = req_data[:BLOCK_ID_SIZE]
block_data = req_data[BLOCK_ID_SIZE:]
blockqueues.gossip_block_queues[stream_to_use].put(
Block(block_id, block_data, auto_verify=False))
logger.info("Added block" + block_id, terminal=True)
return "ok"

View File

@ -28,10 +28,3 @@ def shutdown(client_api_inst):
except AttributeError: except AttributeError:
pass pass
return Response("bye") return Response("bye")
@shutdown_bp.route('/shutdownclean')
def shutdown_clean():
# good for calling from other clients
g.too_many.get_by_string("DeadSimpleKV").put('shutdown', True)
return Response("bye")

View File

@ -1,55 +0,0 @@
"""Onionr - Private P2P Communication.
Event driven interface to trigger events in communicator
"""
from typing import Callable
from flask import Blueprint, request, Response, abort
from werkzeug.exceptions import BadRequest
from gevent import spawn
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
class DaemonEventsBP:
def __init__(self):
"""Create DaemonEvents instance, intended to be a singleton.
Attributes:
listeners: callables that are called when a new event is added.
The callables name should match the event name
_too_many: TooManyObjects instance set by external code
"""
event_BP = Blueprint('event_BP', __name__)
self.listeners = set([])
self.flask_bp = event_BP
event_BP = self.flask_bp
@event_BP.route('/daemon-event/<name>', methods=['POST'])
def daemon_event_handler(name):
handler: Callable
try:
json_data = request.get_json(force=True)
except BadRequest:
json_data = {}
for handler in self.listeners:
if handler.__name__ == name:
return Response(handler(**json_data))
abort(404)
def register_listener(self, listener: Callable):
self.listeners.add(listener)

View File

@ -68,7 +68,6 @@ class PrivateEndpoints:
return Response("pong!") return Response("pong!")
@private_endpoints_bp.route('/shutdown') @private_endpoints_bp.route('/shutdown')
def shutdown(): def shutdown():
return apiutils.shutdown.shutdown(client_api) return apiutils.shutdown.shutdown(client_api)
@ -110,13 +109,3 @@ class PrivateEndpoints:
def get_os_system(): def get_os_system():
return Response(platform.system().lower()) return Response(platform.system().lower())
@private_endpoints_bp.route('/getgeneratingblocks')
def get_generating_blocks() -> Response:
return Response(
','.join(
g.too_many.get_by_string('DeadSimpleKV').get(
'generating_blocks'
))
)

View File

@ -71,12 +71,6 @@ class ClientAPISecurity:
f'Possible DNS rebinding attack by {request.host}') f'Possible DNS rebinding attack by {request.host}')
abort(403) abort(403)
# Add shared objects
try:
g.too_many = self.client_api._too_many
except KeyError:
g.too_many = None
# Static files for Onionr sites # Static files for Onionr sites
if request.path.startswith('/site/'): if request.path.startswith('/site/'):
return return

View File

@ -7,6 +7,7 @@ import queue
import sys import sys
import platform import platform
import signal import signal
from threading import Thread
from stem.connection import IncorrectPassword from stem.connection import IncorrectPassword
import toomanyobjs import toomanyobjs
@ -28,7 +29,6 @@ import onionrvalues
from onionrutils import cleanup from onionrutils import cleanup
from onionrcrypto import getourkeypair from onionrcrypto import getourkeypair
import runtests import runtests
from httpapi import daemoneventsapi
from .. import version from .. import version
from .killdaemon import kill_daemon # noqa from .killdaemon import kill_daemon # noqa
from .showlogo import show_logo from .showlogo import show_logo
@ -95,15 +95,11 @@ def daemon():
# Initialize the quasi-global variables # Initialize the quasi-global variables
setup_kv(shared_state.get(DeadSimpleKV)) setup_kv(shared_state.get(DeadSimpleKV))
shared_state.get(daemoneventsapi.DaemonEventsBP)
# Init run time tester # Init run time tester
# (ensures Onionr is running right, for testing purposes) # (ensures Onionr is running right, for testing purposes)
# Run time tests are not normally run # Run time tests are not normally run
shared_state.get(runtests.OnionrRunTestManager) shared_state.get(runtests.OnionrRunTestManager)
# initialize clientAPI but dont start it yet
shared_state.get(apiservers.ClientAPI)
shared_state.share_object() # share the parent object to the threads shared_state.share_object() # share the parent object to the threads
@ -117,10 +113,10 @@ def daemon():
events.event('init', threaded=False) events.event('init', threaded=False)
events.event('daemon_start') events.event('daemon_start')
gossip.start_gossip_threads() Thread(target=gossip.start_gossip_threads, daemon=True).start()
try: try:
shared_state.get(apiservers.ClientAPI).start() apiservers.private_api.start()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
@ -149,8 +145,9 @@ def start(override: bool = False):
except psutil.NoSuchProcess: except psutil.NoSuchProcess:
proc = "" proc = ""
if not proc.startswith("python"): if not proc.startswith("python"):
logger.info( logger.warn(
f"Detected stale run file, deleting {filepaths.lock_file}", terminal=True) f"Detected stale run file, deleting {filepaths.lock_file}",
terminal=True)
try: try:
os.remove(filepaths.lock_file) os.remove(filepaths.lock_file)
except FileNotFoundError: except FileNotFoundError:

View File

@ -43,4 +43,3 @@ def on_bootstrap(api, data):
args=[TorPeer(socks_address, socks_port, address)], args=[TorPeer(socks_address, socks_port, address)],
daemon=True).start() daemon=True).start()

View File

@ -0,0 +1,67 @@
import os, uuid
from queue import Empty
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:5], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
from time import sleep
from threading import Thread
import asyncio
import unittest
import sys
sys.path.append(".")
sys.path.append("src/")
from ordered_set import OrderedSet
import onionrblocks
import blockdb
from gossip.server import gossip_server
from gossip.blockqueues import gossip_block_queues
from filepaths import gossip_server_socket_file
BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
BLOCK_ID_SIZE = 128
BLOCK_STREAM_OFFSET_DIGITS = 8
class OnionrServerPutBlocksTest(unittest.TestCase):
def test_put_blocks(self):
Thread(target=gossip_server, daemon=True).start()
sleep(0.01)
blocks = []
for _ in range(10):
bl = onionrblocks.blockcreator.create_anonvdf_block(
b"my test block" + os.urandom(16), b"txt", 2800)
blockdb.add_block_to_db(bl)
blocks.append(bl)
async def blocks_put_client():
reader, writer = await asyncio.open_unix_connection(
gossip_server_socket_file)
writer.write(int(5).to_bytes(1, 'big'))
await writer.drain()
for bl in blocks:
writer.write(bl.id)
writer.write(
str(len(bl.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))
writer.write(bl.raw)
await writer.drain()
sleep(0.03)
try:
self.assertEqual(gossip_block_queues[0].get_nowait().raw, bl.raw)
except Empty:
self.assertEqual(gossip_block_queues[1].get_nowait().raw, bl.raw)
asyncio.run(blocks_put_client())
unittest.main()

View File

@ -1,6 +1,6 @@
import os, uuid import os, uuid
from queue import Empty from queue import Empty
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR) print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR os.environ["ONIONR_HOME"] = TEST_DIR

View File

@ -1,5 +1,5 @@
import os, uuid import os, uuid
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR) print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR os.environ["ONIONR_HOME"] = TEST_DIR