diff --git a/static-data/default-plugins/wot/wot/getbykey.py b/static-data/default-plugins/wot/wot/getbykey.py index 758fa23e..c965c11d 100644 --- a/static-data/default-plugins/wot/wot/getbykey.py +++ b/static-data/default-plugins/wot/wot/getbykey.py @@ -1,11 +1,11 @@ -from typing import TYPE_CHECKING, Iterable, Union +from typing import TYPE_CHECKING, Union from nacl.signing import VerifyKey if TYPE_CHECKING: from identity import Identity -from identityset import identities +from identity.identityset import identities def get_identity_by_key( diff --git a/static-data/default-plugins/wot/wot/identity/__init__.py b/static-data/default-plugins/wot/wot/identity/__init__.py index 49656c8a..1bf8282c 100644 --- a/static-data/default-plugins/wot/wot/identity/__init__.py +++ b/static-data/default-plugins/wot/wot/identity/__init__.py @@ -8,8 +8,10 @@ from nacl.encoding import Base32Encoder from nacl.exceptions import BadSignatureError from .processtrustsignature import process_trust_signature +from .proccessrevokesignature import process_revoke_signature from .name import IdentityName from .name import max_len as max_name_len +from .identityset import IdentitySet, identities from exceptions import IdentitySerializationError from timestamp import WotTimestamp @@ -26,12 +28,15 @@ class Identity: key: Union[SigningKey, VerifyKey], name: 'IdentityName', created_date: WotTimestamp = None): - self.trusted: Set[Identity] = set() + self.trusted: Set[Identity] = IdentitySet() self.name = IdentityName(name) self.created_date = created_date self.private_key = self.key = None + if isinstance(key, bytes): + self.key = VerifyKey(key) + # SigningKey and VerifyKey have minimal memory overhead # so we do not need to make them properties if isinstance(key, SigningKey): diff --git a/static-data/default-plugins/wot/wot/identity/identityset.py b/static-data/default-plugins/wot/wot/identity/identityset.py new file mode 100644 index 00000000..e042bdbc --- /dev/null +++ b/static-data/default-plugins/wot/wot/identity/identityset.py @@ -0,0 +1,30 @@ +class IdentitySet(set): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + + def __contains__(self, ob: 'Identity') -> bool: + for identity in self: + if bytes(identity.key) == bytes(ob.key): + return True + return False + + def add(self, identity): + for existing_iden in self: + + if bytes(existing_iden.key) == bytes(identity.key): + return + super().add(identity) + + def remove(self, identity): + remove_idens = [] + for existing_iden in self: + if bytes(existing_iden.key) == bytes(identity.key): + remove_idens.append(existing_iden) + for remove_iden in remove_idens: + super().remove(remove_iden) + + +# Set of identites within N-distance trust + +identities = IdentitySet() \ No newline at end of file diff --git a/static-data/default-plugins/wot/wot/identity/proccessrevokesignature.py b/static-data/default-plugins/wot/wot/identity/proccessrevokesignature.py new file mode 100644 index 00000000..e2b2fe47 --- /dev/null +++ b/static-data/default-plugins/wot/wot/identity/proccessrevokesignature.py @@ -0,0 +1,44 @@ +import traceback + +from nacl.signing import VerifyKey + +import logger + +from getbykey import get_identity_by_key +from blockprocessingevent import WotCommand + + +def process_revoke_signature(revoke_signature_payload): + if len(revoke_signature_payload) != 129: + logger.warn( + f'Signature size is invalid for revoking an identity', + terminal=True) + + # verify that this is a signature for a trust command + if revoke_signature_payload[0] != WotCommand.REVOKE_TRUST: + logger.warn( + f'Invalid command in signature' , terminal=True) + return + # signer is first 32 bytes + signer = VerifyKey(revoke_signature_payload[1:33]) + # revoked is next 32 bytes + revoked = revoke_signature_payload[33:65] + # signature is last 64 bytes + signature = revoke_signature_payload[65:] + + # If bad signature, it raises nacl.exceptions.BadSignatureError + signer.verify( + int.to_bytes(revoke_signature_payload[0], 1, 'big') + \ + revoked, signature) + + # if good signature + try: + + signer_identity = get_identity_by_key(bytes(signer)) + # noop if already revoked + signer_identity.trusted.remove(get_identity_by_key(revoked)) + except KeyError: + # if signer or revoked identity are not in the identity set + # this means they have not been announced yet + traceback.print_exc() + pass diff --git a/static-data/default-plugins/wot/wot/identity/processtrustsignature.py b/static-data/default-plugins/wot/wot/identity/processtrustsignature.py index 47b54245..225b6f1b 100644 --- a/static-data/default-plugins/wot/wot/identity/processtrustsignature.py +++ b/static-data/default-plugins/wot/wot/identity/processtrustsignature.py @@ -16,6 +16,7 @@ def process_trust_signature(sig_payload: bytes): if sig_payload[0] != WotCommand.TRUST: logger.warn( f'Invalid command in signature') + return # signer is first 32 bytes signer = VerifyKey(sig_payload[1:33]) # signed is next 32 bytes diff --git a/static-data/default-plugins/wot/wot/identityset.py b/static-data/default-plugins/wot/wot/identityset.py deleted file mode 100644 index 0caccf3f..00000000 --- a/static-data/default-plugins/wot/wot/identityset.py +++ /dev/null @@ -1,3 +0,0 @@ -from typing import Set -# Set of identites within N-distance trust -identities: Set['Identity'] = set() diff --git a/static-data/default-plugins/wot/wot/loadfromblocks.py b/static-data/default-plugins/wot/wot/loadfromblocks.py index 34fe45f5..a29862e6 100644 --- a/static-data/default-plugins/wot/wot/loadfromblocks.py +++ b/static-data/default-plugins/wot/wot/loadfromblocks.py @@ -7,9 +7,8 @@ import nacl.exceptions import logger import blockdb -from identity import Identity, processtrustsignature +from identity import Identity, processtrustsignature, identities from exceptions import IdentitySerializationError -from identityset import identities from getbykey import get_identity_by_key diff --git a/tests/default-plugin-tests/wot/test_get_identity_by_key.py b/tests/default-plugin-tests/wot/test_get_identity_by_key.py index 694cd1f1..67e68c97 100644 --- a/tests/default-plugin-tests/wot/test_get_identity_by_key.py +++ b/tests/default-plugin-tests/wot/test_get_identity_by_key.py @@ -18,7 +18,7 @@ import onionrblocks from blockdb import block_db_path from identity import Identity from getbykey import get_identity_by_key -from identityset import identities as iden_set +from identity.identityset import identities import blockdb @@ -29,7 +29,7 @@ class GetIdentityByKeyTest(unittest.TestCase): iden_public = iden_priv_key.verify_key identity = Identity(iden_priv_key, "test") - iden_set.add(identity) + identities.add(identity) self.assertIsInstance(get_identity_by_key(iden_public), Identity) diff --git a/tests/default-plugin-tests/wot/test_proccess_trust_signature.py b/tests/default-plugin-tests/wot/test_proccess_trust_signature.py index 4a76da35..563b52e6 100644 --- a/tests/default-plugin-tests/wot/test_proccess_trust_signature.py +++ b/tests/default-plugin-tests/wot/test_proccess_trust_signature.py @@ -18,7 +18,7 @@ sys.path.append(".") sys.path.append('static-data/default-plugins/wot/wot') sys.path.append("src/") import identity -from identityset import identities +from identity import identities class WotCommand(IntEnum): diff --git a/tests/default-plugin-tests/wot/test_process_revoke_signature.py b/tests/default-plugin-tests/wot/test_process_revoke_signature.py new file mode 100644 index 00000000..f9feec5b --- /dev/null +++ b/tests/default-plugin-tests/wot/test_process_revoke_signature.py @@ -0,0 +1,63 @@ +import os, uuid +from random import randint +from time import sleep +from enum import IntEnum, auto +from nacl.signing import SigningKey, VerifyKey +import nacl +import secrets +import onionrblocks + + +TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/' +print("Test directory:", TEST_DIR) +os.environ["ONIONR_HOME"] = TEST_DIR + +import unittest +import sys +sys.path.append(".") +sys.path.append('static-data/default-plugins/wot/wot') +sys.path.append("src/") +import identity +from identity.identityset import identities + + +class WotCommand(IntEnum): + TRUST = 1 + REVOKE_TRUST = auto() + ANNOUNCE = auto() + REVOKE = auto() + + +class TestSignatureRevokeProcessing(unittest.TestCase): + def test_revoke_trust(self): + # reset identity set + identities.clear() + + fake_pubkey = secrets.token_bytes(32) + signing_key = SigningKey.generate() + + main_iden = identity.Identity(signing_key.verify_key, "test") + + identities.add(main_iden) + identities.add(identity.Identity(fake_pubkey, "test2")) + + wot_cmd = int(WotCommand.REVOKE_TRUST).to_bytes(1, 'big') + revoke_signature = signing_key.sign(wot_cmd + fake_pubkey) + revoke_signature_payload = wot_cmd + bytes(signing_key.verify_key) + \ + fake_pubkey + revoke_signature.signature + + main_iden.trusted.add( + identity.Identity(VerifyKey(fake_pubkey), "test2")) + + identity.process_revoke_signature(revoke_signature_payload) + + self.assertEqual(len(identities), 2) + self.assertEqual(len(list(identities)[0].trusted), 0) + for iden in identities: + if iden.key == signing_key.verify_key: + for i in iden.trusted: + if i.key == VerifyKey(fake_pubkey): + raise AssertionError("Signed identity found") + break + +unittest.main()