Added basic test for trust payload processing

This commit is contained in:
Kevin F 2022-09-13 21:40:39 -05:00
parent 08d8fda857
commit 9058f7bee5
5 changed files with 49 additions and 27 deletions

View File

@ -3,7 +3,6 @@
from typing import TYPE_CHECKING, Set from typing import TYPE_CHECKING, Set
from .identity import Identity from .identity import Identity
from .blockprocessing import process_block
from .getbykey import get_identity_by_key from .getbykey import get_identity_by_key
from .identityset import identities from .identityset import identities

View File

@ -5,7 +5,7 @@ from nacl.signing import VerifyKey
if TYPE_CHECKING: if TYPE_CHECKING:
from identity import Identity from identity import Identity
from identityset import identities from .identityset import identities
def get_identity_by_key( def get_identity_by_key(
@ -17,4 +17,4 @@ def get_identity_by_key(
print(identity) print(identity)
if bytes(identity.key) == bytes(key): if bytes(identity.key) == bytes(key):
return identity return identity
return KeyError("Identity not found") raise KeyError("Identity not found")

View File

@ -7,10 +7,11 @@ from nacl.signing import SigningKey, VerifyKey
from nacl.encoding import Base32Encoder from nacl.encoding import Base32Encoder
from nacl.exceptions import BadSignatureError from nacl.exceptions import BadSignatureError
from .processtrustsignature import process_trust_signature
from .name import IdentityName from .name import IdentityName
from .name import max_len as max_name_len from .name import max_len as max_name_len
from exceptions import IdentitySerializationError from ..exceptions import IdentitySerializationError
from timestamp import WotTimestamp from ..timestamp import WotTimestamp
short_identity_keys = { short_identity_keys = {

View File

@ -1,31 +1,33 @@
import logger import logger
from nacl.signing import VerifyKey from nacl.signing import VerifyKey
import nacl.exceptions
from getbykey import get_identity_by_key from ..getbykey import get_identity_by_key
def process_trust_signature(sig_payload: bytes): def process_trust_signature(sig_payload: bytes):
if len(sig_payload) != 128: if len(sig_payload) != 128:
logger.warn( logger.warn(
f'Signature size is invalid for a signed identity') f'Signature size is invalid for a signed identity')
signer = sig_payload[:32]
signed = sig_payload[32:65] # signer is first 32 bytes
signature = signature[65:] signer = VerifyKey(sig_payload[:32])
# signed is next 32 bytes
signed = sig_payload[32:64]
# signature is last 64 bytes
signature = sig_payload[64:]
# If bad signature, it raises nacl.exceptions.BadSignatureError # If bad signature, it raises nacl.exceptions.BadSignatureError
VerifyKey.verify(signer, signed, signature) signer.verify(signed, signature)
# if good signature
try:
signer_identity = get_identity_by_key(signer)
signed_identity = get_identity_by_key(signed)
except KeyError:
# if signer or signed identity are not in the identity set
# this means they have not been announced yet
pass
else: else:
# if good signature # noop if already signed
try: signer_identity.trusted.add(signed_identity)
signer_identity = get_identity_by_key(signer)
signed_identity = get_identity_by_key(signed)
except KeyError:
# if signer or signed identity are not in the identity set
# this means they have not been announced yet
pass
else:
# noop if already signed
signer_identity.trusted.add(signed_identity)

View File

@ -1,6 +1,7 @@
import os, uuid import os, uuid
from random import randint from random import randint
from time import sleep from time import sleep
from nacl.signing import SigningKey
import secrets import secrets
import onionrblocks import onionrblocks
@ -15,13 +16,32 @@ sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/') sys.path.append('static-data/default-plugins/wot/')
sys.path.append("src/") sys.path.append("src/")
from wot import identity from wot import identity
from wot import identityset
class TrustSignatureProcessing(unittest.TestCase): class TrustSignatureProcessing(unittest.TestCase):
def test_block_processing_trust(self): def test_processing_trust_payloads(self):
identity1 = identity.Identity() # reset identity set
identity2 = identity.Identity() identityset.identities = set()
identity1.trust(identity2)
fake_pubkey = secrets.token_bytes(32)
signing_key = SigningKey.generate()
identityset.identities.add(identity.Identity(bytes(signing_key.verify_key), "test"))
identityset.identities.add(identity.Identity(fake_pubkey, "test2"))
trust_signature = signing_key.sign(fake_pubkey)
trust_signature_payload = bytes(signing_key.verify_key) + fake_pubkey + \
trust_signature.signature
identity.process_trust_signature(trust_signature_payload)
for iden in identityset.identities:
if iden.key == signing_key.verify_key:
self.assertIn(fake_pubkey, iden.trusted)
break
unittest.main() unittest.main()