From 19fa1287106a6e4b380d42fe89aa7fc7b4f00ef6 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Thu, 25 Jul 2019 11:14:13 -0500 Subject: [PATCH] progress in removing core --- onionr/dbcreator.py | 6 +- onionr/filepaths/__init__.py | 1 + onionr/onionr.py | 2 +- onionr/onionrcrypto/__init__.py | 3 +- onionr/onionrcrypto/generate.py | 24 +- .../static-data/default-plugins/pms/main.py | 258 ------------------ onionr/tests/test_blocks.py | 9 +- onionr/tests/test_database_actions.py | 23 +- onionr/tests/test_database_creation.py | 70 ----- onionr/tests/test_forward_secrecy.py | 15 +- onionr/tests/test_highlevelcrypto.py | 99 +++---- onionr/tests/test_onionrusers.py | 47 ++-- onionr/tests/test_stringvalidations.py | 4 - onionr/utils/createdirs.py | 2 + 14 files changed, 114 insertions(+), 449 deletions(-) delete mode 100755 onionr/tests/test_database_creation.py diff --git a/onionr/dbcreator.py b/onionr/dbcreator.py index 596112af..a864ff26 100755 --- a/onionr/dbcreator.py +++ b/onionr/dbcreator.py @@ -17,8 +17,9 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . ''' -from coredb import dbfiles import sqlite3, os +from coredb import dbfiles +import filepaths def createAddressDB(): ''' @@ -116,6 +117,9 @@ def createBlockDB(): def createBlockDataDB(): if os.path.exists(dbfiles.block_data_db): raise FileExistsError("Block data database already exists") + else: + if not os.path.exists(filepaths.block_data_location): + os.mkdir(filepaths.block_data_location) conn = sqlite3.connect(dbfiles.block_data_db) c = conn.cursor() c.execute('''CREATE TABLE blockData( diff --git a/onionr/filepaths/__init__.py b/onionr/filepaths/__init__.py index a75d1fb1..695a4b3e 100644 --- a/onionr/filepaths/__init__.py +++ b/onionr/filepaths/__init__.py @@ -4,6 +4,7 @@ if not home.endswith('/'): home += '/' usage_file = home + 'disk-usage.txt' block_data_location = home + 'blocks/' +contacts_location = home + 'contacts/' public_API_host_file = home + 'public-host.txt' private_API_host_file = home + 'private-host.txt' bootstrap_file_location = 'static-data/bootstrap-nodes.txt' diff --git a/onionr/onionr.py b/onionr/onionr.py index f2ead36f..a87497fa 100755 --- a/onionr/onionr.py +++ b/onionr/onionr.py @@ -32,7 +32,7 @@ if sys.version_info[0] == 2 or sys.version_info[1] < MIN_PY_VERSION: from utils import detectoptimization if detectoptimization.detect_optimization(): - sys.stderr.write('Error, Onionr cannot be run in optimized mode\n') + sys.stderr.write('Error, Onionr cannot be run with an optimized Python interpreter\n') sys.exit(1) from utils import createdirs createdirs.create_dirs() diff --git a/onionr/onionrcrypto/__init__.py b/onionr/onionrcrypto/__init__.py index 573faa2c..7b4a317f 100755 --- a/onionr/onionrcrypto/__init__.py +++ b/onionr/onionrcrypto/__init__.py @@ -18,7 +18,8 @@ along with this program. If not, see . ''' -from . import generate, hashers, getourkeypair, signing, encryption +from . import generate, hashers, getourkeypair, signing, encryption, cryptoutils +generate_deterministic = generate.generate_deterministic generate = generate.generate_pub_key keypair = getourkeypair.get_keypair() diff --git a/onionr/onionrcrypto/generate.py b/onionr/onionrcrypto/generate.py index a92271de..37f40826 100644 --- a/onionr/onionrcrypto/generate.py +++ b/onionr/onionrcrypto/generate.py @@ -1,6 +1,26 @@ -import nacl.signing, nacl.encoding +import nacl.signing, nacl.encoding, nacl.pwhash +import onionrexceptions +from onionrutils import bytesconverter def generate_pub_key(): '''Generate a Ed25519 public key pair, return tuple of base32encoded pubkey, privkey''' private_key = nacl.signing.SigningKey.generate() public_key = private_key.verify_key.encode(encoder=nacl.encoding.Base32Encoder()) - return (public_key.decode(), private_key.encode(encoder=nacl.encoding.Base32Encoder()).decode()) \ No newline at end of file + return (public_key.decode(), private_key.encode(encoder=nacl.encoding.Base32Encoder()).decode()) + +def generate_deterministic(passphrase, bypassCheck=False): + '''Generate a Ed25519 public key pair from a password''' + passStrength = 25 + passphrase = bytesconverter.str_to_bytes(passphrase) # Convert to bytes if not already + # Validate passphrase length + if not bypassCheck: + if len(passphrase) < passStrength: + raise onionrexceptions.PasswordStrengthError("Passphase must be at least %s characters" % (passStrength,)) + # KDF values + kdf = nacl.pwhash.argon2id.kdf + salt = b"U81Q7llrQcdTP0Ux" # Does not need to be unique or secret, but must be 16 bytes + ops = nacl.pwhash.argon2id.OPSLIMIT_SENSITIVE + mem = nacl.pwhash.argon2id.MEMLIMIT_SENSITIVE + + key = kdf(32, passphrase, salt, opslimit=ops, memlimit=mem) # Generate seed for ed25519 key + key = nacl.signing.SigningKey(key) + return (key.verify_key.encode(nacl.encoding.Base32Encoder).decode(), key.encode(nacl.encoding.Base32Encoder).decode()) \ No newline at end of file diff --git a/onionr/static-data/default-plugins/pms/main.py b/onionr/static-data/default-plugins/pms/main.py index 48ec75b0..4f1cc23a 100755 --- a/onionr/static-data/default-plugins/pms/main.py +++ b/onionr/static-data/default-plugins/pms/main.py @@ -34,264 +34,6 @@ PLUGIN_VERSION = '0.0.1' sys.path.insert(0, os.path.dirname(os.path.realpath(__file__))) import sentboxdb, mailapi, loadinbox # import after path insert flask_blueprint = mailapi.flask_blueprint -""" -def draw_border(text): - # This function taken from https://stackoverflow.com/a/20757491 by https://stackoverflow.com/users/816449/bunyk, under https://creativecommons.org/licenses/by-sa/3.0/ - lines = text.splitlines() - width = max(len(s) for s in lines) - res = ['┌' + '─' * width + '┐'] - for s in lines: - res.append('│' + (s + ' ' * width)[:width] + '│') - res.append('└' + '─' * width + '┘') - return '\n'.join(res) - -class MailStrings: - def __init__(self, mailInstance): - self.mailInstance = mailInstance - - self.programTag = 'OnionrMail v%s' % (PLUGIN_VERSION) - choices = ['view inbox', 'view sentbox', 'send message', 'toggle pseudonymity', 'quit'] - self.mainMenuChoices = choices - self.mainMenu = '''----------------- - 1. %s - 2. %s - 3. %s - 4. %s - 5. %s''' % (choices[0], choices[1], choices[2], choices[3], choices[4]) - -class OnionrMail: - def __init__(self, pluginapi): - self.strings = MailStrings(self) - - self.sentboxTools = sentboxdb.SentBox() - self.sentboxList = [] - self.sentMessages = {} - self.doSigs = True - return - - def inbox(self): - blockCount = 0 - pmBlockMap = {} - pmBlocks = {} - logger.info('Decrypting messages...', terminal=True) - choice = '' - displayList = [] - subject = '' - - # this could use a lot of memory if someone has received a lot of messages - for blockHash in blockmetadb.get_blocks_by_type('pm'): - pmBlocks[blockHash] = Block(blockHash, core=self.myCore) - pmBlocks[blockHash].decrypt() - blockCount = 0 - for blockHash in pmBlocks: - if not pmBlocks[blockHash].decrypted: - continue - blockCount += 1 - pmBlockMap[blockCount] = blockHash - - block = pmBlocks[blockHash] - senderKey = block.signer - try: - senderKey = senderKey.decode() - except AttributeError: - pass - senderDisplay = onionrusers.OnionrUser(self.myCore, senderKey).getName() - if senderDisplay == 'anonymous': - senderDisplay = senderKey - - blockDate = pmBlocks[blockHash].getDate().strftime("%m/%d %H:%M") - try: - subject = pmBlocks[blockHash].bmetadata['subject'] - except KeyError: - subject = '' - - displayList.append('%s. %s - %s - <%s>: %s' % (blockCount, blockDate, senderDisplay[:12], subject[:10], blockHash)) - while choice not in ('-q', 'q', 'quit'): - for i in displayList: - logger.info(i, terminal=True) - try: - choice = logger.readline('Enter a block number, -r to refresh, or -q to stop: ').strip().lower() - except (EOFError, KeyboardInterrupt): - choice = '-q' - - if choice in ('-q', 'q', 'quit'): - continue - - if choice in ('-r', 'r', 'refresh'): - # dirty hack - self.inbox() - return - - try: - choice = int(choice) - except ValueError: - pass - else: - try: - pmBlockMap[choice] - readBlock = pmBlocks[pmBlockMap[choice]] - except KeyError: - pass - else: - cancel = '' - readBlock.verifySig() - senderDisplay = bytesconverter.bytes_to_str(readBlock.signer) - if len(senderDisplay.strip()) == 0: - senderDisplay = 'Anonymous' - logger.info('Message received from %s' % (senderDisplay,), terminal=True) - logger.info('Valid signature: %s' % readBlock.validSig, terminal=True) - - if not readBlock.validSig: - logger.warn('This message has an INVALID/NO signature. ANYONE could have sent this message.', terminal=True) - cancel = logger.readline('Press enter to continue to message, or -q to not open the message (recommended).') - print('') - if cancel != '-q': - try: - print(draw_border(escapeansi.escape_ANSI(readBlock.bcontent.decode().strip()))) - except ValueError: - logger.warn('Error presenting message. This is usually due to a malformed or blank message.', terminal=True) - pass - if readBlock.validSig: - reply = logger.readline("Press enter to continue, or enter %s to reply" % ("-r",)) - print('') - if reply == "-r": - self.draft_message(bytesconverter.bytes_to_str(readBlock.signer,)) - else: - logger.readline("Press enter to continue") - print('') - return - - def sentbox(self): - ''' - Display sent mail messages - ''' - entering = True - while entering: - self.get_sent_list() - logger.info('Enter a block number or -q to return', terminal=True) - try: - choice = input('>') - except (EOFError, KeyboardInterrupt) as e: - entering = False - else: - try: - choice = int(choice) - 1 - except ValueError: - pass - else: - try: - self.sentboxList[int(choice)] - except (IndexError, ValueError) as e: - logger.warn('Invalid block.', terminal=True) - else: - logger.info('Sent to: ' + self.sentMessages[self.sentboxList[int(choice)]][1], terminal=True) - # Print ansi escaped sent message - logger.info(escapeansi.escape_ANSI(self.sentMessages[self.sentboxList[int(choice)]][0]), terminal=True) - input('Press enter to continue...') - finally: - if choice == '-q': - entering = False - return - - def get_sent_list(self, display=True): - count = 1 - self.sentboxList = [] - self.sentMessages = {} - for i in self.sentboxTools.listSent(): - self.sentboxList.append(i['hash']) - self.sentMessages[i['hash']] = (bytesconverter.bytes_to_str(i['message']), i['peer'], i['subject']) - if display: - logger.info('%s. %s - %s - (%s) - %s' % (count, i['hash'], i['peer'][:12], i['subject'], i['date']), terminal=True) - count += 1 - return json.dumps(self.sentMessages) - - def draft_message(self, recip=''): - message = '' - newLine = '' - subject = '' - entering = False - if len(recip) == 0: - entering = True - while entering: - try: - recip = logger.readline('Enter peer address, or -q to stop:').strip() - if recip in ('-q', 'q'): - raise EOFError - if not stringvalidators.validate_pub_key(recip): - raise onionrexceptions.InvalidPubkey('Must be a valid ed25519 base32 encoded public key') - except onionrexceptions.InvalidPubkey: - logger.warn('Invalid public key', terminal=True) - except (KeyboardInterrupt, EOFError): - entering = False - else: - break - else: - # if -q or ctrl-c/d, exit function here, otherwise we successfully got the public key - return - try: - subject = logger.readline('Message subject: ') - except (KeyboardInterrupt, EOFError): - pass - - cancelEnter = False - logger.info('Enter your message, stop by entering -q on a new line. -c to cancel', terminal=True) - while newLine != '-q': - try: - newLine = input() - except (KeyboardInterrupt, EOFError): - cancelEnter = True - if newLine == '-c': - cancelEnter = True - break - if newLine == '-q': - continue - newLine += '\n' - message += newLine - - if not cancelEnter: - logger.info('Inserting encrypted message as Onionr block....', terminal=True) - - blockID = self.myCore.insertBlock(message, header='pm', encryptType='asym', asymPeer=recip, sign=self.doSigs, meta={'subject': subject}) - - def toggle_signing(self): - self.doSigs = not self.doSigs - - def menu(self): - choice = '' - while True: - sigMsg = 'Message Signing: %s' - - logger.info(self.strings.programTag + '\n\nUser ID: ' + self.myCore._crypto.pubKey, terminal=True) - if self.doSigs: - sigMsg = sigMsg % ('enabled',) - else: - sigMsg = sigMsg % ('disabled (Your messages cannot be trusted)',) - if self.doSigs: - logger.info(sigMsg, terminal=True) - else: - logger.warn(sigMsg, terminal=True) - logger.info(self.strings.mainMenu.title(), terminal=True) # print out main menu - try: - choice = logger.readline('Enter 1-%s:\n' % (len(self.strings.mainMenuChoices))).lower().strip() - except (KeyboardInterrupt, EOFError): - choice = '5' - - if choice in (self.strings.mainMenuChoices[0], '1'): - self.inbox() - elif choice in (self.strings.mainMenuChoices[1], '2'): - self.sentbox() - elif choice in (self.strings.mainMenuChoices[2], '3'): - self.draft_message() - elif choice in (self.strings.mainMenuChoices[3], '4'): - self.toggle_signing() - elif choice in (self.strings.mainMenuChoices[4], '5'): - logger.info('Goodbye.', terminal=True) - break - elif choice == '': - pass - else: - logger.warn('Invalid choice.', terminal=True) - return """ def add_deleted(keyStore, bHash): existing = keyStore.get('deleted_mail') diff --git a/onionr/tests/test_blocks.py b/onionr/tests/test_blocks.py index 257238b1..13332e08 100755 --- a/onionr/tests/test_blocks.py +++ b/onionr/tests/test_blocks.py @@ -7,13 +7,12 @@ import nacl.signing, nacl.hash, nacl.encoding TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' print("Test directory:", TEST_DIR) os.environ["ONIONR_HOME"] = TEST_DIR -import core, onionr - -c = core.Core() - +import onionrblocks +from utils import createdirs +createdirs.create_dirs() class OnionrBlockTests(unittest.TestCase): def test_plaintext_insert(self): message = 'hello world' - c.insertBlock(message) + onionrblocks.insert(message) unittest.main() \ No newline at end of file diff --git a/onionr/tests/test_database_actions.py b/onionr/tests/test_database_actions.py index 76a1350c..54607f43 100755 --- a/onionr/tests/test_database_actions.py +++ b/onionr/tests/test_database_actions.py @@ -6,35 +6,34 @@ TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' print("Test directory:", TEST_DIR) os.environ["ONIONR_HOME"] = TEST_DIR from urllib.request import pathname2url -import core, onionr - -c = core.Core() - +from coredb import keydb +from utils import createdirs +createdirs.create_dirs() class OnionrTests(unittest.TestCase): def test_address_add(self): testAddresses = ['facebookcorewwwi.onion', '56kmnycrvepfarolhnx6t2dvmldfeyg7jdymwgjb7jjzg47u2lqw2sad.onion', '5bvb5ncnfr4dlsfriwczpzcvo65kn7fnnlnt2ln7qvhzna2xaldq.b32.i2p'] for address in testAddresses: - c.addAddress(address) - dbAddresses = c.listAdders() + keydb.addkeys.add_address(address) + dbAddresses = keydb.listkeys.list_adders() for address in testAddresses: self.assertIn(address, dbAddresses) invalidAddresses = [None, '', ' ', '\t', '\n', ' test ', 24, 'fake.onion', 'fake.b32.i2p'] for address in invalidAddresses: try: - c.addAddress(address) + keydb.addkeys.add_address(address) except TypeError: pass - dbAddresses = c.listAdders() + dbAddresses = keydb.listkeys.list_adders() for address in invalidAddresses: self.assertNotIn(address, dbAddresses) def test_address_info(self): adder = 'nytimes3xbfgragh.onion' - c.addAddress(adder) - self.assertNotEqual(c.getAddressInfo(adder, 'success'), 1000) - c.setAddressInfo(adder, 'success', 1000) - self.assertEqual(c.getAddressInfo(adder, 'success'), 1000) + keydb.addkeys.add_address(adder) + self.assertNotEqual(keydb.transportinfo.get_address_info(adder, 'success'), 1000) + keydb.transportinfo.set_address_info(adder, 'success', 1000) + self.assertEqual(keydb.transportinfo.get_address_info(adder, 'success'), 1000) unittest.main() \ No newline at end of file diff --git a/onionr/tests/test_database_creation.py b/onionr/tests/test_database_creation.py deleted file mode 100755 index 09da1aaa..00000000 --- a/onionr/tests/test_database_creation.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python3 -import sys, os -sys.path.append(".") -import unittest, uuid, sqlite3 -TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' -print("Test directory:", TEST_DIR) -os.environ["ONIONR_HOME"] = TEST_DIR -from urllib.request import pathname2url -import core, onionr - -core.Core() - -class OnionrTests(unittest.TestCase): - - def test_peer_db_creation(self): - try: - dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'peers.db')) - conn = sqlite3.connect(dburi, uri=True, timeout=30) - cursor = conn.cursor() - conn.close() - except sqlite3.OperationalError: - self.assertTrue(False) - else: - self.assertTrue(True) - - def test_block_db_creation(self): - try: - dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'blocks.db')) - conn = sqlite3.connect(dburi, uri=True, timeout=30) - cursor = conn.cursor() - conn.close() - except sqlite3.OperationalError: - self.assertTrue(False) - else: - self.assertTrue(True) - - def test_forward_keys_db_creation(self): - try: - dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'forward-keys.db')) - conn = sqlite3.connect(dburi, uri=True, timeout=30) - cursor = conn.cursor() - conn.close() - except sqlite3.OperationalError: - self.assertTrue(False) - else: - self.assertTrue(True) - - def test_address_db_creation(self): - try: - dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'address.db')) - conn = sqlite3.connect(dburi, uri=True, timeout=30) - cursor = conn.cursor() - conn.close() - except sqlite3.OperationalError: - self.assertTrue(False) - else: - self.assertTrue(True) - - def blacklist_db_creation(self): - try: - dburi = 'file:{}?mode=rw'.format(pathname2url(TEST_DIR + 'blacklist.db')) - conn = sqlite3.connect(dburi, uri=True, timeout=30) - cursor = conn.cursor() - conn.close() - except sqlite3.OperationalError: - self.assertTrue(False) - else: - self.assertTrue(True) - -unittest.main() \ No newline at end of file diff --git a/onionr/tests/test_forward_secrecy.py b/onionr/tests/test_forward_secrecy.py index 0110be12..897dc1b6 100755 --- a/onionr/tests/test_forward_secrecy.py +++ b/onionr/tests/test_forward_secrecy.py @@ -4,11 +4,12 @@ sys.path.append(".") import unittest, uuid TEST_DIR_1 = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' TEST_DIR_2 = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' -import core, onionr, time +import onionr, time -import onionrexceptions +import onionrexceptions, onionrcrypto as crypto from onionrusers import onionrusers from onionrusers import contactmanager +from utils import createdirs class OnionrForwardSecrecyTests(unittest.TestCase): ''' @@ -17,23 +18,23 @@ class OnionrForwardSecrecyTests(unittest.TestCase): def test_forward_encrypt(self): os.environ["ONIONR_HOME"] = TEST_DIR_1 - o = onionr.Onionr() + createdirs.create_dirs() - friend = o.onionrCore._crypto.generatePubKey() + friend = crypto.generate() - friendUser = onionrusers.OnionrUser(o.onionrCore, friend[0], saveUser=True) + friendUser = onionrusers.OnionrUser(friend[0], saveUser=True) for x in range(5): message = 'hello world %s' % (random.randint(1, 1000)) forwardKey = friendUser.generateForwardKey() - fakeForwardPair = o.onionrCore._crypto.generatePubKey() + fakeForwardPair = crypto.generate() self.assertTrue(friendUser.addForwardKey(fakeForwardPair[0])) encrypted = friendUser.forwardEncrypt(message) - decrypted = o.onionrCore._crypto.pubKeyDecrypt(encrypted[0], privkey=fakeForwardPair[1], encodedData=True) + decrypted = crypto.encryption.pub_key_decrypt(encrypted[0], privkey=fakeForwardPair[1], encodedData=True) self.assertEqual(decrypted, message.encode()) return diff --git a/onionr/tests/test_highlevelcrypto.py b/onionr/tests/test_highlevelcrypto.py index aab2e198..a3995a4e 100755 --- a/onionr/tests/test_highlevelcrypto.py +++ b/onionr/tests/test_highlevelcrypto.py @@ -8,34 +8,32 @@ from onionrutils import stringvalidators, mnemonickeys TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' print("Test directory:", TEST_DIR) os.environ["ONIONR_HOME"] = TEST_DIR -import core, onionr, onionrexceptions +import onionrcrypto as crypto, onionrexceptions -c = core.Core() -crypto = c._crypto class OnionrCryptoTests(unittest.TestCase): def test_blake2b(self): - self.assertEqual(crypto.blake2bHash('test'), crypto.blake2bHash(b'test')) - self.assertEqual(crypto.blake2bHash(b'test'), crypto.blake2bHash(b'test')) + self.assertEqual(crypto.hashers.blake2b_hash('test'), crypto.hashers.blake2b_hash(b'test')) + self.assertEqual(crypto.hashers.blake2b_hash(b'test'), crypto.hashers.blake2b_hash(b'test')) - self.assertNotEqual(crypto.blake2bHash(''), crypto.blake2bHash(b'test')) + self.assertNotEqual(crypto.hashers.blake2b_hash(''), crypto.hashers.blake2b_hash(b'test')) try: - crypto.blake2bHash(None) + crypto.hashers.blake2b_hash(None) except nacl.exceptions.TypeError: pass else: self.assertTrue(False) - self.assertEqual(nacl.hash.blake2b(b'test'), crypto.blake2bHash(b'test')) + self.assertEqual(nacl.hash.blake2b(b'test'), crypto.hashers.blake2b_hash(b'test')) def test_sha3256(self): hasher = hashlib.sha3_256() - self.assertEqual(crypto.sha3Hash('test'), crypto.sha3Hash(b'test')) - self.assertEqual(crypto.sha3Hash(b'test'), crypto.sha3Hash(b'test')) + self.assertEqual(crypto.hashers.sha3_hash('test'), crypto.hashers.sha3_hash(b'test')) + self.assertEqual(crypto.hashers.sha3_hash(b'test'), crypto.hashers.sha3_hash(b'test')) - self.assertNotEqual(crypto.sha3Hash(''), crypto.sha3Hash(b'test')) + self.assertNotEqual(crypto.hashers.sha3_hash(''), crypto.hashers.sha3_hash(b'test')) try: - crypto.sha3Hash(None) + crypto.hashers.sha3_hash(None) except TypeError: pass else: @@ -43,21 +41,21 @@ class OnionrCryptoTests(unittest.TestCase): hasher.update(b'test') normal = hasher.hexdigest() - self.assertEqual(crypto.sha3Hash(b'test'), normal) + self.assertEqual(crypto.hashers.sha3_hash(b'test'), normal) def valid_default_id(self): - self.assertTrue(stringvalidators.validate_pub_key(crypto.pubKey)) + self.assertTrue(stringvalidators.validate_pub_key(crypto.pub_key)) def test_human_readable_length(self): - human = mnemonickeys.get_human_readable_ID(c) + human = mnemonickeys.get_human_readable_ID() self.assertTrue(len(human.split(' ')) == 32) def test_safe_compare(self): - self.assertTrue(crypto.safeCompare('test', 'test')) - self.assertTrue(crypto.safeCompare('test', b'test')) - self.assertFalse(crypto.safeCompare('test', 'test2')) + self.assertTrue(crypto.cryptoutils.safe_compare('test', 'test')) + self.assertTrue(crypto.cryptoutils.safe_compare('test', b'test')) + self.assertFalse(crypto.cryptoutils.safe_compare('test', 'test2')) try: - crypto.safeCompare('test', None) + crypto.cryptoutils.safe_compare('test', None) except TypeError: pass else: @@ -67,82 +65,53 @@ class OnionrCryptoTests(unittest.TestCase): # Small chance that the randomized list will be same. Rerun test a couple times if it fails startList = ['cat', 'dog', 'moose', 'rabbit', 'monkey', 'crab', 'human', 'dolphin', 'whale', 'etc'] * 10 - self.assertNotEqual(startList, list(crypto.randomShuffle(startList))) - self.assertTrue(len(list(crypto.randomShuffle(startList))) == len(startList)) + self.assertNotEqual(startList, list(crypto.cryptoutils.random_shuffle(startList))) + self.assertTrue(len(list(crypto.cryptoutils.random_shuffle(startList))) == len(startList)) def test_asymmetric(self): - keyPair = crypto.generatePubKey() - keyPair2 = crypto.generatePubKey() + keyPair = crypto.generate() + keyPair2 = crypto.generate() message = "hello world" - self.assertTrue(len(crypto.pubKeyEncrypt(message, keyPair2[0], encodedData=True)) > 0) - encrypted = crypto.pubKeyEncrypt(message, keyPair2[0], encodedData=False) - decrypted = crypto.pubKeyDecrypt(encrypted, privkey=keyPair2[1], encodedData=False) + self.assertTrue(len(crypto.encryption.pub_key_encrypt(message, keyPair2[0], encodedData=True)) > 0) + encrypted = crypto.encryption.pub_key_encrypt(message, keyPair2[0], encodedData=False) + decrypted = crypto.encryption.pub_key_decrypt(encrypted, privkey=keyPair2[1], encodedData=False) self.assertTrue(decrypted.decode() == message) try: - crypto.pubKeyEncrypt(None, keyPair2[0]) + crypto.encryption.pub_key_encrypt(None, keyPair2[0]) except TypeError: pass else: self.assertTrue(False) - blankMessage = crypto.pubKeyEncrypt('', keyPair2[0]) - self.assertTrue('' == crypto.pubKeyDecrypt(blankMessage, privkey=keyPair2[1], encodedData=False).decode()) + blankMessage = crypto.encryption.pub_key_encrypt('', keyPair2[0]) + self.assertTrue('' == crypto.encryption.pub_key_decrypt(blankMessage, privkey=keyPair2[1], encodedData=False).decode()) # Try to encrypt arbitrary bytes - crypto.pubKeyEncrypt(os.urandom(32), keyPair2[0]) - - def test_symmetric(self): - dataString = "this is a secret message" - dataBytes = dataString.encode() - key = b"tttttttttttttttttttttttttttttttt" - invalidKey = b'tttttttttttttttttttttttttttttttb' - encrypted = crypto.symmetricEncrypt(dataString, key, returnEncoded=True) - decrypted = crypto.symmetricDecrypt(encrypted, key, encodedMessage=True) - self.assertTrue(dataString == decrypted.decode()) - - try: - crypto.symmetricDecrypt(encrypted, invalidKey, encodedMessage=True) - except nacl.exceptions.CryptoError: - pass - else: - self.assertFalse(True) - try: - crypto.symmetricEncrypt(None, key, returnEncoded=True) - except AttributeError: - pass - else: - self.assertFalse(True) - crypto.symmetricEncrypt("string", key, returnEncoded=True) - try: - crypto.symmetricEncrypt("string", None, returnEncoded=True) - except nacl.exceptions.TypeError: - pass - else: - self.assertFalse(True) + crypto.encryption.pub_key_encrypt(os.urandom(32), keyPair2[0]) def test_deterministic(self): password = os.urandom(32) - gen = crypto.generateDeterministic(password) + gen = crypto.generate_deterministic(password) self.assertTrue(stringvalidators.validate_pub_key(gen[0])) try: - crypto.generateDeterministic('weakpassword') + crypto.generate_deterministic('weakpassword') except onionrexceptions.PasswordStrengthError: pass else: self.assertFalse(True) try: - crypto.generateDeterministic(None) + crypto.generate_deterministic(None) except TypeError: pass else: self.assertFalse(True) - gen = crypto.generateDeterministic('weakpassword', bypassCheck=True) + gen = crypto.generate_deterministic('weakpassword', bypassCheck=True) password = base64.b64encode(os.urandom(32)) - gen1 = crypto.generateDeterministic(password) - gen2 = crypto.generateDeterministic(password) + gen1 = crypto.generate_deterministic(password) + gen2 = crypto.generate_deterministic(password) self.assertFalse(gen == gen1) self.assertTrue(gen1 == gen2) self.assertTrue(stringvalidators.validate_pub_key(gen1[0])) diff --git a/onionr/tests/test_onionrusers.py b/onionr/tests/test_onionrusers.py index 5b98a2f5..fd880f15 100755 --- a/onionr/tests/test_onionrusers.py +++ b/onionr/tests/test_onionrusers.py @@ -6,37 +6,38 @@ import json TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' print("Test directory:", TEST_DIR) os.environ["ONIONR_HOME"] = TEST_DIR -import core, onionr -c = core.Core() import onionrexceptions from onionrusers import onionrusers from onionrusers import contactmanager - +import onionrcrypto as crypto +from coredb import keydb +from utils import identifyhome, createdirs +createdirs.create_dirs() class OnionrUserTests(unittest.TestCase): ''' Tests both the onionrusers class and the contactmanager (which inherits it) ''' def test_users(self): - keypair = c._crypto.generatePubKey() - onionrusers.OnionrUser(c, keypair[0]) + keypair = crypto.generate() + onionrusers.OnionrUser(keypair[0]) return def test_contact_init_no_save(self): - contact = c._crypto.generatePubKey()[0] - contact = contactmanager.ContactManager(c, contact) - self.assertFalse(contact.publicKey in c.listPeers()) + contact = crypto.generate()[0] + contact = contactmanager.ContactManager(contact) + self.assertFalse(contact.publicKey in keydb.listkeys.list_peers()) def test_contact_create(self): - contact = c._crypto.generatePubKey()[0] - contact = contactmanager.ContactManager(c, contact, saveUser=True) - self.assertTrue(contact.publicKey in c.listPeers()) + contact = crypto.generate()[0] + contact = contactmanager.ContactManager(contact, saveUser=True) + self.assertTrue(contact.publicKey in keydb.listkeys.list_peers()) def test_contact_set_info(self): - contact = c._crypto.generatePubKey()[0] - contact = contactmanager.ContactManager(c, contact, saveUser=True) - fileLocation = '%s/contacts/%s.json' % (c.dataDir, contact.publicKey) + contact = crypto.generate()[0] + contact = contactmanager.ContactManager(contact, saveUser=True) + fileLocation = '%s/contacts/%s.json' % (identifyhome.identify_home(), contact.publicKey) contact.set_info('alias', 'bob') self.assertTrue(os.path.exists(fileLocation)) @@ -47,9 +48,9 @@ class OnionrUserTests(unittest.TestCase): self.assertEqual(data['alias'], 'bob') def test_contact_get_info(self): - contact = c._crypto.generatePubKey()[0] - contact = contactmanager.ContactManager(c, contact, saveUser=True) - fileLocation = '%s/contacts/%s.json' % (c.dataDir, contact.publicKey) + contact = crypto.generate()[0] + contact = contactmanager.ContactManager(contact, saveUser=True) + fileLocation = '%s/contacts/%s.json' % (identifyhome.identify_home(), contact.publicKey) with open(fileLocation, 'w') as contactFile: contactFile.write('{"alias": "bob"}') @@ -59,16 +60,16 @@ class OnionrUserTests(unittest.TestCase): self.assertEqual(contact.get_info('fail'), None) def test_encrypt(self): - contactPair = c._crypto.generatePubKey() - contact = contactmanager.ContactManager(c, contactPair[0], saveUser=True) + contactPair = crypto.generate() + contact = contactmanager.ContactManager(contactPair[0], saveUser=True) encrypted = contact.encrypt('test') - decrypted = c._crypto.pubKeyDecrypt(encrypted, privkey=contactPair[1], encodedData=True).decode() + decrypted = crypto.encryption.pub_key_decrypt(encrypted, privkey=contactPair[1], encodedData=True).decode() self.assertEqual('test', decrypted) def test_delete_contact(self): - contact = c._crypto.generatePubKey()[0] - contact = contactmanager.ContactManager(c, contact, saveUser=True) - fileLocation = '%s/contacts/%s.json' % (c.dataDir, contact.publicKey) + contact = crypto.generate()[0] + contact = contactmanager.ContactManager(contact, saveUser=True) + fileLocation = '%s/contacts/%s.json' % (identifyhome.identify_home(), contact.publicKey) self.assertFalse(os.path.exists(fileLocation)) with open(fileLocation, 'w') as contactFile: contactFile.write('{"alias": "test"}') diff --git a/onionr/tests/test_stringvalidations.py b/onionr/tests/test_stringvalidations.py index eef418ba..9f23669a 100755 --- a/onionr/tests/test_stringvalidations.py +++ b/onionr/tests/test_stringvalidations.py @@ -5,11 +5,8 @@ import unittest, uuid TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' print("Test directory:", TEST_DIR) os.environ["ONIONR_HOME"] = TEST_DIR -import core, onionr from onionrutils import stringvalidators -core.Core() - class OnionrValidations(unittest.TestCase): def test_peer_validator(self): @@ -50,7 +47,6 @@ class OnionrValidations(unittest.TestCase): # Test ed25519 public key validity valids = ['JZ5VE72GUS3C7BOHDRIYZX4B5U5EJMCMLKHLYCVBQQF3UKHYIRRQ====', 'JZ5VE72GUS3C7BOHDRIYZX4B5U5EJMCMLKHLYCVBQQF3UKHYIRRQ'] invalid = [None, '', ' ', 'dfsg', '\n', 'JZ5VE72GUS3C7BOHDRIYZX4B5U5EJMCMLKHLYCVBQQF3UKHYIR$Q===='] - c = core.Core() for valid in valids: print('testing', valid) diff --git a/onionr/utils/createdirs.py b/onionr/utils/createdirs.py index 00b356bb..30342077 100644 --- a/onionr/utils/createdirs.py +++ b/onionr/utils/createdirs.py @@ -8,6 +8,8 @@ def create_dirs(): os.mkdir(home) if not os.path.exists(filepaths.block_data_location): os.mkdir(filepaths.block_data_location) + if not os.path.exists(filepaths.contacts_location): + os.mkdir(filepaths.contacts_location) for db in dbcreator.create_funcs: try: