Wot import fixes
This commit is contained in:
parent
2c9836c54f
commit
171ea25f46
@ -1,4 +1,4 @@
|
||||
{ "name": "example",
|
||||
{ "name": "wot",
|
||||
"version": "0.0.0",
|
||||
"author": "onionr"
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
"""Onionr - Private P2P Communication.
|
||||
|
||||
Default example plugin for devs or to test blocks
|
||||
Web of Trust Plugin
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
@ -35,10 +35,10 @@ GNU General Public License for more details.
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
from wot import loadfromblocks, identities
|
||||
|
||||
plugin_name = 'wot'
|
||||
PLUGIN_VERSION = '0.0.0'
|
||||
from wot import loadfromblocks, identities
|
||||
|
||||
|
||||
|
||||
def wot_test(arg: int):
|
||||
|
@ -4,5 +4,5 @@ from typing import TYPE_CHECKING, Set
|
||||
|
||||
from .identity import Identity
|
||||
from .getbykey import get_identity_by_key
|
||||
from .identityset import identities
|
||||
from .identity import identities
|
||||
|
||||
|
@ -7,7 +7,7 @@ import msgpack
|
||||
if TYPE_CHECKING:
|
||||
from onionrblocks import Block
|
||||
|
||||
from exceptions import InvalidWotBlock
|
||||
from wot.exceptions import InvalidWotBlock
|
||||
|
||||
|
||||
class WotCommand(IntEnum):
|
||||
|
@ -5,7 +5,7 @@ from nacl.signing import VerifyKey
|
||||
if TYPE_CHECKING:
|
||||
from identity import Identity
|
||||
|
||||
from identity.identityset import identities
|
||||
from wot.identity.identityset import identities
|
||||
|
||||
|
||||
def get_identity_by_key(
|
||||
|
@ -7,13 +7,14 @@ from nacl.signing import SigningKey, VerifyKey
|
||||
from nacl.encoding import Base32Encoder
|
||||
from nacl.exceptions import BadSignatureError
|
||||
|
||||
from .processtrustsignature import process_trust_signature
|
||||
from .proccessrevokesignature import process_revoke_signature
|
||||
from .name import IdentityName
|
||||
from .name import max_len as max_name_len
|
||||
from .identityset import IdentitySet, identities
|
||||
from exceptions import IdentitySerializationError
|
||||
from timestamp import WotTimestamp
|
||||
from wot.identity.processtrustsignature import process_trust_signature
|
||||
from wot.identity.processrevokesignature import process_revoke_signature
|
||||
from wot.identity.processidentityannounce import process_identity_announce
|
||||
from wot.identity.name import IdentityName
|
||||
from wot.identity.name import max_len as max_name_len
|
||||
from wot.identity.identityset import IdentitySet, identities
|
||||
from wot.exceptions import IdentitySerializationError
|
||||
from wot.timestamp import WotTimestamp
|
||||
|
||||
|
||||
short_identity_keys = {
|
||||
@ -58,7 +59,7 @@ class Identity:
|
||||
def serialize(self) -> bytes:
|
||||
"""
|
||||
A serialized identity is the name signed by the private key plus
|
||||
the public key and the date (used to prevent replay attacks)
|
||||
the public key
|
||||
"""
|
||||
if not self.private_key:
|
||||
raise IdentitySerializationError("Cannot serialize public identity")
|
||||
|
@ -0,0 +1,28 @@
|
||||
import logger
|
||||
|
||||
from nacl.signing import VerifyKey
|
||||
|
||||
from wot.blockprocessingevent import WotCommand
|
||||
from wot.identity.identityset import identities
|
||||
|
||||
def process_identity_announce(identity_announce_payload):
|
||||
if len(identity_announce_payload) != 97:
|
||||
logger.warn(
|
||||
f'Identity announce signature size is invalid',
|
||||
terminal=True)
|
||||
|
||||
# verify that this is a signature for an announce command
|
||||
if identity_announce_payload[0] != WotCommand.ANNOUNCE:
|
||||
logger.warn(
|
||||
f'Invalid command in signature' , terminal=True)
|
||||
return
|
||||
# signer is first 32 bytes
|
||||
signer = identity_announce_payload[1:33]
|
||||
# signature is last 64 bytes
|
||||
signature = identity_announce_payload[33:]
|
||||
|
||||
# If bad signature, it raises nacl.exceptions.BadSignatureError
|
||||
VerifyKey(signer).verify(identity_announce_payload[0] + signer, signature)
|
||||
|
||||
# noop if already announced
|
||||
identities.add
|
@ -4,8 +4,8 @@ from nacl.signing import VerifyKey
|
||||
|
||||
import logger
|
||||
|
||||
from getbykey import get_identity_by_key
|
||||
from blockprocessingevent import WotCommand
|
||||
from wot.getbykey import get_identity_by_key
|
||||
from wot.blockprocessingevent import WotCommand
|
||||
|
||||
|
||||
def process_revoke_signature(revoke_signature_payload):
|
@ -3,8 +3,8 @@ import logger
|
||||
|
||||
from nacl.signing import VerifyKey
|
||||
|
||||
from getbykey import get_identity_by_key
|
||||
from blockprocessingevent import WotCommand
|
||||
from wot.getbykey import get_identity_by_key
|
||||
from wot.blockprocessingevent import WotCommand
|
||||
|
||||
|
||||
def process_trust_signature(sig_payload: bytes):
|
||||
|
@ -7,9 +7,9 @@ import nacl.exceptions
|
||||
import logger
|
||||
import blockdb
|
||||
|
||||
from identity import Identity, processtrustsignature, identities
|
||||
from exceptions import IdentitySerializationError
|
||||
from getbykey import get_identity_by_key
|
||||
from wot.identity import Identity, processtrustsignature, identities
|
||||
from wot.exceptions import IdentitySerializationError
|
||||
from wot.getbykey import get_identity_by_key
|
||||
|
||||
|
||||
def load_identity_from_block(block) -> Identity:
|
||||
|
@ -0,0 +1,52 @@
|
||||
import os, uuid
|
||||
from random import randint
|
||||
from time import sleep
|
||||
from enum import IntEnum, auto
|
||||
from nacl.signing import SigningKey, VerifyKey
|
||||
import nacl
|
||||
import secrets
|
||||
import onionrblocks
|
||||
|
||||
|
||||
TEST_DIR = 'testdata/%s-%s' % (str(uuid.uuid4())[:6], os.path.basename(__file__)) + '/'
|
||||
print("Test directory:", TEST_DIR)
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR
|
||||
|
||||
import unittest
|
||||
import sys
|
||||
sys.path.append(".")
|
||||
sys.path.append('static-data/default-plugins/wot/wot')
|
||||
sys.path.append("src/")
|
||||
import identity
|
||||
from identity.identityset import identities
|
||||
|
||||
|
||||
class WotCommand(IntEnum):
|
||||
TRUST = 1
|
||||
REVOKE_TRUST = auto()
|
||||
ANNOUNCE = auto()
|
||||
REVOKE = auto()
|
||||
|
||||
|
||||
class TestAnnounceIdentityPayload(unittest.TestCase):
|
||||
def test_announce_identity_payload(self):
|
||||
# reset identity set
|
||||
identities.clear()
|
||||
|
||||
signing_key = SigningKey.generate()
|
||||
main_iden = identity.Identity(signing_key.verify_key, "test")
|
||||
|
||||
wot_cmd = int(WotCommand.ANNOUNCE).to_bytes(1, 'big')
|
||||
announce_signature = signing_key.sign(wot_cmd + bytes(main_iden))
|
||||
announce_signature_payload = wot_cmd + bytes(signing_key.verify_key) + \
|
||||
bytes(announce_signature)
|
||||
|
||||
identity.process_identity_announce(announce_signature_payload)
|
||||
|
||||
self.assertEqual(main_iden, identities[0])
|
||||
self.assertEqual(len(identities), 1)
|
||||
self.assertEqual(len(main_iden.trusted), 0)
|
||||
|
||||
|
||||
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user