Moved communicator shutdown over to KV model for more decoupling

This commit is contained in:
Kevin 2020-07-25 21:36:48 -05:00
parent 0460d3380f
commit 0e4e7bb050
8 changed files with 30 additions and 22 deletions

View File

@ -61,6 +61,7 @@ class OnionrCommunicatorDaemon:
# populate kv values # populate kv values
self.shared_state.get_by_string('DeadSimpleKV').put('blockQueue', {}) self.shared_state.get_by_string('DeadSimpleKV').put('blockQueue', {})
self.shared_state.get_by_string('DeadSimpleKV').put('shutdown', False)
if config.get('general.offline_mode', False): if config.get('general.offline_mode', False):
self.isOnline = False self.isOnline = False
@ -97,9 +98,6 @@ class OnionrCommunicatorDaemon:
# amount of threads running by name, used to prevent too many # amount of threads running by name, used to prevent too many
self.threadCounts = {} self.threadCounts = {}
# set true when shutdown command received
self.shutdown = False
# list of blocks currently downloading # list of blocks currently downloading
self.currentDownloading = [] self.currentDownloading = []
@ -239,23 +237,28 @@ class OnionrCommunicatorDaemon:
get_url() get_url()
while not config.get('onboarding.done', True) and \ while not config.get('onboarding.done', True) and \
not self.shutdown: not self.shared_state.get_by_string(
'DeadSimpleKV').get('shutdown'):
try: try:
time.sleep(2) time.sleep(2)
except KeyboardInterrupt: except KeyboardInterrupt:
self.shutdown = True self.shared_state.get_by_string(
'DeadSimpleKV').put('shutdown', True)
# Main daemon loop, mainly for calling timers, # Main daemon loop, mainly for calling timers,
# don't do any complex operations here to avoid locking # don't do any complex operations here to avoid locking
try: try:
while not self.shutdown: while not self.shared_state.get_by_string(
'DeadSimpleKV').get('shutdown'):
for i in self.timers: for i in self.timers:
if self.shutdown: if self.shared_state.get_by_string(
'DeadSimpleKV').get('shutdown'):
break break
i.processTimer() i.processTimer()
time.sleep(self.delay) time.sleep(self.delay)
except KeyboardInterrupt: except KeyboardInterrupt:
self.shutdown = True self.shared_state.get_by_string(
'DeadSimpleKV').put('shutdown', True)
logger.info( logger.info(
'Goodbye. (Onionr is cleaning up, and will exit)', terminal=True) 'Goodbye. (Onionr is cleaning up, and will exit)', terminal=True)

View File

@ -31,6 +31,7 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
Connect to more peers if we have none connected Connect to more peers if we have none connected
""" """
config = comm_inst.config config = comm_inst.config
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
if config.get('general.offline_mode', False): if config.get('general.offline_mode', False):
comm_inst.decrementThreadCount('get_online_peers') comm_inst.decrementThreadCount('get_online_peers')
return return
@ -49,7 +50,7 @@ def get_online_peers(comm_inst: 'OnionrCommunicatorDaemon'):
else: else:
comm_inst.connectNewPeer() comm_inst.connectNewPeer()
if comm_inst.shutdown: if kv.get('shutdown'):
break break
else: else:
if len(comm_inst.onlinePeers) == 0: if len(comm_inst.onlinePeers) == 0:

View File

@ -27,6 +27,7 @@ def peer_action(comm_inst, peer, action,
returnHeaders=False, max_resp_size=5242880): returnHeaders=False, max_resp_size=5242880):
"""Perform a get request to a peer.""" """Perform a get request to a peer."""
penalty_score = -10 penalty_score = -10
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
if len(peer) == 0: if len(peer) == 0:
return False return False
url = 'http://%s/%s' % (peer, action) url = 'http://%s/%s' % (peer, action)
@ -47,7 +48,7 @@ def peer_action(comm_inst, peer, action,
onlinepeers.remove_online_peer(comm_inst, peer) onlinepeers.remove_online_peer(comm_inst, peer)
keydb.transportinfo.set_address_info( keydb.transportinfo.set_address_info(
peer, 'lastConnectAttempt', epoch.get_epoch()) peer, 'lastConnectAttempt', epoch.get_epoch())
if action != 'ping' and not comm_inst.shutdown: if action != 'ping' and not kv.get('shutdown'):
logger.warn(f'Lost connection to {peer}', terminal=True) logger.warn(f'Lost connection to {peer}', terminal=True)
# Will only add a new peer to pool if needed # Will only add a new peer to pool if needed
onlinepeers.get_online_peers(comm_inst) onlinepeers.get_online_peers(comm_inst)

View File

@ -33,6 +33,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
config = comm_inst.config config = comm_inst.config
retData = False retData = False
tried = comm_inst.offlinePeers tried = comm_inst.offlinePeers
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
transports = gettransports.get() transports = gettransports.get()
if peer != '': if peer != '':
if stringvalidators.validate_transport(peer): if stringvalidators.validate_transport(peer):
@ -77,7 +78,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
or address in comm_inst.onlinePeers \ or address in comm_inst.onlinePeers \
or address in comm_inst.cooldownPeer: or address in comm_inst.cooldownPeer:
continue continue
if comm_inst.shutdown: if kv.get('shutdown'):
return return
# Ping a peer, # Ping a peer,
ret = peeraction.peer_action(comm_inst, address, 'ping') ret = peeraction.peer_action(comm_inst, address, 'ping')

View File

@ -63,7 +63,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
if not shoulddownload.should_download(comm_inst, blockHash): if not shoulddownload.should_download(comm_inst, blockHash):
continue continue
if comm_inst.shutdown or not comm_inst.isOnline or \ if kv.get('shutdown') or not comm_inst.isOnline or \
storage_counter.is_full(): storage_counter.is_full():
# Exit loop if shutting down or offline, or disk allocation reached # Exit loop if shutting down or offline, or disk allocation reached
break break
@ -84,7 +84,7 @@ def download_blocks_from_communicator(comm_inst: "OnionrCommunicatorDaemon"):
blockPeers = onionrcrypto.cryptoutils.random_shuffle(blockPeers) blockPeers = onionrcrypto.cryptoutils.random_shuffle(blockPeers)
peerUsed = blockPeers.pop(0) peerUsed = blockPeers.pop(0)
if not comm_inst.shutdown and peerUsed.strip() != '': if not kv.get('shutdown') and peerUsed.strip() != '':
logger.info( logger.info(
f"Attempting to download %s from {peerUsed}..." % (blockHash[:12],)) f"Attempting to download %s from {peerUsed}..." % (blockHash[:12],))
content = peeraction.peer_action( content = peeraction.peer_action(

View File

@ -32,6 +32,7 @@ def net_check(comm_inst):
""" """
# for detecting if we have received incoming connections recently # for detecting if we have received incoming connections recently
rec = False rec = False
kv: "DeadSimpleKV" = comm_inst.shared_state.get_by_string("DeadSimpleKV")
if len(comm_inst.onlinePeers) == 0: if len(comm_inst.onlinePeers) == 0:
try: try:
if (epoch.get_epoch() - int(localcommand.local_command if (epoch.get_epoch() - int(localcommand.local_command
@ -41,7 +42,7 @@ def net_check(comm_inst):
except ValueError: except ValueError:
pass pass
if not rec and not netutils.checkNetwork(torPort=comm_inst.proxyPort): if not rec and not netutils.checkNetwork(torPort=comm_inst.proxyPort):
if not comm_inst.shutdown: if not kv.get('shutdown'):
if not comm_inst.config.get('general.offline_mode', False): if not comm_inst.config.get('general.offline_mode', False):
logger.warn('Network check failed, are you connected to ' + logger.warn('Network check failed, are you connected to ' +
'the Internet, and is Tor working? ' + 'the Internet, and is Tor working? ' +

View File

@ -13,6 +13,7 @@ import logger
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Callable, NewType, Iterable from typing import Callable, NewType, Iterable
if TYPE_CHECKING: if TYPE_CHECKING:
from deadsimplekv import DeadSimpleKV
from communicator import OnionrCommunicatorDaemon from communicator import OnionrCommunicatorDaemon
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
@ -47,6 +48,8 @@ class OnionrCommunicatorTimers:
self.daemon_inst = daemon_inst self.daemon_inst = daemon_inst
self.max_threads = max_threads self.max_threads = max_threads
self.args = my_args self.args = my_args
self.kv: "DeadSimpleKV" = daemon_inst.shared_state.get_by_string(
"DeadSimpleKV")
self.daemon_inst.timers.append(self) self.daemon_inst.timers.append(self)
self.count = 0 self.count = 0
@ -60,7 +63,7 @@ class OnionrCommunicatorTimers:
self.daemon_inst.threadCounts[self.timer_function.__name__] = 0 self.daemon_inst.threadCounts[self.timer_function.__name__] = 0
# execute timer's func, if we are not missing *required* online peer # execute timer's func, if we are not missing *required* online peer
if self.count == self.frequency and not self.daemon_inst.shutdown: if self.count == self.frequency and not self.kv.get('shutdown'):
try: try:
if self.requires_peer and \ if self.requires_peer and \
len(self.daemon_inst.onlinePeers) == 0: len(self.daemon_inst.onlinePeers) == 0:

View File

@ -1,13 +1,9 @@
""" """Onionr - Private P2P Communication.
Onionr - Private P2P Communication
Shutdown the node either hard or cleanly Shutdown the node either hard or cleanly
""" """
from flask import Blueprint, Response from flask import Blueprint, Response
from flask import g from flask import g
from onionrblocks import onionrblockapi
import onionrexceptions
from onionrutils import stringvalidators
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -25,6 +21,7 @@ from onionrutils import stringvalidators
shutdown_bp = Blueprint('shutdown', __name__) shutdown_bp = Blueprint('shutdown', __name__)
def shutdown(client_api_inst): def shutdown(client_api_inst):
try: try:
client_api_inst.publicAPI.httpServer.stop() client_api_inst.publicAPI.httpServer.stop()
@ -33,8 +30,9 @@ def shutdown(client_api_inst):
pass pass
return Response("bye") return Response("bye")
@shutdown_bp.route('/shutdownclean') @shutdown_bp.route('/shutdownclean')
def shutdown_clean(): def shutdown_clean():
# good for calling from other clients # good for calling from other clients
g.too_many.get_by_string("OnionrCommunicatorDaemon").shutdown = True g.too_many.get_by_string("DeadSimpleKV").put('shutdown', True)
return Response("bye") return Response("bye")