Compare commits

..

No commits in common. "cdeaa403afe0d5f3c1e8979277fb9c41b0832b76" and "05e04ef557cfd34217de87cda0d42d8b377d3752" have entirely different histories.

11 changed files with 13 additions and 158 deletions

View File

@ -1,11 +1,11 @@
from typing import TYPE_CHECKING, Union from typing import TYPE_CHECKING, Iterable, Union
from nacl.signing import VerifyKey from nacl.signing import VerifyKey
if TYPE_CHECKING: if TYPE_CHECKING:
from identity import Identity from identity import Identity
from identity.identityset import identities from identityset import identities
def get_identity_by_key( def get_identity_by_key(

View File

@ -8,10 +8,8 @@ from nacl.encoding import Base32Encoder
from nacl.exceptions import BadSignatureError from nacl.exceptions import BadSignatureError
from .processtrustsignature import process_trust_signature from .processtrustsignature import process_trust_signature
from .proccessrevokesignature import process_revoke_signature
from .name import IdentityName from .name import IdentityName
from .name import max_len as max_name_len from .name import max_len as max_name_len
from .identityset import IdentitySet, identities
from exceptions import IdentitySerializationError from exceptions import IdentitySerializationError
from timestamp import WotTimestamp from timestamp import WotTimestamp
@ -28,15 +26,12 @@ class Identity:
key: Union[SigningKey, VerifyKey], key: Union[SigningKey, VerifyKey],
name: 'IdentityName', name: 'IdentityName',
created_date: WotTimestamp = None): created_date: WotTimestamp = None):
self.trusted: Set[Identity] = IdentitySet() self.trusted: Set[Identity] = set()
self.name = IdentityName(name) self.name = IdentityName(name)
self.created_date = created_date self.created_date = created_date
self.private_key = self.key = None self.private_key = self.key = None
if isinstance(key, bytes):
self.key = VerifyKey(key)
# SigningKey and VerifyKey have minimal memory overhead # SigningKey and VerifyKey have minimal memory overhead
# so we do not need to make them properties # so we do not need to make them properties
if isinstance(key, SigningKey): if isinstance(key, SigningKey):

View File

@ -1,30 +0,0 @@
class IdentitySet(set):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __contains__(self, ob: 'Identity') -> bool:
for identity in self:
if bytes(identity.key) == bytes(ob.key):
return True
return False
def add(self, identity):
for existing_iden in self:
if bytes(existing_iden.key) == bytes(identity.key):
return
super().add(identity)
def remove(self, identity):
remove_idens = []
for existing_iden in self:
if bytes(existing_iden.key) == bytes(identity.key):
remove_idens.append(existing_iden)
for remove_iden in remove_idens:
super().remove(remove_iden)
# Set of identites within N-distance trust
identities = IdentitySet()

View File

@ -1,44 +0,0 @@
import traceback
from nacl.signing import VerifyKey
import logger
from getbykey import get_identity_by_key
from blockprocessingevent import WotCommand
def process_revoke_signature(revoke_signature_payload):
if len(revoke_signature_payload) != 129:
logger.warn(
f'Signature size is invalid for revoking an identity',
terminal=True)
# verify that this is a signature for a trust command
if revoke_signature_payload[0] != WotCommand.REVOKE_TRUST:
logger.warn(
f'Invalid command in signature' , terminal=True)
return
# signer is first 32 bytes
signer = VerifyKey(revoke_signature_payload[1:33])
# revoked is next 32 bytes
revoked = revoke_signature_payload[33:65]
# signature is last 64 bytes
signature = revoke_signature_payload[65:]
# If bad signature, it raises nacl.exceptions.BadSignatureError
signer.verify(
int.to_bytes(revoke_signature_payload[0], 1, 'big') + \
revoked, signature)
# if good signature
try:
signer_identity = get_identity_by_key(bytes(signer))
# noop if already revoked
signer_identity.trusted.remove(get_identity_by_key(revoked))
except KeyError:
# if signer or revoked identity are not in the identity set
# this means they have not been announced yet
traceback.print_exc()
pass

View File

@ -16,7 +16,6 @@ def process_trust_signature(sig_payload: bytes):
if sig_payload[0] != WotCommand.TRUST: if sig_payload[0] != WotCommand.TRUST:
logger.warn( logger.warn(
f'Invalid command in signature') f'Invalid command in signature')
return
# signer is first 32 bytes # signer is first 32 bytes
signer = VerifyKey(sig_payload[1:33]) signer = VerifyKey(sig_payload[1:33])
# signed is next 32 bytes # signed is next 32 bytes

View File

@ -0,0 +1,3 @@
from typing import Set
# Set of identites within N-distance trust
identities: Set['Identity'] = set()

View File

@ -7,8 +7,9 @@ import nacl.exceptions
import logger import logger
import blockdb import blockdb
from identity import Identity, processtrustsignature, identities from identity import Identity, processtrustsignature
from exceptions import IdentitySerializationError from exceptions import IdentitySerializationError
from identityset import identities
from getbykey import get_identity_by_key from getbykey import get_identity_by_key

View File

@ -18,7 +18,7 @@ import onionrblocks
from blockdb import block_db_path from blockdb import block_db_path
from identity import Identity from identity import Identity
from getbykey import get_identity_by_key from getbykey import get_identity_by_key
from identity.identityset import identities from identityset import identities as iden_set
import blockdb import blockdb
@ -29,7 +29,7 @@ class GetIdentityByKeyTest(unittest.TestCase):
iden_public = iden_priv_key.verify_key iden_public = iden_priv_key.verify_key
identity = Identity(iden_priv_key, "test") identity = Identity(iden_priv_key, "test")
identities.add(identity) iden_set.add(identity)
self.assertIsInstance(get_identity_by_key(iden_public), Identity) self.assertIsInstance(get_identity_by_key(iden_public), Identity)

View File

@ -29,14 +29,8 @@ def generate_graph(iden: Identity, depth, max_neighbors):
class IdentityDistanceTest(unittest.TestCase): class IdentityDistanceTest(unittest.TestCase):
def test_distance(self): def test_distance(self):
iden = Identity(os.urandom(32), "1" + secrets.token_hex(4)) iden = Identity(os.urandom(32), "1" + secrets.token_hex(4))
while True: generate_graph(iden, 10, 5)
generate_graph(iden, 10, 5) iden2 = list(list(iden.trusted)[0].trusted)[0]
try:
iden2 = list(list(iden.trusted)[0].trusted)[0]
except IndexError:
pass
else:
break
self.assertEqual(get_distance(iden, iden2), 2) self.assertEqual(get_distance(iden, iden2), 2)

View File

@ -18,7 +18,7 @@ sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/wot') sys.path.append('static-data/default-plugins/wot/wot')
sys.path.append("src/") sys.path.append("src/")
import identity import identity
from identity import identities from identityset import identities
class WotCommand(IntEnum): class WotCommand(IntEnum):

View File

@ -1,63 +0,0 @@
import os, uuid
from random import randint
from time import sleep
from enum import IntEnum, auto
from nacl.signing import SigningKey, VerifyKey
import nacl
import secrets
import onionrblocks
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
import unittest
import sys
sys.path.append(".")
sys.path.append('static-data/default-plugins/wot/wot')
sys.path.append("src/")
import identity
from identity.identityset import identities
class WotCommand(IntEnum):
TRUST = 1
REVOKE_TRUST = auto()
ANNOUNCE = auto()
REVOKE = auto()
class TestSignatureRevokeProcessing(unittest.TestCase):
def test_revoke_trust(self):
# reset identity set
identities.clear()
fake_pubkey = secrets.token_bytes(32)
signing_key = SigningKey.generate()
main_iden = identity.Identity(signing_key.verify_key, "test")
identities.add(main_iden)
identities.add(identity.Identity(fake_pubkey, "test2"))
wot_cmd = int(WotCommand.REVOKE_TRUST).to_bytes(1, 'big')
revoke_signature = signing_key.sign(wot_cmd + fake_pubkey)
revoke_signature_payload = wot_cmd + bytes(signing_key.verify_key) + \
fake_pubkey + revoke_signature.signature
main_iden.trusted.add(
identity.Identity(VerifyKey(fake_pubkey), "test2"))
identity.process_revoke_signature(revoke_signature_payload)
self.assertEqual(len(identities), 2)
self.assertEqual(len(list(identities)[0].trusted), 0)
for iden in identities:
if iden.key == signing_key.verify_key:
for i in iden.trusted:
if i.key == VerifyKey(fake_pubkey):
raise AssertionError("Signed identity found")
break
unittest.main()