''' Onionr - P2P Microblogging Platform & Social network OnionrUtils offers various useful functions to Onionr. Relatively misc. ''' ''' This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . ''' # Misc functions that do not fit in the main api, but are useful import getpass, sys, requests, os, socket, hashlib, logger, sqlite3, config, binascii, time, base64 import nacl.signing, nacl.encoding if sys.version_info < (3, 6): try: import sha3 except ModuleNotFoundError: logger.fatal('On Python 3 versions prior to 3.6.x, you need the sha3 module') sys.exit(1) class OnionrUtils: ''' Various useful function ''' def __init__(self, coreInstance): self.fingerprintFile = 'data/own-fingerprint.txt' self._core = coreInstance self.timingToken = '' return def getTimeBypassToken(self): if os.path.exists('data/time-bypass.txt'): with open('data/time-bypass.txt', 'r') as bypass: self.timingToken = bypass.read() def sendPM(self, pubkey, message): ''' High level function to encrypt a message to a peer and insert it as a block ''' encrypted = self._core._crypto.pubKeyEncrypt(message, pubkey, anonymous=True, encodedData=True).decode() block = self._core.insertBlock(encrypted, header='pm') if block == '': logger.error('Could not send PM') else: logger.info('Sent PM, hash: ' + block) return def incrementAddressSuccess(self, address): ''' Increase the recorded sucesses for an address ''' increment = self._core.getAddressInfo(address, 'success') + 1 self._core.setAddressInfo(address, 'success', increment) return def decrementAddressSuccess(self, address): ''' Decrease the recorded sucesses for an address ''' increment = self._core.getAddressInfo(address, 'success') - 1 self._core.setAddressInfo(address, 'success', increment) return def mergeKeys(self, newKeyList): ''' Merge ed25519 key list to our database ''' retVal = False if newKeyList != False: for key in newKeyList.split(','): if not key in self._core.listPeers(randomOrder=False) and type(key) != None and key != self._core._crypto.pubKey: if self._core.addPeer(key): retVal = True return retVal def mergeAdders(self, newAdderList): ''' Merge peer adders list to our database ''' retVal = False if newAdderList != False: for adder in newAdderList.split(','): if not adder in self._core.listAdders(randomOrder=False) and adder.strip() != self.getMyAddress(): if self._core.addAddress(adder): logger.info('Added ' + adder + ' to db.', timestamp=True) input() retVal = True else: logger.debug(adder + " is either our address or already in our DB") return retVal def getMyAddress(self): myAddressFile = open("data/hs/hostname", 'r') myAddress = myAddressFile.read() myAddressFile.close() return myAddress.strip() def localCommand(self, command): ''' Send a command to the local http API server, securely. Intended for local clients, DO NOT USE for remote peers. ''' config.reload() self.getTimeBypassToken() # TODO: URL encode parameters, just as an extra measure. May not be needed, but should be added regardless. try: retData = requests.get('http://' + open('data/host.txt', 'r').read() + ':' + str(config.get('client')['port']) + '/client/?action=' + command + '&token=' + str(config.get('client')['client_hmac']) + '&timingToken=' + self.timingToken).text except requests.ConnectionError: retData = False return retData def getPassword(self, message='Enter password: ', confirm = True): ''' Get a password without showing the users typing and confirm the input ''' # Get a password safely with confirmation and return it while True: print(message) pass1 = getpass.getpass() if confirm: print('Confirm password: ') pass2 = getpass.getpass() if pass1 != pass2: logger.error("Passwords do not match.") logger.readline() else: break else: break return pass1 def checkPort(self, port, host=''): ''' Checks if a port is available, returns bool ''' # inspired by https://www.reddit.com/r/learnpython/comments/2i4qrj/how_to_write_a_python_script_that_checks_to_see/ckzarux/ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) retVal = False try: sock.bind((host, port)) except OSError as e: if e.errno is 98: retVal = True finally: sock.close() return retVal def checkIsIP(self, ip): ''' Check if a string is a valid IPv4 address ''' try: socket.inet_aton(ip) except: return False else: return True def getBlockDBHash(self): ''' Return a sha3_256 hash of the blocks DB ''' with open(self._core.blockDB, 'rb') as data: data = data.read() hasher = hashlib.sha3_256() hasher.update(data) dataHash = hasher.hexdigest() return dataHash def hasBlock(self, hash): ''' Check for new block in the list ''' conn = sqlite3.connect(self._core.blockDB) c = conn.cursor() if not self.validateHash(hash): raise Exception("Invalid hash") for result in c.execute("SELECT COUNT() FROM hashes where hash='" + hash + "'"): if result[0] >= 1: conn.commit() conn.close() return True else: conn.commit() conn.close() return False def validateHash(self, data, length=64): ''' Validate if a string is a valid hex formatted hash ''' retVal = True if data == False or data == True: return False data = data.strip() if len(data) != length: retVal = False else: try: int(data, 16) except ValueError: retVal = False return retVal def validatePubKey(self, key): '''Validate if a string is a valid base32 encoded Ed25519 key''' retVal = False try: nacl.signing.SigningKey(seed=key, encoder=nacl.encoding.Base32Encoder) except nacl.exceptions.ValueError: pass except base64.binascii.Error as err: pass else: retVal = True return retVal def validateID(self, id): ''' Validate if an address is a valid tor or i2p hidden service ''' idLength = len(id) retVal = True idNoDomain = '' peerType = '' # i2p b32 addresses are 60 characters long (including .b32.i2p) if idLength == 60: peerType = 'i2p' if not id.endswith('.b32.i2p'): retVal = False else: idNoDomain = id.split('.b32.i2p')[0] # Onion v2's are 22 (including .onion), v3's are 62 with .onion elif idLength == 22 or idLength == 62: peerType = 'onion' if not id.endswith('.onion'): retVal = False else: idNoDomain = id.split('.onion')[0] else: retVal = False if retVal: if peerType == 'i2p': try: id.split('.b32.i2p')[2] except: pass else: retVal = False elif peerType == 'onion': try: id.split('.onion')[2] except: pass else: retVal = False if not idNoDomain.isalnum(): retVal = False return retVal def loadPMs(self): ''' Find, decrypt, and return array of PMs (array of dictionary, {from, text}) ''' blocks = self._core.getBlockList().split('\n') message = '' sender = '' for i in blocks: if len (i) == 0: continue with open('data/blocks/' + i + '.dat', 'r') as potentialMessage: message = potentialMessage.read() if message.startswith('-pm-'): try: message = self._core._crypto.pubKeyDecrypt(message.replace('-pm-', ''), encodedData=True, anonymous=True) except nacl.exceptions.CryptoError as e: #logger.debug('Unable to decrypt ' + i) #logger.debug(str(e)) pass else: logger.info('Recieved message: ' + message.decode()) return