Compare commits

...

2 Commits

Author SHA1 Message Date
Kevin F
cdeaa403af Added set wrapper to contain identities
Finished signature revoking implementation
2022-09-15 20:03:29 -05:00
Kevin F
fae9521d8f Made trust distance test graph generation reliable 2022-09-15 20:01:59 -05:00
11 changed files with 158 additions and 13 deletions

View File

@ -1,11 +1,11 @@
from typing import TYPE_CHECKING, Iterable, Union from typing import TYPE_CHECKING, Union
from nacl.signing import VerifyKey from nacl.signing import VerifyKey
if TYPE_CHECKING: if TYPE_CHECKING:
from identity import Identity from identity import Identity
from identityset import identities from identity.identityset import identities
def get_identity_by_key( def get_identity_by_key(

View File

@ -8,8 +8,10 @@ from nacl.encoding import Base32Encoder
from nacl.exceptions import BadSignatureError from nacl.exceptions import BadSignatureError
from .processtrustsignature import process_trust_signature from .processtrustsignature import process_trust_signature
from .proccessrevokesignature import process_revoke_signature
from .name import IdentityName from .name import IdentityName
from .name import max_len as max_name_len from .name import max_len as max_name_len
from .identityset import IdentitySet, identities
from exceptions import IdentitySerializationError from exceptions import IdentitySerializationError
from timestamp import WotTimestamp from timestamp import WotTimestamp
@ -26,12 +28,15 @@ class Identity:
key: Union[SigningKey, VerifyKey], key: Union[SigningKey, VerifyKey],
name: 'IdentityName', name: 'IdentityName',
created_date: WotTimestamp = None): created_date: WotTimestamp = None):
self.trusted: Set[Identity] = set() self.trusted: Set[Identity] = IdentitySet()
self.name = IdentityName(name) self.name = IdentityName(name)
self.created_date = created_date self.created_date = created_date
self.private_key = self.key = None self.private_key = self.key = None
if isinstance(key, bytes):
self.key = VerifyKey(key)
# SigningKey and VerifyKey have minimal memory overhead # SigningKey and VerifyKey have minimal memory overhead
# so we do not need to make them properties # so we do not need to make them properties
if isinstance(key, SigningKey): if isinstance(key, SigningKey):

View File

@ -0,0 +1,30 @@
class IdentitySet(set):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __contains__(self, ob: 'Identity') -> bool:
for identity in self:
if bytes(identity.key) == bytes(ob.key):
return True
return False
def add(self, identity):
for existing_iden in self:
if bytes(existing_iden.key) == bytes(identity.key):
return
super().add(identity)
def remove(self, identity):
remove_idens = []
for existing_iden in self:
if bytes(existing_iden.key) == bytes(identity.key):
remove_idens.append(existing_iden)
for remove_iden in remove_idens:
super().remove(remove_iden)
# Set of identites within N-distance trust
identities = IdentitySet()

View File

@ -0,0 +1,44 @@
import traceback
from nacl.signing import VerifyKey
import logger
from getbykey import get_identity_by_key
from blockprocessingevent import WotCommand
def process_revoke_signature(revoke_signature_payload):
if len(revoke_signature_payload) != 129:
logger.warn(
f'Signature size is invalid for revoking an identity',
terminal=True)
# verify that this is a signature for a trust command
if revoke_signature_payload[0] != WotCommand.REVOKE_TRUST:
logger.warn(
f'Invalid command in signature' , terminal=True)
return
# signer is first 32 bytes
signer = VerifyKey(revoke_signature_payload[1:33])
# revoked is next 32 bytes
revoked = revoke_signature_payload[33:65]
# signature is last 64 bytes
signature = revoke_signature_payload[65:]
# If bad signature, it raises nacl.exceptions.BadSignatureError
signer.verify(
int.to_bytes(revoke_signature_payload[0], 1, 'big') + \
revoked, signature)
# if good signature
try:
signer_identity = get_identity_by_key(bytes(signer))
# noop if already revoked
signer_identity.trusted.remove(get_identity_by_key(revoked))
except KeyError:
# if signer or revoked identity are not in the identity set
# this means they have not been announced yet
traceback.print_exc()
pass

View File

@ -16,6 +16,7 @@ def process_trust_signature(sig_payload: bytes):
if sig_payload[0] != WotCommand.TRUST: if sig_payload[0] != WotCommand.TRUST:
logger.warn( logger.warn(
f'Invalid command in signature') f'Invalid command in signature')
return
# signer is first 32 bytes # signer is first 32 bytes
signer = VerifyKey(sig_payload[1:33]) signer = VerifyKey(sig_payload[1:33])
# signed is next 32 bytes # signed is next 32 bytes

View File

@ -1,3 +0,0 @@
from typing import Set
# Set of identites within N-distance trust
identities: Set['Identity'] = set()

View File

@ -7,9 +7,8 @@ import nacl.exceptions
import logger import logger
import blockdb import blockdb
from identity import Identity, processtrustsignature from identity import Identity, processtrustsignature, identities
from exceptions import IdentitySerializationError from exceptions import IdentitySerializationError
from identityset import identities
from getbykey import get_identity_by_key from getbykey import get_identity_by_key

View File

@ -18,7 +18,7 @@ import onionrblocks
from blockdb import block_db_path from blockdb import block_db_path
from identity import Identity from identity import Identity
from getbykey import get_identity_by_key from getbykey import get_identity_by_key
from identityset import identities as iden_set from identity.identityset import identities
import blockdb import blockdb
@ -29,7 +29,7 @@ class GetIdentityByKeyTest(unittest.TestCase):
iden_public = iden_priv_key.verify_key iden_public = iden_priv_key.verify_key
identity = Identity(iden_priv_key, "test") identity = Identity(iden_priv_key, "test")
iden_set.add(identity) identities.add(identity)
self.assertIsInstance(get_identity_by_key(iden_public), Identity) self.assertIsInstance(get_identity_by_key(iden_public), Identity)

View File

@ -29,8 +29,14 @@ def generate_graph(iden: Identity, depth, max_neighbors):
class IdentityDistanceTest(unittest.TestCase): class IdentityDistanceTest(unittest.TestCase):
def test_distance(self): def test_distance(self):
iden = Identity(os.urandom(32), "1" + secrets.token_hex(4)) iden = Identity(os.urandom(32), "1" + secrets.token_hex(4))
generate_graph(iden, 10, 5) while True:
iden2 = list(list(iden.trusted)[0].trusted)[0] generate_graph(iden, 10, 5)
try:
iden2 = list(list(iden.trusted)[0].trusted)[0]
except IndexError:
pass
else:
break
self.assertEqual(get_distance(iden, iden2), 2) self.assertEqual(get_distance(iden, iden2), 2)

View File

@ -18,7 +18,7 @@ sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/wot') sys.path.append('static-data/default-plugins/wot/wot')
sys.path.append("src/") sys.path.append("src/")
import identity import identity
from identityset import identities from identity import identities
class WotCommand(IntEnum): class WotCommand(IntEnum):

View File

@ -0,0 +1,63 @@
import os, uuid
from random import randint
from time import sleep
from enum import IntEnum, auto
from nacl.signing import SigningKey, VerifyKey
import nacl
import secrets
import onionrblocks
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest
import sys
sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/wot')
sys.path.append("src/")
import identity
from identity.identityset import identities
class WotCommand(IntEnum):
TRUST = 1
REVOKE_TRUST = auto()
ANNOUNCE = auto()
REVOKE = auto()
class TestSignatureRevokeProcessing(unittest.TestCase):
def test_revoke_trust(self):
# reset identity set
identities.clear()
fake_pubkey = secrets.token_bytes(32)
signing_key = SigningKey.generate()
main_iden = identity.Identity(signing_key.verify_key, "test")
identities.add(main_iden)
identities.add(identity.Identity(fake_pubkey, "test2"))
wot_cmd = int(WotCommand.REVOKE_TRUST).to_bytes(1, 'big')
revoke_signature = signing_key.sign(wot_cmd + fake_pubkey)
revoke_signature_payload = wot_cmd + bytes(signing_key.verify_key) + \
fake_pubkey + revoke_signature.signature
main_iden.trusted.add(
identity.Identity(VerifyKey(fake_pubkey), "test2"))
identity.process_revoke_signature(revoke_signature_payload)
self.assertEqual(len(identities), 2)
self.assertEqual(len(list(identities)[0].trusted), 0)
for iden in identities:
if iden.key == signing_key.verify_key:
for i in iden.trusted:
if i.key == VerifyKey(fake_pubkey):
raise AssertionError("Signed identity found")
break
unittest.main()