Work on WOT block processing and signatures

This commit is contained in:
Kevin F 2022-09-08 18:44:23 -05:00
parent e3d06ff0f5
commit 650e943182
10 changed files with 118 additions and 8 deletions

View File

@ -1 +1 @@
jjvq7itovbt6gcttj5p25zgalht3zraqfvgzojhdqdd2rarriehrfnyd,m4vae3qvhnpz65jscbulv5j7lymhnbfv3hdhfwk7qztknldbgn3b3oqd
hc5qc2vfuq2rkz4mjrrobcjgdjnsni6rovhfkbpc7huupbuiztx6wiid

View File

@ -4,8 +4,6 @@ from typing import TYPE_CHECKING, Set
from .identity import Identity
from .blockprocessing import process_block
# Set of identites within N-distance trust
identities: Set['Identity'] = set()
from .getbykey import get_identity_by_key
from .identityset import identities

View File

@ -16,6 +16,7 @@ class WotCommand(IntEnum):
ANNOUNCE = auto()
REVOKE = auto()
class WotPayload:
def __init__(self, block_data: bytes):
wot_command = WotCommand(

View File

@ -0,0 +1,20 @@
from typing import TYPE_CHECKING, Iterable, Union
from nacl.signing import VerifyKey
if TYPE_CHECKING:
from identity import Identity
from identityset import identities
def get_identity_by_key(
key: Union[bytes, 'VerifyKey']) -> 'Identity':
if not isinstance(key, VerifyKey):
key = VerifyKey(key)
for identity in identities:
print(identity)
if bytes(identity.key) == bytes(key):
return identity
return KeyError("Identity not found")

View File

@ -19,8 +19,6 @@ short_identity_keys = {
'key': 'k'
}
class Identity:
def __init__(
self,

View File

@ -0,0 +1,31 @@
import logger
from nacl.signing import VerifyKey
import nacl.exceptions
from getbykey import get_identity_by_key
def process_trust_signature(signature: bytes):
if len(signature) != 128:
logger.warn(
f'Signature size is invalid for a signed identity')
signer = signature[:32]
signed = signature[32:65]
signature = signature[65:]
# If bad signature, it raises nacl.exceptions.BadSignatureError
VerifyKey.verify(signer, signed, signature)
else:
# if good signature
try:
signer_identity = get_identity_by_key(signer)
signed_identity = get_identity_by_key(signed)
except KeyError:
# if signer or signed identity are not in the identity set
# this means they have not been announced yet
pass
else:
# noop if already signed
signer_identity.trusted.add(signed_identity)

View File

@ -0,0 +1,3 @@
from typing import Set
# Set of identites within N-distance trust
identities: Set['Identity'] = set()

View File

@ -1,8 +1,16 @@
from typing import Generator
import traceback
from nacl.signing import VerifyKey
import nacl.exceptions
import logger
import blockdb
from identity import Identity
from identity import Identity, processtrustsignature
from exceptions import IdentitySerializationError
from identityset import identities
from getbykey import get_identity_by_key
def load_identity_from_block(block) -> Identity:
@ -15,3 +23,15 @@ def load_identities_from_blocks() -> Generator[Identity, None, None]:
yield load_identity_from_block(block)
except IdentitySerializationError:
pass
def load_signatures_from_blocks() -> None:
for block in blockdb.get_blocks_by_type(b'wots'):
try:
# If good signature,
# it adds the signature to the signed identity's trust set
# noop if already signed
processtrustsignature.process_trust_signature(block.data)
except nacl.exceptions.BadSignatureError:
logger.warn('Bad signature in block:')
logger.warn(traceback.format_exc())

View File

@ -0,0 +1,38 @@
import dbm
import os, uuid
import time
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
print("Test directory:", TEST_DIR)
os.environ["ONIONR_HOME"] = TEST_DIR
os.makedirs(TEST_DIR)
from nacl import signing
import unittest
import sys
sys.path.append('static-data/default-plugins/wot/wot')
sys.path.append("src/")
import onionrblocks
from blockdb import block_db_path
from identity import Identity
from getbykey import get_identity_by_key
from identityset import identities as iden_set
import blockdb
class GetIdentityByKeyTest(unittest.TestCase):
def test_get_identity_by_key(self):
iden_priv_key = signing.SigningKey.generate()
iden_public = iden_priv_key.verify_key
identity = Identity(iden_priv_key, "test")
iden_set.add(identity)
self.assertIsInstance(get_identity_by_key(iden_public), Identity)
unittest.main()

View File

@ -18,6 +18,7 @@ from identity import Identity
from loadfromblocks import load_identities_from_blocks
import blockdb
def _safe_remove(path):
try:
os.remove(path)