Onionr/tests/test_blockio.py
2021-02-09 23:02:19 +00:00

144 lines
4.6 KiB
Python

#!/usr/bin/env python3
import sys, os
sys.path.append(".")
sys.path.append("src/")
import uuid
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest
import time
from utils import identifyhome, createdirs
from onionrsetup import setup_config, setup_default_plugins
import random
createdirs.create_dirs()
setup_config()
setup_default_plugins()
import kasten
from onionrblocks.generators import anonvdf
from onionrblocks import blockcreator
from utils import identifyhome
import safedb
import blockio
from blockio.clean.cleanblocklistentries import clean_block_list_entries
from blockio import subprocgenerate, subprocvalidate
def _remove_db(path):
try:
os.remove(path)
except FileNotFoundError:
pass
class TestBlockIO(unittest.TestCase):
def test_subproc_validate(self):
bl = blockcreator.create_anonvdf_block(b"hello" + int(3).to_bytes(1, "big"), b"txt" + int(3).to_bytes(1, "big"), 5)
invalid = os.urandom(64)
subprocvalidate.vdf_block(bl.id, bl.get_packed())
self.assertRaises(anonvdf.InvalidID, subprocvalidate.vdf_block, invalid, bl.get_packed())
def test_subproc_generate(self):
db_file = identifyhome.identify_home() + 'test.db'
db = safedb.SafeDB(db_file)
bl: 'Kasten' = subprocgenerate.vdf_block(b"test", "txt", 10)
self.assertEqual(b"test", bl.data)
self.assertEqual("txt", bl.get_data_type())
self.assertEqual(330, bl.get_metadata()['rds'])
db.close()
_remove_db(db_file)
def test_list_all_blocks(self):
db_file = identifyhome.identify_home() + 'test.db'
db = safedb.SafeDB(db_file)
expected_l = []
for i in range(5):
bl = blockcreator.create_anonvdf_block(b"hello" + int(i).to_bytes(1, "big"), b"txt" + int(i).to_bytes(1, "big"), 5)
blockio.store_block(bl, db)
expected_l.append(bl.id)
l = blockio.load.list_all_blocks(db)
self.assertEqual(len(l), 5)
for i in l:
self.assertIn(bytes(i), expected_l)
db.close()
_remove_db(db_file)
def test_clean_blocklist_entries(self):
db_file = identifyhome.identify_home() + 'test.db'
db = safedb.SafeDB(db_file)
bl = blockcreator.create_anonvdf_block(b"hello" + int(10).to_bytes(1, "big"), b"txt", 5)
blockio.store_block(bl, db)
db.db_conn[b'bl-txt'] = b''
clean_block_list_entries(db)
self.assertRaises(KeyError, db.get, 'bl-txt')
db.close()
_remove_db(db_file)
def test_clean_expired(self):
db_file = identifyhome.identify_home() + 'test.db'
db = safedb.SafeDB(db_file)
for i in range(3):
bl = blockcreator.create_anonvdf_block(b"hello" + int(i).to_bytes(1, "big"), b"txt", 5)
blockio.store_block(bl, db)
blockio.clean_expired_blocks(db)
time.sleep(1)
self.assertEqual(len(list(blockio.list_blocks_by_type("txt", db))), 3)
time.sleep(10.1)
blockio.clean_expired_blocks(db)
self.assertEqual(len(db.db_conn[b'bl-txt']), 0)
db.close()
_remove_db(db_file)
def test_store_block(self):
packed = kasten.generator.pack.pack(b"test", "tst")
bl: kasten.Kasten = anonvdf.AnonVDFGenerator.generate(packed, rounds=1000)
db_file = identifyhome.identify_home() + 'test.db'
db = safedb.SafeDB(db_file)
blockio.store_block(bl, db)
db.close()
_remove_db(db_file)
def test_store_dupe(self):
db_file = identifyhome.identify_home() + 'test.db'
_remove_db(db_file)
packed = kasten.generator.pack.pack(b"test", "tst")
bl: kasten.Kasten = anonvdf.AnonVDFGenerator.generate(packed, rounds=1000)
db = safedb.SafeDB(db_file)
blockio.store_block(bl, db)
self.assertRaises(ValueError, blockio.store_block, bl, db)
db.close()
_remove_db(db_file)
def test_list_blocks(self):
db_file = identifyhome.identify_home() + 'test.db'
_remove_db(db_file)
db = safedb.SafeDB(db_file)
expected_list = []
for i in range(10):
bl = blockcreator.create_anonvdf_block(b'test' + int(i).to_bytes(1, 'big'), 'txt', 60)
blockio.store_block(bl, db)
expected_list.append(bl.id)
l = blockio.list_blocks_by_type('txt', db)
self.assertEqual(len(list(l)), len(expected_list))
for i in l:
self.assertIn(bytes(i), expected_list)
db.close()
_remove_db(db_file)
unittest.main()