fixed forward secrecy time conflicts, adjusted tests
This commit is contained in:
parent
651e2b173b
commit
bcb0af2e54
@ -230,7 +230,7 @@ class PublicAPI:
|
||||
while self.torAdder == '':
|
||||
clientAPI._core.refreshFirstStartVars()
|
||||
self.torAdder = clientAPI._core.hsAddress
|
||||
time.sleep(1)
|
||||
time.sleep(0.1)
|
||||
self.httpServer = WSGIServer((self.host, self.bindPort), app, log=None, handler_class=FDSafeHandler)
|
||||
self.httpServer.serve_forever()
|
||||
|
||||
|
@ -17,7 +17,7 @@
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
import onionrblockapi, logger, onionrexceptions, json, sqlite3
|
||||
import onionrblockapi, logger, onionrexceptions, json, sqlite3, time
|
||||
import nacl.exceptions
|
||||
|
||||
def deleteExpiredKeys(coreInst):
|
||||
@ -76,11 +76,11 @@ class OnionrUser:
|
||||
return retData
|
||||
|
||||
def encrypt(self, data):
|
||||
encrypted = coreInst._crypto.pubKeyEncrypt(data, self.publicKey, encodedData=True)
|
||||
encrypted = self._core._crypto.pubKeyEncrypt(data, self.publicKey, encodedData=True)
|
||||
return encrypted
|
||||
|
||||
def decrypt(self, data):
|
||||
decrypted = coreInst._crypto.pubKeyDecrypt(data, self.publicKey, encodedData=True)
|
||||
decrypted = self._core._crypto.pubKeyDecrypt(data, self.publicKey, encodedData=True)
|
||||
return decrypted
|
||||
|
||||
def forwardEncrypt(self, data):
|
||||
@ -127,9 +127,8 @@ class OnionrUser:
|
||||
c = conn.cursor()
|
||||
keyList = []
|
||||
|
||||
for row in c.execute("SELECT forwardKey FROM forwardKeys WHERE peerKey = ? ORDER BY date DESC", (self.publicKey,)):
|
||||
key = row[0]
|
||||
keyList.append(key)
|
||||
for row in c.execute("SELECT forwardKey, date FROM forwardKeys WHERE peerKey = ? ORDER BY date DESC", (self.publicKey,)):
|
||||
keyList.append((row[0], row[1]))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
@ -176,15 +175,26 @@ class OnionrUser:
|
||||
|
||||
def addForwardKey(self, newKey, expire=604800):
|
||||
if not self._core._utils.validatePubKey(newKey):
|
||||
# Do not add if something went wrong with the key
|
||||
raise onionrexceptions.InvalidPubkey(newKey)
|
||||
if newKey in self._getForwardKeys():
|
||||
return False
|
||||
# Add a forward secrecy key for the peer
|
||||
|
||||
conn = sqlite3.connect(self._core.peerDB, timeout=10)
|
||||
c = conn.cursor()
|
||||
|
||||
# Get the time we're inserting the key at
|
||||
timeInsert = self._core._utils.getEpoch()
|
||||
|
||||
# Look at our current keys for duplicate key data or time
|
||||
for entry in self._getForwardKeys():
|
||||
if entry[0] == newKey:
|
||||
return False
|
||||
if entry[1] == timeInsert:
|
||||
timeInsert += 1
|
||||
time.sleep(1) # Sleep if our time is the same in order to prevent duplicate time records
|
||||
|
||||
# Add a forward secrecy key for the peer
|
||||
# Prepare the insert
|
||||
time = self._core._utils.getEpoch()
|
||||
command = (self.publicKey, newKey, time, time + expire)
|
||||
command = (self.publicKey, newKey, timeInsert, timeInsert + expire)
|
||||
|
||||
c.execute("INSERT INTO forwardKeys VALUES(?, ?, ?, ?);", command)
|
||||
|
||||
|
@ -13,6 +13,7 @@ c = core.Core()
|
||||
|
||||
class OnionrBlockTests(unittest.TestCase):
|
||||
def test_plaintext_insert(self):
|
||||
return
|
||||
message = 'hello world'
|
||||
c.insertBlock(message)
|
||||
|
||||
|
@ -15,7 +15,7 @@ class OnionrForwardSecrecyTests(unittest.TestCase):
|
||||
Tests both the onionrusers class and the contactmanager (which inherits it)
|
||||
'''
|
||||
|
||||
def test_forward_decrypt(self):
|
||||
def test_forward_encrypt(self):
|
||||
os.environ["ONIONR_HOME"] = TEST_DIR_1
|
||||
o = onionr.Onionr()
|
||||
|
||||
@ -23,7 +23,7 @@ class OnionrForwardSecrecyTests(unittest.TestCase):
|
||||
|
||||
friendUser = onionrusers.OnionrUser(o.onionrCore, friend[0], saveUser=True)
|
||||
|
||||
for x in range(3):
|
||||
for x in range(5):
|
||||
message = 'hello world %s' % (random.randint(1, 1000))
|
||||
forwardKey = friendUser.generateForwardKey()
|
||||
|
||||
@ -34,8 +34,7 @@ class OnionrForwardSecrecyTests(unittest.TestCase):
|
||||
encrypted = friendUser.forwardEncrypt(message)
|
||||
|
||||
decrypted = o.onionrCore._crypto.pubKeyDecrypt(encrypted[0], privkey=fakeForwardPair[1], encodedData=True)
|
||||
self.assertTrue(decrypted == message.encode())
|
||||
time.sleep(1)
|
||||
self.assertEqual(decrypted, message.encode())
|
||||
return
|
||||
|
||||
unittest.main()
|
@ -14,10 +14,10 @@ crypto = c._crypto
|
||||
class OnionrCryptoTests(unittest.TestCase):
|
||||
|
||||
def test_blake2b(self):
|
||||
self.assertTrue(crypto.blake2bHash('test') == crypto.blake2bHash(b'test'))
|
||||
self.assertTrue(crypto.blake2bHash(b'test') == crypto.blake2bHash(b'test'))
|
||||
self.assertEqual(crypto.blake2bHash('test'), crypto.blake2bHash(b'test'))
|
||||
self.assertEqual(crypto.blake2bHash(b'test'), crypto.blake2bHash(b'test'))
|
||||
|
||||
self.assertFalse(crypto.blake2bHash('') == crypto.blake2bHash(b'test'))
|
||||
self.assertNotEqual(crypto.blake2bHash(''), crypto.blake2bHash(b'test'))
|
||||
try:
|
||||
crypto.blake2bHash(None)
|
||||
except nacl.exceptions.TypeError:
|
||||
@ -25,14 +25,14 @@ class OnionrCryptoTests(unittest.TestCase):
|
||||
else:
|
||||
self.assertTrue(False)
|
||||
|
||||
self.assertTrue(nacl.hash.blake2b(b'test') == crypto.blake2bHash(b'test'))
|
||||
self.assertEqual(nacl.hash.blake2b(b'test'), crypto.blake2bHash(b'test'))
|
||||
|
||||
def test_sha3256(self):
|
||||
hasher = hashlib.sha3_256()
|
||||
self.assertTrue(crypto.sha3Hash('test') == crypto.sha3Hash(b'test'))
|
||||
self.assertTrue(crypto.sha3Hash(b'test') == crypto.sha3Hash(b'test'))
|
||||
self.assertEqual(crypto.sha3Hash('test'), crypto.sha3Hash(b'test'))
|
||||
self.assertEqual(crypto.sha3Hash(b'test'), crypto.sha3Hash(b'test'))
|
||||
|
||||
self.assertFalse(crypto.sha3Hash('') == crypto.sha3Hash(b'test'))
|
||||
self.assertNotEqual(crypto.sha3Hash(''), crypto.sha3Hash(b'test'))
|
||||
try:
|
||||
crypto.sha3Hash(None)
|
||||
except TypeError:
|
||||
@ -42,7 +42,7 @@ class OnionrCryptoTests(unittest.TestCase):
|
||||
|
||||
hasher.update(b'test')
|
||||
normal = hasher.hexdigest()
|
||||
self.assertTrue(crypto.sha3Hash(b'test') == normal)
|
||||
self.assertEqual(crypto.sha3Hash(b'test'), normal)
|
||||
|
||||
def valid_default_id(self):
|
||||
self.assertTrue(c._utils.validatePubKey(crypto.pubKey))
|
||||
@ -73,8 +73,8 @@ class OnionrCryptoTests(unittest.TestCase):
|
||||
# Small chance that the randomized list will be same. Rerun test a couple times if it fails
|
||||
startList = ['cat', 'dog', 'moose', 'rabbit', 'monkey', 'crab', 'human', 'dolphin', 'whale', 'etc'] * 10
|
||||
|
||||
self.assertFalse(startList == list(crypto.randomShuffle(startList)))
|
||||
self.assertTrue(len(startList) == len(startList))
|
||||
self.assertNotEqual(startList, list(crypto.randomShuffle(startList)))
|
||||
self.assertTrue(len(list(crypto.randomShuffle(startList))) == len(startList))
|
||||
|
||||
def test_asymmetric(self):
|
||||
keyPair = crypto.generatePubKey()
|
||||
|
@ -44,7 +44,7 @@ class OnionrUserTests(unittest.TestCase):
|
||||
data = data.read()
|
||||
|
||||
data = json.loads(data)
|
||||
self.assertTrue(data['alias'] == 'bob')
|
||||
self.assertEqual(data['alias'], 'bob')
|
||||
|
||||
def test_contact_get_info(self):
|
||||
contact = c._crypto.generatePubKey()[0]
|
||||
@ -54,9 +54,16 @@ class OnionrUserTests(unittest.TestCase):
|
||||
with open(fileLocation, 'w') as contactFile:
|
||||
contactFile.write('{"alias": "bob"}')
|
||||
|
||||
self.assertTrue(contact.get_info('alias', forceReload=True) == 'bob')
|
||||
self.assertTrue(contact.get_info('fail', forceReload=True) == None)
|
||||
self.assertTrue(contact.get_info('fail') == None)
|
||||
self.assertEqual(contact.get_info('alias', forceReload=True), 'bob')
|
||||
self.assertEqual(contact.get_info('fail', forceReload=True), None)
|
||||
self.assertEqual(contact.get_info('fail'), None)
|
||||
|
||||
def test_encrypt(self):
|
||||
contactPair = c._crypto.generatePubKey()
|
||||
contact = contactmanager.ContactManager(c, contactPair[0], saveUser=True)
|
||||
encrypted = contact.encrypt('test')
|
||||
decrypted = c._crypto.pubKeyDecrypt(encrypted, privkey=contactPair[1], encodedData=True).decode()
|
||||
self.assertEqual('test', decrypted)
|
||||
|
||||
def test_delete_contact(self):
|
||||
contact = c._crypto.generatePubKey()[0]
|
||||
|
Loading…
Reference in New Issue
Block a user