Moved all communicator ext vars to KV

This commit is contained in:
Kevin 2020-07-29 03:57:06 -05:00
parent 080f33bf1f
commit f28d469e56
7 changed files with 34 additions and 35 deletions

View File

@ -10,7 +10,8 @@ import config
import logger import logger
import onionrpeers import onionrpeers
import onionrplugins as plugins import onionrplugins as plugins
from . import onlinepeers, uploadqueue from . import onlinepeers
from . import uploadqueue
from communicatorutils import servicecreator from communicatorutils import servicecreator
from communicatorutils import onionrcommunicatortimers from communicatorutils import onionrcommunicatortimers
from communicatorutils import downloadblocks from communicatorutils import downloadblocks
@ -65,6 +66,8 @@ class OnionrCommunicatorDaemon:
self.kv.put('shutdown', False) self.kv.put('shutdown', False)
self.kv.put('onlinePeers', []) self.kv.put('onlinePeers', [])
self.kv.put('offlinePeers', []) self.kv.put('offlinePeers', [])
self.kv.put('peerProfiles', [])
self.kv.put('connectTimes', {})
self.kv.put('currentDownloading', []) self.kv.put('currentDownloading', [])
self.kv.put('announceCache', {}) self.kv.put('announceCache', {})
self.kv.put('newPeers', []) self.kv.put('newPeers', [])
@ -72,6 +75,8 @@ class OnionrCommunicatorDaemon:
self.kv.put('blocksToUpload', []) self.kv.put('blocksToUpload', [])
self.kv.put('cooldownPeer', {}) self.kv.put('cooldownPeer', {})
self.kv.put('generating_blocks', []) self.kv.put('generating_blocks', [])
self.kv.put('lastNodeSeen', None)
self.kv.put('startTime', epoch.get_epoch())
if config.get('general.offline_mode', False): if config.get('general.offline_mode', False):
self.isOnline = False self.isOnline = False
@ -89,23 +94,12 @@ class OnionrCommunicatorDaemon:
# loop time.sleep delay in seconds # loop time.sleep delay in seconds
self.delay = 1 self.delay = 1
# lists of connected peers and peers we know we can't reach currently
self.connectTimes = {}
# list of peer's profiles (onionrpeers.PeerProfile instances)
self.peerProfiles = []
# amount of threads running by name, used to prevent too many # amount of threads running by name, used to prevent too many
self.threadCounts = {} self.threadCounts = {}
# timestamp when the last online node was seen
self.lastNodeSeen = None
# Loads in and starts the enabled plugins # Loads in and starts the enabled plugins
plugins.reload() plugins.reload()
# time app started running for info/statistics purposes
self.startTime = epoch.get_epoch()
# extends our upload list and saves our list when Onionr exits # extends our upload list and saves our list when Onionr exits
uploadqueue.UploadQueue(self) uploadqueue.UploadQueue(self)
@ -296,7 +290,7 @@ class OnionrCommunicatorDaemon:
def getPeerProfileInstance(self, peer): def getPeerProfileInstance(self, peer):
"""Gets a peer profile instance from the list of profiles""" """Gets a peer profile instance from the list of profiles"""
for i in self.peerProfiles: for i in self.kv.get('peerProfiles'):
# if the peer's profile is already loaded, return that # if the peer's profile is already loaded, return that
if i.address == peer: if i.address == peer:
retData = i retData = i
@ -305,19 +299,16 @@ class OnionrCommunicatorDaemon:
# if the peer's profile is not loaded, return a new one. # if the peer's profile is not loaded, return a new one.
# connectNewPeer also adds it to the list on connect # connectNewPeer also adds it to the list on connect
retData = onionrpeers.PeerProfiles(peer) retData = onionrpeers.PeerProfiles(peer)
self.peerProfiles.append(retData) self.kv.get('peerProfiles').append(retData)
return retData return retData
def getUptime(self):
return epoch.get_epoch() - self.startTime
def heartbeat(self): def heartbeat(self):
"""Show a heartbeat debug message.""" """Show a heartbeat debug message."""
logger.debug('Heartbeat. Node running for %s.' % logger.debug('Heartbeat. Node running for %s.' %
humanreadabletime.human_readable_time(self.getUptime())) humanreadabletime.human_readable_time(
self.kv.get('startTime')))
self.decrementThreadCount('heartbeat') self.decrementThreadCount('heartbeat')
def startCommunicator(shared_state): def startCommunicator(shared_state):
OnionrCommunicatorDaemon(shared_state) OnionrCommunicatorDaemon(shared_state)

View File

@ -5,7 +5,7 @@ get online peers in a communicator instance
import time import time
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from etc import humanreadabletime from etc.humanreadabletime import human_readable_time
import logger import logger
if TYPE_CHECKING: if TYPE_CHECKING:
from deadsimplekv import DeadSimpleKV from deadsimplekv import DeadSimpleKV
@ -41,9 +41,8 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
needed = max_peers - len(kv.get('onlinePeers')) needed = max_peers - len(kv.get('onlinePeers'))
last_seen = 'never' last_seen = 'never'
if not isinstance(comm_inst.lastNodeSeen, type(None)): if not isinstance(kv.get('lastNodeSeen'), type(None)):
last_seen = humanreadabletime.human_readable_time( last_seen = human_readable_time(kv.get('lastNodeSeen'))
comm_inst.lastNodeSeen)
for _ in range(needed): for _ in range(needed):
if len(kv.get('onlinePeers')) == 0: if len(kv.get('onlinePeers')) == 0:
@ -62,5 +61,5 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
except RecursionError: except RecursionError:
pass pass
else: else:
comm_inst.lastNodeSeen = time.time() kv.put('lastNodeSeen', time.time())
comm_inst.decrementThreadCount('get_online_peers') comm_inst.decrementThreadCount('get_online_peers')

View File

@ -26,7 +26,7 @@ def remove_online_peer(comm_inst, peer):
"""Remove an online peer.""" """Remove an online peer."""
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
try: try:
del comm_inst.connectTimes[peer] del kv.get('connectTimes')[peer]
except KeyError: except KeyError:
pass pass
try: try:

View File

@ -90,15 +90,15 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
if address not in kv.get('onlinePeers'): if address not in kv.get('onlinePeers'):
logger.info('Connected to ' + address, terminal=True) logger.info('Connected to ' + address, terminal=True)
kv.get('onlinePeers').append(address) kv.get('onlinePeers').append(address)
comm_inst.connectTimes[address] = epoch.get_epoch() kv.get('connectTimes')[address] = epoch.get_epoch()
retData = address retData = address
# add peer to profile list if they're not in it # add peer to profile list if they're not in it
for profile in comm_inst.peerProfiles: for profile in kv.get('peerProfiles'):
if profile.address == address: if profile.address == address:
break break
else: else:
comm_inst.peerProfiles.append( kv.get('peerProfiles').append(
onionrpeers.PeerProfiles(address)) onionrpeers.PeerProfiles(address))
break break
else: else:

View File

@ -33,7 +33,7 @@ def cooldown_peer(comm_inst):
minTime = 300 minTime = 300
cooldown_time = 600 cooldown_time = 600
to_cool = '' to_cool = ''
tempConnectTimes = dict(comm_inst.connectTimes) tempConnectTimes = dict(kv.get('connectTimes'))
# Remove peers from cooldown that have been there long enough # Remove peers from cooldown that have been there long enough
tempCooldown = dict(kv.get('cooldownPeer')) tempCooldown = dict(kv.get('cooldownPeer'))

View File

@ -88,7 +88,7 @@ class BlockUploadSessionManager:
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string( kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string(
"DeadSimpleKV") "DeadSimpleKV")
sessions_to_delete = [] sessions_to_delete = []
if comm_inst.getUptime() < 120: if kv.get('startTime') < 120:
return return
onlinePeerCount = len(kv.get('onlinePeers')) onlinePeerCount = len(kv.get('onlinePeers'))
@ -105,7 +105,8 @@ class BlockUploadSessionManager:
# Clean sessions if they have uploaded to enough online peers # Clean sessions if they have uploaded to enough online peers
if sess.total_success_count <= 0: if sess.total_success_count <= 0:
continue continue
if (sess.total_success_count / onlinePeerCount) >= onionrvalues.MIN_BLOCK_UPLOAD_PEER_PERCENT: if (sess.total_success_count / onlinePeerCount) >= \
onionrvalues.MIN_BLOCK_UPLOAD_PEER_PERCENT:
sessions_to_delete.append(sess) sessions_to_delete.append(sess)
for sess in sessions_to_delete: for sess in sessions_to_delete:
try: try:

View File

@ -2,6 +2,8 @@
Serialize various node information Serialize various node information
""" """
from typing import TYPE_CHECKING
from gevent import sleep from gevent import sleep
from psutil import Process, WINDOWS from psutil import Process, WINDOWS
@ -11,6 +13,9 @@ from coredb import blockmetadb
from utils.sizeutils import size, human_size from utils.sizeutils import size, human_size
from utils.identifyhome import identify_home from utils.identifyhome import identify_home
import communicator import communicator
if TYPE_CHECKING:
from deadsimplekv import DeadSimpleKV
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -44,18 +49,21 @@ class SerializedData:
proc = Process() proc = Process()
def get_open_files(): def get_open_files():
if WINDOWS: return proc.num_handles() if WINDOWS:
return proc.num_handles()
return proc.num_fds() return proc.num_fds()
try: try:
self._too_many self._too_many
except AttributeError: except AttributeError:
sleep(1) sleep(1)
comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon, args=(self._too_many,)) comm_inst = self._too_many.get(communicator.OnionrCommunicatorDaemon,
args=(self._too_many,))
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV") kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
connected = [] connected = []
[connected.append(x) for x in kv.get('onlinePeers') if x not in connected] [connected.append(x)
stats['uptime'] = comm_inst.getUptime() for x in kv.get('onlinePeers') if x not in connected]
stats['uptime'] = kv.get('getUptime')
stats['connectedNodes'] = '\n'.join(connected) stats['connectedNodes'] = '\n'.join(connected)
stats['blockCount'] = len(blockmetadb.get_block_list()) stats['blockCount'] = len(blockmetadb.get_block_list())
stats['blockQueueCount'] = len(kv.get('blockQueue')) stats['blockQueueCount'] = len(kv.get('blockQueue'))