Compare commits

...

3 Commits

Author SHA1 Message Date
Kevin F 8712a1c401 Changed rpc block wrapper to use multiprocessing 2023-01-12 21:51:27 -06:00
Kevin F 8511fb42b6 Added test for multiprocess wrapper 2023-01-12 21:51:03 -06:00
Kevin F 9eb2e5d413 Added multiprocess wrapper to do simple function calls in a seperate process 2023-01-12 21:42:22 -06:00
4 changed files with 103 additions and 5 deletions

31
src/utils/multiproc.py Normal file
View File

@ -0,0 +1,31 @@
from typing import Callable
import multiprocessing
import time
def _compute(q: multiprocessing.Queue, func: Callable, *args, **kwargs):
q.put(func(*args, **kwargs))
def subprocess_compute(func: Callable, wallclock_timeout: int, *args, **kwargs):
"""
Call func in a subprocess, and return the result. Set wallclock_timeout to <= 0 to disable
Raises TimeoutError if the function does not return in time
Raises ChildProcessError if the subprocess dies before returning
"""
q = multiprocessing.Queue()
p = multiprocessing.Process(
target=_compute, args=(q, func, *args), kwargs=kwargs, daemon=True)
wallclock_timeout = max(wallclock_timeout, 0)
p.start()
start = time.time()
while True:
try:
return q.get(timeout=1)
except multiprocessing.queues.Empty:
if not p.is_alive():
raise ChildProcessError("Process died before returning")
if wallclock_timeout:
if time.time() - start >= wallclock_timeout:
raise TimeoutError("Process did not return in time")

View File

@ -67,7 +67,7 @@ def _detect_cors_and_add_headers():
class OnionrRPC(object):
@cherrypy.expose
def queue_rpc(self):
def threaded_rpc(self):
if _detect_cors_and_add_headers():
return ''

View File

@ -5,10 +5,10 @@ from base64 import b85decode
from onionrblocks import Block
import onionrblocks
from jsonrpc import dispatcher
import ujson
from gossip.blockqueues import gossip_block_queues
from blockdb import get_blocks_after_timestamp
from utils import multiproc
@dispatcher.add_method
@ -19,10 +19,16 @@ def get_blocks(timestamp):
@dispatcher.add_method
def create_block(
block_data: 'base64', block_type: str, ttl: int, metadata: dict):
# TODO use a module from an old version to use multiprocessing to avoid blocking GIL
# Wrapper for onionrblocks.create_block (take base64 to be compatible with RPC)
bl = onionrblocks.create_anonvdf_block(
base64.b64decode(block_data), block_type, ttl, **metadata)
bl = multiproc.subprocess_compute(
onionrblocks.create_anonvdf_block,
3600,
base64.b64decode(block_data),
block_type,
ttl,
**metadata
)
return base64.b85encode(bl.raw).decode('utf-8')
queue_to_use = randbits(1)

61
tests/test_multiproc.py Normal file
View File

@ -0,0 +1,61 @@
#!/usr/bin/env python3
import sys, os
import time
sys.path.append(".")
sys.path.append("src/")
import uuid
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest, json
from utils import identifyhome, createdirs
from onionrsetup import setup_config
from utils import multiproc
createdirs.create_dirs()
setup_config()
class TestMultiProc(unittest.TestCase):
def test_list_args(self):
answer = multiproc.subprocess_compute(sum, 10, [1, 3])
self.assertEqual(answer, 4)
def test_two_args(self):
def _add(a, b):
return a + b
answer = multiproc.subprocess_compute(_add, 10, 1, 3)
self.assertEqual(answer, 4)
def test_kwargs(self):
def _add(a=0, b=0):
return a + b
answer = multiproc.subprocess_compute(_add, 10, a=1, b=3)
self.assertEqual(answer, 4)
def test_exception(self):
def _fail():
raise Exception("This always fails")
with self.assertRaises(ChildProcessError):
multiproc.subprocess_compute(_fail, 10)
def test_delayed_exception(self):
def _fail():
time.sleep(3)
raise Exception("This always fails")
with self.assertRaises(ChildProcessError):
multiproc.subprocess_compute(_fail, 10)
def test_timeout(self):
def _sleep():
time.sleep(3)
with self.assertRaises(TimeoutError):
multiproc.subprocess_compute(_sleep, 1)
def test_timeout_disabled(self):
def _sleep():
time.sleep(3)
self.assertIsNone(multiproc.subprocess_compute(_sleep, -1))
self.assertIsNone(multiproc.subprocess_compute(_sleep, 0))
unittest.main()