Compare commits
3 Commits
39c01fdbc5
...
8712a1c401
Author | SHA1 | Date | |
---|---|---|---|
|
8712a1c401 | ||
|
8511fb42b6 | ||
|
9eb2e5d413 |
31
src/utils/multiproc.py
Normal file
31
src/utils/multiproc.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
from typing import Callable
|
||||||
|
import multiprocessing
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
def _compute(q: multiprocessing.Queue, func: Callable, *args, **kwargs):
|
||||||
|
q.put(func(*args, **kwargs))
|
||||||
|
|
||||||
|
def subprocess_compute(func: Callable, wallclock_timeout: int, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Call func in a subprocess, and return the result. Set wallclock_timeout to <= 0 to disable
|
||||||
|
Raises TimeoutError if the function does not return in time
|
||||||
|
Raises ChildProcessError if the subprocess dies before returning
|
||||||
|
"""
|
||||||
|
q = multiprocessing.Queue()
|
||||||
|
p = multiprocessing.Process(
|
||||||
|
target=_compute, args=(q, func, *args), kwargs=kwargs, daemon=True)
|
||||||
|
wallclock_timeout = max(wallclock_timeout, 0)
|
||||||
|
|
||||||
|
p.start()
|
||||||
|
start = time.time()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
return q.get(timeout=1)
|
||||||
|
except multiprocessing.queues.Empty:
|
||||||
|
if not p.is_alive():
|
||||||
|
raise ChildProcessError("Process died before returning")
|
||||||
|
if wallclock_timeout:
|
||||||
|
if time.time() - start >= wallclock_timeout:
|
||||||
|
raise TimeoutError("Process did not return in time")
|
||||||
|
|
@ -67,7 +67,7 @@ def _detect_cors_and_add_headers():
|
|||||||
|
|
||||||
class OnionrRPC(object):
|
class OnionrRPC(object):
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
def queue_rpc(self):
|
def threaded_rpc(self):
|
||||||
if _detect_cors_and_add_headers():
|
if _detect_cors_and_add_headers():
|
||||||
return ''
|
return ''
|
||||||
|
|
||||||
|
@ -5,10 +5,10 @@ from base64 import b85decode
|
|||||||
from onionrblocks import Block
|
from onionrblocks import Block
|
||||||
import onionrblocks
|
import onionrblocks
|
||||||
from jsonrpc import dispatcher
|
from jsonrpc import dispatcher
|
||||||
import ujson
|
|
||||||
|
|
||||||
from gossip.blockqueues import gossip_block_queues
|
from gossip.blockqueues import gossip_block_queues
|
||||||
from blockdb import get_blocks_after_timestamp
|
from blockdb import get_blocks_after_timestamp
|
||||||
|
from utils import multiproc
|
||||||
|
|
||||||
|
|
||||||
@dispatcher.add_method
|
@dispatcher.add_method
|
||||||
@ -19,10 +19,16 @@ def get_blocks(timestamp):
|
|||||||
@dispatcher.add_method
|
@dispatcher.add_method
|
||||||
def create_block(
|
def create_block(
|
||||||
block_data: 'base64', block_type: str, ttl: int, metadata: dict):
|
block_data: 'base64', block_type: str, ttl: int, metadata: dict):
|
||||||
# TODO use a module from an old version to use multiprocessing to avoid blocking GIL
|
|
||||||
# Wrapper for onionrblocks.create_block (take base64 to be compatible with RPC)
|
# Wrapper for onionrblocks.create_block (take base64 to be compatible with RPC)
|
||||||
bl = onionrblocks.create_anonvdf_block(
|
bl = multiproc.subprocess_compute(
|
||||||
base64.b64decode(block_data), block_type, ttl, **metadata)
|
onionrblocks.create_anonvdf_block,
|
||||||
|
3600,
|
||||||
|
base64.b64decode(block_data),
|
||||||
|
block_type,
|
||||||
|
ttl,
|
||||||
|
**metadata
|
||||||
|
)
|
||||||
|
|
||||||
return base64.b85encode(bl.raw).decode('utf-8')
|
return base64.b85encode(bl.raw).decode('utf-8')
|
||||||
|
|
||||||
queue_to_use = randbits(1)
|
queue_to_use = randbits(1)
|
61
tests/test_multiproc.py
Normal file
61
tests/test_multiproc.py
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import sys, os
|
||||||
|
import time
|
||||||
|
sys.path.append(".")
|
||||||
|
sys.path.append("src/")
|
||||||
|
import uuid
|
||||||
|
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||||
|
print("Test directory:", TEST_DIR)
|
||||||
|
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||||
|
import unittest, json
|
||||||
|
|
||||||
|
from utils import identifyhome, createdirs
|
||||||
|
from onionrsetup import setup_config
|
||||||
|
from utils import multiproc
|
||||||
|
createdirs.create_dirs()
|
||||||
|
setup_config()
|
||||||
|
|
||||||
|
class TestMultiProc(unittest.TestCase):
|
||||||
|
def test_list_args(self):
|
||||||
|
answer = multiproc.subprocess_compute(sum, 10, [1, 3])
|
||||||
|
self.assertEqual(answer, 4)
|
||||||
|
|
||||||
|
def test_two_args(self):
|
||||||
|
def _add(a, b):
|
||||||
|
return a + b
|
||||||
|
answer = multiproc.subprocess_compute(_add, 10, 1, 3)
|
||||||
|
self.assertEqual(answer, 4)
|
||||||
|
|
||||||
|
def test_kwargs(self):
|
||||||
|
def _add(a=0, b=0):
|
||||||
|
return a + b
|
||||||
|
answer = multiproc.subprocess_compute(_add, 10, a=1, b=3)
|
||||||
|
self.assertEqual(answer, 4)
|
||||||
|
|
||||||
|
def test_exception(self):
|
||||||
|
def _fail():
|
||||||
|
raise Exception("This always fails")
|
||||||
|
with self.assertRaises(ChildProcessError):
|
||||||
|
multiproc.subprocess_compute(_fail, 10)
|
||||||
|
|
||||||
|
def test_delayed_exception(self):
|
||||||
|
def _fail():
|
||||||
|
time.sleep(3)
|
||||||
|
raise Exception("This always fails")
|
||||||
|
with self.assertRaises(ChildProcessError):
|
||||||
|
multiproc.subprocess_compute(_fail, 10)
|
||||||
|
|
||||||
|
def test_timeout(self):
|
||||||
|
def _sleep():
|
||||||
|
time.sleep(3)
|
||||||
|
with self.assertRaises(TimeoutError):
|
||||||
|
multiproc.subprocess_compute(_sleep, 1)
|
||||||
|
|
||||||
|
def test_timeout_disabled(self):
|
||||||
|
def _sleep():
|
||||||
|
time.sleep(3)
|
||||||
|
self.assertIsNone(multiproc.subprocess_compute(_sleep, -1))
|
||||||
|
self.assertIsNone(multiproc.subprocess_compute(_sleep, 0))
|
||||||
|
|
||||||
|
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue
Block a user