Browse Source

work on main object

master
Kevin Froman 2 years ago
parent
commit
5662259e19
  1. 7
      kasten/generator/pack.py
  2. 31
      kasten/main.py
  3. BIN
      kasten/pack/__pycache__/__init__.cpython-38.pyc
  4. 8
      tests/test_generator_base.py
  5. 14
      tests/test_kasten.py
  6. 19
      tests/test_kasten_with_base_generator.py
  7. 33
      tests/test_pack.py

7
kasten/generator/pack.py

@ -45,8 +45,7 @@ def pack(data: bytes, data_type: 'KastenDataType',
pass
if timestamp is None:
timestamp = floor(time())
assert int(timestamp)
timestamp = int(timestamp)
kasten_header = [data_type, enc_mode, timestamp]
if signer:
@ -56,6 +55,6 @@ def pack(data: bytes, data_type: 'KastenDataType',
if app_metadata is not None:
kasten_header.append(app_metadata)
kasten_header = packb(kasten_header) + packb(b'\n')
kasten_header = packb(kasten_header) + b'\n'
print(kasten_header + data)
return kasten_header + data

31
kasten/main.py

@ -1,17 +1,40 @@
from msgpack import unpackb
from .types import KastenChecksum
from .types import KastenPacked
from .generator import pack
class Kasten:
def __init__(self, id: KastenChecksum,
packed_bytes: KastenPacked,
generator: 'KastenBaseGenerator',
auto_check_generator = False): # noqa
auto_check_generator = True): # noqa
if auto_check_generator:
generator.validate_id(id, packed_bytes)
self.id = id
self.packed_bytes = packed_bytes
self.generator = generator
header, data = packed_bytes.split(b'\n', 1)
header = unpackb(header, strict_map_key=True)
self.header = header
self.data = data
def check_generator(self, generator=None):
packed = self.get_packed()
if generator is None:
self.generator.validate_id(self.id, self.packed_bytes)
else:
generator(self.id, self.packed_bytes)
# Getters are gross, but they are used here to preserve space
def get_packed(self) -> KastenPacked:
return pack.pack(self.data, self.get_data_type(),
self.get_encryption_mode(),
timestamp=self.get_timestamp())
def get_data_type(self) -> str: return self.header[0]
def get_encryption_mode(self): return self.header[1]
def check_generator(self):
self.generator.validate_id(self.id, self.packed_bytes)
def get_timestamp(self): return self.header[2]

BIN
kasten/pack/__pycache__/__init__.cpython-38.pyc

Binary file not shown.

8
tests/test_generator_base.py

@ -7,12 +7,14 @@ from kasten.generator import KastenBaseGenerator
class TestBaseGenerator(unittest.TestCase):
def test_base_generator(self):
k = b'\x92\xa3bin\x00\xc4\x01\n(\x86!\xd7\xb5\x8ar\xae\x97z'
k = b'\x93\xa3txt\x01\xce^\x97\xe3\xdc\ntest'
invalid = b'\x93\xa3txt\x01\xce^\x97\xe3\xdc\ntes2'
K = KastenBaseGenerator.generate(k)
h = sha3_384(k).digest()
self.assertTrue(len(K.packed_bytes) > 0)
self.assertTrue(len(K.get_packed()) > 0)
KastenBaseGenerator.validate_id(h, k)
self.assertRaises(exceptions.InvalidID, KastenBaseGenerator.validate_id, h, b"\x92\xa3txt\x00\xc4\x01\n(\x86!\xd7\xb5\x8ar\xae\x97z")
self.assertRaises(exceptions.InvalidID, KastenBaseGenerator.validate_id, h, invalid)
self.assertEqual(K.get_packed(), k)
unittest.main()

14
tests/test_kasten.py

@ -1,14 +0,0 @@
import unittest
from kasten import Kasten
from hashlib import sha3_384
from kasten import exceptions
from kasten.generator import KastenBaseGenerator
class TestKasten(unittest.TestCase):
def test_kasten(self):
k = b'\x92\xa3bin\x00\xc4\x01\n(\x86!\xd7\xb5\x8ar\xae\x97z'
unittest.main()

19
tests/test_kasten_with_base_generator.py

@ -0,0 +1,19 @@
import unittest
from hashlib import sha3_384
from kasten import Kasten
from kasten import exceptions
from kasten.generator import KastenBaseGenerator
class TestKastenBaseGenerator(unittest.TestCase):
def test_kasten(self):
k = b'\x93\xa3txt\x01\xce^\x97\xe3\xdc\ntest'
K = Kasten(sha3_384(k).digest(), k, KastenBaseGenerator)
def test_kasten_invalid(self):
k = b'\x93\xa3txt\x01\xce^\x97\xe3\xdc\ntest'
self.assertRaises(exceptions.InvalidID, Kasten, sha3_384(k + b'invalid').digest(), k, KastenBaseGenerator)
unittest.main()

33
tests/test_pack.py

@ -1,5 +1,8 @@
import unittest
import os
from time import time
from math import floor
from msgpack import unpackb
from kasten.generator import pack
from kasten import exceptions
@ -9,17 +12,37 @@ class TestPack(unittest.TestCase):
def test_unsigned_pack(self):
data = os.urandom(10)
t = floor(time())
packed = pack.pack(data, 'bin', 0)
parts = packed.split(b'\n', 1)
self.assertEqual(parts[0], b'\x93\xa3bin\x00\xce^\x95\x82:\xc4\x01')
self.assertEqual(parts[1], data)
unpacked = unpackb(parts[0])
self.assertEqual(unpacked[0], 'bin')
self.assertEqual(unpacked[1], 0)
self.assertAlmostEqual(unpacked[2], t)
self.assertEqual(len(unpacked), 3)
def test_unsigned_with_meta(self):
data = os.urandom(10)
t = floor(time())
packed = pack.pack(data, 'bin', 0, app_metadata={"meme": "doge"})
parts = packed.split(b'\n', 1)
unpacked = unpackb(parts[0])
self.assertEqual(unpacked[0], 'bin')
self.assertEqual(unpacked[1], 0)
self.assertAlmostEqual(unpacked[2], t)
self.assertEqual(unpacked[3], {"meme": "doge"})
self.assertEqual(len(unpacked), 4)
def test_linebreak_data(self):
data = os.urandom(9) + b'\n' + b"okay"
data = os.urandom(10) + b'\n'
t = floor(time())
packed = pack.pack(data, 'bin', 0)
parts = packed.split(b'\n', 1)
self.assertEqual(parts[0], b'\x93\xa3bin\x00\xce^\x95\x82:\xc4\x01')
self.assertEqual(parts[1], data)
unpacked = unpackb(parts[0])
self.assertEqual(unpacked[0], 'bin')
self.assertEqual(unpacked[1], 0)
self.assertAlmostEqual(unpacked[2], t)
self.assertEqual(len(unpacked[0]), 3)
def test_invalid_data_type(self):
data = os.urandom(10)

Loading…
Cancel
Save