diff --git a/onionr/api.py b/onionr/api.py index 34a05419..6cb94104 100755 --- a/onionr/api.py +++ b/onionr/api.py @@ -178,6 +178,7 @@ class API: @app.route('/public/') def public_handler(): # Public means it is publicly network accessible + # TODO, stop hard coding endpoints, use whitelist and serializer. self.validateHost('public') action = request.args.get('action') requestingPeer = request.args.get('myID') diff --git a/onionr/communicator2.py b/onionr/communicator2.py index a27591f4..4e3f5cdc 100755 --- a/onionr/communicator2.py +++ b/onionr/communicator2.py @@ -19,7 +19,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . ''' -import sys, core, config, onionrblockapi as block, requests, time, logger, threading +import sys, os, core, config, onionrblockapi as block, requests, time, logger, threading, onionrplugins as plugins from defusedxml import minidom class OnionrCommunicatorDaemon: @@ -29,30 +29,41 @@ class OnionrCommunicatorDaemon: self.nistSaltTimestamp = 0 self.powSalt = 0 self.delay = 1 + self.proxyPort = sys.argv[2] + + self.threadCounts = {} self.shutdown = False # Clear the daemon queue for any dead messages - self._core.clearDaemonQueue() + if os.path.exists(self._core.queueDB): + self._core.clearDaemonQueue() + + # Loads in and starts the enabled plugins + plugins.reload() + + # Print nice header thing :) + if config.get('display_header', True): + self.header() if debug or developmentMode: OnionrCommunicatorTimers(self, self.heartbeat, 10) OnionrCommunicatorTimers(self, self.daemonCommands, 5) - OnionrCommunicatorTimers(self, self.detectAPICrash, 5) + OnionrCommunicatorTimers(self, self.detectAPICrash, 12) # Main daemon loop, mainly for calling timers, do not do any complex operations here while not self.shutdown: - time.sleep(self.delay) for i in self.timers: i.processTimer() + time.sleep(self.delay) logger.info('Goodbye.') - def heartbeat(self): '''Show a heartbeat debug message''' logger.debug('Communicator heartbeat') - + self.threadCounts['heartbeat'] -= 1 + def daemonCommands(self): '''process daemon commands from daemonQueue''' cmd = self._core.daemonQueue() @@ -60,9 +71,21 @@ class OnionrCommunicatorDaemon: if cmd is not False: if cmd[0] == 'shutdown': self.shutdown = True + elif cmd[0] == 'announce': + self.announce(cmd[1]) else: logger.info('Recieved daemonQueue command:' + cmd[0]) + self.threadCounts['daemonCommands'] -= 1 + def announce(self, peer): + '''Announce to peers''' + for peer in self._core.listAdders(): + self.peerAction(peer, 'announce', self._core.hsAdder) + + def peerAction(self, peer, action, data=''): + '''Perform a get request to a peer''' + return self._core._utils.doGetRequest('http://' + peer + '/public/?action=' + action + '&data=' + data, port=self.proxyPort) + def detectAPICrash(self): '''exit if the api server crashes/stops''' if self._core._utils.localCommand('ping') != 'pong': @@ -74,24 +97,44 @@ class OnionrCommunicatorDaemon: # This executes if the api is NOT detected to be running logger.error('Daemon detected API crash (or otherwise unable to reach API after long time), stopping...') self.shutdown = True + self.threadCounts['detectAPICrash'] -= 1 + + def header(self, message = logger.colors.fg.pink + logger.colors.bold + 'Onionr' + logger.colors.reset + logger.colors.fg.pink + ' has started.'): + if os.path.exists('static-data/header.txt'): + with open('static-data/header.txt', 'rb') as file: + # only to stdout, not file or log or anything + print(file.read().decode().replace('P', logger.colors.fg.pink).replace('W', logger.colors.reset + logger.colors.bold).replace('G', logger.colors.fg.green).replace('\n', logger.colors.reset + '\n')) + logger.info(logger.colors.fg.lightgreen + '-> ' + str(message) + logger.colors.reset + logger.colors.fg.lightgreen + ' <-\n') class OnionrCommunicatorTimers: - def __init__(self, daemonInstance, timerFunction, frequency, makeThread=True, threadAmount=1): + def __init__(self, daemonInstance, timerFunction, frequency, makeThread=True, threadAmount=1, maxThreads=5): self.timerFunction = timerFunction self.frequency = frequency self.threadAmount = threadAmount self.makeThread = makeThread self.daemonInstance = daemonInstance + self.maxThreads = maxThreads self._core = self.daemonInstance._core self.daemonInstance.timers.append(self) self.count = 0 + def processTimer(self): self.count += 1 + try: + self.daemonInstance.threadCounts[self.timerFunction.__name__] + except KeyError: + self.daemonInstance.threadCounts[self.timerFunction.__name__] = 0 + if self.count == self.frequency: if self.makeThread: for i in range(self.threadAmount): - threading.Thread(target=self.timerFunction).run() + if self.daemonInstance.threadCounts[self.timerFunction.__name__] >= self.maxThreads: + logger.warn(self.timerFunction.__name__ + ' has too many current threads to start anymore.') + else: + self.daemonInstance.threadCounts[self.timerFunction.__name__] += 1 + newThread = threading.Thread(target=self.timerFunction) + newThread.start() else: self.timerFunction() self.count = 0 diff --git a/onionr/core.py b/onionr/core.py index 92849d3a..ef133bec 100644 --- a/onionr/core.py +++ b/onionr/core.py @@ -194,6 +194,7 @@ class Core: speed int, success int, DBHash text, + powValue text, failure int, lastConnect int );