Finished identity serialization

This commit is contained in:
Kevin F 2022-08-31 00:30:28 -05:00
parent bddddd7c5b
commit 57b1e07715
10 changed files with 322 additions and 0 deletions

View File

@ -0,0 +1,32 @@
from typing import TYPE_CHECKING
from enum import IntEnum, auto
import struct
import msgpack
if TYPE_CHECKING:
from onionrblocks import Block
from .exceptions import InvalidWotBlock
class WotCommand(IntEnum):
TRUST = 1
UNTRUST = auto()
ANNOUNCE = auto()
REVOKE = auto()
class WotPayload:
def __init__(self, block_data: bytes):
wot_command = WotCommand(
int.from_bytes(block_data[0], byteorder='big'))
match wot_command(WotCommand):
case WotCommand.TRUST:
pass
def process_block(bl: 'Block'):
assert bl.type == 'iden'
wot_command = WotCommand(bl.data)

View File

@ -0,0 +1,2 @@
class InvalidWotBlock(Exception):
pass

View File

@ -0,0 +1,106 @@
from collections import deque
from enum import Enum
from typing import Set, Union
import time
from nacl.signing import SigningKey, VerifyKey
from nacl.encoding import Base32Encoder
from nacl.exceptions import BadSignatureError
from .name import IdentityName
from .name import max_len as max_name_len
from timestamp import WotTimestamp
short_identity_keys = {
'trusted': 't',
'name': 'n',
'key': 'k'
}
class WotSerializationError(Exception): pass
class Identity:
def __init__(
self,
key: Union[SigningKey, VerifyKey],
name: 'IdentityName',
created_date: WotTimestamp = None):
self.trusted: Set[Identity] = set()
self.name = IdentityName(name)
self.created_date = created_date
self.private_key = self.key = None
# SigningKey and VerifyKey have minimal memory overhead
# so we do not need to make them properties
if isinstance(key, SigningKey):
self.private_key = key
self.key = key.verify_key
elif isinstance(key, VerifyKey):
self.key = key
def __eq__(self, other):
return self.key == other
def __str__(self):
return self.key.encode(encoder=Base32Encoder).decode('utf-8')
def __hash__(self):
return hash(self.key)
def serialize(self) -> bytes:
"""
A serialized identity is the name signed by the private key plus
the public key and the date (used to prevent replay attacks)
"""
if not self.private_key:
raise WotSerializationError("Cannot serialize public identity")
signed = self.private_key.sign(
self.name.zfill(max_name_len).encode('utf-8') + bytes(self.key) +
str(int(time.time())).encode('utf-8'))
return signed.signature + signed.message
@classmethod
def deserialize(cls, serialized: bytes):
signature = serialized[:64]
message = serialized[64:]
name = message[:max_name_len].decode('utf-8').lstrip('0')
key = VerifyKey(message[max_name_len:max_name_len + 32])
date = WotTimestamp(message[max_name_len + 32:].decode('utf-8'))
if date > time.time():
raise WotSerializationError(
"Date in serialized identity is in the future")
elif date <= 0:
raise WotSerializationError("Date in serialized identity is <= 0")
try:
VerifyKey.verify(key, message, signature)
except BadSignatureError:
raise WotSerializationError(
"Signature in serialized identity is invalid")
return cls(key, name)
def get_distance(identity: Identity, identity2: Identity):
distance = 0
visited = set()
stack = deque([identity])
while stack:
current_iden = stack.popleft()
if current_iden == identity2:
return distance
distance += 1
if identity2 in current_iden.trusted:
return distance
for trusted in current_iden.trusted:
if trusted not in visited:
visited.add(trusted)
stack.append(trusted)
raise ValueError

View File

@ -0,0 +1,10 @@
max_len = 21
min_len = 1
class IdentityName(str):
def __new__(cls, data):
if data[0] == '0':
raise ValueError("Name cannot start with 0")
if not len(data) in list(range(1, 21)):
raise ValueError(f"Must be in range({min_len}, {max_len})")
return super().__new__(cls, data)

View File

@ -0,0 +1,9 @@
from typing import Generator
import blockdb
from .identity import Identity
def load_identities_from_blocks(blocks) -> Generator[Identity]:
for block in blockdb.get_blocks_by_type('wotb'):
yield Identity.deserialize(block.data)

View File

@ -0,0 +1,11 @@
from time import time
class WotTimestamp(int):
def __new__(cls, value: int):
value = int(value)
if value <= 0:
raise ValueError("Timestamp cannot be negative or zero")
elif value > time():
raise ValueError("Timestamp cannot be in the future")
return super().__new__(cls, value)

View File

@ -0,0 +1,25 @@
import os, uuid
from random import randint
from time import sleep
import secrets
import onionrblocks
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest
import sys
sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/')
sys.path.append("src/")
from wot import identity
from wot import process_block
class BlockProcessingTest(unittest.TestCase):
def test_block_processing_trust(self):
unittest.main()

View File

@ -0,0 +1,37 @@
import os, uuid
from random import randint
from time import sleep
import secrets
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest
import sys
sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/')
sys.path.append("src/")
from wot import identity
def generate_graph(iden: identity.Identity, depth, max_neighbors):
c = 0
if depth == 0:
return
for i in range(randint(0, max_neighbors)):
i = identity.Identity(secrets.token_hex(16))
iden.trusted.add(i)
generate_graph(i, depth - 1, max_neighbors)
class IdentityDistanceTest(unittest.TestCase):
def test_distance(self):
iden = identity.Identity(secrets.token_hex(16))
generate_graph(iden, 10, 5)
iden2 = list(list(iden.trusted)[0].trusted)[0]
self.assertEqual(identity.get_distance(iden, iden2), 2)
unittest.main()

View File

@ -0,0 +1,42 @@
import os, uuid
from random import randint
import secrets
from nacl import signing
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest
import sys
sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/wot/')
sys.path.append("src/")
from identity import Identity
class IdentityInitTest(unittest.TestCase):
def test_inden_init_privkey(self):
private_key = signing.SigningKey.generate()
iden = Identity(private_key, "test")
self.assertEqual(iden.name, "test")
self.assertEqual(iden.key, private_key.verify_key)
self.assertEqual(iden.private_key, private_key)
def test_iden_init_pubkey(self):
public = signing.SigningKey.generate().verify_key
iden = Identity(public, "test")
self.assertEqual(iden.name, "test")
self.assertEqual(iden.key, public)
self.assertEqual(iden.private_key, None)
def test_iden_init_pubkey_invalid_name(self):
public = signing.SigningKey.generate().verify_key
self.assertRaises(ValueError, Identity, public, secrets.token_hex(32))
unittest.main()

View File

@ -0,0 +1,48 @@
import os, uuid
from random import randint
import time
from nacl import signing
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest
import sys
sys.path.append('static-data/default-plugins/wot/wot')
sys.path.append("src/")
from identity import Identity
from identity.name import max_len
class IdentitySerializeTest(unittest.TestCase):
def test_iden_deserialize(self):
iden_priv_key = signing.SigningKey.generate()
iden_public = iden_priv_key.verify_key
serialized = iden_priv_key.sign("test".zfill(max_len).encode('utf-8') +
bytes(iden_public) +
str(int(time.time())).encode('utf-8'))
iden = Identity.deserialize(serialized)
self.assertEqual(iden.name, "test")
self.assertEqual(iden.key, iden_public)
self.assertEqual(iden.private_key, None)
def test_iden_serialize(self):
iden_priv_key = signing.SigningKey.generate()
iden_public = iden_priv_key.verify_key
# Onionr keys sign themselves + the date
# in order to prevent replay attacks
expected_serialized = \
iden_priv_key.sign("test".zfill(max_len).encode('utf-8') +
bytes(iden_public) +
str(int(time.time())).encode('utf-8'))
expected_serialized_len = len(expected_serialized)
identity = Identity(iden_priv_key, "test")
serialized = identity.serialize()
self.assertEqual(len(serialized), expected_serialized_len)
self.assertEqual(serialized, expected_serialized)
unittest.main()