Finished tests for trust payload processing
This commit is contained in:
parent
9058f7bee5
commit
5bb43326e7
@ -5,7 +5,7 @@ from nacl.signing import VerifyKey
|
|||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from identity import Identity
|
from identity import Identity
|
||||||
|
|
||||||
from .identityset import identities
|
from identityset import identities
|
||||||
|
|
||||||
|
|
||||||
def get_identity_by_key(
|
def get_identity_by_key(
|
||||||
@ -14,7 +14,6 @@ def get_identity_by_key(
|
|||||||
if not isinstance(key, VerifyKey):
|
if not isinstance(key, VerifyKey):
|
||||||
key = VerifyKey(key)
|
key = VerifyKey(key)
|
||||||
for identity in identities:
|
for identity in identities:
|
||||||
print(identity)
|
|
||||||
if bytes(identity.key) == bytes(key):
|
if bytes(identity.key) == bytes(key):
|
||||||
return identity
|
return identity
|
||||||
raise KeyError("Identity not found")
|
raise KeyError("Identity not found")
|
||||||
|
@ -10,8 +10,8 @@ from nacl.exceptions import BadSignatureError
|
|||||||
from .processtrustsignature import process_trust_signature
|
from .processtrustsignature import process_trust_signature
|
||||||
from .name import IdentityName
|
from .name import IdentityName
|
||||||
from .name import max_len as max_name_len
|
from .name import max_len as max_name_len
|
||||||
from ..exceptions import IdentitySerializationError
|
from exceptions import IdentitySerializationError
|
||||||
from ..timestamp import WotTimestamp
|
from timestamp import WotTimestamp
|
||||||
|
|
||||||
|
|
||||||
short_identity_keys = {
|
short_identity_keys = {
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
|
import traceback
|
||||||
import logger
|
import logger
|
||||||
|
|
||||||
from nacl.signing import VerifyKey
|
from nacl.signing import VerifyKey
|
||||||
|
|
||||||
from ..getbykey import get_identity_by_key
|
from getbykey import get_identity_by_key
|
||||||
|
|
||||||
|
|
||||||
def process_trust_signature(sig_payload: bytes):
|
def process_trust_signature(sig_payload: bytes):
|
||||||
@ -27,6 +28,7 @@ def process_trust_signature(sig_payload: bytes):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
# if signer or signed identity are not in the identity set
|
# if signer or signed identity are not in the identity set
|
||||||
# this means they have not been announced yet
|
# this means they have not been announced yet
|
||||||
|
traceback.print_exc()
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
# noop if already signed
|
# noop if already signed
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
import os, uuid
|
import os, uuid
|
||||||
from random import randint
|
from random import randint
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from nacl.signing import SigningKey
|
from nacl.signing import SigningKey, VerifyKey
|
||||||
|
import nacl
|
||||||
import secrets
|
import secrets
|
||||||
import onionrblocks
|
import onionrblocks
|
||||||
|
|
||||||
@ -13,35 +14,100 @@ os.environ["ONIONR_HOME"] = TEST_DIR
|
|||||||
import unittest
|
import unittest
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(".")
|
sys.path.append(".")
|
||||||
sys.path.append('static-data/default-plugins/wot/')
|
sys.path.append('static-data/default-plugins/wot/wot')
|
||||||
sys.path.append("src/")
|
sys.path.append("src/")
|
||||||
from wot import identity
|
import identity
|
||||||
from wot import identityset
|
from identityset import identities
|
||||||
|
|
||||||
|
|
||||||
class TrustSignatureProcessing(unittest.TestCase):
|
class TrustSignatureProcessing(unittest.TestCase):
|
||||||
def test_processing_trust_payloads(self):
|
|
||||||
|
def test_processing_trust_payload_without_announced_identity(self):
|
||||||
# reset identity set
|
# reset identity set
|
||||||
identityset.identities = set()
|
identities.clear()
|
||||||
|
|
||||||
fake_pubkey = secrets.token_bytes(32)
|
fake_pubkey = secrets.token_bytes(32)
|
||||||
signing_key = SigningKey.generate()
|
signing_key = SigningKey.generate()
|
||||||
|
|
||||||
identityset.identities.add(identity.Identity(bytes(signing_key.verify_key), "test"))
|
identities.add(identity.Identity(signing_key.verify_key, "test"))
|
||||||
identityset.identities.add(identity.Identity(fake_pubkey, "test2"))
|
|
||||||
|
trust_signature = signing_key.sign(fake_pubkey)
|
||||||
|
trust_signature_payload = bytes(signing_key.verify_key) + fake_pubkey + \
|
||||||
|
trust_signature.signature
|
||||||
|
|
||||||
|
for iden in identities:
|
||||||
|
if iden.key == signing_key.verify_key:
|
||||||
|
for i in iden.trusted:
|
||||||
|
if i.key == VerifyKey(fake_pubkey):
|
||||||
|
raise AssertionError("Signed identity found")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise AssertionError("Signing identity not found")
|
||||||
|
|
||||||
|
def test_processing_invalid_trust_payloads(self):
|
||||||
|
# reset identity set
|
||||||
|
identities.clear()
|
||||||
|
|
||||||
|
fake_pubkey = secrets.token_bytes(32)
|
||||||
|
signing_key = SigningKey.generate()
|
||||||
|
|
||||||
|
identities.add(identity.Identity(signing_key.verify_key, "test"))
|
||||||
|
identities.add(identity.Identity(VerifyKey(fake_pubkey), "test2"))
|
||||||
|
|
||||||
|
trust_signature = signing_key.sign(fake_pubkey)
|
||||||
|
trust_signature_payload = bytes(signing_key.verify_key) + fake_pubkey + \
|
||||||
|
trust_signature.signature
|
||||||
|
trust_signature_payload = bytearray(trust_signature_payload)
|
||||||
|
trust_signature_payload[64] = 0
|
||||||
|
trust_signature_payload = bytes(trust_signature_payload)
|
||||||
|
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
nacl.exceptions.BadSignatureError, identity.process_trust_signature, trust_signature_payload)
|
||||||
|
|
||||||
|
for iden in identities:
|
||||||
|
if iden.key == signing_key.verify_key:
|
||||||
|
for i in iden.trusted:
|
||||||
|
if i.key == VerifyKey(fake_pubkey):
|
||||||
|
raise AssertionError("Signed identity found")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise AssertionError("Signing identity not found")
|
||||||
|
|
||||||
|
def test_processing_trust_payloads(self):
|
||||||
|
# reset identity set
|
||||||
|
identities.clear()
|
||||||
|
|
||||||
|
fake_pubkey = secrets.token_bytes(32)
|
||||||
|
signing_key = SigningKey.generate()
|
||||||
|
|
||||||
|
identities.add(identity.Identity(signing_key.verify_key, "test"))
|
||||||
|
identities.add(identity.Identity(VerifyKey(fake_pubkey), "test2"))
|
||||||
|
|
||||||
|
|
||||||
trust_signature = signing_key.sign(fake_pubkey)
|
trust_signature = signing_key.sign(fake_pubkey)
|
||||||
trust_signature_payload = bytes(signing_key.verify_key) + fake_pubkey + \
|
trust_signature_payload = bytes(signing_key.verify_key) + fake_pubkey + \
|
||||||
trust_signature.signature
|
trust_signature.signature
|
||||||
|
|
||||||
identity.process_trust_signature(trust_signature_payload)
|
identity.process_trust_signature(trust_signature_payload)
|
||||||
|
|
||||||
|
for iden in identities:
|
||||||
|
|
||||||
for iden in identityset.identities:
|
|
||||||
if iden.key == signing_key.verify_key:
|
if iden.key == signing_key.verify_key:
|
||||||
self.assertIn(fake_pubkey, iden.trusted)
|
|
||||||
|
for i in iden.trusted:
|
||||||
|
if i.key == VerifyKey(fake_pubkey):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise AssertionError("Signed identity not found")
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
|
raise AssertionError("Signing identity not found")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
"""
|
Loading…
Reference in New Issue
Block a user