Compare commits
No commits in common. "8712a1c40123cb8e01642ce6749d3fc96ce6ba5b" and "39c01fdbc5dfd901f510b3f6193eb30ffe0bdefb" have entirely different histories.
8712a1c401
...
39c01fdbc5
@ -1,31 +0,0 @@
|
|||||||
from typing import Callable
|
|
||||||
import multiprocessing
|
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
def _compute(q: multiprocessing.Queue, func: Callable, *args, **kwargs):
|
|
||||||
q.put(func(*args, **kwargs))
|
|
||||||
|
|
||||||
def subprocess_compute(func: Callable, wallclock_timeout: int, *args, **kwargs):
|
|
||||||
"""
|
|
||||||
Call func in a subprocess, and return the result. Set wallclock_timeout to <= 0 to disable
|
|
||||||
Raises TimeoutError if the function does not return in time
|
|
||||||
Raises ChildProcessError if the subprocess dies before returning
|
|
||||||
"""
|
|
||||||
q = multiprocessing.Queue()
|
|
||||||
p = multiprocessing.Process(
|
|
||||||
target=_compute, args=(q, func, *args), kwargs=kwargs, daemon=True)
|
|
||||||
wallclock_timeout = max(wallclock_timeout, 0)
|
|
||||||
|
|
||||||
p.start()
|
|
||||||
start = time.time()
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
return q.get(timeout=1)
|
|
||||||
except multiprocessing.queues.Empty:
|
|
||||||
if not p.is_alive():
|
|
||||||
raise ChildProcessError("Process died before returning")
|
|
||||||
if wallclock_timeout:
|
|
||||||
if time.time() - start >= wallclock_timeout:
|
|
||||||
raise TimeoutError("Process did not return in time")
|
|
||||||
|
|
@ -67,7 +67,7 @@ def _detect_cors_and_add_headers():
|
|||||||
|
|
||||||
class OnionrRPC(object):
|
class OnionrRPC(object):
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
def threaded_rpc(self):
|
def queue_rpc(self):
|
||||||
if _detect_cors_and_add_headers():
|
if _detect_cors_and_add_headers():
|
||||||
return ''
|
return ''
|
||||||
|
|
||||||
|
@ -5,10 +5,10 @@ from base64 import b85decode
|
|||||||
from onionrblocks import Block
|
from onionrblocks import Block
|
||||||
import onionrblocks
|
import onionrblocks
|
||||||
from jsonrpc import dispatcher
|
from jsonrpc import dispatcher
|
||||||
|
import ujson
|
||||||
|
|
||||||
from gossip.blockqueues import gossip_block_queues
|
from gossip.blockqueues import gossip_block_queues
|
||||||
from blockdb import get_blocks_after_timestamp
|
from blockdb import get_blocks_after_timestamp
|
||||||
from utils import multiproc
|
|
||||||
|
|
||||||
|
|
||||||
@dispatcher.add_method
|
@dispatcher.add_method
|
||||||
@ -19,16 +19,10 @@ def get_blocks(timestamp):
|
|||||||
@dispatcher.add_method
|
@dispatcher.add_method
|
||||||
def create_block(
|
def create_block(
|
||||||
block_data: 'base64', block_type: str, ttl: int, metadata: dict):
|
block_data: 'base64', block_type: str, ttl: int, metadata: dict):
|
||||||
|
# TODO use a module from an old version to use multiprocessing to avoid blocking GIL
|
||||||
# Wrapper for onionrblocks.create_block (take base64 to be compatible with RPC)
|
# Wrapper for onionrblocks.create_block (take base64 to be compatible with RPC)
|
||||||
bl = multiproc.subprocess_compute(
|
bl = onionrblocks.create_anonvdf_block(
|
||||||
onionrblocks.create_anonvdf_block,
|
base64.b64decode(block_data), block_type, ttl, **metadata)
|
||||||
3600,
|
|
||||||
base64.b64decode(block_data),
|
|
||||||
block_type,
|
|
||||||
ttl,
|
|
||||||
**metadata
|
|
||||||
)
|
|
||||||
|
|
||||||
return base64.b85encode(bl.raw).decode('utf-8')
|
return base64.b85encode(bl.raw).decode('utf-8')
|
||||||
|
|
||||||
queue_to_use = randbits(1)
|
queue_to_use = randbits(1)
|
@ -1,61 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
import sys, os
|
|
||||||
import time
|
|
||||||
sys.path.append(".")
|
|
||||||
sys.path.append("src/")
|
|
||||||
import uuid
|
|
||||||
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
|
||||||
print("Test directory:", TEST_DIR)
|
|
||||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
|
||||||
import unittest, json
|
|
||||||
|
|
||||||
from utils import identifyhome, createdirs
|
|
||||||
from onionrsetup import setup_config
|
|
||||||
from utils import multiproc
|
|
||||||
createdirs.create_dirs()
|
|
||||||
setup_config()
|
|
||||||
|
|
||||||
class TestMultiProc(unittest.TestCase):
|
|
||||||
def test_list_args(self):
|
|
||||||
answer = multiproc.subprocess_compute(sum, 10, [1, 3])
|
|
||||||
self.assertEqual(answer, 4)
|
|
||||||
|
|
||||||
def test_two_args(self):
|
|
||||||
def _add(a, b):
|
|
||||||
return a + b
|
|
||||||
answer = multiproc.subprocess_compute(_add, 10, 1, 3)
|
|
||||||
self.assertEqual(answer, 4)
|
|
||||||
|
|
||||||
def test_kwargs(self):
|
|
||||||
def _add(a=0, b=0):
|
|
||||||
return a + b
|
|
||||||
answer = multiproc.subprocess_compute(_add, 10, a=1, b=3)
|
|
||||||
self.assertEqual(answer, 4)
|
|
||||||
|
|
||||||
def test_exception(self):
|
|
||||||
def _fail():
|
|
||||||
raise Exception("This always fails")
|
|
||||||
with self.assertRaises(ChildProcessError):
|
|
||||||
multiproc.subprocess_compute(_fail, 10)
|
|
||||||
|
|
||||||
def test_delayed_exception(self):
|
|
||||||
def _fail():
|
|
||||||
time.sleep(3)
|
|
||||||
raise Exception("This always fails")
|
|
||||||
with self.assertRaises(ChildProcessError):
|
|
||||||
multiproc.subprocess_compute(_fail, 10)
|
|
||||||
|
|
||||||
def test_timeout(self):
|
|
||||||
def _sleep():
|
|
||||||
time.sleep(3)
|
|
||||||
with self.assertRaises(TimeoutError):
|
|
||||||
multiproc.subprocess_compute(_sleep, 1)
|
|
||||||
|
|
||||||
def test_timeout_disabled(self):
|
|
||||||
def _sleep():
|
|
||||||
time.sleep(3)
|
|
||||||
self.assertIsNone(multiproc.subprocess_compute(_sleep, -1))
|
|
||||||
self.assertIsNone(multiproc.subprocess_compute(_sleep, 0))
|
|
||||||
|
|
||||||
|
|
||||||
unittest.main()
|
|
Loading…
Reference in New Issue
Block a user