Compare commits
2 Commits
57b1e07715
...
e3d06ff0f5
Author | SHA1 | Date | |
---|---|---|---|
|
e3d06ff0f5 | ||
|
83007cb28d |
@ -6,7 +6,12 @@ from onionrblocks import Block
|
|||||||
|
|
||||||
from .dbpath import block_db_path
|
from .dbpath import block_db_path
|
||||||
|
|
||||||
|
|
||||||
def get_blocks_by_type(block_type: str) -> "Generator[Block]":
|
def get_blocks_by_type(block_type: str) -> "Generator[Block]":
|
||||||
|
try:
|
||||||
|
block_type = block_type.decode('utf-8')
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
block_db = db.get_db_obj(block_db_path, 'u')
|
block_db = db.get_db_obj(block_db_path, 'u')
|
||||||
for block_hash in db.list_keys(block_db_path):
|
for block_hash in db.list_keys(block_db_path):
|
||||||
block = Block(block_hash, block_db[block_hash], auto_verify=False)
|
block = Block(block_hash, block_db[block_hash], auto_verify=False)
|
||||||
@ -25,4 +30,4 @@ def get_blocks_after_timestamp(
|
|||||||
if block_type == block.type:
|
if block_type == block.type:
|
||||||
yield block
|
yield block
|
||||||
else:
|
else:
|
||||||
yield block
|
yield block
|
||||||
|
@ -9,11 +9,6 @@ gossip_server_socket_file = home + 'gossip-server.sock'
|
|||||||
|
|
||||||
usage_file = home + 'disk-usage.txt'
|
usage_file = home + 'disk-usage.txt'
|
||||||
contacts_location = home + 'contacts/'
|
contacts_location = home + 'contacts/'
|
||||||
public_API_host_file = home + 'public-host.txt'
|
|
||||||
private_API_host_file = home + 'private-host.txt'
|
|
||||||
bootstrap_file_location = 'static-data/bootstrap-nodes.txt'
|
|
||||||
data_nonce_file = home + 'block-nonces.dat'
|
|
||||||
forward_keys_file = home + 'forward-keys.db'
|
|
||||||
cached_storage = home + 'cachedstorage.dat'
|
cached_storage = home + 'cachedstorage.dat'
|
||||||
export_location = home + 'block-export/'
|
export_location = home + 'block-export/'
|
||||||
upload_list = home + 'upload-list.json'
|
upload_list = home + 'upload-list.json'
|
||||||
@ -27,8 +22,6 @@ site_cache = home + 'onionr-sites.txt'
|
|||||||
tor_hs_loc = home + 'hs/'
|
tor_hs_loc = home + 'hs/'
|
||||||
tor_hs_address_file = home + 'hs/hostname'
|
tor_hs_address_file = home + 'hs/hostname'
|
||||||
|
|
||||||
data_nonce_file = home + 'block-nonces.dat'
|
|
||||||
|
|
||||||
keys_file = home + 'keys.txt'
|
keys_file = home + 'keys.txt'
|
||||||
|
|
||||||
onboarding_mark_file = home + 'onboarding-completed'
|
onboarding_mark_file = home + 'onboarding-completed'
|
||||||
@ -38,5 +31,3 @@ log_file = home + 'onionr.log'
|
|||||||
ephemeral_services_file = home + 'ephemeral-services.list'
|
ephemeral_services_file = home + 'ephemeral-services.list'
|
||||||
|
|
||||||
restarting_indicator = home + "is-restarting"
|
restarting_indicator = home + "is-restarting"
|
||||||
|
|
||||||
block_database = home + "blocks.db"
|
|
||||||
|
@ -36,5 +36,6 @@ def store_blocks(dandelion_phase: 'DandelionPhase'):
|
|||||||
bl = new_queue.get(timeout=dandelion_phase.remaining_time() + 1)
|
bl = new_queue.get(timeout=dandelion_phase.remaining_time() + 1)
|
||||||
blockdb.add_block_to_db(bl)
|
blockdb.add_block_to_db(bl)
|
||||||
event('gotblock', data=bl, threaded=True)
|
event('gotblock', data=bl, threaded=True)
|
||||||
|
event(f'gotblock{bl.type}', data=bl, threaded=True)
|
||||||
except Empty:
|
except Empty:
|
||||||
pass
|
pass
|
||||||
|
@ -1,81 +0,0 @@
|
|||||||
"""Onionr - Private P2P Communication.
|
|
||||||
|
|
||||||
Load, save, and delete the user's public key pairs (does not handle peer keys)
|
|
||||||
"""
|
|
||||||
from onionrutils import bytesconverter
|
|
||||||
from onionrcrypto import generate
|
|
||||||
import filepaths
|
|
||||||
"""
|
|
||||||
This program is free software: you can redistribute it and/or modify
|
|
||||||
it under the terms of the GNU General Public License as published by
|
|
||||||
the Free Software Foundation, either version 3 of the License, or
|
|
||||||
(at your option) any later version.
|
|
||||||
|
|
||||||
This program is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
GNU General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class KeyManager:
|
|
||||||
def __init__(self):
|
|
||||||
self.keyFile = filepaths.keys_file
|
|
||||||
|
|
||||||
def addKey(self, pubKey=None, privKey=None):
|
|
||||||
"""Add a new key pair.
|
|
||||||
|
|
||||||
either specified or None to generate a new pair automatically
|
|
||||||
"""
|
|
||||||
if type(pubKey) is type(None) and type(privKey) is type(None):
|
|
||||||
pubKey, privKey = generate.generate_pub_key()
|
|
||||||
pubKey = bytesconverter.bytes_to_str(pubKey)
|
|
||||||
privKey = bytesconverter.bytes_to_str(privKey)
|
|
||||||
try:
|
|
||||||
if pubKey in self.getPubkeyList():
|
|
||||||
raise ValueError('Pubkey already in list: %s' % (pubKey,))
|
|
||||||
except FileNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
with open(self.keyFile, "a") as keyFile:
|
|
||||||
keyFile.write(pubKey + ',' + privKey + '\n')
|
|
||||||
return (pubKey, privKey)
|
|
||||||
|
|
||||||
def removeKey(self, pubKey):
|
|
||||||
"""Remove a key pair by pubkey"""
|
|
||||||
keyList = self.getPubkeyList()
|
|
||||||
keyData = ''
|
|
||||||
try:
|
|
||||||
keyList.remove(pubKey)
|
|
||||||
except ValueError:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
keyData = ','.join(keyList)
|
|
||||||
with open(self.keyFile, "w") as keyFile:
|
|
||||||
keyFile.write(keyData)
|
|
||||||
|
|
||||||
def getPubkeyList(self):
|
|
||||||
"""Return a list of the user's keys"""
|
|
||||||
keyList = []
|
|
||||||
try:
|
|
||||||
with open(self.keyFile, "r") as keyFile:
|
|
||||||
keyData = keyFile.read()
|
|
||||||
except FileNotFoundError:
|
|
||||||
keyData = ''
|
|
||||||
keyData = keyData.split('\n')
|
|
||||||
for pair in keyData:
|
|
||||||
if len(pair) > 0:
|
|
||||||
keyList.append(pair.split(',')[0])
|
|
||||||
return keyList
|
|
||||||
|
|
||||||
def getPrivkey(self, pubKey):
|
|
||||||
privKey = None
|
|
||||||
with open(self.keyFile, "r") as keyFile:
|
|
||||||
keyData = keyFile.read()
|
|
||||||
for pair in keyData.split('\n'):
|
|
||||||
if pubKey in pair or pubKey.replace('=', '') in pair:
|
|
||||||
privKey = pair.split(',')[1]
|
|
||||||
return privKey
|
|
@ -18,7 +18,7 @@
|
|||||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
from . import generate, hashers, getourkeypair, signing, encryption, cryptoutils
|
from . import generate, getourkeypair, signing, encryption, cryptoutils
|
||||||
generate_deterministic = generate.generate_deterministic
|
generate_deterministic = generate.generate_deterministic
|
||||||
generate = generate.generate_pub_key
|
generate = generate.generate_pub_key
|
||||||
|
|
||||||
|
@ -31,10 +31,8 @@ def generate_pub_key():
|
|||||||
return tuple of base32encoded pubkey, privkey
|
return tuple of base32encoded pubkey, privkey
|
||||||
"""
|
"""
|
||||||
private_key = nacl.signing.SigningKey.generate()
|
private_key = nacl.signing.SigningKey.generate()
|
||||||
public_key = private_key.verify_key.encode(
|
public_key = private_key.verify_key
|
||||||
encoder=nacl.encoding.Base32Encoder())
|
return (public_key, private_key)
|
||||||
return (public_key.decode(), private_key.encode(
|
|
||||||
encoder=nacl.encoding.Base32Encoder()).decode())
|
|
||||||
|
|
||||||
|
|
||||||
def generate_deterministic(passphrase, bypassCheck=False):
|
def generate_deterministic(passphrase, bypassCheck=False):
|
||||||
|
@ -1,21 +0,0 @@
|
|||||||
import hashlib
|
|
||||||
|
|
||||||
import nacl.hash
|
|
||||||
|
|
||||||
|
|
||||||
def sha3_hash(data):
|
|
||||||
try:
|
|
||||||
data = data.encode()
|
|
||||||
except AttributeError:
|
|
||||||
pass
|
|
||||||
hasher = hashlib.sha3_256()
|
|
||||||
hasher.update(data)
|
|
||||||
return hasher.hexdigest()
|
|
||||||
|
|
||||||
|
|
||||||
def blake2b_hash(data):
|
|
||||||
try:
|
|
||||||
data = data.encode()
|
|
||||||
except AttributeError:
|
|
||||||
pass
|
|
||||||
return nacl.hash.blake2b(data)
|
|
@ -48,7 +48,9 @@ class TorPeer:
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
logger.debug(f"Could not create socket to peer {self.transport_address}", terminal=True)
|
logger.debug(
|
||||||
|
f"Could not create socket to peer {self.transport_address}",
|
||||||
|
terminal=True)
|
||||||
raise TimeoutError
|
raise TimeoutError
|
||||||
mock_recv = HandleRevc(s)
|
mock_recv = HandleRevc(s)
|
||||||
s.recv = mock_recv.recv
|
s.recv = mock_recv.recv
|
||||||
|
@ -35,7 +35,7 @@ GNU General Public License for more details.
|
|||||||
You should have received a copy of the GNU General Public License
|
You should have received a copy of the GNU General Public License
|
||||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
"""
|
"""
|
||||||
|
from wot import loadfromblocks, identities
|
||||||
|
|
||||||
plugin_name = 'wot'
|
plugin_name = 'wot'
|
||||||
PLUGIN_VERSION = '0.0.0'
|
PLUGIN_VERSION = '0.0.0'
|
||||||
@ -50,3 +50,5 @@ def on_init(api, data=None):
|
|||||||
logger.info(
|
logger.info(
|
||||||
f"Web of Trust Plugin v{PLUGIN_VERSION} enabled", terminal=True)
|
f"Web of Trust Plugin v{PLUGIN_VERSION} enabled", terminal=True)
|
||||||
onionrplugins.plugin_apis['wot'] = wot_test
|
onionrplugins.plugin_apis['wot'] = wot_test
|
||||||
|
|
||||||
|
list(map(lambda x: identities.add(x), loadfromblocks.load_identities()))
|
||||||
|
@ -1,40 +0,0 @@
|
|||||||
import os, uuid
|
|
||||||
from random import randint
|
|
||||||
from sqlite3 import Time
|
|
||||||
import socket
|
|
||||||
from queue import Queue
|
|
||||||
from time import sleep
|
|
||||||
import secrets
|
|
||||||
|
|
||||||
|
|
||||||
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
|
|
||||||
print("Test directory:", TEST_DIR)
|
|
||||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
|
||||||
|
|
||||||
import unittest
|
|
||||||
import sys
|
|
||||||
sys.path.append(".")
|
|
||||||
sys.path.append('static-data/default-plugins/wot/')
|
|
||||||
sys.path.append("src/")
|
|
||||||
from wot import identity
|
|
||||||
|
|
||||||
|
|
||||||
def generate_graph(iden: identity.Identity, depth, max_neighbors):
|
|
||||||
c = 0
|
|
||||||
if depth == 0:
|
|
||||||
return
|
|
||||||
for i in range(randint(0, max_neighbors)):
|
|
||||||
i = identity.Identity(secrets.token_hex(16))
|
|
||||||
iden.trusted.add(i)
|
|
||||||
generate_graph(i, depth - 1, max_neighbors)
|
|
||||||
|
|
||||||
|
|
||||||
class IdentityDistanceTest(unittest.TestCase):
|
|
||||||
def test_distance(self):
|
|
||||||
iden = identity.Identity(secrets.token_hex(16))
|
|
||||||
generate_graph(iden, 10, 5)
|
|
||||||
iden2 = list(list(iden.trusted)[0].trusted)[0]
|
|
||||||
|
|
||||||
self.assertEqual(identity.get_distance(iden, iden2), 2)
|
|
||||||
|
|
||||||
unittest.main()
|
|
@ -1,2 +1,11 @@
|
|||||||
# The web of trust is a graph of identities where each edge is a signature
|
# The web of trust is a graph of identities where each edge is a signature
|
||||||
# of a byte representing a trust level and an identity's public key
|
# of a byte representing a trust level and an identity's public key
|
||||||
|
from typing import TYPE_CHECKING, Set
|
||||||
|
|
||||||
|
from .identity import Identity
|
||||||
|
from .blockprocessing import process_block
|
||||||
|
|
||||||
|
|
||||||
|
# Set of identites within N-distance trust
|
||||||
|
identities: Set['Identity'] = set()
|
||||||
|
|
||||||
|
@ -1,2 +1,2 @@
|
|||||||
class InvalidWotBlock(Exception):
|
class InvalidWotBlock(Exception): pass
|
||||||
pass
|
class IdentitySerializationError(Exception): pass
|
@ -1,42 +0,0 @@
|
|||||||
from collections import deque
|
|
||||||
from typing import Set, Union, List
|
|
||||||
|
|
||||||
|
|
||||||
identities: List['Identity'] = []
|
|
||||||
|
|
||||||
|
|
||||||
class Identity:
|
|
||||||
def __init__(self, key: Union['Ed25519PublicKey', 'Ed25519PrivateKey']):
|
|
||||||
self.trusted: Set[Identity] = set()
|
|
||||||
self.key = key
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return self.key == other
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self.key
|
|
||||||
|
|
||||||
def __hash__(self):
|
|
||||||
return hash(self.key)
|
|
||||||
|
|
||||||
|
|
||||||
def get_distance(identity: Identity, identity2: Identity):
|
|
||||||
distance = 0
|
|
||||||
visited = set()
|
|
||||||
stack = deque([identity])
|
|
||||||
|
|
||||||
while stack:
|
|
||||||
current_iden = stack.popleft()
|
|
||||||
|
|
||||||
if current_iden == identity2:
|
|
||||||
return distance
|
|
||||||
distance += 1
|
|
||||||
|
|
||||||
if identity2 in current_iden.trusted:
|
|
||||||
return distance
|
|
||||||
|
|
||||||
for trusted in current_iden.trusted:
|
|
||||||
if trusted not in visited:
|
|
||||||
visited.add(trusted)
|
|
||||||
stack.append(trusted)
|
|
||||||
raise ValueError
|
|
@ -9,6 +9,7 @@ from nacl.exceptions import BadSignatureError
|
|||||||
|
|
||||||
from .name import IdentityName
|
from .name import IdentityName
|
||||||
from .name import max_len as max_name_len
|
from .name import max_len as max_name_len
|
||||||
|
from exceptions import IdentitySerializationError
|
||||||
from timestamp import WotTimestamp
|
from timestamp import WotTimestamp
|
||||||
|
|
||||||
|
|
||||||
@ -18,7 +19,6 @@ short_identity_keys = {
|
|||||||
'key': 'k'
|
'key': 'k'
|
||||||
}
|
}
|
||||||
|
|
||||||
class WotSerializationError(Exception): pass
|
|
||||||
|
|
||||||
|
|
||||||
class Identity:
|
class Identity:
|
||||||
@ -57,7 +57,7 @@ class Identity:
|
|||||||
the public key and the date (used to prevent replay attacks)
|
the public key and the date (used to prevent replay attacks)
|
||||||
"""
|
"""
|
||||||
if not self.private_key:
|
if not self.private_key:
|
||||||
raise WotSerializationError("Cannot serialize public identity")
|
raise IdentitySerializationError("Cannot serialize public identity")
|
||||||
signed = self.private_key.sign(
|
signed = self.private_key.sign(
|
||||||
self.name.zfill(max_name_len).encode('utf-8') + bytes(self.key) +
|
self.name.zfill(max_name_len).encode('utf-8') + bytes(self.key) +
|
||||||
str(int(time.time())).encode('utf-8'))
|
str(int(time.time())).encode('utf-8'))
|
||||||
@ -72,14 +72,14 @@ class Identity:
|
|||||||
key = VerifyKey(message[max_name_len:max_name_len + 32])
|
key = VerifyKey(message[max_name_len:max_name_len + 32])
|
||||||
date = WotTimestamp(message[max_name_len + 32:].decode('utf-8'))
|
date = WotTimestamp(message[max_name_len + 32:].decode('utf-8'))
|
||||||
if date > time.time():
|
if date > time.time():
|
||||||
raise WotSerializationError(
|
raise IdentitySerializationError(
|
||||||
"Date in serialized identity is in the future")
|
"Date in serialized identity is in the future")
|
||||||
elif date <= 0:
|
elif date <= 0:
|
||||||
raise WotSerializationError("Date in serialized identity is <= 0")
|
raise IdentitySerializationError("Date in serialized identity is <= 0")
|
||||||
try:
|
try:
|
||||||
VerifyKey.verify(key, message, signature)
|
VerifyKey.verify(key, message, signature)
|
||||||
except BadSignatureError:
|
except BadSignatureError:
|
||||||
raise WotSerializationError(
|
raise IdentitySerializationError(
|
||||||
"Signature in serialized identity is invalid")
|
"Signature in serialized identity is invalid")
|
||||||
return cls(key, name)
|
return cls(key, name)
|
||||||
|
|
||||||
|
@ -1,9 +1,17 @@
|
|||||||
from typing import Generator
|
from typing import Generator
|
||||||
import blockdb
|
import blockdb
|
||||||
|
|
||||||
from .identity import Identity
|
from identity import Identity
|
||||||
|
from exceptions import IdentitySerializationError
|
||||||
|
|
||||||
|
|
||||||
def load_identities_from_blocks(blocks) -> Generator[Identity]:
|
def load_identity_from_block(block) -> Identity:
|
||||||
for block in blockdb.get_blocks_by_type('wotb'):
|
return Identity.deserialize(block.data)
|
||||||
yield Identity.deserialize(block.data)
|
|
||||||
|
|
||||||
|
def load_identities_from_blocks() -> Generator[Identity, None, None]:
|
||||||
|
for block in blockdb.get_blocks_by_type(b'wotb'):
|
||||||
|
try:
|
||||||
|
yield load_identity_from_block(block)
|
||||||
|
except IdentitySerializationError:
|
||||||
|
pass
|
||||||
|
@ -11,27 +11,27 @@ os.environ["ONIONR_HOME"] = TEST_DIR
|
|||||||
import unittest
|
import unittest
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(".")
|
sys.path.append(".")
|
||||||
sys.path.append('static-data/default-plugins/wot/')
|
sys.path.append('static-data/default-plugins/wot/wot')
|
||||||
sys.path.append("src/")
|
sys.path.append("src/")
|
||||||
from wot import identity
|
from identity import Identity, get_distance
|
||||||
|
|
||||||
|
|
||||||
def generate_graph(iden: identity.Identity, depth, max_neighbors):
|
def generate_graph(iden: Identity, depth, max_neighbors):
|
||||||
c = 0
|
c = 0
|
||||||
if depth == 0:
|
if depth == 0:
|
||||||
return
|
return
|
||||||
for i in range(randint(0, max_neighbors)):
|
for i in range(randint(0, max_neighbors)):
|
||||||
i = identity.Identity(secrets.token_hex(16))
|
i = Identity(os.urandom(32), "1" + secrets.token_hex(4))
|
||||||
iden.trusted.add(i)
|
iden.trusted.add(i)
|
||||||
generate_graph(i, depth - 1, max_neighbors)
|
generate_graph(i, depth - 1, max_neighbors)
|
||||||
|
|
||||||
|
|
||||||
class IdentityDistanceTest(unittest.TestCase):
|
class IdentityDistanceTest(unittest.TestCase):
|
||||||
def test_distance(self):
|
def test_distance(self):
|
||||||
iden = identity.Identity(secrets.token_hex(16))
|
iden = Identity(os.urandom(32), "1" + secrets.token_hex(4))
|
||||||
generate_graph(iden, 10, 5)
|
generate_graph(iden, 10, 5)
|
||||||
iden2 = list(list(iden.trusted)[0].trusted)[0]
|
iden2 = list(list(iden.trusted)[0].trusted)[0]
|
||||||
|
|
||||||
self.assertEqual(identity.get_distance(iden, iden2), 2)
|
self.assertEqual(get_distance(iden, iden2), 2)
|
||||||
|
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -0,0 +1,59 @@
|
|||||||
|
import dbm
|
||||||
|
import os, uuid
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
|
||||||
|
print("Test directory:", TEST_DIR)
|
||||||
|
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||||
|
os.makedirs(TEST_DIR)
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import sys
|
||||||
|
sys.path.append('static-data/default-plugins/wot/wot')
|
||||||
|
sys.path.append("src/")
|
||||||
|
import onionrblocks
|
||||||
|
from blockdb import block_db_path
|
||||||
|
from identity import Identity
|
||||||
|
from loadfromblocks import load_identities_from_blocks
|
||||||
|
import blockdb
|
||||||
|
|
||||||
|
def _safe_remove(path):
|
||||||
|
try:
|
||||||
|
os.remove(path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class LoadIdentitiesFromBlocksTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_load_from_blocks_no_blocks(self):
|
||||||
|
_safe_remove(block_db_path)
|
||||||
|
self.assertEqual(len(list(load_identities_from_blocks())), 0)
|
||||||
|
|
||||||
|
def test_load_from_blocks_one(self):
|
||||||
|
_safe_remove(block_db_path)
|
||||||
|
|
||||||
|
serialized_identity = b'jp\x18\xccB\xbb\xb5T\xae%\xc2NfvF\xd9e\xdb\xd1\x11\x13\x8al\x9f\x9d\xb7/\xc5\x0eG\xe9g{f\xa2\n\r\xe3cK\x96E\x01d\xbbz\xb5\xb1\x1eRA`\x94\xab\xf2\n",\xfe\xca\x0b\xb4v\x0500000000000000000test\x1b\xc8\x8d\x88\xe39\xeb\xbe\\\xbd\xc8[xD\xbcr\x1f\xa4\x03%p\x19\xf7\xd7%6S\xef*\x03\x91\xe31662057071'
|
||||||
|
|
||||||
|
bl = onionrblocks.create_anonvdf_block(
|
||||||
|
serialized_identity, b'wotb', 3600)
|
||||||
|
|
||||||
|
with dbm.open(block_db_path, 'c') as db:
|
||||||
|
db[bl.id] = bl.raw
|
||||||
|
|
||||||
|
self.assertEqual(len(list(load_identities_from_blocks())), 1)
|
||||||
|
|
||||||
|
def test_load_from_blocks_one_invalid(self):
|
||||||
|
_safe_remove(block_db_path)
|
||||||
|
serialized_identity_invalid = b'jp\x18\xccB\xbb\xb5T\xae%\xc2NfvF\xd9e\xdb\xd1\x12\x14\x8al\x9f\x9d\xb7/\xc5\x0eG\xe9g{f\xa2\n\r\xe3cK\x96E\x01d\xbbz\xb5\xb1\x1eRA`\x94\xab\xf2\n",\xfe\xca\x0b\xb4v\x0500000000000000000test\x1b\xc8\x8d\x88\xe39\xeb\xbe\\\xbd\xc8[xD\xbcr\x1f\xa4\x03%p\x19\xf7\xd7%6S\xef*\x03\x91\xe31662057071'
|
||||||
|
bl = onionrblocks.create_anonvdf_block(
|
||||||
|
serialized_identity_invalid, b'wotb', 3600)
|
||||||
|
|
||||||
|
with dbm.open(block_db_path, 'c') as db:
|
||||||
|
db[bl.id] = bl.raw
|
||||||
|
|
||||||
|
self.assertEqual(len(list(load_identities_from_blocks())), 0)
|
||||||
|
|
||||||
|
|
||||||
|
unittest.main()
|
@ -42,6 +42,7 @@ class IdentitySerializeTest(unittest.TestCase):
|
|||||||
|
|
||||||
identity = Identity(iden_priv_key, "test")
|
identity = Identity(iden_priv_key, "test")
|
||||||
serialized = identity.serialize()
|
serialized = identity.serialize()
|
||||||
|
print(serialized)
|
||||||
self.assertEqual(len(serialized), expected_serialized_len)
|
self.assertEqual(len(serialized), expected_serialized_len)
|
||||||
self.assertEqual(serialized, expected_serialized)
|
self.assertEqual(serialized, expected_serialized)
|
||||||
|
|
||||||
|
@ -1,28 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
import sys, os
|
|
||||||
sys.path.append(".")
|
|
||||||
sys.path.append("src/")
|
|
||||||
import uuid
|
|
||||||
TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/'
|
|
||||||
print("Test directory:", TEST_DIR)
|
|
||||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
|
||||||
import unittest, json
|
|
||||||
|
|
||||||
import time, math
|
|
||||||
|
|
||||||
from utils import bettersleep
|
|
||||||
|
|
||||||
|
|
||||||
class TestBetterSleep(unittest.TestCase):
|
|
||||||
def test_better_sleep(self):
|
|
||||||
s = math.floor(time.time())
|
|
||||||
t = 1
|
|
||||||
bettersleep.sleep(t)
|
|
||||||
self.assertEqual(math.floor(time.time()) - s, t)
|
|
||||||
|
|
||||||
def test_no_ctrl_c(self):
|
|
||||||
# TODO: figure out how to automate ctrl-c test
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
unittest.main()
|
|
Loading…
Reference in New Issue
Block a user