Add direct connection feature and multithreading to events

Add direct connection feature and multithreading to events
This commit is contained in:
Arinerron 2018-05-03 03:22:07 +00:00 committed by GitHub
parent 1a02124122
commit c3d86eafa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 306 additions and 14 deletions

View File

@ -198,6 +198,8 @@ class API:
resp = Response(self._utils.getBlockDBHash())
elif action == 'getBlockHashes':
resp = Response(self._core.getBlockList())
elif action == 'directMessage':
resp = Response(self._core.handle_direct_connection(data))
elif action == 'announce':
if data != '':
# TODO: require POW for this

View File

@ -97,13 +97,13 @@ class OnionrCommunicate:
blockProcessTimer = 0
if command != False:
if command[0] == 'shutdown':
logger.info('Daemon recieved exit command.', timestamp=True)
logger.info('Daemon received exit command.', timestamp=True)
break
elif command[0] == 'announceNode':
announceAttempts = 3
announceAttemptCount = 0
announceVal = False
logger.info('Announcing our node to ' + command[1], timestamp=True)
logger.info('Announcing node to ' + command[1], timestamp=True)
while not announceVal:
announceAttemptCount += 1
announceVal = self.performGet('announce', command[1], data=self._core.hsAdder.replace('\n', ''), skipHighFailureAddress=True)
@ -114,6 +114,31 @@ class OnionrCommunicate:
elif command[0] == 'runCheck':
logger.info('Status check; looks good.')
open('data/.runcheck', 'w+').close()
elif command[0] == 'event':
# todo
pass
elif command[0] == 'checkCallbacks':
try:
data = json.loads(command[1])
logger.info('Checking for callbacks with connection %s...' % data['id'])
self.check_callbacks(data, config.get('dc_execcallbacks', True))
events.event('incoming_direct_connection', data = {'callback' : True, 'communicator' : self, 'data' : data})
except Exception as e:
logger.error('Failed to interpret callbacks for checking', e)
elif command[0] == 'incomingDirectConnection':
try:
data = json.loads(command[1])
logger.info('Handling incoming connection %s...' % data['id'])
self.incoming_direct_connection(data)
events.event('incoming_direct_connection', data = {'callback' : False, 'communicator' : self, 'data' : data})
except Exception as e:
logger.error('Failed to handle callbacks for checking', e)
apiRunningCheckCount += 1
@ -127,7 +152,7 @@ class OnionrCommunicate:
time.sleep(1)
else:
# This executes if the api is NOT detected to be running
logger.error('Daemon detected API crash (or otherwise unable to reach API after long time, stopping)')
logger.error('Daemon detected API crash (or otherwise unable to reach API after long time), stopping...')
break # break main daemon loop
apiRunningCheckCount = 0
@ -136,10 +161,208 @@ class OnionrCommunicate:
self._netController.killTor()
return
future_callbacks = {}
connection_handlers = {}
id_peer_cache = {}
def get_connection_handlers(self, name = None):
'''
Returns a list of callback handlers by name, or, if name is None, it returns all handlers.
'''
if name is None:
return self.connection_handlers
elif name in self.connection_handlers:
return self.connection_handlers[name]
else
return list()
def add_connection_handler(self, name, handler):
'''
Adds a function to be called when an connection that is NOT a callback is received.
Takes in the name of the communication type and the handler as input
'''
if not name in self.connection_handlers:
self.connection_handlers[name] = list()
self.connection_handlers[name].append(handler)
return
def remove_connection_handler(self, name, handler = None):
'''
Removes a connection handler if specified, or removes all by name
'''
if handler is None:
if name in self.connection_handlers:
self.connection_handlers[name].remove(handler)
elif name in self.connection_handlers:
del self.connection_handlers[name]
return
def set_callback(self, identifier, callback):
'''
(Over)writes a callback by communication identifier
'''
if not callback is None:
self.future_callbacks[identifier] = callback
return True
return False
def unset_callback(self, identifier):
'''
Unsets a callback by communication identifier, if set
'''
if identifier in future_callbacks:
del self.future_callbacks[identifier]
return True
return False
def get_callback(self, identifier):
'''
Returns a callback by communication identifier if set, or None
'''
if identifier in self.future_callbacks:
return self.future_callbacks[id]
return None
def direct_connect(self, peer, data = None, callback = None, log = True):
'''
Communicates something directly with the client
- `peer` should obviously be the peer id to request.
- `data` should be a dict (NOT str), with the parameter "type"
ex. {'type': 'sendMessage', 'content': 'hey, this is a dm'}
In that dict, the key 'token' must NEVER be set. If it is, it will
be overwritten.
- if `callback` is set to a function, it will call that function
back if/when the client the request is sent to decides to respond.
Do NOT depend on a response, because users can configure their
clients not to respond to this type of request.
- `log` is set to True by default-- what this does is log the
request for debug purposes. Should be False for sensitive actions.
'''
# TODO: Timing attack prevention
try:
# does not need to be secure random, only used for keeping track of async responses
# Actually, on second thought, it does need to be secure random. Otherwise, if it is predictable, someone could trigger arbitrary callbacks that have been saved on the local node, wrecking all kinds of havoc. Better just to keep it secure random.
identifier = self._utils.token(32)
if 'id' in data:
identifier = data['id']
if not identifier in id_peer_cache:
id_peer_cache[identifier] = peer
if type(data) == str:
# if someone inputs a string instead of a dict, it will assume it's the type
data = {'type' : data}
data['id'] = identifier
data['token'] = '' # later put PoW stuff here or whatever is needed
data_str = json.dumps(data)
events.event('outgoing_direct_connection', data = {'callback' : True, 'communicator' : self, 'data' : data, 'id' : identifier, 'token' : token, 'peer' : peer, 'callback' : callback, 'log' : log})
logger.debug('Direct connection (identifier: "%s"): %s' + (identifier, data_str))
try:
self.performGet('directMessage', peer, data_str)
except:
logger.warn('Failed to connect to peer: "%s".' % str(peer))
return False
if not callback is None:
self.set_callback(identifier, callback)
return True
except Exception as e:
logger.warn('Unknown error, failed to execute direct connect (peer: "%s").' % str(peer), e)
return False
def direct_connect_response(self, identifier, data, peer = None, callback = None, log = True):
'''
Responds to a previous connection. Hostname will be pulled from id_peer_cache if not specified in `peer` parameter.
If yet another callback is requested, it can be put in the `callback` parameter.
'''
if config.get('dc_response', True):
data['id'] = identifier
data['sender'] = open('data/hs/hostname').read()
data['callback'] = True
if (origin is None) and (identifier in id_peer_cache):
origin = id_peer_cache[identifier]
if not identifier in id_peer_cache:
id_peer_cache[identifier] = peer
if origin is None:
logger.warn('Failed to identify peer for connection %s' % str(identifier))
return False
else:
return self.direct_connect(peer, data = data, callback = callback, log = log)
else:
logger.warn('Node tried to respond to direct connection id %s, but it was rejected due to `dc_response` restriction.' % str(identifier))
return False
def check_callbacks(self, data, execute = True, remove = True):
'''
Check if a callback is set, and if so, execute it
'''
try:
if type(data) is str:
data = json.loads(data)
if 'id' in data: # TODO: prevent enumeration, require extra PoW
identifier = data['id']
if identifier in self.future_callbacks:
if execute:
self.get_callback(identifier)(data)
logger.debug('Request callback "%s" executed.' % str(identifier))
if remove:
self.unset_callback(identifier)
return True
logger.warn('Unable to find request callback for ID "%s".' % str(identifier))
else:
logger.warn('Unable to identify callback request, `id` parameter missing: %s' % json.dumps(data))
except Exception as e:
logger.warn('Unknown error, failed to execute direct connection callback (peer: "%s").' % str(peer), e)
return False
def incoming_direct_connection(self, data):
'''
This code is run whenever there is a new incoming connection.
'''
if 'type' in data and data['type'] in self.connection_handlers:
for handler in self.get_connection_handlers(name):
handler(data)
return
def getNewPeers(self):
'''
Get new peers and keys
'''
peersCheck = 5 # Amount of peers to ask for new peers + keys
peersChecked = 0
peerList = list(self._core.listAdders()) # random ordered list of peers
@ -158,6 +381,7 @@ class OnionrCommunicate:
#i = secrets.randbelow(maxN) # cant use prior to 3.6
i = random.randint(0, maxN)
logger.info('Using ' + peerList[i] + ' to find new peers', timestamp=True)
try:
newAdders = self.performGet('pex', peerList[i], skipHighFailureAddress=True)
logger.debug('Attempting to merge address: ')
@ -185,24 +409,31 @@ class OnionrCommunicate:
'''
Lookup blocks and merge new ones
'''
peerList = self._core.listAdders()
blocks = ''
for i in peerList:
try:
if self.peerData[i]['failCount'] >= self.highFailureAmount:
continue
except KeyError:
pass
lastDB = self._core.getAddressInfo(i, 'DBHash')
if lastDB == None:
logger.debug('Fetching hash from ' + i + ' No previous known.')
logger.debug('Fetching hash from ' + str(i) + ', no previous known.')
else:
logger.debug('Fetching hash from ' + str(i) + ', ' + lastDB + ' last known')
logger.debug('Fetching hash from ' + str(i) + ', ' + str(lastDB) + ' last known')
currentDB = self.performGet('getDBHash', i)
if currentDB != False:
logger.debug(i + " hash db (from request): " + currentDB)
else:
logger.warn("Error getting hash db status for " + i)
if currentDB != False:
if lastDB != currentDB:
logger.debug('Fetching hash from ' + i + ' - ' + currentDB + ' current hash.')
@ -213,10 +444,13 @@ class OnionrCommunicate:
self.peerData[i]['failCount'] -= 1
if self._utils.validateHash(currentDB):
self._core.setAddressInfo(i, "DBHash", currentDB)
if len(blocks.strip()) != 0:
pass
#logger.debug('BLOCKS:' + blocks)
blockList = blocks.split('\n')
for i in blockList:
if len(i.strip()) == 0:
continue
@ -224,6 +458,7 @@ class OnionrCommunicate:
continue
if i in self.ignoredHashes:
continue
#logger.debug('Exchanged block (blockList): ' + i)
if not self._utils.validateHash(i):
# skip hash if it isn't valid
@ -247,10 +482,12 @@ class OnionrCommunicate:
if i != "":
if i in self.ignoredHashes:
continue
try:
self.newHashes[i]
except KeyError:
self.newHashes[i] = 0
# check if a new hash has been around too long, delete it from database and add it to ignore list
if self.newHashes[i] >= self.keepNewHash:
logger.warn('Ignoring block ' + i + ' because it took to long to get valid data.')
@ -258,6 +495,7 @@ class OnionrCommunicate:
self._core.removeBlock(i)
self.ignoredHashes.append(i)
continue
self.newHashes[i] += 1
logger.warn('UNSAVED BLOCK: ' + i)
data = self.downloadBlock(i)
@ -275,18 +513,22 @@ class OnionrCommunicate:
blockMetadata = blockMetadata.decode()
except AttributeError:
pass
blockMetadata = json.loads(blockMetadata + '}')
try:
blockMetadata['sig']
blockMetadata['id']
except KeyError:
pass
else:
creator = self._utils.getPeerByHashId(blockMetadata['id'])
try:
creator = creator.decode()
except AttributeError:
pass
if self._core._crypto.edVerify(blockContent.split(b'}')[1], creator, blockMetadata['sig'], encodedData=True):
self._core.updateBlockInfo(i, 'sig', 'true')
else:
@ -300,34 +542,43 @@ class OnionrCommunicate:
except json.decoder.JSONDecodeError:
logger.warn('Could not decode block metadata')
pass
return
def downloadBlock(self, hash, peerTries=3):
'''
Download a block from random order of peers
'''
retVal = False
peerList = self._core.listAdders()
blocks = ''
peerTryCount = 0
for i in peerList:
if self.peerData[i]['failCount'] >= self.highFailureAmount:
continue
if peerTryCount >= peerTries:
break
hasher = hashlib.sha3_256()
data = self.performGet('getData', i, hash, skipHighFailureAddress=True)
if data == False or len(data) > 10000000 or data == '':
peerTryCount += 1
continue
try:
data = base64.b64decode(data)
except binascii.Error:
data = b''
hasher.update(data)
digest = hasher.hexdigest()
if type(digest) is bytes:
digest = digest.decode()
if digest == hash.strip():
self._core.setData(data)
logger.info('Successfully obtained data for ' + hash, timestamp=True)

View File

@ -17,10 +17,9 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import sqlite3, os, sys, time, math, base64, tarfile, getpass, simplecrypt, hashlib, nacl, logger, json
import sqlite3, os, sys, time, math, base64, tarfile, getpass, simplecrypt, hashlib, nacl, logger, json, netcontroller
#from Crypto.Cipher import AES
#from Crypto import Random
import netcontroller
import onionrutils, onionrcrypto, btc, onionrevents as events
@ -547,7 +546,26 @@ class Core:
conn.close()
return
def getBlockList(self, unsaved=False):
def handle_direct_connection(self, data):
'''
Handles direct messages
'''
try:
data = json.loads(data)
# TODO: Determine the sender, verify, etc
if ('callback' in data) and (data['callback'] is True):
# then this is a response to the message we sent earlier
self.daemonQueueAdd('checkCallbacks', json.dumps(data))
else:
# then we should handle it and respond accordingly
self.daemonQueueAdd('incomingDirectConnection', json.dumps(data))
except Exception as e:
logger.warn('Failed to handle incoming direct message: %s' % str(e))
return
def getBlockList(self, unsaved = False):
'''
Get list of our blocks
'''

View File

@ -19,15 +19,16 @@
'''
import config, logger, onionrplugins as plugins, onionrpluginapi as pluginapi
from threading import Thread
def get_pluginapi(onionr, data):
return pluginapi.pluginapi(onionr, data)
def event(event_name, data = {}, onionr = None):
def __event_caller(event_name, data = {}, onionr = None):
'''
Calls an event on all plugins (if defined)
DO NOT call this function, this is for threading code only.
Instead, call onionrevents.event
'''
for plugin in plugins.get_enabled_plugins():
try:
call(plugins.get_plugin(plugin), event_name, data, get_pluginapi(onionr, data))
@ -38,6 +39,19 @@ def event(event_name, data = {}, onionr = None):
logger.warn('Event \"' + event_name + '\" failed for plugin \"' + plugin + '\".')
logger.debug(str(e))
def event(event_name, data = {}, onionr = None, threaded = True):
'''
Calls an event on all plugins (if defined)
'''
if threaded:
thread = Thread(target = __event_caller, args = (event_name, data, onionr))
thread.start()
return thread
else:
__event_caller(event_name, data, onionr)
def call(plugin, event_name, data = None, pluginapi = None):
'''
Calls an event on a plugin if one is defined

View File

@ -421,3 +421,6 @@ class OnionrUtils:
return False
except:
return False
def token(self, size = 32):
return binascii.hexlify(os.urandom(size))

View File

@ -171,15 +171,19 @@ class OnionrTests(unittest.TestCase):
if not plugins.exists('test'):
os.makedirs(plugins.get_plugins_folder('test'))
with open(plugins.get_plugins_folder('test') + '/main.py', 'a') as main:
main.write("print('Running')\n\ndef on_test(pluginapi, data = None):\n print('received test event!')\n return True\n\ndef on_start(pluginapi, data = None):\n print('start event called')\n\ndef on_stop(pluginapi, data = None):\n print('stop event called')\n\ndef on_enable(pluginapi, data = None):\n print('enable event called')\n\ndef on_disable(pluginapi, data = None):\n print('disable event called')\n")
main.write("print('Running')\n\ndef on_test(pluginapi, data = None):\n print('received test event!')\n print('thread test started...')\n import time\n time.sleep(1)\n \n return True\n\ndef on_start(pluginapi, data = None):\n print('start event called')\n\ndef on_stop(pluginapi, data = None):\n print('stop event called')\n\ndef on_enable(pluginapi, data = None):\n print('enable event called')\n\ndef on_disable(pluginapi, data = None):\n print('disable event called')\n")
plugins.enable('test')
plugins.start('test')
if not events.call(plugins.get_plugin('test'), 'test'):
if not events.call(plugins.get_plugin('test'), 'enable'):
self.assertTrue(False)
events.event('test', data = {'tests': self})
logger.debug('preparing to start thread', timestamp = False)
thread = events.event('test', data = {'tests': self})
logger.debug('thread running...', timestamp = False)
thread.join()
logger.debug('thread finished.', timestamp = False)
self.assertTrue(True)