refactoring communicator into module

This commit is contained in:
Kevin Froman 2019-07-10 17:38:20 -05:00
parent be318f2403
commit 9bf6c76557
12 changed files with 205 additions and 102 deletions

View File

@ -1,4 +1,3 @@
#!/usr/bin/env python3
''' '''
Onionr - Private P2P Communication Onionr - Private P2P Communication
@ -20,9 +19,9 @@
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
import sys, os, time import sys, os, time
import streamedrequests
import core, config, logger, onionr import core, config, logger, onionr
import onionrexceptions, onionrpeers, onionrevents as events, onionrplugins as plugins, onionrblockapi as block import onionrexceptions, onionrpeers, onionrevents as events, onionrplugins as plugins, onionrblockapi as block
from . import onlinepeers
from communicatorutils import servicecreator, onionrcommunicatortimers from communicatorutils import servicecreator, onionrcommunicatortimers
from communicatorutils import downloadblocks, lookupblocks, lookupadders from communicatorutils import downloadblocks, lookupblocks, lookupadders
from communicatorutils import servicecreator, connectnewpeers, uploadblocks from communicatorutils import servicecreator, connectnewpeers, uploadblocks
@ -31,7 +30,6 @@ from communicatorutils import cooldownpeer, housekeeping, netcheck
from onionrutils import localcommand, epoch, basicrequests from onionrutils import localcommand, epoch, basicrequests
from etc import humanreadabletime from etc import humanreadabletime
import onionrservices, onionr, onionrproofs import onionrservices, onionr, onionrproofs
OnionrCommunicatorTimers = onionrcommunicatortimers.OnionrCommunicatorTimers OnionrCommunicatorTimers = onionrcommunicatortimers.OnionrCommunicatorTimers
config.reload() config.reload()
@ -99,7 +97,7 @@ class OnionrCommunicatorDaemon:
# Set timers, function reference, seconds # Set timers, function reference, seconds
# requiresPeer True means the timer function won't fire if we have no connected peers # requiresPeer True means the timer function won't fire if we have no connected peers
peerPoolTimer = OnionrCommunicatorTimers(self, self.getOnlinePeers, 60, maxThreads=1) peerPoolTimer = OnionrCommunicatorTimers(self, onlinepeers.get_online_peers, 60, maxThreads=1, myArgs=[self])
OnionrCommunicatorTimers(self, self.runCheck, 2, maxThreads=1) OnionrCommunicatorTimers(self, self.runCheck, 2, maxThreads=1)
# Timers to periodically lookup new blocks and download them # Timers to periodically lookup new blocks and download them
@ -107,7 +105,7 @@ class OnionrCommunicatorDaemon:
OnionrCommunicatorTimers(self, self.getBlocks, self._core.config.get('timers.getBlocks', 30), requiresPeer=True, maxThreads=2) OnionrCommunicatorTimers(self, self.getBlocks, self._core.config.get('timers.getBlocks', 30), requiresPeer=True, maxThreads=2)
# Timer to reset the longest offline peer so contact can be attempted again # Timer to reset the longest offline peer so contact can be attempted again
OnionrCommunicatorTimers(self, self.clearOfflinePeer, 58) OnionrCommunicatorTimers(self, onlinepeers.clear_offline_peer, 58, myArgs=[self])
# Timer to cleanup old blocks # Timer to cleanup old blocks
blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 65, myArgs=[self]) blockCleanupTimer = OnionrCommunicatorTimers(self, housekeeping.clean_old_blocks, 65, myArgs=[self])
@ -209,56 +207,6 @@ class OnionrCommunicatorDaemon:
except KeyError: except KeyError:
pass pass
def pickOnlinePeer(self):
'''randomly picks peer from pool without bias (using secrets module)'''
retData = ''
while True:
peerLength = len(self.onlinePeers)
if peerLength <= 0:
break
try:
# get a random online peer, securely. May get stuck in loop if network is lost or if all peers in pool magically disconnect at once
retData = self.onlinePeers[self._core._crypto.secrets.randbelow(peerLength)]
except IndexError:
pass
else:
break
return retData
def clearOfflinePeer(self):
'''Removes the longest offline peer to retry later'''
try:
removed = self.offlinePeers.pop(0)
except IndexError:
pass
else:
logger.debug('Removed ' + removed + ' from offline list, will try them again.')
self.decrementThreadCount('clearOfflinePeer')
def getOnlinePeers(self):
'''
Manages the self.onlinePeers attribute list, connects to more peers if we have none connected
'''
logger.debug('Refreshing peer pool...')
maxPeers = int(config.get('peers.max_connect', 10))
needed = maxPeers - len(self.onlinePeers)
for i in range(needed):
if len(self.onlinePeers) == 0:
self.connectNewPeer(useBootstrap=True)
else:
self.connectNewPeer()
if self.shutdown:
break
else:
if len(self.onlinePeers) == 0:
logger.debug('Couldn\'t connect to any peers.' + (' Last node seen %s ago.' % humanreadabletime.human_readable_time(time.time() - self.lastNodeSeen) if not self.lastNodeSeen is None else ''), terminal=True)
else:
self.lastNodeSeen = time.time()
self.decrementThreadCount('getOnlinePeers')
def addBootstrapListToPeerList(self, peerList): def addBootstrapListToPeerList(self, peerList):
''' '''
Add the bootstrap list to the peer list (no duplicates) Add the bootstrap list to the peer list (no duplicates)
@ -302,37 +250,6 @@ class OnionrCommunicatorDaemon:
score = str(self.getPeerProfileInstance(i).score) score = str(self.getPeerProfileInstance(i).score)
logger.info(i + ', score: ' + score, terminal=True) logger.info(i + ', score: ' + score, terminal=True)
def peerAction(self, peer, action, data='', returnHeaders=False, max_resp_size=5242880):
'''Perform a get request to a peer'''
penalty_score = -10
if len(peer) == 0:
return False
#logger.debug('Performing ' + action + ' with ' + peer + ' on port ' + str(self.proxyPort))
url = 'http://%s/%s' % (peer, action)
if len(data) > 0:
url += '&data=' + data
self._core.setAddressInfo(peer, 'lastConnectAttempt', epoch.get_epoch()) # mark the time we're trying to request this peer
try:
retData = basicrequests.do_get_request(self._core, url, port=self.proxyPort, max_size=max_resp_size)
except streamedrequests.exceptions.ResponseLimitReached:
retData = False
penalty_score = -100
# if request failed, (error), mark peer offline
if retData == False:
try:
self.getPeerProfileInstance(peer).addScore(penalty_score)
self.removeOnlinePeer(peer)
if action != 'ping' and not self.shutdown:
logger.warn('Lost connection to ' + peer, terminal=True)
self.getOnlinePeers() # Will only add a new peer to pool if needed
except ValueError:
pass
else:
self._core.setAddressInfo(peer, 'lastConnect', epoch.get_epoch())
self.getPeerProfileInstance(peer).addScore(1)
return retData # If returnHeaders, returns tuple of data, headers. if not, just data string
def getPeerProfileInstance(self, peer): def getPeerProfileInstance(self, peer):
'''Gets a peer profile instance from the list of profiles, by address name''' '''Gets a peer profile instance from the list of profiles, by address name'''
for i in self.peerProfiles: for i in self.peerProfiles:

View File

@ -0,0 +1,25 @@
'''
Onionr - Private P2P Communication
interact with the peer pool in a communicator instance
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
from . import clearofflinepeer, onlinepeers, pickonlinepeers
clear_offline_peer = clearofflinepeer.clear_offline_peer
get_online_peers = onlinepeers.get_online_peers
pick_online_peer = pickonlinepeers.pick_online_peer

View File

@ -0,0 +1,29 @@
'''
Onionr - Private P2P Communication
clear offline peer in a communicator instance
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import logger
def clear_offline_peer(comm_inst):
'''Removes the longest offline peer to retry later'''
try:
removed = comm_inst.offlinePeers.pop(0)
except IndexError:
pass
else:
logger.debug('Removed ' + removed + ' from offline list, will try them again.')
comm_inst.decrementThreadCount('clear_offline_peer')

View File

@ -0,0 +1,45 @@
'''
Onionr - Private P2P Communication
get online peers in a communicator instance
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import time
from etc import humanreadabletime
import logger
def get_online_peers(comm_inst):
'''
Manages the comm_inst.onlinePeers attribute list, connects to more peers if we have none connected
'''
config = comm_inst._core.config
logger.debug('Refreshing peer pool...')
maxPeers = int(config.get('peers.max_connect', 10))
needed = maxPeers - len(comm_inst.onlinePeers)
for i in range(needed):
if len(comm_inst.onlinePeers) == 0:
comm_inst.connectNewPeer(useBootstrap=True)
else:
comm_inst.connectNewPeer()
if comm_inst.shutdown:
break
else:
if len(comm_inst.onlinePeers) == 0:
logger.debug('Couldn\'t connect to any peers.' + (' Last node seen %s ago.' % humanreadabletime.human_readable_time(time.time() - comm_inst.lastNodeSeen) if not comm_inst.lastNodeSeen is None else ''), terminal=True)
else:
comm_inst.lastNodeSeen = time.time()
comm_inst.decrementThreadCount('get_online_peers')

View File

@ -0,0 +1,34 @@
'''
Onionr - Private P2P Communication
pick online peers in a communicator instance
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
def pick_online_peer(comm_inst):
'''randomly picks peer from pool without bias (using secrets module)'''
retData = ''
while True:
peerLength = len(comm_inst.onlinePeers)
if peerLength <= 0:
break
try:
# get a random online peer, securely. May get stuck in loop if network is lost or if all peers in pool magically disconnect at once
retData = comm_inst.onlinePeers[comm_inst._core._crypto.secrets.randbelow(peerLength)]
except IndexError:
pass
else:
break
return retData

View File

@ -0,0 +1,52 @@
'''
Onionr - Private P2P Communication
This file implements logic for performing requests to Onionr peers
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import streamedrequests
import logger
from onionrutils import epoch, basicrequests
from . import onlinepeers
def peer_action(comm_inst, peer, action, data='', returnHeaders=False, max_resp_size=5242880):
'''Perform a get request to a peer'''
penalty_score = -10
if len(peer) == 0:
return False
url = 'http://%s/%s' % (peer, action)
if len(data) > 0:
url += '&data=' + data
comm_inst._core.setAddressInfo(peer, 'lastConnectAttempt', epoch.get_epoch()) # mark the time we're trying to request this peer
try:
retData = basicrequests.do_get_request(comm_inst._core, url, port=comm_inst.proxyPort, max_size=max_resp_size)
except streamedrequests.exceptions.ResponseLimitReached:
retData = False
penalty_score = -100
# if request failed, (error), mark peer offline
if retData == False:
try:
comm_inst.getPeerProfileInstance(peer).addScore(penalty_score)
comm_inst.removeOnlinePeer(peer)
if action != 'ping' and not comm_inst.shutdown:
logger.warn('Lost connection to ' + peer, terminal=True)
onlinepeers.get_online_peers(comm_inst) # Will only add a new peer to pool if needed
except ValueError:
pass
else:
comm_inst._core.setAddressInfo(peer, 'lastConnect', epoch.get_epoch())
comm_inst.getPeerProfileInstance(peer).addScore(1)
return retData # If returnHeaders, returns tuple of data, headers. if not, just data string

View File

@ -21,6 +21,7 @@ import base64
import onionrproofs, logger import onionrproofs, logger
from etc import onionrvalues from etc import onionrvalues
from onionrutils import basicrequests, bytesconverter from onionrutils import basicrequests, bytesconverter
from communicator import onlinepeers
def announce_node(daemon): def announce_node(daemon):
'''Announce our node to our peers''' '''Announce our node to our peers'''
@ -39,7 +40,7 @@ def announce_node(daemon):
peer = i peer = i
break break
else: else:
peer = daemon.pickOnlinePeer() peer = onlinepeers.pick_online_peer(daemon)
for x in range(1): for x in range(1):
if x == 1 and daemon._core.config.get('i2p.host'): if x == 1 and daemon._core.config.get('i2p.host'):

View File

@ -17,15 +17,12 @@
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
import time, sys import time, sys, secrets
import onionrexceptions, logger, onionrpeers import onionrexceptions, logger, onionrpeers
from utils import networkmerger from utils import networkmerger
from onionrutils import stringvalidators, epoch from onionrutils import stringvalidators, epoch
# secrets module was added into standard lib in 3.6+ from communicator import peeraction
if sys.version_info[0] == 3 and sys.version_info[1] < 6:
from dependencies import secrets
elif sys.version_info[0] == 3 and sys.version_info[1] >= 6:
import secrets
def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False): def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
config = comm_inst._core.config config = comm_inst._core.config
retData = False retData = False
@ -67,7 +64,7 @@ def connect_new_peer_to_communicator(comm_inst, peer='', useBootstrap=False):
if comm_inst.shutdown: if comm_inst.shutdown:
return return
# Ping a peer, # Ping a peer,
if comm_inst.peerAction(address, 'ping') == 'pong!': if peeraction.peer_action(comm_inst, address, 'ping') == 'pong!':
time.sleep(0.1) time.sleep(0.1)
if address not in mainPeerList: if address not in mainPeerList:
# Add a peer to our list if it isn't already since it successfully connected # Add a peer to our list if it isn't already since it successfully connected

View File

@ -21,6 +21,7 @@ import communicator, onionrexceptions
import logger, onionrpeers import logger, onionrpeers
from onionrutils import blockmetadata, stringvalidators, validatemetadata from onionrutils import blockmetadata, stringvalidators, validatemetadata
from . import shoulddownload from . import shoulddownload
from communicator import peeraction, onlinepeers
def download_blocks_from_communicator(comm_inst): def download_blocks_from_communicator(comm_inst):
assert isinstance(comm_inst, communicator.OnionrCommunicatorDaemon) assert isinstance(comm_inst, communicator.OnionrCommunicatorDaemon)
@ -47,14 +48,14 @@ def download_blocks_from_communicator(comm_inst):
comm_inst.currentDownloading.append(blockHash) # So we can avoid concurrent downloading in other threads of same block comm_inst.currentDownloading.append(blockHash) # So we can avoid concurrent downloading in other threads of same block
if len(blockPeers) == 0: if len(blockPeers) == 0:
peerUsed = comm_inst.pickOnlinePeer() peerUsed = onlinepeers.pick_online_peer(comm_inst)
else: else:
blockPeers = comm_inst._core._crypto.randomShuffle(blockPeers) blockPeers = comm_inst._core._crypto.randomShuffle(blockPeers)
peerUsed = blockPeers.pop(0) peerUsed = blockPeers.pop(0)
if not comm_inst.shutdown and peerUsed.strip() != '': if not comm_inst.shutdown and peerUsed.strip() != '':
logger.info("Attempting to download %s from %s..." % (blockHash[:12], peerUsed)) logger.info("Attempting to download %s from %s..." % (blockHash[:12], peerUsed))
content = comm_inst.peerAction(peerUsed, 'getdata/' + blockHash, max_resp_size=3000000) # block content from random peer (includes metadata) content = peeraction.peer_action(comm_inst, peerUsed, 'getdata/' + blockHash, max_resp_size=3000000) # block content from random peer (includes metadata)
if content != False and len(content) > 0: if content != False and len(content) > 0:
try: try:
content = content.encode() content = content.encode()

View File

@ -19,7 +19,7 @@
''' '''
import logger import logger
from onionrutils import stringvalidators from onionrutils import stringvalidators
from communicator import peeraction, onlinepeers
def lookup_new_peer_transports_with_communicator(comm_inst): def lookup_new_peer_transports_with_communicator(comm_inst):
logger.info('Looking up new addresses...') logger.info('Looking up new addresses...')
tryAmount = 1 tryAmount = 1
@ -29,8 +29,8 @@ def lookup_new_peer_transports_with_communicator(comm_inst):
if len(newPeers) > 10000: if len(newPeers) > 10000:
# Don't get new peers if we have too many queued up # Don't get new peers if we have too many queued up
break break
peer = comm_inst.pickOnlinePeer() peer = onlinepeers.pick_online_peer(comm_inst)
newAdders = comm_inst.peerAction(peer, action='pex') newAdders = peeraction.peer_action(comm_inst, peer, action='pex')
try: try:
newPeers = newAdders.split(',') newPeers = newAdders.split(',')
except AttributeError: except AttributeError:

View File

@ -19,6 +19,7 @@
''' '''
import logger, onionrproofs import logger, onionrproofs
from onionrutils import stringvalidators, epoch from onionrutils import stringvalidators, epoch
from communicator import peeraction, onlinepeers
def lookup_blocks_from_communicator(comm_inst): def lookup_blocks_from_communicator(comm_inst):
logger.info('Looking up new blocks...') logger.info('Looking up new blocks...')
@ -39,7 +40,7 @@ def lookup_blocks_from_communicator(comm_inst):
if comm_inst._core.storage_counter.isFull(): if comm_inst._core.storage_counter.isFull():
logger.debug('Not looking up new blocks due to maximum amount of allowed disk space used') logger.debug('Not looking up new blocks due to maximum amount of allowed disk space used')
break break
peer = comm_inst.pickOnlinePeer() # select random online peer peer = onlinepeers.pick_online_peer(comm_inst) # select random online peer
# if we've already tried all the online peers this time around, stop # if we've already tried all the online peers this time around, stop
if peer in triedPeers: if peer in triedPeers:
if len(comm_inst.onlinePeers) == len(triedPeers): if len(comm_inst.onlinePeers) == len(triedPeers):
@ -57,7 +58,7 @@ def lookup_blocks_from_communicator(comm_inst):
else: else:
listLookupCommand += '?date=%s' % (lastLookupTime,) listLookupCommand += '?date=%s' % (lastLookupTime,)
try: try:
newBlocks = comm_inst.peerAction(peer, listLookupCommand) # get list of new block hashes newBlocks = peeraction.peer_action(comm_inst, peer, listLookupCommand) # get list of new block hashes
except Exception as error: except Exception as error:
logger.warn('Could not get new blocks from %s.' % peer, error = error) logger.warn('Could not get new blocks from %s.' % peer, error = error)
newBlocks = False newBlocks = False

View File

@ -21,6 +21,7 @@ import logger
from communicatorutils import proxypicker from communicatorutils import proxypicker
import onionrblockapi as block import onionrblockapi as block
from onionrutils import localcommand, stringvalidators, basicrequests from onionrutils import localcommand, stringvalidators, basicrequests
from communicator import onlinepeers
def upload_blocks_from_communicator(comm_inst): def upload_blocks_from_communicator(comm_inst):
# when inserting a block, we try to upload it to a few peers to add some deniability # when inserting a block, we try to upload it to a few peers to add some deniability
@ -35,7 +36,7 @@ def upload_blocks_from_communicator(comm_inst):
comm_inst.decrementThreadCount('uploadBlock') comm_inst.decrementThreadCount('uploadBlock')
return return
for i in range(min(len(comm_inst.onlinePeers), 6)): for i in range(min(len(comm_inst.onlinePeers), 6)):
peer = comm_inst.pickOnlinePeer() peer = onlinepeers.pick_online_peer(comm_inst)
if peer in triedPeers: if peer in triedPeers:
continue continue
triedPeers.append(peer) triedPeers.append(peer)