Compare commits

..

2 Commits

Author SHA1 Message Date
Kevin F
39c01fdbc5 Improved RPC, added threaded RPC with result fetcher 2023-01-12 00:23:28 -06:00
Kevin F
7bcef03592 Added setunixsocket and settcpsocket helper commands to RPC plugin 2023-01-11 16:51:11 -06:00
3 changed files with 98 additions and 3 deletions

View File

@ -0,0 +1,41 @@
"""
"""
import threading
import os
import time
import traceback
import collections
from typing import Union
import ujson
import jsonrpc
from logger import log as logging
rpc_results = collections.deque(maxlen=10000)
def get_results(id) -> Union[str, None]:
final = None
for result in rpc_results:
if str(result['id']) == str(id):
final = result
break
else:
return None
rpc_results.remove(final)
return ujson.dumps(final)
def _exec_rpc(rpc_json_str):
json_resp = jsonrpc.JSONRPCResponseManager.handle(rpc_json_str, jsonrpc.dispatcher)
data = json_resp.data
rpc_results.append(data)
def threaded_rpc(rpc_json_str):
threading.Thread(
target=_exec_rpc,
args=(rpc_json_str,),
daemon=True,
name="JSON RPC").start()

View File

@ -52,18 +52,47 @@ from onionrplugins import plugin_apis
from rpc import blocks, pluginrpcmethods from rpc import blocks, pluginrpcmethods
from rpc.addmodule import add_module_to_api from rpc.addmodule import add_module_to_api
import longrpc
plugin_apis['rpc.add_module_to_api'] = add_module_to_api plugin_apis['rpc.add_module_to_api'] = add_module_to_api
def _detect_cors_and_add_headers():
cherrypy.response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
cherrypy.response.headers['Access-Control-Allow-Origin'] = '*'
cherrypy.response.headers['Access-Control-Allow-Methods'] = 'POST'
if cherrypy.request.method == 'OPTIONS':
return True
return False
class OnionrRPC(object): class OnionrRPC(object):
@cherrypy.expose
def queue_rpc(self):
if _detect_cors_and_add_headers():
return ''
rpc_request_json: str = cherrypy.request.body.read().decode('utf-8')
longrpc.threaded_rpc(rpc_request_json)
return 'ok'
@cherrypy.expose
def get_rpc_result(self, id=0):
if _detect_cors_and_add_headers():
return ''
results = longrpc.get_results(id)
if not results:
return '"no result"'
return results
@cherrypy.expose @cherrypy.expose
def rpc(self): def rpc(self):
# Basic RPC, intended for small amounts of work
# Use /queue_rpc for large workloads like creating blocks
# and getting results with /get_rpc_result?id=<id>
# Dispatcher is dictionary {<method_name>: callable} # Dispatcher is dictionary {<method_name>: callable}
if cherrypy.request.method == 'OPTIONS': if _detect_cors_and_add_headers():
cherrypy.response.headers['Access-Control-Allow-Origin'] = '*'
cherrypy.response.headers['Access-Control-Allow-Methods'] = 'POST'
return '' return ''
data = cherrypy.request.body.read().decode('utf-8') data = cherrypy.request.body.read().decode('utf-8')
@ -89,7 +118,10 @@ def on_beforecmdparsing(api, data=None):
def on_afterinit(api, data=None): def on_afterinit(api, data=None):
def ping(): def ping():
return "pong" return "pong"
def always_fails():
raise Exception("This always fails")
dispatcher['ping'] = ping dispatcher['ping'] = ping
dispatcher['always_fails'] = always_fails
pluginrpcmethods.add_plugin_rpc_methods() pluginrpcmethods.add_plugin_rpc_methods()
@ -97,6 +129,27 @@ def _gen_random_loopback():
return f'127.{randbelow(256)}.{randbelow(256)}.{randbelow(256)}' return f'127.{randbelow(256)}.{randbelow(256)}.{randbelow(256)}'
def on_setunixsocket_cmd(api, data=None):
config.set('rpc.use_sock_file', True, savefile=True)
logging.info('Set RPC to use unix socket')
def on_settcpsocket_cmd(api, data=None):
config.set('rpc.use_sock_file', False, savefile=True)
address = input('Enter address to bind to (default: random loopback): ').strip()
if not address:
address = _gen_random_loopback()
port = input('Enter port to bind to (default: 0 (random/picked by OS)): ').strip()
if not port:
port = 0
port = int(port)
config.set('rpc.bind_host', address, savefile=True)
config.set('rpc.bind_port', port, savefile=True)
logging.info(
'Set RPC to use TCP socket http://' +
f'{config.get("rpc.bind_host")}:{config.get("rpc.bind_port")}')
def on_init(api, data=None): def on_init(api, data=None):
bind_config = {} bind_config = {}
if config.get('rpc.use_sock_file', True, save=True): if config.get('rpc.use_sock_file', True, save=True):

View File

@ -19,6 +19,7 @@ def get_blocks(timestamp):
@dispatcher.add_method @dispatcher.add_method
def create_block( def create_block(
block_data: 'base64', block_type: str, ttl: int, metadata: dict): block_data: 'base64', block_type: str, ttl: int, metadata: dict):
# TODO use a module from an old version to use multiprocessing to avoid blocking GIL
# Wrapper for onionrblocks.create_block (take base64 to be compatible with RPC) # Wrapper for onionrblocks.create_block (take base64 to be compatible with RPC)
bl = onionrblocks.create_anonvdf_block( bl = onionrblocks.create_anonvdf_block(
base64.b64decode(block_data), block_type, ttl, **metadata) base64.b64decode(block_data), block_type, ttl, **metadata)