made communicator init have better lint compliance

This commit is contained in:
Kevin Froman 2019-12-20 02:59:27 -06:00
parent b5a8f295c9
commit c469c5d871
1 changed files with 115 additions and 55 deletions

View File

@ -1,9 +1,11 @@
''' """
Onionr - Private P2P Communication Onionr - Private P2P Communication
This file contains both the OnionrCommunicate class for communcating with peers This file contains both the OnionrCommunicate class for
and code to operate as a daemon, getting commands from the command queue database (see core.Core.daemonQueue) communcating with peers and code to operate as a daemon,
''' getting commands from the command queue database
(see core.Core.daemonQueue)
"""
import os import os
import time import time
@ -33,8 +35,8 @@ from onionrblocks import storagecounter
from coredb import daemonqueue from coredb import daemonqueue
from coredb import dbfiles from coredb import dbfiles
from netcontroller import NetController from netcontroller import NetController
from . import bootstrappeers
''' """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or the Free Software Foundation, either version 3 of the License, or
@ -47,11 +49,13 @@ from netcontroller import NetController
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
''' """
OnionrCommunicatorTimers = onionrcommunicatortimers.OnionrCommunicatorTimers OnionrCommunicatorTimers = onionrcommunicatortimers.OnionrCommunicatorTimers
config.reload() config.reload()
class OnionrCommunicatorDaemon: class OnionrCommunicatorDaemon:
def __init__(self, shared_state, developmentMode=None): def __init__(self, shared_state, developmentMode=None):
if developmentMode is None: if developmentMode is None:
@ -74,7 +78,8 @@ class OnionrCommunicatorDaemon:
# Upload information, list of blocks to upload # Upload information, list of blocks to upload
self.blocksToUpload = [] self.blocksToUpload = []
self.upload_session_manager = self.shared_state.get(uploadblocks.sessionmanager.BlockUploadSessionManager) self.upload_session_manager = self.shared_state.get(
uploadblocks.sessionmanager.BlockUploadSessionManager)
self.shared_state.share_object() self.shared_state.share_object()
# loop time.sleep delay in seconds # loop time.sleep delay in seconds
@ -100,7 +105,8 @@ class OnionrCommunicatorDaemon:
# set true when shutdown command received # set true when shutdown command received
self.shutdown = False self.shutdown = False
# list of new blocks to download, added to when new block lists are fetched from peers # list of new blocks to download
# added to when new block lists are fetched from peers
self.blockQueue = {} self.blockQueue = {}
# list of blocks currently downloading, avoid s # list of blocks currently downloading, avoid s
@ -109,7 +115,8 @@ class OnionrCommunicatorDaemon:
# timestamp when the last online node was seen # timestamp when the last online node was seen
self.lastNodeSeen = None self.lastNodeSeen = None
# Dict of time stamps for peer's block list lookup times, to avoid downloading full lists all the time # Dict of time stamps for peer's block list lookup times,
# to avoid downloading full lists all the time
self.dbTimestamps = {} self.dbTimestamps = {}
# Clear the daemon queue for any dead messages # Clear the daemon queue for any dead messages
@ -122,73 +129,111 @@ class OnionrCommunicatorDaemon:
# time app started running for info/statistics purposes # time app started running for info/statistics purposes
self.startTime = epoch.get_epoch() self.startTime = epoch.get_epoch()
uploadqueue.UploadQueue(self) # extends our upload list and saves our list when Onionr exits # extends our upload list and saves our list when Onionr exits
uploadqueue.UploadQueue(self)
if developmentMode: if developmentMode:
OnionrCommunicatorTimers(self, self.heartbeat, 30) OnionrCommunicatorTimers(self, self.heartbeat, 30)
# Set timers, function reference, seconds # Set timers, function reference, seconds
# requires_peer True means the timer function won't fire if we have no connected peers # requires_peer True means the timer function won't fire if we
peerPoolTimer = OnionrCommunicatorTimers(self, onlinepeers.get_online_peers, 60, max_threads=1, my_args=[self]) # have no connected peers
peerPoolTimer = OnionrCommunicatorTimers(
self, onlinepeers.get_online_peers, 60, max_threads=1,
my_args=[self])
OnionrCommunicatorTimers(self, self.runCheck, 2, max_threads=1) OnionrCommunicatorTimers(self, self.runCheck, 2, max_threads=1)
# Timers to periodically lookup new blocks and download them # Timers to periodically lookup new blocks and download them
lookup_blocks_timer = OnionrCommunicatorTimers(self, lookupblocks.lookup_blocks_from_communicator, config.get('timers.lookupBlocks', 25), my_args=[self], requires_peer=True, max_threads=1) lookup_blocks_timer = OnionrCommunicatorTimers(
# The block download timer is accessed by the block lookup function to trigger faster download starts self,
self.download_blocks_timer = OnionrCommunicatorTimers(self, self.getBlocks, config.get('timers.getBlocks', 10), requires_peer=True, max_threads=5) lookupblocks.lookup_blocks_from_communicator,
config.get('timers.lookupBlocks', 25),
my_args=[self], requires_peer=True, max_threads=1)
# Timer to reset the longest offline peer so contact can be attempted again """The block download timer is accessed by the block lookup function
OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, my_args=[self]) to trigger faster download starts"""
self.download_blocks_timer = OnionrCommunicatorTimers(
self, self.getBlocks, config.get('timers.getBlocks', 10),
requires_peer=True, max_threads=5)
# Timer to reset the longest offline peer
# so contact can be attempted again
OnionrCommunicatorTimers(
self, onlinepeers.clear_offline_peer, 58, my_args=[self])
# Timer to cleanup old blocks # Timer to cleanup old blocks
blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 20, my_args=[self]) blockCleanupTimer = OnionrCommunicatorTimers(
self, housekeeping.clean_old_blocks, 20, my_args=[self])
# Timer to discover new peers # Timer to discover new peers
OnionrCommunicatorTimers(self, lookupadders.lookup_new_peer_transports_with_communicator, 60, requires_peer=True, my_args=[self], max_threads=2) OnionrCommunicatorTimers(
self, lookupadders.lookup_new_peer_transports_with_communicator,
60, requires_peer=True, my_args=[self], max_threads=2)
# Timer for adjusting which peers we actively communicate to at any given time, to avoid over-using peers # Timer for adjusting which peers
OnionrCommunicatorTimers(self, cooldownpeer.cooldown_peer, 30, my_args=[self], requires_peer=True) # we actively communicate to at any given time,
# to avoid over-using peers
OnionrCommunicatorTimers(
self, cooldownpeer.cooldown_peer, 30,
my_args=[self], requires_peer=True)
# Timer to read the upload queue and upload the entries to peers # Timer to read the upload queue and upload the entries to peers
OnionrCommunicatorTimers(self, uploadblocks.upload_blocks_from_communicator, 5, my_args=[self], requires_peer=True, max_threads=1) OnionrCommunicatorTimers(
self, uploadblocks.upload_blocks_from_communicator,
5, my_args=[self], requires_peer=True, max_threads=1)
# Timer to process the daemon command queue # Timer to process the daemon command queue
OnionrCommunicatorTimers(self, daemonqueuehandler.handle_daemon_commands, 6, my_args=[self], max_threads=3) OnionrCommunicatorTimers(
self, daemonqueuehandler.handle_daemon_commands,
6, my_args=[self], max_threads=3)
# Setup direct connections # Setup direct connections
if config.get('general.socket_servers', False): if config.get('general.socket_servers', False):
self.services = onionrservices.OnionrServices() self.services = onionrservices.OnionrServices()
self.active_services = [] self.active_services = []
self.service_greenlets = [] self.service_greenlets = []
OnionrCommunicatorTimers(self, servicecreator.service_creator, 5, max_threads=50, my_args=[self]) OnionrCommunicatorTimers(
self, servicecreator.service_creator, 5,
max_threads=50, my_args=[self])
else: else:
self.services = None self.services = None
# {peer_pubkey: ephemeral_address}, the address to reach them # {peer_pubkey: ephemeral_address}, the address to reach them
self.direct_connection_clients = {} self.direct_connection_clients = {}
# This timer creates deniable blocks, in an attempt to further obfuscate block insertion metadata # This timer creates deniable blocks,
# in an attempt to further obfuscate block insertion metadata
if config.get('general.insert_deniable_blocks', True): if config.get('general.insert_deniable_blocks', True):
deniableBlockTimer = OnionrCommunicatorTimers(self, deniableinserts.insert_deniable_block, 180, my_args=[self], requires_peer=True, max_threads=1) deniableBlockTimer = OnionrCommunicatorTimers(
self, deniableinserts.insert_deniable_block,
180, my_args=[self], requires_peer=True, max_threads=1)
deniableBlockTimer.count = (deniableBlockTimer.frequency - 175) deniableBlockTimer.count = (deniableBlockTimer.frequency - 175)
# Timer to check for connectivity, through Tor to various high-profile onion services # Timer to check for connectivity,
# through Tor to various high-profile onion services
OnionrCommunicatorTimers(self, netcheck.net_check, 500, OnionrCommunicatorTimers(self, netcheck.net_check, 500,
my_args=[self], max_threads=1) my_args=[self], max_threads=1)
# Announce the public API server transport address to other nodes if security level allows # Announce the public API server transport address
if config.get('general.security_level', 1) == 0 and config.get('general.announce_node', True): # to other nodes if security level allows
if config.get('general.security_level', 1) == 0 \
and config.get('general.announce_node', True):
# Default to high security level incase config breaks # Default to high security level incase config breaks
announceTimer = OnionrCommunicatorTimers(self, announcenode.announce_node, 3600, my_args=[self], requires_peer=True, max_threads=1) announceTimer = OnionrCommunicatorTimers(
self,
announcenode.announce_node,
3600, my_args=[self], requires_peer=True, max_threads=1)
announceTimer.count = (announceTimer.frequency - 120) announceTimer.count = (announceTimer.frequency - 120)
else: else:
logger.debug('Will not announce node.') logger.debug('Will not announce node.')
# Timer to delete malfunctioning or long-dead peers
cleanupTimer = OnionrCommunicatorTimers(self, self.peerCleanup, 300, requires_peer=True)
# Timer to cleanup dead ephemeral forward secrecy keys # Timer to delete malfunctioning or long-dead peers
forwardSecrecyTimer = OnionrCommunicatorTimers(self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1) cleanupTimer = OnionrCommunicatorTimers(
self, self.peerCleanup, 300, requires_peer=True)
# Timer to cleanup dead ephemeral forward secrecy keys
OnionrCommunicatorTimers(
self, housekeeping.clean_keys, 15, my_args=[self], max_threads=1)
# Adjust initial timer triggers # Adjust initial timer triggers
peerPoolTimer.count = (peerPoolTimer.frequency - 1) peerPoolTimer.count = (peerPoolTimer.frequency - 1)
@ -197,17 +242,21 @@ class OnionrCommunicatorDaemon:
lookup_blocks_timer = (lookup_blocks_timer.frequency - 2) lookup_blocks_timer = (lookup_blocks_timer.frequency - 2)
shared_state.add(self) shared_state.add(self)
if config.get('general.use_bootstrap', True): if config.get('general.use_bootstrap', True):
bootstrappeers.add_bootstrap_list_to_peer_list(self, [], db_only=True) bootstrappeers.add_bootstrap_list_to_peer_list(
self, [], db_only=True)
if not config.get('onboarding.done', True): if not config.get('onboarding.done', True):
logger.info('First run detected. Run openhome to get setup.', terminal=True) logger.info(
'First run detected. Run openhome to get setup.',
terminal=True)
while not config.get('onboarding.done', True): while not config.get('onboarding.done', True):
time.sleep(5) time.sleep(5)
# Main daemon loop, mainly for calling timers, don't do any complex operations here to avoid locking # Main daemon loop, mainly for calling timers,
# don't do any complex operations here to avoid locking
try: try:
while not self.shutdown: while not self.shutdown:
for i in self.timers: for i in self.timers:
@ -217,9 +266,9 @@ class OnionrCommunicatorDaemon:
time.sleep(self.delay) time.sleep(self.delay)
except KeyboardInterrupt: except KeyboardInterrupt:
self.shutdown = True self.shutdown = True
pass
logger.info('Goodbye. (Onionr is cleaning up, and will exit)', terminal=True) logger.info(
'Goodbye. (Onionr is cleaning up, and will exit)', terminal=True)
try: try:
self.service_greenlets self.service_greenlets
except AttributeError: except AttributeError:
@ -228,18 +277,21 @@ class OnionrCommunicatorDaemon:
# Stop onionr direct connection services # Stop onionr direct connection services
for server in self.service_greenlets: for server in self.service_greenlets:
server.stop() server.stop()
localcommand.local_command('shutdown') # shutdown the api localcommand.local_command('shutdown') # shutdown the api
try: try:
time.sleep(0.5) time.sleep(0.5)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
def getBlocks(self): def getBlocks(self):
'''download new blocks in queue''' """Download new blocks in queue."""
downloadblocks.download_blocks_from_communicator(self) downloadblocks.download_blocks_from_communicator(self)
def decrementThreadCount(self, threadName): def decrementThreadCount(self, threadName):
'''Decrement amount of a thread name if more than zero, called when a function meant to be run in a thread ends''' """Decrement amount of a thread name if more than zero.
called when a function meant to be run in a thread ends
"""
try: try:
if self.threadCounts[threadName] > 0: if self.threadCounts[threadName] > 0:
self.threadCounts[threadName] -= 1 self.threadCounts[threadName] -= 1
@ -247,23 +299,27 @@ class OnionrCommunicatorDaemon:
pass pass
def connectNewPeer(self, peer='', useBootstrap=False): def connectNewPeer(self, peer='', useBootstrap=False):
'''Adds a new random online peer to self.onlinePeers''' """Adds a new random online peer to self.onlinePeers"""
connectnewpeers.connect_new_peer_to_communicator(self, peer, useBootstrap) connectnewpeers.connect_new_peer_to_communicator(
self, peer, useBootstrap)
def peerCleanup(self): def peerCleanup(self):
'''This just calls onionrpeers.cleanupPeers, which removes dead or bad peers (offline too long, too slow)''' """This just calls onionrpeers.cleanupPeers.
Remove dead or bad peers (offline too long, too slow)"""
onionrpeers.peer_cleanup() onionrpeers.peer_cleanup()
self.decrementThreadCount('peerCleanup') self.decrementThreadCount('peerCleanup')
def getPeerProfileInstance(self, peer): def getPeerProfileInstance(self, peer):
'''Gets a peer profile instance from the list of profiles, by address name''' """Gets a peer profile instance from the list of profiles"""
for i in self.peerProfiles: for i in self.peerProfiles:
# if the peer's profile is already loaded, return that # if the peer's profile is already loaded, return that
if i.address == peer: if i.address == peer:
retData = i retData = i
break break
else: else:
# if the peer's profile is not loaded, return a new one. connectNewPeer also adds it to the list on connect # if the peer's profile is not loaded, return a new one.
# connectNewPeer also adds it to the list on connect
retData = onionrpeers.PeerProfiles(peer) retData = onionrpeers.PeerProfiles(peer)
self.peerProfiles.append(retData) self.peerProfiles.append(retData)
return retData return retData
@ -272,21 +328,25 @@ class OnionrCommunicatorDaemon:
return epoch.get_epoch() - self.startTime return epoch.get_epoch() - self.startTime
def heartbeat(self): def heartbeat(self):
'''Show a heartbeat debug message''' """Show a heartbeat debug message."""
logger.debug('Heartbeat. Node running for %s.' % humanreadabletime.human_readable_time(self.getUptime())) logger.debug('Heartbeat. Node running for %s.' %
humanreadabletime.human_readable_time(self.getUptime()))
self.decrementThreadCount('heartbeat') self.decrementThreadCount('heartbeat')
def runCheck(self): def runCheck(self):
"""Show message if run file exists"""
if run_file_exists(self): if run_file_exists(self):
logger.debug('Status check; looks good.') logger.debug('Status check; looks good.')
self.decrementThreadCount('runCheck') self.decrementThreadCount('runCheck')
def startCommunicator(shared_state): def startCommunicator(shared_state):
OnionrCommunicatorDaemon(shared_state) OnionrCommunicatorDaemon(shared_state)
def run_file_exists(daemon): def run_file_exists(daemon):
if os.path.isfile(filepaths.run_check_file): if os.path.isfile(filepaths.run_check_file):
os.remove(filepaths.run_check_file) os.remove(filepaths.run_check_file)
return True return True
return False return False