Finished loading identities from blocks
This commit is contained in:
parent
57b1e07715
commit
83007cb28d
@ -35,7 +35,7 @@ GNU General Public License for more details.
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
from wot import loadfromblocks, identities
|
||||
|
||||
plugin_name = 'wot'
|
||||
PLUGIN_VERSION = '0.0.0'
|
||||
@ -50,3 +50,5 @@ def on_init(api, data=None):
|
||||
logger.info(
|
||||
f"Web of Trust Plugin v{PLUGIN_VERSION} enabled", terminal=True)
|
||||
onionrplugins.plugin_apis['wot'] = wot_test
|
||||
|
||||
list(map(lambda x: identities.add(x), loadfromblocks.load_identities()))
|
||||
|
@ -1,40 +0,0 @@
|
||||
import os, uuid
|
||||
from random import randint
|
||||
from sqlite3 import Time
|
||||
import socket
|
||||
from queue import Queue
|
||||
from time import sleep
|
||||
import secrets
|
||||
|
||||
|
||||
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
|
||||
import unittest
|
||||
import sys
|
||||
sys.path.append(".")
|
||||
sys.path.append('static-data/default-plugins/wot/')
|
||||
sys.path.append("src/")
|
||||
from wot import identity
|
||||
|
||||
|
||||
def generate_graph(iden: identity.Identity, depth, max_neighbors):
|
||||
c = 0
|
||||
if depth == 0:
|
||||
return
|
||||
for i in range(randint(0, max_neighbors)):
|
||||
i = identity.Identity(secrets.token_hex(16))
|
||||
iden.trusted.add(i)
|
||||
generate_graph(i, depth - 1, max_neighbors)
|
||||
|
||||
|
||||
class IdentityDistanceTest(unittest.TestCase):
|
||||
def test_distance(self):
|
||||
iden = identity.Identity(secrets.token_hex(16))
|
||||
generate_graph(iden, 10, 5)
|
||||
iden2 = list(list(iden.trusted)[0].trusted)[0]
|
||||
|
||||
self.assertEqual(identity.get_distance(iden, iden2), 2)
|
||||
|
||||
unittest.main()
|
@ -1,2 +1,11 @@
|
||||
# The web of trust is a graph of identities where each edge is a signature
|
||||
# of a byte representing a trust level and an identity's public key
|
||||
from typing import TYPE_CHECKING, Set
|
||||
|
||||
from .identity import Identity
|
||||
from .blockprocessing import process_block
|
||||
|
||||
|
||||
# Set of identites within N-distance trust
|
||||
identities: Set['Identity'] = set()
|
||||
|
||||
|
@ -1,2 +1,2 @@
|
||||
class InvalidWotBlock(Exception):
|
||||
pass
|
||||
class InvalidWotBlock(Exception): pass
|
||||
class IdentitySerializationError(Exception): pass
|
@ -1,42 +0,0 @@
|
||||
from collections import deque
|
||||
from typing import Set, Union, List
|
||||
|
||||
|
||||
identities: List['Identity'] = []
|
||||
|
||||
|
||||
class Identity:
|
||||
def __init__(self, key: Union['Ed25519PublicKey', 'Ed25519PrivateKey']):
|
||||
self.trusted: Set[Identity] = set()
|
||||
self.key = key
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.key == other
|
||||
|
||||
def __str__(self):
|
||||
return self.key
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.key)
|
||||
|
||||
|
||||
def get_distance(identity: Identity, identity2: Identity):
|
||||
distance = 0
|
||||
visited = set()
|
||||
stack = deque([identity])
|
||||
|
||||
while stack:
|
||||
current_iden = stack.popleft()
|
||||
|
||||
if current_iden == identity2:
|
||||
return distance
|
||||
distance += 1
|
||||
|
||||
if identity2 in current_iden.trusted:
|
||||
return distance
|
||||
|
||||
for trusted in current_iden.trusted:
|
||||
if trusted not in visited:
|
||||
visited.add(trusted)
|
||||
stack.append(trusted)
|
||||
raise ValueError
|
@ -9,6 +9,7 @@ from nacl.exceptions import BadSignatureError
|
||||
|
||||
from .name import IdentityName
|
||||
from .name import max_len as max_name_len
|
||||
from exceptions import IdentitySerializationError
|
||||
from timestamp import WotTimestamp
|
||||
|
||||
|
||||
@ -18,7 +19,6 @@ short_identity_keys = {
|
||||
'key': 'k'
|
||||
}
|
||||
|
||||
class WotSerializationError(Exception): pass
|
||||
|
||||
|
||||
class Identity:
|
||||
@ -57,7 +57,7 @@ class Identity:
|
||||
the public key and the date (used to prevent replay attacks)
|
||||
"""
|
||||
if not self.private_key:
|
||||
raise WotSerializationError("Cannot serialize public identity")
|
||||
raise IdentitySerializationError("Cannot serialize public identity")
|
||||
signed = self.private_key.sign(
|
||||
self.name.zfill(max_name_len).encode('utf-8') + bytes(self.key) +
|
||||
str(int(time.time())).encode('utf-8'))
|
||||
@ -72,14 +72,14 @@ class Identity:
|
||||
key = VerifyKey(message[max_name_len:max_name_len + 32])
|
||||
date = WotTimestamp(message[max_name_len + 32:].decode('utf-8'))
|
||||
if date > time.time():
|
||||
raise WotSerializationError(
|
||||
raise IdentitySerializationError(
|
||||
"Date in serialized identity is in the future")
|
||||
elif date <= 0:
|
||||
raise WotSerializationError("Date in serialized identity is <= 0")
|
||||
raise IdentitySerializationError("Date in serialized identity is <= 0")
|
||||
try:
|
||||
VerifyKey.verify(key, message, signature)
|
||||
except BadSignatureError:
|
||||
raise WotSerializationError(
|
||||
raise IdentitySerializationError(
|
||||
"Signature in serialized identity is invalid")
|
||||
return cls(key, name)
|
||||
|
||||
|
@ -1,9 +1,17 @@
|
||||
from typing import Generator
|
||||
import blockdb
|
||||
|
||||
from .identity import Identity
|
||||
from identity import Identity
|
||||
from exceptions import IdentitySerializationError
|
||||
|
||||
|
||||
def load_identities_from_blocks(blocks) -> Generator[Identity]:
|
||||
for block in blockdb.get_blocks_by_type('wotb'):
|
||||
yield Identity.deserialize(block.data)
|
||||
def load_identity_from_block(block) -> Identity:
|
||||
return Identity.deserialize(block.data)
|
||||
|
||||
|
||||
def load_identities_from_blocks() -> Generator[Identity, None, None]:
|
||||
for block in blockdb.get_blocks_by_type(b'wotb'):
|
||||
try:
|
||||
yield load_identity_from_block(block)
|
||||
except IdentitySerializationError:
|
||||
pass
|
||||
|
@ -11,27 +11,27 @@ os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
import unittest
|
||||
import sys
|
||||
sys.path.append(".")
|
||||
sys.path.append('static-data/default-plugins/wot/')
|
||||
sys.path.append('static-data/default-plugins/wot/wot')
|
||||
sys.path.append("src/")
|
||||
from wot import identity
|
||||
from identity import Identity, get_distance
|
||||
|
||||
|
||||
def generate_graph(iden: identity.Identity, depth, max_neighbors):
|
||||
def generate_graph(iden: Identity, depth, max_neighbors):
|
||||
c = 0
|
||||
if depth == 0:
|
||||
return
|
||||
for i in range(randint(0, max_neighbors)):
|
||||
i = identity.Identity(secrets.token_hex(16))
|
||||
i = Identity(os.urandom(32), "1" + secrets.token_hex(4))
|
||||
iden.trusted.add(i)
|
||||
generate_graph(i, depth - 1, max_neighbors)
|
||||
|
||||
|
||||
class IdentityDistanceTest(unittest.TestCase):
|
||||
def test_distance(self):
|
||||
iden = identity.Identity(secrets.token_hex(16))
|
||||
iden = Identity(os.urandom(32), "1" + secrets.token_hex(4))
|
||||
generate_graph(iden, 10, 5)
|
||||
iden2 = list(list(iden.trusted)[0].trusted)[0]
|
||||
|
||||
self.assertEqual(identity.get_distance(iden, iden2), 2)
|
||||
self.assertEqual(get_distance(iden, iden2), 2)
|
||||
|
||||
unittest.main()
|
||||
|
@ -42,6 +42,7 @@ class IdentitySerializeTest(unittest.TestCase):
|
||||
|
||||
identity = Identity(iden_priv_key, "test")
|
||||
serialized = identity.serialize()
|
||||
print(serialized)
|
||||
self.assertEqual(len(serialized), expected_serialized_len)
|
||||
self.assertEqual(serialized, expected_serialized)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user