From 9eb2e5d4133271592e68b09b8f25ea4cf607ff77 Mon Sep 17 00:00:00 2001 From: Kevin F Date: Thu, 12 Jan 2023 21:42:22 -0600 Subject: [PATCH] Added multiprocess wrapper to do simple function calls in a seperate process --- src/utils/multiproc.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 src/utils/multiproc.py diff --git a/src/utils/multiproc.py b/src/utils/multiproc.py new file mode 100644 index 00000000..661c15dc --- /dev/null +++ b/src/utils/multiproc.py @@ -0,0 +1,31 @@ +from typing import Callable +import multiprocessing +import time + + +def _compute(q: multiprocessing.Queue, func: Callable, *args, **kwargs): + q.put(func(*args, **kwargs)) + +def subprocess_compute(func: Callable, wallclock_timeout: int, *args, **kwargs): + """ + Call func in a subprocess, and return the result. Set wallclock_timeout to <= 0 to disable + Raises TimeoutError if the function does not return in time + Raises ChildProcessError if the subprocess dies before returning + """ + q = multiprocessing.Queue() + p = multiprocessing.Process( + target=_compute, args=(q, func, *args), kwargs=kwargs, daemon=True) + wallclock_timeout = max(wallclock_timeout, 0) + + p.start() + start = time.time() + while True: + try: + return q.get(timeout=1) + except multiprocessing.queues.Empty: + if not p.is_alive(): + raise ChildProcessError("Process died before returning") + if wallclock_timeout: + if time.time() - start >= wallclock_timeout: + raise TimeoutError("Process did not return in time") +