communicator refactoring and better documentation

refactorinsert
Kevin Froman 3 years ago
parent d17970b181
commit d4191b2cb5
  1. 2
      README.md
  2. 97
      onionr/communicator.py
  3. 55
      onionr/communicatorutils/daemonqueuehandler.py
  4. 48
      onionr/communicatorutils/lookupadders.py
  5. 3
      onionr/communicatorutils/onionrdaemontools.py
  6. 3
      onionr/netcontroller.py

@ -23,7 +23,7 @@ Onionr is a decentralized, peer-to-peer communication and storage network, desig
Onionr stores data in independent packages referred to as 'blocks'. The blocks are synced to all other nodes in the network. Blocks and user IDs cannot be easily proven to have been created by a particular user. Even if there is enough evidence to believe that a specific user created a block, nodes still operate behind Tor or I2P and as such cannot be trivially unmasked.
Users are identified by ed25519 public keys, which can be used to sign blocks or send encrypted data.
Users are identified by ed25519/curve25519 public keys, which can be used to sign blocks or send encrypted data.
Onionr can be used for mail, as a social network, instant messenger, file sharing software, or for encrypted group discussion.

@ -23,8 +23,9 @@ import sys, os, core, config, json, requests, time, logger, threading, base64, o
from dependencies import secrets
import onionrexceptions, onionrpeers, onionrevents as events, onionrplugins as plugins, onionrblockapi as block
from communicatorutils import onionrdaemontools, servicecreator, onionrcommunicatortimers
from communicatorutils import proxypicker, downloadblocks, lookupblocks
from communicatorutils import proxypicker, downloadblocks, lookupblocks, lookupadders
from communicatorutils import servicecreator, connectnewpeers, uploadblocks
from communicatorutils import daemonqueuehandler
import onionrservices, onionr, onionrproofs
OnionrCommunicatorTimers = onionrcommunicatortimers.OnionrCommunicatorTimers
@ -98,15 +99,33 @@ class OnionrCommunicatorDaemon:
# requiresPeer True means the timer function won't fire if we have no connected peers
peerPoolTimer = OnionrCommunicatorTimers(self, self.getOnlinePeers, 60, maxThreads=1)
OnionrCommunicatorTimers(self, self.runCheck, 2, maxThreads=1)
# Timers to periodically lookup new blocks and download them
OnionrCommunicatorTimers(self, self.lookupBlocks, self._core.config.get('timers.lookupBlocks'), requiresPeer=True, maxThreads=1)
OnionrCommunicatorTimers(self, self.getBlocks, self._core.config.get('timers.getBlocks'), requiresPeer=True, maxThreads=2)
# Timer to reset the longest offline peer so contact can be attempted again
OnionrCommunicatorTimers(self, self.clearOfflinePeer, 58)
# Timer to cleanup old blocks
blockCleanupTimer = OnionrCommunicatorTimers(self, self.daemonTools.cleanOldBlocks, 65)
# Timer to discover new peers
OnionrCommunicatorTimers(self, self.lookupAdders, 60, requiresPeer=True)
# Timer for adjusting which peers we actively communicate to at any given time, to avoid over-using peers
OnionrCommunicatorTimers(self, self.daemonTools.cooldownPeer, 30, requiresPeer=True)
# Timer to read the upload queue and upload the entries to peers
OnionrCommunicatorTimers(self, self.uploadBlock, 10, requiresPeer=True, maxThreads=1)
# Timer to process the daemon command queue
OnionrCommunicatorTimers(self, self.daemonCommands, 6, maxThreads=1)
# Timer that kills Onionr if the API server crashes
OnionrCommunicatorTimers(self, self.detectAPICrash, 30, maxThreads=1)
# Setup direct connections
if config.get('general.socket_servers', False):
self.services = onionrservices.OnionrServices(self._core)
self.active_services = []
@ -114,23 +133,32 @@ class OnionrCommunicatorDaemon:
OnionrCommunicatorTimers(self, servicecreator.service_creator, 5, maxThreads=50, myArgs=(self,))
else:
self.services = None
# This timer creates deniable blocks, in an attempt to further obfuscate block insertion metadata
deniableBlockTimer = OnionrCommunicatorTimers(self, self.daemonTools.insertDeniableBlock, 180, requiresPeer=True, maxThreads=1)
# Timer to check for connectivity, through Tor to various high-profile onion services
netCheckTimer = OnionrCommunicatorTimers(self, self.daemonTools.netCheck, 600)
if config.get('general.security_level') == 0:
# Announce the public API server transport address to other nodes if security level allows
if config.get('general.security_level', 1) == 0:
# Default to high security level incase config breaks
announceTimer = OnionrCommunicatorTimers(self, self.daemonTools.announceNode, 3600, requiresPeer=True, maxThreads=1)
announceTimer.count = (announceTimer.frequency - 120)
else:
logger.debug('Will not announce node.')
# Timer to delete malfunctioning or long-dead peers
cleanupTimer = OnionrCommunicatorTimers(self, self.peerCleanup, 300, requiresPeer=True)
# Timer to cleanup dead ephemeral forward secrecy keys
forwardSecrecyTimer = OnionrCommunicatorTimers(self, self.daemonTools.cleanKeys, 15, maxThreads=1)
# set loop to execute instantly to load up peer pool (replaced old pool init wait)
# Adjust initial timer triggers
peerPoolTimer.count = (peerPoolTimer.frequency - 1)
cleanupTimer.count = (cleanupTimer.frequency - 60)
deniableBlockTimer.count = (deniableBlockTimer.frequency - 175)
blockCleanupTimer.count = (blockCleanupTimer.frequency - 5)
#forwardSecrecyTimer.count = (forwardSecrecyTimer.frequency - 990)
# Main daemon loop, mainly for calling timers, don't do any complex operations here to avoid locking
try:
@ -160,32 +188,7 @@ class OnionrCommunicatorDaemon:
def lookupAdders(self):
'''Lookup new peer addresses'''
logger.info('Looking up new addresses...')
tryAmount = 1
newPeers = []
for i in range(tryAmount):
# Download new peer address list from random online peers
if len(newPeers) > 10000:
# Dont get new peers if we have too many queued up
break
peer = self.pickOnlinePeer()
newAdders = self.peerAction(peer, action='pex')
try:
newPeers = newAdders.split(',')
except AttributeError:
pass
else:
# Validate new peers are good format and not already in queue
invalid = []
for x in newPeers:
x = x.strip()
if not self._core._utils.validateID(x) or x in self.newPeers or x == self._core.hsAddress:
# avoid adding if its our address
invalid.append(x)
for x in invalid:
newPeers.remove(x)
self.newPeers.extend(newPeers)
self.decrementThreadCount('lookupAdders')
lookupadders.lookup_new_peer_transports_with_communicator(self)
def lookupBlocks(self):
'''Lookup new blocks & add them to download queue'''
@ -346,39 +349,7 @@ class OnionrCommunicatorDaemon:
'''
Process daemon commands from daemonQueue
'''
cmd = self._core.daemonQueue()
response = ''
if cmd is not False:
events.event('daemon_command', onionr = None, data = {'cmd' : cmd})
if cmd[0] == 'shutdown':
self.shutdown = True
elif cmd[0] == 'announceNode':
if len(self.onlinePeers) > 0:
self.announce(cmd[1])
else:
logger.debug("No nodes connected. Will not introduce node.")
elif cmd[0] == 'runCheck': # deprecated
logger.debug('Status check; looks good.')
open(self._core.dataDir + '.runcheck', 'w+').close()
elif cmd[0] == 'connectedPeers':
response = '\n'.join(list(self.onlinePeers)).strip()
if response == '':
response = 'none'
elif cmd[0] == 'localCommand':
response = self._core._utils.localCommand(cmd[1])
elif cmd[0] == 'pex':
for i in self.timers:
if i.timerFunction.__name__ == 'lookupAdders':
i.count = (i.frequency - 1)
elif cmd[0] == 'uploadBlock':
self.blocksToUpload.append(cmd[1])
if cmd[0] not in ('', None):
if response != '':
self._core._utils.localCommand('queueResponseAdd/' + cmd[4], post=True, postData={'data': response})
response = ''
self.decrementThreadCount('daemonCommands')
daemonqueuehandler.handle_daemon_commands(self)
def uploadBlock(self):
'''Upload our block to a few peers'''

@ -0,0 +1,55 @@
'''
Onionr - P2P Microblogging Platform & Social network
Handle daemon queue commands in the communicator
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import logger
import onionrevents as events
def handle_daemon_commands(comm_inst):
cmd = comm_inst._core.daemonQueue()
response = ''
if cmd is not False:
events.event('daemon_command', onionr = None, data = {'cmd' : cmd})
if cmd[0] == 'shutdown':
comm_inst.shutdown = True
elif cmd[0] == 'announceNode':
if len(comm_inst.onlinePeers) > 0:
comm_inst.announce(cmd[1])
else:
logger.debug("No nodes connected. Will not introduce node.")
elif cmd[0] == 'runCheck': # deprecated
logger.debug('Status check; looks good.')
open(comm_inst._core.dataDir + '.runcheck', 'w+').close()
elif cmd[0] == 'connectedPeers':
response = '\n'.join(list(comm_inst.onlinePeers)).strip()
if response == '':
response = 'none'
elif cmd[0] == 'localCommand':
response = comm_inst._core._utils.localCommand(cmd[1])
elif cmd[0] == 'pex':
for i in comm_inst.timers:
if i.timerFunction.__name__ == 'lookupAdders':
i.count = (i.frequency - 1)
elif cmd[0] == 'uploadBlock':
comm_inst.blocksToUpload.append(cmd[1])
if cmd[0] not in ('', None):
if response != '':
comm_inst._core._utils.localCommand('queueResponseAdd/' + cmd[4], post=True, postData={'data': response})
response = ''
comm_inst.decrementThreadCount('daemonCommands')

@ -0,0 +1,48 @@
'''
Onionr - P2P Microblogging Platform & Social network
Lookup new peer transport addresses using the communicator
'''
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import logger
def lookup_new_peer_transports_with_communicator(comm_inst):
logger.info('Looking up new addresses...')
tryAmount = 1
newPeers = []
for i in range(tryAmount):
# Download new peer address list from random online peers
if len(newPeers) > 10000:
# Dont get new peers if we have too many queued up
break
peer = comm_inst.pickOnlinePeer()
newAdders = comm_inst.peerAction(peer, action='pex')
try:
newPeers = newAdders.split(',')
except AttributeError:
pass
else:
# Validate new peers are good format and not already in queue
invalid = []
for x in newPeers:
x = x.strip()
if not comm_inst._core._utils.validateID(x) or x in comm_inst.newPeers or x == comm_inst._core.hsAddress:
# avoid adding if its our address
invalid.append(x)
for x in invalid:
newPeers.remove(x)
comm_inst.newPeers.extend(newPeers)
comm_inst.decrementThreadCount('lookupAdders')

@ -210,8 +210,9 @@ class DaemonTools:
fakePeer = ''
chance = 10
if secrets.randbelow(chance) == (chance - 1):
# This assumes on the libsodium primitives to have key-privacy
fakePeer = 'OVPCZLOXD6DC5JHX4EQ3PSOGAZ3T24F75HQLIUZSDSMYPEOXCPFA===='
data = secrets.token_hex(secrets.randbelow(500) + 1)
data = secrets.token_hex(secrets.randbelow(1024) + 1)
self.daemon._core.insertBlock(data, header='pm', encryptType='asym', asymPeer=fakePeer, meta={'subject': 'foo'})
self.daemon.decrementThreadCount('insertDeniableBlock')
return

@ -24,7 +24,8 @@ from onionrblockapi import Block
from dependencies import secrets
def getOpenPort():
# taken from (but modified) https://stackoverflow.com/a/2838309
# taken from (but modified) https://stackoverflow.com/a/2838309 by https://stackoverflow.com/users/133374/albert ccy-by-sa-3 https://creativecommons.org/licenses/by-sa/3.0/
# changes from source: import moved to top of file, bind specifically to localhost
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1",0))
s.listen(1)

Loading…
Cancel
Save