work on new peer pool system in new communicator

This commit is contained in:
Kevin Froman 2018-06-13 17:22:48 -05:00
parent effeddc536
commit 6cb69c7187
No known key found for this signature in database
GPG Key ID: 0D414D0FE405B63B
4 changed files with 93 additions and 8 deletions

View File

@ -20,6 +20,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
import sys, os, core, config, onionrblockapi as block, requests, time, logger, threading, onionrplugins as plugins import sys, os, core, config, onionrblockapi as block, requests, time, logger, threading, onionrplugins as plugins
import onionrexceptions
from defusedxml import minidom from defusedxml import minidom
class OnionrCommunicatorDaemon: class OnionrCommunicatorDaemon:
@ -31,6 +32,8 @@ class OnionrCommunicatorDaemon:
self.delay = 1 self.delay = 1
self.proxyPort = sys.argv[2] self.proxyPort = sys.argv[2]
self.onlinePeers = []
self.threadCounts = {} self.threadCounts = {}
self.shutdown = False self.shutdown = False
@ -49,8 +52,10 @@ class OnionrCommunicatorDaemon:
if debug or developmentMode: if debug or developmentMode:
OnionrCommunicatorTimers(self, self.heartbeat, 10) OnionrCommunicatorTimers(self, self.heartbeat, 10)
self.getOnlinePeers()
OnionrCommunicatorTimers(self, self.daemonCommands, 5) OnionrCommunicatorTimers(self, self.daemonCommands, 5)
OnionrCommunicatorTimers(self, self.detectAPICrash, 12) OnionrCommunicatorTimers(self, self.detectAPICrash, 12)
OnionrCommunicatorTimers(self, self.getOnlinePeers, 60)
# Main daemon loop, mainly for calling timers, do not do any complex operations here # Main daemon loop, mainly for calling timers, do not do any complex operations here
while not self.shutdown: while not self.shutdown:
@ -58,11 +63,51 @@ class OnionrCommunicatorDaemon:
i.processTimer() i.processTimer()
time.sleep(self.delay) time.sleep(self.delay)
logger.info('Goodbye.') logger.info('Goodbye.')
def decrementThreadCount(self, threadName):
if self.threadCounts[threadName] > 0:
self.threadCounts[threadName] -= 1
def getOnlinePeers(self):
'''Manages the self.onlinePeers attribute list'''
logger.info('Refreshing peer pool.')
maxPeers = 4
needed = maxPeers - len(self.onlinePeers)
for i in range(needed):
self.connectNewPeer()
self.decrementThreadCount('getOnlinePeers')
def connectNewPeer(self, peer=''):
'''Adds a new random online peer to self.onlinePeers'''
retData = False
if peer != '':
if self._core._utils.validateID(peer):
peerList = [peer]
else:
raise onionrexceptions.InvalidAddress('Will not attempt connection test to invalid address')
else:
peerList = self._core.listAdders()
if len(peerList) == 0:
peerList.extend(self._core.bootstrapList)
for address in peerList:
if self.peerAction(address, 'ping') == 'pong!':
logger.info('connected to ' + address)
self.onlinePeers.append(address)
retData = address
break
else:
logger.debug('failed to connect to ' + address)
else:
logger.warn('Could not connect to any peer')
return retData
def heartbeat(self): def heartbeat(self):
'''Show a heartbeat debug message''' '''Show a heartbeat debug message'''
logger.debug('Communicator heartbeat') logger.debug('Communicator heartbeat')
self.threadCounts['heartbeat'] -= 1 self.decrementThreadCount('heartbeat')
def daemonCommands(self): def daemonCommands(self):
'''process daemon commands from daemonQueue''' '''process daemon commands from daemonQueue'''
@ -71,20 +116,47 @@ class OnionrCommunicatorDaemon:
if cmd is not False: if cmd is not False:
if cmd[0] == 'shutdown': if cmd[0] == 'shutdown':
self.shutdown = True self.shutdown = True
elif cmd[0] == 'announce': elif cmd[0] == 'announceNode':
self.announce(cmd[1]) self.announce(cmd[1])
elif cmd[0] == 'runCheck':
logger.debug('Status check; looks good.')
open('data/.runcheck', 'w+').close()
elif cmd[0] == 'connectedPeers':
self.printOnlinePeers()
else: else:
logger.info('Recieved daemonQueue command:' + cmd[0]) logger.info('Recieved daemonQueue command:' + cmd[0])
self.threadCounts['daemonCommands'] -= 1 self.decrementThreadCount('daemonCommands')
def printOnlinePeers(self):
'''logs online peer list'''
if len(self.onlinePeers) == 0:
logger.warn('No online peers')
return
for i in self.onlinePeers:
logger.info(self.onlinePeers[i])
def announce(self, peer): def announce(self, peer):
'''Announce to peers''' '''Announce to peers'''
announceCount = 0
announceAmount = 2
for peer in self._core.listAdders(): for peer in self._core.listAdders():
self.peerAction(peer, 'announce', self._core.hsAdder) announceCount += 1
if self.peerAction(peer, 'announce', self._core.hsAdder) == 'Success':
logger.info('Successfully introduced node to ' + peer)
break
else:
if announceCount == announceAmount:
logger.warn('Could not introduce node. Try again soon')
break
def peerAction(self, peer, action, data=''): def peerAction(self, peer, action, data=''):
'''Perform a get request to a peer''' '''Perform a get request to a peer'''
return self._core._utils.doGetRequest('http://' + peer + '/public/?action=' + action + '&data=' + data, port=self.proxyPort) logger.info('Performing ' + action + ' with ' + peer + ' on port ' + str(self.proxyPort))
retData = self._core._utils.doGetRequest('http://' + peer + '/public/?action=' + action + '&data=' + data, port=self.proxyPort)
if retData == False:
self.onlinePeers.remove(peer)
self.getOnlinePeers() # Will only add a new peer to pool if needed
return retData
def detectAPICrash(self): def detectAPICrash(self):
'''exit if the api server crashes/stops''' '''exit if the api server crashes/stops'''
@ -97,7 +169,7 @@ class OnionrCommunicatorDaemon:
# This executes if the api is NOT detected to be running # This executes if the api is NOT detected to be running
logger.error('Daemon detected API crash (or otherwise unable to reach API after long time), stopping...') logger.error('Daemon detected API crash (or otherwise unable to reach API after long time), stopping...')
self.shutdown = True self.shutdown = True
self.threadCounts['detectAPICrash'] -= 1 self.decrementThreadCount('detectAPICrash')
def header(self, message = logger.colors.fg.pink + logger.colors.bold + 'Onionr' + logger.colors.reset + logger.colors.fg.pink + ' has started.'): def header(self, message = logger.colors.fg.pink + logger.colors.bold + 'Onionr' + logger.colors.reset + logger.colors.fg.pink + ' has started.'):
if os.path.exists('static-data/header.txt'): if os.path.exists('static-data/header.txt'):

View File

@ -197,6 +197,7 @@ class Onionr:
'add-file': self.addFile, 'add-file': self.addFile,
'addfile': self.addFile, 'addfile': self.addFile,
'listconn': self.listConn,
'import-blocks': self.onionrUtils.importNewBlocks, 'import-blocks': self.onionrUtils.importNewBlocks,
'importblocks': self.onionrUtils.importNewBlocks, 'importblocks': self.onionrUtils.importNewBlocks,
@ -226,6 +227,7 @@ class Onionr:
'get-pms': 'Shows private messages sent to you', 'get-pms': 'Shows private messages sent to you',
'add-file': 'Create an Onionr block from a file', 'add-file': 'Create an Onionr block from a file',
'import-blocks': 'import blocks from the disk (Onionr is transport-agnostic!)', 'import-blocks': 'import blocks from the disk (Onionr is transport-agnostic!)',
'listconn': 'list connected peers',
'introduce': 'Introduce your node to the public Onionr network', 'introduce': 'Introduce your node to the public Onionr network',
} }
@ -254,6 +256,9 @@ class Onionr:
def getCommands(self): def getCommands(self):
return self.cmds return self.cmds
def listConn(self):
self.onionrCore.daemonQueueAdd('connectedPeers')
def getWebPassword(self): def getWebPassword(self):
return config.get('client')['client_hmac'] return config.get('client')['client_hmac']

View File

@ -19,4 +19,6 @@
''' '''
class MissingPort(Exception): class MissingPort(Exception):
pass
class InvalidAddress(Exception):
pass pass

View File

@ -498,8 +498,14 @@ class OnionrUtils:
else: else:
return return
headers = {'user-agent': 'PyOnionr'} headers = {'user-agent': 'PyOnionr'}
r = requests.get(url, headers=headers, proxies=proxies, allow_redirects=False, timeout=(15, 30)) try:
return r.text proxies = {'http': 'socks5h://127.0.0.1:' + str(port), 'https': 'socks5h://127.0.0.1:' + str(port)}
r = requests.get(url, headers=headers, proxies=proxies, allow_redirects=False, timeout=(15, 30))
retData = r.text
except requests.exceptions.RequestException as e:
logger.debug('Error: %s' % str(e))
retData = False
return retData
def getNistBeaconSalt(self, torPort=0): def getNistBeaconSalt(self, torPort=0):
''' '''