diff --git a/src/anonvdf-block-creator.py b/src/anonvdf-block-creator.py
new file mode 100755
index 00000000..4ac1f08c
--- /dev/null
+++ b/src/anonvdf-block-creator.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+import sys
+import os
+
+import ujson as json
+
+from onionrblocks import blockcreator
+
+# This script creates a block without storing it. it is written to stdout
+# It is used instead of in the main process to avoid GIL locking/slow down
+
+metadata = json.loads(sys.argv[1])
+block_type = sys.argv[2]
+ttl = int(sys.argv[3])
+
+data = sys.stdin.read()
+
+with os.fdopen(sys.stdout.fileno(), 'wb') as stdout:
+ bl = blockcreator.create_anonvdf_block(data, block_type, ttl, **metadata)
+ try:
+ stdout.write(bl.id + bl.get_packed())
+ except BrokenPipeError:
+ pass
diff --git a/src/blockcreatorqueue/__init__.py b/src/blockcreatorqueue/__init__.py
new file mode 100644
index 00000000..67366878
--- /dev/null
+++ b/src/blockcreatorqueue/__init__.py
@@ -0,0 +1,69 @@
+"""Onionr - Private P2P Communication.
+
+BlockCreatorQueue, generate anonvdf blocks in a queue
+"""
+from typing import Callable
+from threading import Thread
+from hashlib import sha3_224
+from os import cpu_count
+import time
+
+import blockio
+"""
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see .
+"""
+
+
+class AlreadyGenerating(Exception): pass # noqa
+
+
+class BlockCreatorQueue:
+ def __init__(
+ self, callback_func: Callable, *additional_callback_func_args,
+ **additional_callback_func_kwargs):
+ self.callback_func = callback_func
+ self.queued = set()
+ self.max_parallel = cpu_count()
+ self.additional_callback_func_args = additional_callback_func_args
+ self.additional_callback_func_kwargs = additional_callback_func_kwargs
+
+ def block_data_in_queue(self, block_data: bytes) -> bool:
+ if sha3_224(block_data).digest() in self.queued:
+ return True
+ return False
+
+ def queue_block(
+ self, block_data, block_type, ttl: int, **block_metadata) -> bytes:
+ """Spawn a thread to make a subprocess to generate a block
+ if queue is not full, else wait"""
+
+ digest = sha3_224(block_data).digest()
+
+ def _do_create():
+ if digest in self.queued:
+ raise AlreadyGenerating()
+ self.queued.add(digest)
+ while len(self.queued) >= self.max_parallel:
+ time.sleep(1)
+ result = blockio.subprocgenerate.vdf_block(
+ block_data, block_type, ttl, **block_metadata)
+ self.queued.remove(digest)
+ self.callback_func(
+ result,
+ *self.additional_callback_func_args,
+ **self.additional_callback_func_kwargs)
+
+ Thread(target=_do_create, daemon=True).start()
+ return digest
+
diff --git a/src/blockio/__init__.py b/src/blockio/__init__.py
index df0f8d05..0cb504b8 100644
--- a/src/blockio/__init__.py
+++ b/src/blockio/__init__.py
@@ -5,6 +5,7 @@ Wrap safedb for storing and fetching blocks
from .store import store_block
from .load import load_block, list_blocks_by_type, list_all_blocks
from .clean import clean_expired_blocks, clean_block_list_entries
+from . import subprocgenerate
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
diff --git a/src/blockio/subprocgenerate.py b/src/blockio/subprocgenerate.py
new file mode 100644
index 00000000..a59c1ded
--- /dev/null
+++ b/src/blockio/subprocgenerate.py
@@ -0,0 +1,25 @@
+import os
+import subprocess
+
+import ujson as json
+
+import kasten
+from onionrblocks.generators.anonvdf import AnonVDFGenerator
+
+_DIR = os.path.dirname(os.path.realpath(__file__)) + '/../'
+
+
+def vdf_block(data, data_type, ttl, **metadata):
+ generated = subprocess.Popen(
+ [
+ f'{_DIR}anonvdf-block-creator.py',
+ json.dumps(metadata),
+ data_type,
+ str(ttl)],
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE)
+ generated = generated.communicate(data)[0]
+ return kasten.Kasten(
+ generated[:64], generated[64:],
+ AnonVDFGenerator, auto_check_generator=True)
+
diff --git a/src/httpapi/serializedapi/__init__.py b/src/httpapi/serializedapi/__init__.py
index 2866eb7c..345ccc3c 100644
--- a/src/httpapi/serializedapi/__init__.py
+++ b/src/httpapi/serializedapi/__init__.py
@@ -3,7 +3,7 @@
view and interact with onionr sites
"""
from flask import Blueprint, Response, request, abort, g
-import json
+import ujson as json
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -26,6 +26,15 @@ serialized_api_bp = Blueprint('serializedapi', __name__)
@serialized_api_bp.route(
'/serialized/', endpoint='serialized', methods=['POST'])
def serialized(name: str) -> Response:
+ def _do_call(method, *args, **kwargs):
+ try:
+ resp = method(*args, **kwargs)
+ if isinstance(resp, int):
+ resp = str(resp)
+ return Response(resp, content_type='application/octet-stream')
+ except Exception as e:
+ return Response(repr(e), content_type='text/plain', status=500)
+
initial = g.too_many.get_by_string(name.split('.')[0])
for c, i in enumerate(name.split('.')):
if i and c != 0:
@@ -45,16 +54,10 @@ def serialized(name: str) -> Response:
print('data', data)
if data:
print(*args, **data)
- resp = attr(*args, **data)
- if isinstance(resp, int):
- resp = str(resp)
- return Response(resp)
+ return _do_call(attr, *args, **data)
else:
print(*args, **data)
- resp = attr(*args)
- if isinstance(resp, int):
- resp = str(resp)
- return Response(resp, content_type='application/octet-stream')
+ return _do_call(attr, *args)
else:
if isinstance(attr, int):
attr = str(attr)
diff --git a/tests/test_blockcreatorqueue.py b/tests/test_blockcreatorqueue.py
new file mode 100644
index 00000000..9fb5cb08
--- /dev/null
+++ b/tests/test_blockcreatorqueue.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python3
+import sys, os
+import time
+sys.path.append(".")
+sys.path.append("src/")
+import uuid
+TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
+print("Test directory:", TEST_DIR)
+os.environ["ONIONR_HOME"] = TEST_DIR
+import unittest, json
+
+from utils import identifyhome, createdirs
+from onionrsetup import setup_config
+import blockcreatorqueue
+createdirs.create_dirs()
+setup_config()
+
+# BlockCreatorQueue
+# queue_block
+# queue_count
+# in_queue (sha3_256 hash)
+
+
+class TestBlockCreatorQueue(unittest.TestCase):
+
+ def test_in_queue(self):
+
+
+ def test_blockcreator_queue_1(self):
+ received_callback = [False]
+ def my_store_func(result):
+ self.assertIn(b"ok data", result.get_packed()) # if this fails received won't be true
+ received_callback[0] = True
+ blockcreatorqueue.BlockCreatorQueue(my_store_func).queue_block(b"ok data", "txt", 100)
+ time.sleep(0.1)
+ self.assertTrue(received_callback[0])
+
+ def test_blockcreator_queue_2(self):
+ queue = blockcreatorqueue.BlockCreatorQueue(lambda d: print(d))
+ queue.queued.add(os.urandom(28))
+ queue.queued.add(os.urandom(28))
+ queue.queue_block(b"test", "txt", 1000)
+ time.sleep(1)
+ self.assertEqual(len(queue.queued), 3)
+
+unittest.main()
diff --git a/tests/test_blockio.py b/tests/test_blockio.py
index 3bd1109f..d889610b 100644
--- a/tests/test_blockio.py
+++ b/tests/test_blockio.py
@@ -24,7 +24,7 @@ from utils import identifyhome
import safedb
import blockio
from blockio.clean.cleanblocklistentries import clean_block_list_entries
-
+from blockio import subprocgenerate
def _remove_db(path):
try:
@@ -35,6 +35,19 @@ def _remove_db(path):
class TestBlockIO(unittest.TestCase):
+ def test_subproc_generate(self):
+ db_file = identifyhome.identify_home() + 'test.db'
+ db = safedb.SafeDB(db_file)
+
+ bl: 'Kasten' = subprocgenerate.vdf_block(b"test", "txt", 10)
+
+ self.assertEqual(b"test", bl.data)
+ self.assertEqual("txt", bl.get_data_type())
+ self.assertEqual(330, bl.get_metadata()['rds'])
+
+ db.close()
+ _remove_db(db_file)
+
def test_list_all_blocks(self):
db_file = identifyhome.identify_home() + 'test.db'
db = safedb.SafeDB(db_file)