Onionr/onionr/communicator.py

395 lines
16 KiB
Python
Raw Normal View History

2018-01-07 08:55:44 +00:00
#!/usr/bin/env python3
'''
Onionr - P2P Microblogging Platform & Social network.
This file contains both the OnionrCommunicate class for communcating with peers
and code to operate as a daemon, getting commands from the command queue database (see core.Core.daemonQueue)
'''
'''
2018-01-07 08:55:44 +00:00
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
2018-04-26 07:40:39 +00:00
import sqlite3, requests, hmac, hashlib, time, sys, os, math, logger, urllib.parse, base64, binascii, random, json
import core, onionrutils, onionrcrypto, netcontroller, onionrproofs, btc, config, onionrplugins as plugins
2018-01-07 08:55:44 +00:00
class OnionrCommunicate:
2018-01-15 08:03:13 +00:00
def __init__(self, debug, developmentMode):
'''
OnionrCommunicate
This class handles communication with nodes in the Onionr network.
'''
2018-03-03 04:19:01 +00:00
self._core = core.Core()
2018-01-27 01:16:15 +00:00
self._utils = onionrutils.OnionrUtils(self._core)
self._crypto = onionrcrypto.OnionrCrypto(self._core)
self._netController = netcontroller.NetController(0) # arg is the HS port but not needed rn in this file
self.newHashes = {} # use this to not keep hashes around too long if we cant get their data
2018-04-25 22:42:42 +00:00
self.keepNewHash = 12
self.ignoredHashes = []
self.highFailureAmount = 7
2018-03-04 02:28:17 +00:00
'''
2018-04-19 01:47:35 +00:00
logger.info('Starting Bitcoin Node... with Tor socks port:' + str(sys.argv[2]), timestamp=True)
2018-03-03 04:19:01 +00:00
try:
self.bitcoin = btc.OnionrBTC(torP=int(sys.argv[2]))
2018-03-04 02:28:17 +00:00
except _gdbm.error:
pass
2018-04-19 01:47:35 +00:00
logger.info('Bitcoin Node started, on block: ' + self.bitcoin.node.getBlockHash(self.bitcoin.node.getLastBlockHeight()), timestamp=True)
2018-03-04 02:28:17 +00:00
'''
#except:
#logger.fatal('Failed to start Bitcoin Node, exiting...')
#exit(1)
2018-01-25 22:39:09 +00:00
blockProcessTimer = 0
2018-01-26 06:28:11 +00:00
blockProcessAmount = 5
highFailureTimer = 0
highFailureRate = 10
2018-01-27 21:49:48 +00:00
heartBeatTimer = 0
heartBeatRate = 0
2018-04-03 21:47:48 +00:00
pexTimer = 25 # How often we should check for new peers
2018-02-28 09:06:02 +00:00
pexCount = 0
2018-01-26 07:22:48 +00:00
logger.debug('Communicator debugging enabled.')
2018-01-20 07:23:09 +00:00
torID = open('data/hs/hostname').read()
apiRunningCheckRate = 10
apiRunningCheckCount = 0
self.peerData = {} # Session data for peers (recent reachability, speed, etc)
if os.path.exists(self._core.queueDB):
self._core.clearDaemonQueue()
2018-03-03 04:19:01 +00:00
# Loads in and starts the enabled plugins
plugins.reload()
while True:
command = self._core.daemonQueue()
2018-01-25 22:39:09 +00:00
# Process blocks based on a timer
blockProcessTimer += 1
2018-01-27 21:49:48 +00:00
heartBeatTimer += 1
2018-03-01 09:20:57 +00:00
pexCount += 1
if highFailureTimer == highFailureRate:
highFailureTimer = 0
for i in self.peerData:
if self.peerData[i]['failCount'] >= self.highFailureAmount:
self.peerData[i]['failCount'] -= 1
2018-03-01 09:20:57 +00:00
if pexTimer == pexCount:
self.getNewPeers()
2018-04-03 21:47:48 +00:00
pexCount = 0 # TODO: do not reset timer if low peer count
2018-01-27 21:49:48 +00:00
if heartBeatRate == heartBeatTimer:
logger.debug('Communicator heartbeat')
heartBeatTimer = 0
2018-01-25 22:39:09 +00:00
if blockProcessTimer == blockProcessAmount:
2018-01-26 06:28:11 +00:00
self.lookupBlocks()
self.processBlocks()
2018-01-25 22:39:09 +00:00
blockProcessTimer = 0
if command != False:
if command[0] == 'shutdown':
2018-04-19 01:47:35 +00:00
logger.info('Daemon recieved exit command.', timestamp=True)
break
2018-04-19 02:56:25 +00:00
elif command[0] == 'announceNode':
announceAttempts = 3
announceAttemptCount = 0
2018-04-19 01:17:47 +00:00
announceVal = False
2018-04-19 02:56:25 +00:00
logger.info('Announcing our node to ' + command[1], timestamp=True)
while not announceVal:
announceAttemptCount += 1
announceVal = self.performGet('announce', command[1], data=self._core.hsAdder.replace('\n', ''), skipHighFailureAddress=True)
logger.info(announceVal)
if announceAttemptCount >= announceAttempts:
logger.warn('Unable to announce to ' + command[1])
break
apiRunningCheckCount += 1
# check if local API is up
if apiRunningCheckCount > apiRunningCheckRate:
if self._core._utils.localCommand('ping') != 'pong':
for i in range(4):
if self._utils.localCommand('ping') == 'pong':
apiRunningCheckCount = 0
break # break for loop
time.sleep(1)
else:
# This executes if the api is NOT detected to be running
logger.error('Daemon detected API crash (or otherwise unable to reach API after long time, stopping)')
break # break main daemon loop
apiRunningCheckCount = 0
2018-04-19 01:47:35 +00:00
time.sleep(1)
self._netController.killTor()
2018-01-07 08:55:44 +00:00
return
2018-03-03 04:19:01 +00:00
2018-03-01 09:20:57 +00:00
def getNewPeers(self):
'''
2018-04-03 21:47:48 +00:00
Get new peers and keys
2018-03-01 09:20:57 +00:00
'''
2018-03-16 15:35:37 +00:00
peersCheck = 5 # Amount of peers to ask for new peers + keys
peersChecked = 0
peerList = list(self._core.listAdders()) # random ordered list of peers
newKeys = []
newAdders = []
2018-03-16 20:38:33 +00:00
if len(peerList) > 0:
maxN = len(peerList) - 1
else:
peersCheck = 0
maxN = 0
2018-03-16 15:35:37 +00:00
if len(peerList) > peersCheck:
peersCheck = len(peerList)
while peersCheck > peersChecked:
2018-04-25 07:09:28 +00:00
#i = secrets.randbelow(maxN) # cant use prior to 3.6
i = random.randint(0, maxN)
2018-04-19 01:47:35 +00:00
logger.info('Using ' + peerList[i] + ' to find new peers', timestamp=True)
2018-03-16 15:35:37 +00:00
try:
newAdders = self.performGet('pex', peerList[i], skipHighFailureAddress=True)
2018-04-03 21:47:48 +00:00
logger.debug('Attempting to merge address: ')
logger.debug(newAdders)
2018-03-16 15:35:37 +00:00
self._utils.mergeAdders(newAdders)
except requests.exceptions.ConnectionError:
2018-04-19 01:47:35 +00:00
logger.info(peerList[i] + ' connection failed', timestamp=True)
2018-03-16 15:35:37 +00:00
continue
else:
try:
logger.info('Using ' + peerList[i] + ' to find new keys')
newKeys = self.performGet('kex', peerList[i], skipHighFailureAddress=True)
2018-04-03 21:47:48 +00:00
logger.debug('Attempting to merge pubkey: ')
logger.debug(newKeys)
2018-03-16 15:35:37 +00:00
# TODO: Require keys to come with POW token (very large amount of POW)
self._utils.mergeKeys(newKeys)
except requests.exceptions.ConnectionError:
2018-04-19 01:47:35 +00:00
logger.info(peerList[i] + ' connection failed', timestamp=True)
2018-03-16 15:35:37 +00:00
continue
else:
peersChecked += 1
2018-03-01 09:20:57 +00:00
return
2018-01-26 06:28:11 +00:00
def lookupBlocks(self):
'''
Lookup blocks and merge new ones
'''
peerList = self._core.listAdders()
2018-01-26 06:28:11 +00:00
blocks = ''
for i in peerList:
2018-04-25 22:42:42 +00:00
try:
if self.peerData[i]['failCount'] >= self.highFailureAmount:
continue
except KeyError:
pass
2018-02-28 00:00:37 +00:00
lastDB = self._core.getAddressInfo(i, 'DBHash')
if lastDB == None:
logger.debug('Fetching hash from ' + i + ' No previous known.')
else:
2018-02-28 00:00:37 +00:00
logger.debug('Fetching hash from ' + str(i) + ', ' + lastDB + ' last known')
2018-01-26 06:28:11 +00:00
currentDB = self.performGet('getDBHash', i)
2018-01-28 22:29:16 +00:00
if currentDB != False:
logger.debug(i + " hash db (from request): " + currentDB)
else:
logger.warn("Error getting hash db status for " + i)
2018-01-27 01:16:15 +00:00
if currentDB != False:
if lastDB != currentDB:
logger.debug('Fetching hash from ' + i + ' - ' + currentDB + ' current hash.')
try:
blocks += self.performGet('getBlockHashes', i)
except TypeError:
logger.warn('Failed to get data hash from ' + i)
self.peerData[i]['failCount'] -= 1
2018-01-28 01:53:24 +00:00
if self._utils.validateHash(currentDB):
2018-02-28 00:00:37 +00:00
self._core.setAddressInfo(i, "DBHash", currentDB)
if len(blocks.strip()) != 0:
logger.debug('BLOCKS:' + blocks)
2018-01-26 06:28:11 +00:00
blockList = blocks.split('\n')
for i in blockList:
2018-01-29 02:54:39 +00:00
if len(i.strip()) == 0:
continue
2018-01-29 02:52:48 +00:00
if self._utils.hasBlock(i):
continue
2018-04-23 06:14:49 +00:00
if i in self.ignoredHashes:
continue
#logger.debug('Exchanged block (blockList): ' + i)
2018-01-27 01:16:15 +00:00
if not self._utils.validateHash(i):
2018-01-26 06:28:11 +00:00
# skip hash if it isn't valid
2018-01-29 02:54:39 +00:00
logger.warn('Hash ' + i + ' is not valid')
2018-01-26 06:28:11 +00:00
continue
else:
self.newHashes[i] = 0
2018-01-26 07:22:48 +00:00
logger.debug('Adding ' + i + ' to hash database...')
2018-01-26 06:28:11 +00:00
self._core.addToBlockDB(i)
2018-01-26 06:28:11 +00:00
return
def processBlocks(self):
'''
Work with the block database and download any missing blocks
This is meant to be called from the communicator daemon on its timer.
'''
2018-03-03 04:19:01 +00:00
for i in self._core.getBlockList(unsaved=True).split("\n"):
if i != "":
if i in self.ignoredHashes:
continue
try:
self.newHashes[i]
except KeyError:
self.newHashes[i] = 0
# check if a new hash has been around too long, delete it from database and add it to ignore list
if self.newHashes[i] >= self.keepNewHash:
logger.warn('Ignoring block ' + i + ' because it took to long to get valid data.')
del self.newHashes[i]
self._core.removeBlock(i)
self.ignoredHashes.append(i)
continue
self.newHashes[i] += 1
2018-01-28 22:15:41 +00:00
logger.warn('UNSAVED BLOCK: ' + i)
2018-01-28 22:14:19 +00:00
data = self.downloadBlock(i)
2018-04-26 07:40:39 +00:00
# if block was successfull gotten (hash already verified)
if data:
2018-04-26 07:40:39 +00:00
del self.newHashes[i] # remove from probation list
# deal with block metadata
blockContent = self._core.getData(i)
try:
blockMetadata = json.loads(self._core.getData(i)).split('}')[0] + '}'
try:
blockMetadata['sig']
blockMetadata['id']
except KeyError:
pass
else:
creator = self._utils.getPeerByHashId(blockMetadata['id'])
if self._crypto.edVerify(blockContent, creator):
self._core.updateBlockInfo(i, 'sig', 'true')
else:
self._core.updateBlockInfo(i, 'sig', 'false')
try:
blockMetadata['type']
except KeyError:
pass
except json.decoder.JSONDecodeError:
pass
return
2018-04-25 22:42:42 +00:00
def downloadBlock(self, hash, peerTries=3):
'''
Download a block from random order of peers
'''
retVal = False
peerList = self._core.listAdders()
2018-01-28 22:14:19 +00:00
blocks = ''
2018-04-25 22:42:42 +00:00
peerTryCount = 0
2018-01-28 22:14:19 +00:00
for i in peerList:
2018-04-25 22:42:42 +00:00
if self.peerData[i]['failCount'] >= self.highFailureAmount:
continue
if peerTryCount >= peerTries:
break
2018-01-28 22:14:19 +00:00
hasher = hashlib.sha3_256()
data = self.performGet('getData', i, hash, skipHighFailureAddress=True)
if data == False or len(data) > 10000000 or data == '':
2018-04-25 22:42:42 +00:00
peerTryCount += 1
2018-04-23 01:59:44 +00:00
continue
2018-04-23 01:43:17 +00:00
try:
2018-04-23 02:13:55 +00:00
data = base64.b64decode(data)
2018-04-23 01:43:17 +00:00
except binascii.Error:
data = b''
hasher.update(data)
digest = hasher.hexdigest()
if type(digest) is bytes:
digest = digest.decode()
if digest == hash.strip():
2018-01-28 22:14:19 +00:00
self._core.setData(data)
2018-04-19 01:47:35 +00:00
logger.info('Successfully obtained data for ' + hash, timestamp=True)
retVal = True
2018-04-26 07:40:39 +00:00
'''
if data.startswith(b'-txt-'):
self._core.setBlockType(hash, 'txt')
if len(data) < 120:
logger.debug('Block text:\n' + data.decode())
2018-04-26 07:40:39 +00:00
'''
else:
logger.warn("Failed to validate " + hash + " " + " hash calculated was " + digest)
2018-04-25 22:42:42 +00:00
peerTryCount += 1
2018-01-26 07:22:48 +00:00
return retVal
def urlencode(self, data):
'''
URL encodes the data
'''
return urllib.parse.quote_plus(data)
2018-04-22 23:35:00 +00:00
def performGet(self, action, peer, data=None, skipHighFailureAddress=False, peerType='tor', selfCheck=True):
'''
Performs a request to a peer through Tor or i2p (currently only Tor)
'''
2018-03-03 04:19:01 +00:00
2018-01-21 01:02:56 +00:00
if not peer.endswith('.onion') and not peer.endswith('.onion/'):
raise PeerError('Currently only Tor .onion peers are supported. You must manually specify .onion')
2018-04-22 23:35:00 +00:00
if len(self._core.hsAdder.strip()) == 0:
raise Exception("Could not perform self address check in performGet due to not knowing our address")
if selfCheck:
if peer.replace('/', '') == self._core.hsAdder:
logger.warn('Tried to performget to own hidden service, but selfCheck was not set to false')
return
# Store peer in peerData dictionary (non permanent)
if not peer in self.peerData:
self.peerData[peer] = {'connectCount': 0, 'failCount': 0, 'lastConnectTime': math.floor(time.time())}
2018-01-21 01:02:56 +00:00
socksPort = sys.argv[2]
2018-01-28 01:53:24 +00:00
'''We use socks5h to use tor as DNS'''
2018-02-28 00:00:37 +00:00
proxies = {'http': 'socks5://127.0.0.1:' + str(socksPort), 'https': 'socks5://127.0.0.1:' + str(socksPort)}
2018-01-21 01:02:56 +00:00
headers = {'user-agent': 'PyOnionr'}
2018-02-04 05:22:34 +00:00
url = 'http://' + peer + '/public/?action=' + self.urlencode(action)
2018-01-21 01:02:56 +00:00
if data != None:
2018-02-04 05:22:34 +00:00
url = url + '&data=' + self.urlencode(data)
try:
if skipHighFailureAddress and self.peerData[peer]['failCount'] > self.highFailureAmount:
retData = False
logger.debug('Skipping ' + peer + ' because of high failure rate')
else:
logger.debug('Contacting ' + peer + ' on port ' + socksPort)
r = requests.get(url, headers=headers, proxies=proxies, timeout=(15, 30))
retData = r.text
2018-01-28 01:53:24 +00:00
except requests.exceptions.RequestException as e:
2018-01-28 01:58:30 +00:00
logger.warn(action + " failed with peer " + peer + ": " + str(e))
retData = False
2018-02-22 06:42:02 +00:00
if not retData:
self.peerData[peer]['failCount'] += 1
else:
self.peerData[peer]['connectCount'] += 1
self.peerData[peer]['failCount'] -= 1
self.peerData[peer]['lastConnectTime'] = math.floor(time.time())
return retData
2018-01-26 07:22:48 +00:00
2018-01-20 00:59:05 +00:00
shouldRun = False
2018-01-27 01:16:15 +00:00
debug = True
developmentMode = False
2018-03-03 09:18:53 +00:00
if config.get('devmode', True):
developmentMode = True
try:
if sys.argv[1] == 'run':
shouldRun = True
except IndexError:
pass
if shouldRun:
2018-01-15 08:03:13 +00:00
try:
OnionrCommunicate(debug, developmentMode)
except KeyboardInterrupt:
2018-03-04 02:28:17 +00:00
sys.exit(1)
pass