Compare commits
No commits in common. "8712a1c40123cb8e01642ce6749d3fc96ce6ba5b" and "39c01fdbc5dfd901f510b3f6193eb30ffe0bdefb" have entirely different histories.
8712a1c401
...
39c01fdbc5
@ -1,31 +0,0 @@
|
||||
from typing import Callable
|
||||
import multiprocessing
|
||||
import time
|
||||
|
||||
|
||||
def _compute(q: multiprocessing.Queue, func: Callable, *args, **kwargs):
|
||||
q.put(func(*args, **kwargs))
|
||||
|
||||
def subprocess_compute(func: Callable, wallclock_timeout: int, *args, **kwargs):
|
||||
"""
|
||||
Call func in a subprocess, and return the result. Set wallclock_timeout to <= 0 to disable
|
||||
Raises TimeoutError if the function does not return in time
|
||||
Raises ChildProcessError if the subprocess dies before returning
|
||||
"""
|
||||
q = multiprocessing.Queue()
|
||||
p = multiprocessing.Process(
|
||||
target=_compute, args=(q, func, *args), kwargs=kwargs, daemon=True)
|
||||
wallclock_timeout = max(wallclock_timeout, 0)
|
||||
|
||||
p.start()
|
||||
start = time.time()
|
||||
while True:
|
||||
try:
|
||||
return q.get(timeout=1)
|
||||
except multiprocessing.queues.Empty:
|
||||
if not p.is_alive():
|
||||
raise ChildProcessError("Process died before returning")
|
||||
if wallclock_timeout:
|
||||
if time.time() - start >= wallclock_timeout:
|
||||
raise TimeoutError("Process did not return in time")
|
||||
|
@ -67,7 +67,7 @@ def _detect_cors_and_add_headers():
|
||||
|
||||
class OnionrRPC(object):
|
||||
@cherrypy.expose
|
||||
def threaded_rpc(self):
|
||||
def queue_rpc(self):
|
||||
if _detect_cors_and_add_headers():
|
||||
return ''
|
||||
|
||||
|
@ -5,10 +5,10 @@ from base64 import b85decode
|
||||
from onionrblocks import Block
|
||||
import onionrblocks
|
||||
from jsonrpc import dispatcher
|
||||
import ujson
|
||||
|
||||
from gossip.blockqueues import gossip_block_queues
|
||||
from blockdb import get_blocks_after_timestamp
|
||||
from utils import multiproc
|
||||
|
||||
|
||||
@dispatcher.add_method
|
||||
@ -19,16 +19,10 @@ def get_blocks(timestamp):
|
||||
@dispatcher.add_method
|
||||
def create_block(
|
||||
block_data: 'base64', block_type: str, ttl: int, metadata: dict):
|
||||
# TODO use a module from an old version to use multiprocessing to avoid blocking GIL
|
||||
# Wrapper for onionrblocks.create_block (take base64 to be compatible with RPC)
|
||||
bl = multiproc.subprocess_compute(
|
||||
onionrblocks.create_anonvdf_block,
|
||||
3600,
|
||||
base64.b64decode(block_data),
|
||||
block_type,
|
||||
ttl,
|
||||
**metadata
|
||||
)
|
||||
|
||||
bl = onionrblocks.create_anonvdf_block(
|
||||
base64.b64decode(block_data), block_type, ttl, **metadata)
|
||||
return base64.b85encode(bl.raw).decode('utf-8')
|
||||
|
||||
queue_to_use = randbits(1)
|
@ -1,61 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys, os
|
||||
import time
|
||||
sys.path.append(".")
|
||||
sys.path.append("src/")
|
||||
import uuid
|
||||
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
import unittest, json
|
||||
|
||||
from utils import identifyhome, createdirs
|
||||
from onionrsetup import setup_config
|
||||
from utils import multiproc
|
||||
createdirs.create_dirs()
|
||||
setup_config()
|
||||
|
||||
class TestMultiProc(unittest.TestCase):
|
||||
def test_list_args(self):
|
||||
answer = multiproc.subprocess_compute(sum, 10, [1, 3])
|
||||
self.assertEqual(answer, 4)
|
||||
|
||||
def test_two_args(self):
|
||||
def _add(a, b):
|
||||
return a + b
|
||||
answer = multiproc.subprocess_compute(_add, 10, 1, 3)
|
||||
self.assertEqual(answer, 4)
|
||||
|
||||
def test_kwargs(self):
|
||||
def _add(a=0, b=0):
|
||||
return a + b
|
||||
answer = multiproc.subprocess_compute(_add, 10, a=1, b=3)
|
||||
self.assertEqual(answer, 4)
|
||||
|
||||
def test_exception(self):
|
||||
def _fail():
|
||||
raise Exception("This always fails")
|
||||
with self.assertRaises(ChildProcessError):
|
||||
multiproc.subprocess_compute(_fail, 10)
|
||||
|
||||
def test_delayed_exception(self):
|
||||
def _fail():
|
||||
time.sleep(3)
|
||||
raise Exception("This always fails")
|
||||
with self.assertRaises(ChildProcessError):
|
||||
multiproc.subprocess_compute(_fail, 10)
|
||||
|
||||
def test_timeout(self):
|
||||
def _sleep():
|
||||
time.sleep(3)
|
||||
with self.assertRaises(TimeoutError):
|
||||
multiproc.subprocess_compute(_sleep, 1)
|
||||
|
||||
def test_timeout_disabled(self):
|
||||
def _sleep():
|
||||
time.sleep(3)
|
||||
self.assertIsNone(multiproc.subprocess_compute(_sleep, -1))
|
||||
self.assertIsNone(multiproc.subprocess_compute(_sleep, 0))
|
||||
|
||||
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user