diff --git a/onionr/communicator/__init__.py b/onionr/communicator/__init__.py index ff4d1853..5dfaf053 100755 --- a/onionr/communicator/__init__.py +++ b/onionr/communicator/__init__.py @@ -43,7 +43,7 @@ class OnionrCommunicatorDaemon: self.config = config self.storage_counter = storagecounter.StorageCounter() self.isOnline = True # Assume we're connected to the internet - self.shared_state = shared_state + self.shared_state = shared_state # TooManyObjects module # list of timer instances self.timers = [] @@ -53,6 +53,8 @@ class OnionrCommunicatorDaemon: # Upload information, list of blocks to upload self.blocksToUpload = [] + self.upload_session_manager = self.shared_state.get(uploadblocks.sessionmanager.BlockUploadSessionManager) + self.shared_state.share_object() # loop time.sleep delay in seconds self.delay = 1 diff --git a/onionr/communicator/peeraction.py b/onionr/communicator/peeraction.py index f3b73cfe..ca715273 100644 --- a/onionr/communicator/peeraction.py +++ b/onionr/communicator/peeraction.py @@ -44,7 +44,7 @@ def peer_action(comm_inst, peer, action, returnHeaders=False, max_resp_size=5242 onlinepeers.remove_online_peer(comm_inst, peer) keydb.transportinfo.set_address_info(peer, 'lastConnectAttempt', epoch.get_epoch()) if action != 'ping' and not comm_inst.shutdown: - logger.warn('Lost connection to ' + peer, terminal=True) + logger.warn(f'Lost connection to {peer}', terminal=True) onlinepeers.get_online_peers(comm_inst) # Will only add a new peer to pool if needed except ValueError: pass diff --git a/onionr/communicatorutils/downloadblocks/__init__.py b/onionr/communicatorutils/downloadblocks/__init__.py index 7abf4c9e..cf6b5998 100755 --- a/onionr/communicatorutils/downloadblocks/__init__.py +++ b/onionr/communicatorutils/downloadblocks/__init__.py @@ -30,7 +30,8 @@ def download_blocks_from_communicator(comm_inst): blacklist = onionrblacklist.OnionrBlackList() storage_counter = storagecounter.StorageCounter() LOG_SKIP_COUNT = 50 # for how many iterations we skip logging the counter - count = 0 + count: int = 0 + metadata_validation_result: bool = False # Iterate the block queue in the communicator for blockHash in list(comm_inst.blockQueue): count += 1 @@ -80,7 +81,11 @@ def download_blocks_from_communicator(comm_inst): #content = content.decode() # decode here because sha3Hash needs bytes above metas = blockmetadata.get_block_metadata_from_data(content) # returns tuple(metadata, meta), meta is also in metadata metadata = metas[0] - if validatemetadata.validate_metadata(metadata, metas[2]): # check if metadata is valid, and verify nonce + try: + metadata_validation_result = validatemetadata.validate_metadata(metadata, metas[2]) + except onionrexceptions.DataExists: + metadata_validation_result = False + if metadata_validation_result: # check if metadata is valid, and verify nonce if onionrcrypto.cryptoutils.verify_POW(content): # check if POW is enough/correct logger.info('Attempting to save block %s...' % blockHash[:12]) try: diff --git a/onionr/communicatorutils/uploadblocks/__init__.py b/onionr/communicatorutils/uploadblocks/__init__.py index 19b43c9b..fd776573 100755 --- a/onionr/communicatorutils/uploadblocks/__init__.py +++ b/onionr/communicatorutils/uploadblocks/__init__.py @@ -3,6 +3,7 @@ Upload blocks in the upload queue to peers from the communicator ''' +from __future__ import annotations ''' This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -17,6 +18,9 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . ''' +from typing import Union, TYPE_CHECKING +import threading + import logger from communicatorutils import proxypicker import onionrexceptions @@ -24,13 +28,15 @@ import onionrblockapi as block from onionrutils import localcommand, stringvalidators, basicrequests from communicator import onlinepeers import onionrcrypto +from . import sessionmanager -from . import session - -def upload_blocks_from_communicator(comm_inst): - # when inserting a block, we try to upload it to a few peers to add some deniability +def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): + """Accepts a communicator instance and uploads blocks from its upload queue""" + """when inserting a block, we try to upload + it to a few peers to add some deniability & increase functionality""" TIMER_NAME = "upload_blocks_from_communicator" + session_manager: sessionmanager.BlockUploadSessionManager = comm_inst.shared_state.get(sessionmanager.BlockUploadSessionManager) triedPeers = [] finishedUploads = [] comm_inst.blocksToUpload = onionrcrypto.cryptoutils.random_shuffle(comm_inst.blocksToUpload) @@ -40,29 +46,44 @@ def upload_blocks_from_communicator(comm_inst): logger.warn('Requested to upload invalid block', terminal=True) comm_inst.decrementThreadCount(TIMER_NAME) return + session = session_manager.add_session(bl) for i in range(min(len(comm_inst.onlinePeers), 6)): peer = onlinepeers.pick_online_peer(comm_inst) - if peer in triedPeers: - continue - triedPeers.append(peer) - url = 'http://%s/upload' % (peer,) try: - #data = {'block': block.Block(bl).getRaw()} + session.peer_exists[peer] + continue + except KeyError: + pass + try: + if session.peer_fails[peer] > 3: continue + except KeyError: + pass + if peer in triedPeers: continue + triedPeers.append(peer) + url = f'http://{peer}/upload' + try: data = block.Block(bl).getRaw() except onionrexceptions.NoDataAvailable: finishedUploads.append(bl) break proxyType = proxypicker.pick_proxy(peer) - logger.info("Uploading block %s to %s" % (bl[:8], peer), terminal=True) + logger.info(f"Uploading block {bl[:8]} to {peer}", terminal=True) resp = basicrequests.do_post_request(url, data=data, proxyType=proxyType, content_type='application/octet-stream') if not resp == False: if resp == 'success': - localcommand.local_command('waitforshare/' + bl, post=True) - finishedUploads.append(bl) + session.success() + session.peer_exists[peer] = True elif resp == 'exists': - finishedUploads.append(bl) + session.success() + session.peer_exists[peer] = True else: - logger.warn('Failed to upload %s, reason: %s' % (bl[:8], resp[:150]), terminal=True) + session.fail() + session.fail_peer(peer) + comm_inst.getPeerProfileInstance(peer).addScore(-5) + logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}', terminal=True) + else: + session.fail() + session_manager.clean_session() for x in finishedUploads: try: comm_inst.blocksToUpload.remove(x) diff --git a/onionr/communicatorutils/uploadblocks/session.py b/onionr/communicatorutils/uploadblocks/session.py index 24415975..6c0b3472 100644 --- a/onionr/communicatorutils/uploadblocks/session.py +++ b/onionr/communicatorutils/uploadblocks/session.py @@ -3,6 +3,7 @@ Virtual upload "sessions" for blocks """ +from __future__ import annotations """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -33,10 +34,12 @@ class UploadSession: block_hash = reconstructhash.reconstruct_hash(block_hash) if not stringvalidators.validate_hash(block_hash): raise ValueError + self.start_time = epoch.get_epoch() self.block_hash = reconstructhash.deconstruct_hash(block_hash) self.total_fail_count: int = 0 self.total_success_count: int = 0 self.peer_fails = {} + self.peer_exists = {} def fail_peer(self, peer): try: diff --git a/onionr/communicatorutils/uploadblocks/sessionmanager.py b/onionr/communicatorutils/uploadblocks/sessionmanager.py index e69de29b..36fa5f49 100644 --- a/onionr/communicatorutils/uploadblocks/sessionmanager.py +++ b/onionr/communicatorutils/uploadblocks/sessionmanager.py @@ -0,0 +1,85 @@ +""" + Onionr - Private P2P Communication + + Manager for upload 'sessions' +""" +from __future__ import annotations +""" + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +""" +from typing import Iterable, Union + +from onionrutils import bytesconverter +from onionrutils import localcommand +from etc import onionrvalues +from etc import waitforsetvar +from utils import reconstructhash + +from . import session + +class BlockUploadSessionManager: + """Holds block UploadSession instances. Optionally accepts iterable of sessions to added on init + + Arguments: old_session: iterable of old UploadSession objects""" + def __init__(self, old_sessions:Iterable=None): + #self._too_many: TooMany = None + if old_sessions is None: + self.sessions = [] + else: + self.sessions = old_session + + def add_session(self, session_or_block: Union(str, bytes, session.UploadSession))->session.UploadSession: + """Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession""" + if isinstance(session_or_block, session.UploadSession): + if not session_or_block in self.sessions: + self.sessions.append(session_or_block) + return session_or_block + try: + return self.get_session(session_or_block) + except KeyError: + pass + # convert bytes hash to str + if isinstance(session_or_block, bytes): session_or_block = bytesconverter.bytes_to_str(session_or_block) + # intentionally not elif + if isinstance(session_or_block, str): + new_session = session.UploadSession(session_or_block) + self.sessions.append(new_session) + return new_session + + def get_session(self, block_hash: Union(str, bytes))->session.UploadSession: + block_hash = reconstructhash.deconstruct_hash(bytesconverter.bytes_to_str(block_hash)) + for session in self.sessions: + if session.block_hash == block_hash: return session + raise KeyError + + def clean_session(self, specific_session: Union[str, UploadSession]=None): + comm_inst: OnionrCommunicatorDaemon = self._too_many.get_by_string("OnionrCommunicatorDaemon") + sessions_to_delete = [] + if comm_inst.getUptime() < 120: return + for session in self.sessions: + if (session.total_success_count / len(comm_inst.onlinePeers)) >= onionrvalues.MIN_BLOCK_UPLOAD_PEER_PERCENT: + sessions_to_delete.append(session) + for session in sessions_to_delete: + self.sessions.remove(session) + # TODO cleanup to one round of search + # Remove the blocks from the sessions, upload list, and waitforshare list + try: + comm_inst.blocksToUpload.remove(reconstructhash.reconstruct_hash(session.block_hash)) + except ValueError: + pass + try: + comm_inst.blocksToUpload.remove(session.block_hash) + except ValueError: + pass + localcommand.local_command('waitforshare/{session.block_hash}') \ No newline at end of file diff --git a/onionr/etc/onionrvalues.py b/onionr/etc/onionrvalues.py index 66075c0a..642fe4c7 100755 --- a/onionr/etc/onionrvalues.py +++ b/onionr/etc/onionrvalues.py @@ -31,6 +31,9 @@ MAX_BLOCK_CLOCK_SKEW = 120 MAIN_PUBLIC_KEY_SIZE = 32 ORIG_RUN_DIR_ENV_VAR = 'ORIG_ONIONR_RUN_DIR' +# Block creation anonymization requirements +MIN_BLOCK_UPLOAD_PEER_PERCENT = 0.1 + # Begin OnionrValues migrated values ANNOUNCE_POW = 5 DEFAULT_EXPIRE = 2592000 diff --git a/onionr/etc/waitforsetvar.py b/onionr/etc/waitforsetvar.py index f2c023ac..60921499 100644 --- a/onionr/etc/waitforsetvar.py +++ b/onionr/etc/waitforsetvar.py @@ -1,4 +1,8 @@ -def wait_for_set_var(obj, attribute): - while True: - if hasattr(obj, attribute): - break \ No newline at end of file +from __future__ import annotations +from typing import Union, Generic +from gevent import sleep +def wait_for_set_var(obj, attribute, sleep_seconds: Union[int, float]=0): + """Wait for an object to get an attribute with an optional sleep time""" + while not hasattr(obj, attribute): + if hasattr(obj, attribute): break + if sleep_seconds > 0: sleep(sleep_seconds) \ No newline at end of file diff --git a/onionr/httpapi/miscclientapi/endpoints.py b/onionr/httpapi/miscclientapi/endpoints.py index 12082a1d..d4b903e6 100644 --- a/onionr/httpapi/miscclientapi/endpoints.py +++ b/onionr/httpapi/miscclientapi/endpoints.py @@ -26,6 +26,7 @@ from netcontroller import NetController from serializeddata import SerializedData from onionrutils import mnemonickeys from onionrutils import bytesconverter +from utils import reconstructhash pub_key = onionrcrypto.pub_key.replace('=', '') @@ -78,6 +79,7 @@ class PrivateEndpoints: def waitforshare(name): '''Used to prevent the **public** api from sharing blocks we just created''' if not name.isalnum(): raise ValueError('block hash needs to be alpha numeric') + name = reconstructhash.reconstruct_hash(name) if name in client_api.publicAPI.hideBlocks: client_api.publicAPI.hideBlocks.remove(name) return Response("removed") diff --git a/onionr/onionrcommands/version.py b/onionr/onionrcommands/version.py index 8162f321..f2141352 100644 --- a/onionr/onionrcommands/version.py +++ b/onionr/onionrcommands/version.py @@ -11,5 +11,11 @@ def version(verbosity = 5, function = logger.info): if verbosity >= 1: function(onionrvalues.ONIONR_TAGLINE, terminal=True) if verbosity >= 2: - function('Running on %s %s' % (platform.platform(), platform.release()), terminal=True) - function('Onionr data dir: %s' % identifyhome.identify_home(), terminal=True) \ No newline at end of file + pf = platform.platform() + release = platform.release() + python_imp = platform.python_implementation() + python_version = platform.python_version() + function(f'{python_imp} {python_version} on {pf} {release}', terminal=True) + function('Onionr data dir: %s' % identifyhome.identify_home(), terminal=True) + +version.onionr_help = 'Shows environment details including Onionr version & data directory, OS and Python version' diff --git a/onionr/onionrutils/validatemetadata.py b/onionr/onionrutils/validatemetadata.py index ddb74455..405bf799 100644 --- a/onionr/onionrutils/validatemetadata.py +++ b/onionr/onionrutils/validatemetadata.py @@ -93,13 +93,14 @@ def validate_metadata(metadata, block_data) -> bool: try: with open(filepaths.data_nonce_file, 'r') as nonceFile: if nonce in nonceFile.read(): - ret_data = False # we've seen that nonce before, so we can't pass metadata + # we've seen that nonce before, so we can't pass metadata raise onionrexceptions.DataExists except FileNotFoundError: ret_data = True except onionrexceptions.DataExists: - # do not set ret_data to True, because nonce has been seen before - pass + # do not set ret_data to True, because data has been seen before + logger.warn(f'{nonce} seen before') + raise onionrexceptions.DataExists else: ret_data = True else: diff --git a/onionr/static-data/www/shared/configeditor.js b/onionr/static-data/www/shared/configeditor.js index 172b3d5a..7c9068ce 100755 --- a/onionr/static-data/www/shared/configeditor.js +++ b/onionr/static-data/www/shared/configeditor.js @@ -49,6 +49,8 @@ saveBtn.onclick = function(){ }}) .then((resp) => resp.text()) // Transform the data into text .then(function(data) { - alert('Config saved') + PNotify.success({ + text: 'Config saved' + }) }) } \ No newline at end of file diff --git a/onionr/static-data/www/shared/sites.js b/onionr/static-data/www/shared/sites.js index ab5c01a8..b318fffc 100755 --- a/onionr/static-data/www/shared/sites.js +++ b/onionr/static-data/www/shared/sites.js @@ -13,6 +13,8 @@ document.getElementById('openSite').onclick = function(){ window.location.href = '/site/' + hash } else{ - alert('Invalid site hash') + PNotify.notice({ + text: 'Invalid site hash' + }) } } \ No newline at end of file diff --git a/tests/test_waitforsetvars.py b/tests/test_waitforsetvars.py new file mode 100644 index 00000000..76bbb724 --- /dev/null +++ b/tests/test_waitforsetvars.py @@ -0,0 +1,45 @@ +import sys, os +sys.path.append(".") +sys.path.append("onionr/") +import unittest, uuid, time, threading + +TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' +os.environ["ONIONR_HOME"] = TEST_DIR +from etc import waitforsetvar + +def set_test_var_delay(obj, delay=0): + if delay > 0: time.sleep(delay) + obj.test_var = True + +class TestWaitForSetVar(unittest.TestCase): + def test_no_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_negative_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, -1]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_zero_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 0]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_one_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 1]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_three_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 3]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + +unittest.main()