Compare commits

...

2 Commits

Author SHA1 Message Date
Kevin F
cdeaa403af Added set wrapper to contain identities
Finished signature revoking implementation
2022-09-15 20:03:29 -05:00
Kevin F
fae9521d8f Made trust distance test graph generation reliable 2022-09-15 20:01:59 -05:00
11 changed files with 158 additions and 13 deletions

View File

@ -1,11 +1,11 @@
from typing import TYPE_CHECKING, Iterable, Union
from typing import TYPE_CHECKING, Union
from nacl.signing import VerifyKey
if TYPE_CHECKING:
from identity import Identity
from identityset import identities
from identity.identityset import identities
def get_identity_by_key(

View File

@ -8,8 +8,10 @@ from nacl.encoding import Base32Encoder
from nacl.exceptions import BadSignatureError
from .processtrustsignature import process_trust_signature
from .proccessrevokesignature import process_revoke_signature
from .name import IdentityName
from .name import max_len as max_name_len
from .identityset import IdentitySet, identities
from exceptions import IdentitySerializationError
from timestamp import WotTimestamp
@ -26,12 +28,15 @@ class Identity:
key: Union[SigningKey, VerifyKey],
name: 'IdentityName',
created_date: WotTimestamp = None):
self.trusted: Set[Identity] = set()
self.trusted: Set[Identity] = IdentitySet()
self.name = IdentityName(name)
self.created_date = created_date
self.private_key = self.key = None
if isinstance(key, bytes):
self.key = VerifyKey(key)
# SigningKey and VerifyKey have minimal memory overhead
# so we do not need to make them properties
if isinstance(key, SigningKey):

View File

@ -0,0 +1,30 @@
class IdentitySet(set):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __contains__(self, ob: 'Identity') -> bool:
for identity in self:
if bytes(identity.key) == bytes(ob.key):
return True
return False
def add(self, identity):
for existing_iden in self:
if bytes(existing_iden.key) == bytes(identity.key):
return
super().add(identity)
def remove(self, identity):
remove_idens = []
for existing_iden in self:
if bytes(existing_iden.key) == bytes(identity.key):
remove_idens.append(existing_iden)
for remove_iden in remove_idens:
super().remove(remove_iden)
# Set of identites within N-distance trust
identities = IdentitySet()

View File

@ -0,0 +1,44 @@
import traceback
from nacl.signing import VerifyKey
import logger
from getbykey import get_identity_by_key
from blockprocessingevent import WotCommand
def process_revoke_signature(revoke_signature_payload):
if len(revoke_signature_payload) != 129:
logger.warn(
f'Signature size is invalid for revoking an identity',
terminal=True)
# verify that this is a signature for a trust command
if revoke_signature_payload[0] != WotCommand.REVOKE_TRUST:
logger.warn(
f'Invalid command in signature' , terminal=True)
return
# signer is first 32 bytes
signer = VerifyKey(revoke_signature_payload[1:33])
# revoked is next 32 bytes
revoked = revoke_signature_payload[33:65]
# signature is last 64 bytes
signature = revoke_signature_payload[65:]
# If bad signature, it raises nacl.exceptions.BadSignatureError
signer.verify(
int.to_bytes(revoke_signature_payload[0], 1, 'big') + \
revoked, signature)
# if good signature
try:
signer_identity = get_identity_by_key(bytes(signer))
# noop if already revoked
signer_identity.trusted.remove(get_identity_by_key(revoked))
except KeyError:
# if signer or revoked identity are not in the identity set
# this means they have not been announced yet
traceback.print_exc()
pass

View File

@ -16,6 +16,7 @@ def process_trust_signature(sig_payload: bytes):
if sig_payload[0] != WotCommand.TRUST:
logger.warn(
f'Invalid command in signature')
return
# signer is first 32 bytes
signer = VerifyKey(sig_payload[1:33])
# signed is next 32 bytes

View File

@ -1,3 +0,0 @@
from typing import Set
# Set of identites within N-distance trust
identities: Set['Identity'] = set()

View File

@ -7,9 +7,8 @@ import nacl.exceptions
import logger
import blockdb
from identity import Identity, processtrustsignature
from identity import Identity, processtrustsignature, identities
from exceptions import IdentitySerializationError
from identityset import identities
from getbykey import get_identity_by_key

View File

@ -18,7 +18,7 @@ import onionrblocks
from blockdb import block_db_path
from identity import Identity
from getbykey import get_identity_by_key
from identityset import identities as iden_set
from identity.identityset import identities
import blockdb
@ -29,7 +29,7 @@ class GetIdentityByKeyTest(unittest.TestCase):
iden_public = iden_priv_key.verify_key
identity = Identity(iden_priv_key, "test")
iden_set.add(identity)
identities.add(identity)
self.assertIsInstance(get_identity_by_key(iden_public), Identity)

View File

@ -29,8 +29,14 @@ def generate_graph(iden: Identity, depth, max_neighbors):
class IdentityDistanceTest(unittest.TestCase):
def test_distance(self):
iden = Identity(os.urandom(32), "1" + secrets.token_hex(4))
generate_graph(iden, 10, 5)
iden2 = list(list(iden.trusted)[0].trusted)[0]
while True:
generate_graph(iden, 10, 5)
try:
iden2 = list(list(iden.trusted)[0].trusted)[0]
except IndexError:
pass
else:
break
self.assertEqual(get_distance(iden, iden2), 2)

View File

@ -18,7 +18,7 @@ sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/wot')
sys.path.append("src/")
import identity
from identityset import identities
from identity import identities
class WotCommand(IntEnum):

View File

@ -0,0 +1,63 @@
import os, uuid
from random import randint
from time import sleep
from enum import IntEnum, auto
from nacl.signing import SigningKey, VerifyKey
import nacl
import secrets
import onionrblocks
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest
import sys
sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/wot')
sys.path.append("src/")
import identity
from identity.identityset import identities
class WotCommand(IntEnum):
TRUST = 1
REVOKE_TRUST = auto()
ANNOUNCE = auto()
REVOKE = auto()
class TestSignatureRevokeProcessing(unittest.TestCase):
def test_revoke_trust(self):
# reset identity set
identities.clear()
fake_pubkey = secrets.token_bytes(32)
signing_key = SigningKey.generate()
main_iden = identity.Identity(signing_key.verify_key, "test")
identities.add(main_iden)
identities.add(identity.Identity(fake_pubkey, "test2"))
wot_cmd = int(WotCommand.REVOKE_TRUST).to_bytes(1, 'big')
revoke_signature = signing_key.sign(wot_cmd + fake_pubkey)
revoke_signature_payload = wot_cmd + bytes(signing_key.verify_key) + \
fake_pubkey + revoke_signature.signature
main_iden.trusted.add(
identity.Identity(VerifyKey(fake_pubkey), "test2"))
identity.process_revoke_signature(revoke_signature_payload)
self.assertEqual(len(identities), 2)
self.assertEqual(len(list(identities)[0].trusted), 0)
for iden in identities:
if iden.key == signing_key.verify_key:
for i in iden.trusted:
if i.key == VerifyKey(fake_pubkey):
raise AssertionError("Signed identity found")
break
unittest.main()