From 6cb69c7187c73137857019ee63de93b08e0d11df Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Wed, 13 Jun 2018 17:22:48 -0500 Subject: [PATCH] work on new peer pool system in new communicator --- onionr/communicator2.py | 84 +++++++++++++++++++++++++++++++++++--- onionr/onionr.py | 5 +++ onionr/onionrexceptions.py | 2 + onionr/onionrutils.py | 10 ++++- 4 files changed, 93 insertions(+), 8 deletions(-) diff --git a/onionr/communicator2.py b/onionr/communicator2.py index 4e3f5cdc..54698ce4 100755 --- a/onionr/communicator2.py +++ b/onionr/communicator2.py @@ -20,6 +20,7 @@ along with this program. If not, see . ''' import sys, os, core, config, onionrblockapi as block, requests, time, logger, threading, onionrplugins as plugins +import onionrexceptions from defusedxml import minidom class OnionrCommunicatorDaemon: @@ -31,6 +32,8 @@ class OnionrCommunicatorDaemon: self.delay = 1 self.proxyPort = sys.argv[2] + self.onlinePeers = [] + self.threadCounts = {} self.shutdown = False @@ -49,8 +52,10 @@ class OnionrCommunicatorDaemon: if debug or developmentMode: OnionrCommunicatorTimers(self, self.heartbeat, 10) + self.getOnlinePeers() OnionrCommunicatorTimers(self, self.daemonCommands, 5) OnionrCommunicatorTimers(self, self.detectAPICrash, 12) + OnionrCommunicatorTimers(self, self.getOnlinePeers, 60) # Main daemon loop, mainly for calling timers, do not do any complex operations here while not self.shutdown: @@ -58,11 +63,51 @@ class OnionrCommunicatorDaemon: i.processTimer() time.sleep(self.delay) logger.info('Goodbye.') + + def decrementThreadCount(self, threadName): + if self.threadCounts[threadName] > 0: + self.threadCounts[threadName] -= 1 + + def getOnlinePeers(self): + '''Manages the self.onlinePeers attribute list''' + logger.info('Refreshing peer pool.') + maxPeers = 4 + needed = maxPeers - len(self.onlinePeers) + + for i in range(needed): + self.connectNewPeer() + self.decrementThreadCount('getOnlinePeers') + + def connectNewPeer(self, peer=''): + '''Adds a new random online peer to self.onlinePeers''' + retData = False + if peer != '': + if self._core._utils.validateID(peer): + peerList = [peer] + else: + raise onionrexceptions.InvalidAddress('Will not attempt connection test to invalid address') + else: + peerList = self._core.listAdders() + + if len(peerList) == 0: + peerList.extend(self._core.bootstrapList) + + for address in peerList: + if self.peerAction(address, 'ping') == 'pong!': + logger.info('connected to ' + address) + self.onlinePeers.append(address) + retData = address + break + else: + logger.debug('failed to connect to ' + address) + else: + logger.warn('Could not connect to any peer') + return retData def heartbeat(self): '''Show a heartbeat debug message''' logger.debug('Communicator heartbeat') - self.threadCounts['heartbeat'] -= 1 + self.decrementThreadCount('heartbeat') def daemonCommands(self): '''process daemon commands from daemonQueue''' @@ -71,20 +116,47 @@ class OnionrCommunicatorDaemon: if cmd is not False: if cmd[0] == 'shutdown': self.shutdown = True - elif cmd[0] == 'announce': + elif cmd[0] == 'announceNode': self.announce(cmd[1]) + elif cmd[0] == 'runCheck': + logger.debug('Status check; looks good.') + open('data/.runcheck', 'w+').close() + elif cmd[0] == 'connectedPeers': + self.printOnlinePeers() else: logger.info('Recieved daemonQueue command:' + cmd[0]) - self.threadCounts['daemonCommands'] -= 1 + self.decrementThreadCount('daemonCommands') + def printOnlinePeers(self): + '''logs online peer list''' + if len(self.onlinePeers) == 0: + logger.warn('No online peers') + return + for i in self.onlinePeers: + logger.info(self.onlinePeers[i]) + def announce(self, peer): '''Announce to peers''' + announceCount = 0 + announceAmount = 2 for peer in self._core.listAdders(): - self.peerAction(peer, 'announce', self._core.hsAdder) + announceCount += 1 + if self.peerAction(peer, 'announce', self._core.hsAdder) == 'Success': + logger.info('Successfully introduced node to ' + peer) + break + else: + if announceCount == announceAmount: + logger.warn('Could not introduce node. Try again soon') + break def peerAction(self, peer, action, data=''): '''Perform a get request to a peer''' - return self._core._utils.doGetRequest('http://' + peer + '/public/?action=' + action + '&data=' + data, port=self.proxyPort) + logger.info('Performing ' + action + ' with ' + peer + ' on port ' + str(self.proxyPort)) + retData = self._core._utils.doGetRequest('http://' + peer + '/public/?action=' + action + '&data=' + data, port=self.proxyPort) + if retData == False: + self.onlinePeers.remove(peer) + self.getOnlinePeers() # Will only add a new peer to pool if needed + return retData def detectAPICrash(self): '''exit if the api server crashes/stops''' @@ -97,7 +169,7 @@ class OnionrCommunicatorDaemon: # This executes if the api is NOT detected to be running logger.error('Daemon detected API crash (or otherwise unable to reach API after long time), stopping...') self.shutdown = True - self.threadCounts['detectAPICrash'] -= 1 + self.decrementThreadCount('detectAPICrash') def header(self, message = logger.colors.fg.pink + logger.colors.bold + 'Onionr' + logger.colors.reset + logger.colors.fg.pink + ' has started.'): if os.path.exists('static-data/header.txt'): diff --git a/onionr/onionr.py b/onionr/onionr.py index 688193ed..db2825a7 100755 --- a/onionr/onionr.py +++ b/onionr/onionr.py @@ -197,6 +197,7 @@ class Onionr: 'add-file': self.addFile, 'addfile': self.addFile, + 'listconn': self.listConn, 'import-blocks': self.onionrUtils.importNewBlocks, 'importblocks': self.onionrUtils.importNewBlocks, @@ -226,6 +227,7 @@ class Onionr: 'get-pms': 'Shows private messages sent to you', 'add-file': 'Create an Onionr block from a file', 'import-blocks': 'import blocks from the disk (Onionr is transport-agnostic!)', + 'listconn': 'list connected peers', 'introduce': 'Introduce your node to the public Onionr network', } @@ -254,6 +256,9 @@ class Onionr: def getCommands(self): return self.cmds + def listConn(self): + self.onionrCore.daemonQueueAdd('connectedPeers') + def getWebPassword(self): return config.get('client')['client_hmac'] diff --git a/onionr/onionrexceptions.py b/onionr/onionrexceptions.py index c3975e63..3f201b80 100644 --- a/onionr/onionrexceptions.py +++ b/onionr/onionrexceptions.py @@ -19,4 +19,6 @@ ''' class MissingPort(Exception): + pass +class InvalidAddress(Exception): pass \ No newline at end of file diff --git a/onionr/onionrutils.py b/onionr/onionrutils.py index e43e9ad6..de32e415 100644 --- a/onionr/onionrutils.py +++ b/onionr/onionrutils.py @@ -498,8 +498,14 @@ class OnionrUtils: else: return headers = {'user-agent': 'PyOnionr'} - r = requests.get(url, headers=headers, proxies=proxies, allow_redirects=False, timeout=(15, 30)) - return r.text + try: + proxies = {'http': 'socks5h://127.0.0.1:' + str(port), 'https': 'socks5h://127.0.0.1:' + str(port)} + r = requests.get(url, headers=headers, proxies=proxies, allow_redirects=False, timeout=(15, 30)) + retData = r.text + except requests.exceptions.RequestException as e: + logger.debug('Error: %s' % str(e)) + retData = False + return retData def getNistBeaconSalt(self, torPort=0): '''