From 63f7209bddf02df290eac70e0ac9bc17f7098029 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Fri, 29 Jan 2021 21:15:18 +0000 Subject: [PATCH] Added blockcreatorqueue and subprocess generator for new block system --- src/anonvdf-block-creator.py | 23 +++++++++ src/blockcreatorqueue/__init__.py | 69 +++++++++++++++++++++++++++ src/blockio/__init__.py | 1 + src/blockio/subprocgenerate.py | 25 ++++++++++ src/httpapi/serializedapi/__init__.py | 21 ++++---- tests/test_blockcreatorqueue.py | 46 ++++++++++++++++++ tests/test_blockio.py | 15 +++++- 7 files changed, 190 insertions(+), 10 deletions(-) create mode 100755 src/anonvdf-block-creator.py create mode 100644 src/blockcreatorqueue/__init__.py create mode 100644 src/blockio/subprocgenerate.py create mode 100644 tests/test_blockcreatorqueue.py diff --git a/src/anonvdf-block-creator.py b/src/anonvdf-block-creator.py new file mode 100755 index 00000000..4ac1f08c --- /dev/null +++ b/src/anonvdf-block-creator.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +import sys +import os + +import ujson as json + +from onionrblocks import blockcreator + +# This script creates a block without storing it. it is written to stdout +# It is used instead of in the main process to avoid GIL locking/slow down + +metadata = json.loads(sys.argv[1]) +block_type = sys.argv[2] +ttl = int(sys.argv[3]) + +data = sys.stdin.read() + +with os.fdopen(sys.stdout.fileno(), 'wb') as stdout: + bl = blockcreator.create_anonvdf_block(data, block_type, ttl, **metadata) + try: + stdout.write(bl.id + bl.get_packed()) + except BrokenPipeError: + pass diff --git a/src/blockcreatorqueue/__init__.py b/src/blockcreatorqueue/__init__.py new file mode 100644 index 00000000..67366878 --- /dev/null +++ b/src/blockcreatorqueue/__init__.py @@ -0,0 +1,69 @@ +"""Onionr - Private P2P Communication. + +BlockCreatorQueue, generate anonvdf blocks in a queue +""" +from typing import Callable +from threading import Thread +from hashlib import sha3_224 +from os import cpu_count +import time + +import blockio +""" +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + + +class AlreadyGenerating(Exception): pass # noqa + + +class BlockCreatorQueue: + def __init__( + self, callback_func: Callable, *additional_callback_func_args, + **additional_callback_func_kwargs): + self.callback_func = callback_func + self.queued = set() + self.max_parallel = cpu_count() + self.additional_callback_func_args = additional_callback_func_args + self.additional_callback_func_kwargs = additional_callback_func_kwargs + + def block_data_in_queue(self, block_data: bytes) -> bool: + if sha3_224(block_data).digest() in self.queued: + return True + return False + + def queue_block( + self, block_data, block_type, ttl: int, **block_metadata) -> bytes: + """Spawn a thread to make a subprocess to generate a block + if queue is not full, else wait""" + + digest = sha3_224(block_data).digest() + + def _do_create(): + if digest in self.queued: + raise AlreadyGenerating() + self.queued.add(digest) + while len(self.queued) >= self.max_parallel: + time.sleep(1) + result = blockio.subprocgenerate.vdf_block( + block_data, block_type, ttl, **block_metadata) + self.queued.remove(digest) + self.callback_func( + result, + *self.additional_callback_func_args, + **self.additional_callback_func_kwargs) + + Thread(target=_do_create, daemon=True).start() + return digest + diff --git a/src/blockio/__init__.py b/src/blockio/__init__.py index df0f8d05..0cb504b8 100644 --- a/src/blockio/__init__.py +++ b/src/blockio/__init__.py @@ -5,6 +5,7 @@ Wrap safedb for storing and fetching blocks from .store import store_block from .load import load_block, list_blocks_by_type, list_all_blocks from .clean import clean_expired_blocks, clean_block_list_entries +from . import subprocgenerate """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by diff --git a/src/blockio/subprocgenerate.py b/src/blockio/subprocgenerate.py new file mode 100644 index 00000000..a59c1ded --- /dev/null +++ b/src/blockio/subprocgenerate.py @@ -0,0 +1,25 @@ +import os +import subprocess + +import ujson as json + +import kasten +from onionrblocks.generators.anonvdf import AnonVDFGenerator + +_DIR = os.path.dirname(os.path.realpath(__file__)) + '/../' + + +def vdf_block(data, data_type, ttl, **metadata): + generated = subprocess.Popen( + [ + f'{_DIR}anonvdf-block-creator.py', + json.dumps(metadata), + data_type, + str(ttl)], + stdout=subprocess.PIPE, + stdin=subprocess.PIPE) + generated = generated.communicate(data)[0] + return kasten.Kasten( + generated[:64], generated[64:], + AnonVDFGenerator, auto_check_generator=True) + diff --git a/src/httpapi/serializedapi/__init__.py b/src/httpapi/serializedapi/__init__.py index 2866eb7c..345ccc3c 100644 --- a/src/httpapi/serializedapi/__init__.py +++ b/src/httpapi/serializedapi/__init__.py @@ -3,7 +3,7 @@ view and interact with onionr sites """ from flask import Blueprint, Response, request, abort, g -import json +import ujson as json """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,6 +26,15 @@ serialized_api_bp = Blueprint('serializedapi', __name__) @serialized_api_bp.route( '/serialized/', endpoint='serialized', methods=['POST']) def serialized(name: str) -> Response: + def _do_call(method, *args, **kwargs): + try: + resp = method(*args, **kwargs) + if isinstance(resp, int): + resp = str(resp) + return Response(resp, content_type='application/octet-stream') + except Exception as e: + return Response(repr(e), content_type='text/plain', status=500) + initial = g.too_many.get_by_string(name.split('.')[0]) for c, i in enumerate(name.split('.')): if i and c != 0: @@ -45,16 +54,10 @@ def serialized(name: str) -> Response: print('data', data) if data: print(*args, **data) - resp = attr(*args, **data) - if isinstance(resp, int): - resp = str(resp) - return Response(resp) + return _do_call(attr, *args, **data) else: print(*args, **data) - resp = attr(*args) - if isinstance(resp, int): - resp = str(resp) - return Response(resp, content_type='application/octet-stream') + return _do_call(attr, *args) else: if isinstance(attr, int): attr = str(attr) diff --git a/tests/test_blockcreatorqueue.py b/tests/test_blockcreatorqueue.py new file mode 100644 index 00000000..9fb5cb08 --- /dev/null +++ b/tests/test_blockcreatorqueue.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +import sys, os +import time +sys.path.append(".") +sys.path.append("src/") +import uuid +TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR +import unittest, json + +from utils import identifyhome, createdirs +from onionrsetup import setup_config +import blockcreatorqueue +createdirs.create_dirs() +setup_config() + +# BlockCreatorQueue +# queue_block +# queue_count +# in_queue (sha3_256 hash) + + +class TestBlockCreatorQueue(unittest.TestCase): + + def test_in_queue(self): + + + def test_blockcreator_queue_1(self): + received_callback = [False] + def my_store_func(result): + self.assertIn(b"ok data", result.get_packed()) # if this fails received won't be true + received_callback[0] = True + blockcreatorqueue.BlockCreatorQueue(my_store_func).queue_block(b"ok data", "txt", 100) + time.sleep(0.1) + self.assertTrue(received_callback[0]) + + def test_blockcreator_queue_2(self): + queue = blockcreatorqueue.BlockCreatorQueue(lambda d: print(d)) + queue.queued.add(os.urandom(28)) + queue.queued.add(os.urandom(28)) + queue.queue_block(b"test", "txt", 1000) + time.sleep(1) + self.assertEqual(len(queue.queued), 3) + +unittest.main() diff --git a/tests/test_blockio.py b/tests/test_blockio.py index 3bd1109f..d889610b 100644 --- a/tests/test_blockio.py +++ b/tests/test_blockio.py @@ -24,7 +24,7 @@ from utils import identifyhome import safedb import blockio from blockio.clean.cleanblocklistentries import clean_block_list_entries - +from blockio import subprocgenerate def _remove_db(path): try: @@ -35,6 +35,19 @@ def _remove_db(path): class TestBlockIO(unittest.TestCase): + def test_subproc_generate(self): + db_file = identifyhome.identify_home() + 'test.db' + db = safedb.SafeDB(db_file) + + bl: 'Kasten' = subprocgenerate.vdf_block(b"test", "txt", 10) + + self.assertEqual(b"test", bl.data) + self.assertEqual("txt", bl.get_data_type()) + self.assertEqual(330, bl.get_metadata()['rds']) + + db.close() + _remove_db(db_file) + def test_list_all_blocks(self): db_file = identifyhome.identify_home() + 'test.db' db = safedb.SafeDB(db_file)