From 57b1e077150f08a12aacf776037d841f388e12db Mon Sep 17 00:00:00 2001 From: Kevin F Date: Wed, 31 Aug 2022 00:30:28 -0500 Subject: [PATCH] Finished identity serialization --- .../wot/wot/blockprocessingevent.py | 32 ++++++ .../default-plugins/wot/wot/exceptions.py | 2 + .../wot/wot/identity/__init__.py | 106 ++++++++++++++++++ .../default-plugins/wot/wot/identity/name.py | 10 ++ .../default-plugins/wot/wot/loadfromblocks.py | 9 ++ .../default-plugins/wot/wot/timestamp.py | 11 ++ .../wot/test_block_processing.py | 25 +++++ .../wot/test_identity_distance.py | 37 ++++++ .../wot/test_identity_init.py | 42 +++++++ .../wot/test_identity_serialize.py | 48 ++++++++ 10 files changed, 322 insertions(+) create mode 100644 static-data/default-plugins/wot/wot/blockprocessingevent.py create mode 100644 static-data/default-plugins/wot/wot/exceptions.py create mode 100644 static-data/default-plugins/wot/wot/identity/__init__.py create mode 100644 static-data/default-plugins/wot/wot/identity/name.py create mode 100644 static-data/default-plugins/wot/wot/loadfromblocks.py create mode 100644 static-data/default-plugins/wot/wot/timestamp.py create mode 100644 tests/default-plugin-tests/wot/test_block_processing.py create mode 100644 tests/default-plugin-tests/wot/test_identity_distance.py create mode 100644 tests/default-plugin-tests/wot/test_identity_init.py create mode 100644 tests/default-plugin-tests/wot/test_identity_serialize.py diff --git a/static-data/default-plugins/wot/wot/blockprocessingevent.py b/static-data/default-plugins/wot/wot/blockprocessingevent.py new file mode 100644 index 00000000..95106872 --- /dev/null +++ b/static-data/default-plugins/wot/wot/blockprocessingevent.py @@ -0,0 +1,32 @@ +from typing import TYPE_CHECKING +from enum import IntEnum, auto +import struct + +import msgpack + +if TYPE_CHECKING: + from onionrblocks import Block + +from .exceptions import InvalidWotBlock + + +class WotCommand(IntEnum): + TRUST = 1 + UNTRUST = auto() + ANNOUNCE = auto() + REVOKE = auto() + +class WotPayload: + def __init__(self, block_data: bytes): + wot_command = WotCommand( + int.from_bytes(block_data[0], byteorder='big')) + + + match wot_command(WotCommand): + case WotCommand.TRUST: + pass + + +def process_block(bl: 'Block'): + assert bl.type == 'iden' + wot_command = WotCommand(bl.data) diff --git a/static-data/default-plugins/wot/wot/exceptions.py b/static-data/default-plugins/wot/wot/exceptions.py new file mode 100644 index 00000000..bc70c9a9 --- /dev/null +++ b/static-data/default-plugins/wot/wot/exceptions.py @@ -0,0 +1,2 @@ +class InvalidWotBlock(Exception): + pass \ No newline at end of file diff --git a/static-data/default-plugins/wot/wot/identity/__init__.py b/static-data/default-plugins/wot/wot/identity/__init__.py new file mode 100644 index 00000000..f80024ec --- /dev/null +++ b/static-data/default-plugins/wot/wot/identity/__init__.py @@ -0,0 +1,106 @@ +from collections import deque +from enum import Enum +from typing import Set, Union +import time + +from nacl.signing import SigningKey, VerifyKey +from nacl.encoding import Base32Encoder +from nacl.exceptions import BadSignatureError + +from .name import IdentityName +from .name import max_len as max_name_len +from timestamp import WotTimestamp + + +short_identity_keys = { + 'trusted': 't', + 'name': 'n', + 'key': 'k' +} + +class WotSerializationError(Exception): pass + + +class Identity: + def __init__( + self, + key: Union[SigningKey, VerifyKey], + name: 'IdentityName', + created_date: WotTimestamp = None): + self.trusted: Set[Identity] = set() + self.name = IdentityName(name) + self.created_date = created_date + + self.private_key = self.key = None + + # SigningKey and VerifyKey have minimal memory overhead + # so we do not need to make them properties + if isinstance(key, SigningKey): + self.private_key = key + self.key = key.verify_key + elif isinstance(key, VerifyKey): + self.key = key + + + def __eq__(self, other): + return self.key == other + + def __str__(self): + return self.key.encode(encoder=Base32Encoder).decode('utf-8') + + def __hash__(self): + return hash(self.key) + + def serialize(self) -> bytes: + """ + A serialized identity is the name signed by the private key plus + the public key and the date (used to prevent replay attacks) + """ + if not self.private_key: + raise WotSerializationError("Cannot serialize public identity") + signed = self.private_key.sign( + self.name.zfill(max_name_len).encode('utf-8') + bytes(self.key) + + str(int(time.time())).encode('utf-8')) + + return signed.signature + signed.message + + @classmethod + def deserialize(cls, serialized: bytes): + signature = serialized[:64] + message = serialized[64:] + name = message[:max_name_len].decode('utf-8').lstrip('0') + key = VerifyKey(message[max_name_len:max_name_len + 32]) + date = WotTimestamp(message[max_name_len + 32:].decode('utf-8')) + if date > time.time(): + raise WotSerializationError( + "Date in serialized identity is in the future") + elif date <= 0: + raise WotSerializationError("Date in serialized identity is <= 0") + try: + VerifyKey.verify(key, message, signature) + except BadSignatureError: + raise WotSerializationError( + "Signature in serialized identity is invalid") + return cls(key, name) + + +def get_distance(identity: Identity, identity2: Identity): + distance = 0 + visited = set() + stack = deque([identity]) + + while stack: + current_iden = stack.popleft() + + if current_iden == identity2: + return distance + distance += 1 + + if identity2 in current_iden.trusted: + return distance + + for trusted in current_iden.trusted: + if trusted not in visited: + visited.add(trusted) + stack.append(trusted) + raise ValueError diff --git a/static-data/default-plugins/wot/wot/identity/name.py b/static-data/default-plugins/wot/wot/identity/name.py new file mode 100644 index 00000000..d89a435f --- /dev/null +++ b/static-data/default-plugins/wot/wot/identity/name.py @@ -0,0 +1,10 @@ +max_len = 21 +min_len = 1 + +class IdentityName(str): + def __new__(cls, data): + if data[0] == '0': + raise ValueError("Name cannot start with 0") + if not len(data) in list(range(1, 21)): + raise ValueError(f"Must be in range({min_len}, {max_len})") + return super().__new__(cls, data) \ No newline at end of file diff --git a/static-data/default-plugins/wot/wot/loadfromblocks.py b/static-data/default-plugins/wot/wot/loadfromblocks.py new file mode 100644 index 00000000..a8fcb8a0 --- /dev/null +++ b/static-data/default-plugins/wot/wot/loadfromblocks.py @@ -0,0 +1,9 @@ +from typing import Generator +import blockdb + +from .identity import Identity + + +def load_identities_from_blocks(blocks) -> Generator[Identity]: + for block in blockdb.get_blocks_by_type('wotb'): + yield Identity.deserialize(block.data) diff --git a/static-data/default-plugins/wot/wot/timestamp.py b/static-data/default-plugins/wot/wot/timestamp.py new file mode 100644 index 00000000..db736bfc --- /dev/null +++ b/static-data/default-plugins/wot/wot/timestamp.py @@ -0,0 +1,11 @@ +from time import time + + +class WotTimestamp(int): + def __new__(cls, value: int): + value = int(value) + if value <= 0: + raise ValueError("Timestamp cannot be negative or zero") + elif value > time(): + raise ValueError("Timestamp cannot be in the future") + return super().__new__(cls, value) \ No newline at end of file diff --git a/tests/default-plugin-tests/wot/test_block_processing.py b/tests/default-plugin-tests/wot/test_block_processing.py new file mode 100644 index 00000000..0ec07ba7 --- /dev/null +++ b/tests/default-plugin-tests/wot/test_block_processing.py @@ -0,0 +1,25 @@ +import os, uuid +from random import randint +from time import sleep +import secrets +import onionrblocks + + +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +import unittest +import sys +sys.path.append(".") +sys.path.append('static-data/default-plugins/wot/') +sys.path.append("src/") +from wot import identity +from wot import process_block + + +class BlockProcessingTest(unittest.TestCase): + def test_block_processing_trust(self): + + +unittest.main() diff --git a/tests/default-plugin-tests/wot/test_identity_distance.py b/tests/default-plugin-tests/wot/test_identity_distance.py new file mode 100644 index 00000000..15753cce --- /dev/null +++ b/tests/default-plugin-tests/wot/test_identity_distance.py @@ -0,0 +1,37 @@ +import os, uuid +from random import randint +from time import sleep +import secrets + + +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +import unittest +import sys +sys.path.append(".") +sys.path.append('static-data/default-plugins/wot/') +sys.path.append("src/") +from wot import identity + + +def generate_graph(iden: identity.Identity, depth, max_neighbors): + c = 0 + if depth == 0: + return + for i in range(randint(0, max_neighbors)): + i = identity.Identity(secrets.token_hex(16)) + iden.trusted.add(i) + generate_graph(i, depth - 1, max_neighbors) + + +class IdentityDistanceTest(unittest.TestCase): + def test_distance(self): + iden = identity.Identity(secrets.token_hex(16)) + generate_graph(iden, 10, 5) + iden2 = list(list(iden.trusted)[0].trusted)[0] + + self.assertEqual(identity.get_distance(iden, iden2), 2) + +unittest.main() diff --git a/tests/default-plugin-tests/wot/test_identity_init.py b/tests/default-plugin-tests/wot/test_identity_init.py new file mode 100644 index 00000000..1fc76a04 --- /dev/null +++ b/tests/default-plugin-tests/wot/test_identity_init.py @@ -0,0 +1,42 @@ +import os, uuid +from random import randint + +import secrets +from nacl import signing + +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +import unittest +import sys +sys.path.append(".") +sys.path.append('static-data/default-plugins/wot/wot/') +sys.path.append("src/") +from identity import Identity + + + +class IdentityInitTest(unittest.TestCase): + def test_inden_init_privkey(self): + private_key = signing.SigningKey.generate() + iden = Identity(private_key, "test") + self.assertEqual(iden.name, "test") + self.assertEqual(iden.key, private_key.verify_key) + self.assertEqual(iden.private_key, private_key) + + + def test_iden_init_pubkey(self): + public = signing.SigningKey.generate().verify_key + iden = Identity(public, "test") + self.assertEqual(iden.name, "test") + self.assertEqual(iden.key, public) + self.assertEqual(iden.private_key, None) + + def test_iden_init_pubkey_invalid_name(self): + public = signing.SigningKey.generate().verify_key + + self.assertRaises(ValueError, Identity, public, secrets.token_hex(32)) + + +unittest.main() diff --git a/tests/default-plugin-tests/wot/test_identity_serialize.py b/tests/default-plugin-tests/wot/test_identity_serialize.py new file mode 100644 index 00000000..6a1c55c2 --- /dev/null +++ b/tests/default-plugin-tests/wot/test_identity_serialize.py @@ -0,0 +1,48 @@ +import os, uuid +from random import randint + +import time +from nacl import signing + +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +import unittest +import sys +sys.path.append('static-data/default-plugins/wot/wot') +sys.path.append("src/") +from identity import Identity +from identity.name import max_len + + +class IdentitySerializeTest(unittest.TestCase): + + def test_iden_deserialize(self): + iden_priv_key = signing.SigningKey.generate() + iden_public = iden_priv_key.verify_key + serialized = iden_priv_key.sign("test".zfill(max_len).encode('utf-8') + + bytes(iden_public) + + str(int(time.time())).encode('utf-8')) + iden = Identity.deserialize(serialized) + self.assertEqual(iden.name, "test") + self.assertEqual(iden.key, iden_public) + self.assertEqual(iden.private_key, None) + + def test_iden_serialize(self): + iden_priv_key = signing.SigningKey.generate() + iden_public = iden_priv_key.verify_key + # Onionr keys sign themselves + the date + # in order to prevent replay attacks + expected_serialized = \ + iden_priv_key.sign("test".zfill(max_len).encode('utf-8') + + bytes(iden_public) + + str(int(time.time())).encode('utf-8')) + expected_serialized_len = len(expected_serialized) + + identity = Identity(iden_priv_key, "test") + serialized = identity.serialize() + self.assertEqual(len(serialized), expected_serialized_len) + self.assertEqual(serialized, expected_serialized) + +unittest.main()