diff --git a/onionr/communicator/__init__.py b/onionr/communicator/__init__.py index ff4d1853..5dfaf053 100755 --- a/onionr/communicator/__init__.py +++ b/onionr/communicator/__init__.py @@ -43,7 +43,7 @@ class OnionrCommunicatorDaemon: self.config = config self.storage_counter = storagecounter.StorageCounter() self.isOnline = True # Assume we're connected to the internet - self.shared_state = shared_state + self.shared_state = shared_state # TooManyObjects module # list of timer instances self.timers = [] @@ -53,6 +53,8 @@ class OnionrCommunicatorDaemon: # Upload information, list of blocks to upload self.blocksToUpload = [] + self.upload_session_manager = self.shared_state.get(uploadblocks.sessionmanager.BlockUploadSessionManager) + self.shared_state.share_object() # loop time.sleep delay in seconds self.delay = 1 diff --git a/onionr/communicatorutils/uploadblocks/__init__.py b/onionr/communicatorutils/uploadblocks/__init__.py index 19b43c9b..20005b49 100755 --- a/onionr/communicatorutils/uploadblocks/__init__.py +++ b/onionr/communicatorutils/uploadblocks/__init__.py @@ -3,6 +3,7 @@ Upload blocks in the upload queue to peers from the communicator ''' +from __future__ import annotations ''' This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -17,6 +18,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . ''' +from typing import Union import logger from communicatorutils import proxypicker import onionrexceptions @@ -25,12 +27,13 @@ from onionrutils import localcommand, stringvalidators, basicrequests from communicator import onlinepeers import onionrcrypto -from . import session - -def upload_blocks_from_communicator(comm_inst): - # when inserting a block, we try to upload it to a few peers to add some deniability +def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): + """Accepts a communicator instance and uploads blocks from its upload queue""" + """when inserting a block, we try to upload + it to a few peers to add some deniability & increase functionality""" TIMER_NAME = "upload_blocks_from_communicator" + session_manager = comm_inst.shared_state.get_by_string('BlockUploadSessionManager') triedPeers = [] finishedUploads = [] comm_inst.blocksToUpload = onionrcrypto.cryptoutils.random_shuffle(comm_inst.blocksToUpload) @@ -40,20 +43,20 @@ def upload_blocks_from_communicator(comm_inst): logger.warn('Requested to upload invalid block', terminal=True) comm_inst.decrementThreadCount(TIMER_NAME) return + session_manager.new_session(bl) for i in range(min(len(comm_inst.onlinePeers), 6)): peer = onlinepeers.pick_online_peer(comm_inst) if peer in triedPeers: continue triedPeers.append(peer) - url = 'http://%s/upload' % (peer,) + url = f'http://{peer}/upload' try: - #data = {'block': block.Block(bl).getRaw()} data = block.Block(bl).getRaw() except onionrexceptions.NoDataAvailable: finishedUploads.append(bl) break proxyType = proxypicker.pick_proxy(peer) - logger.info("Uploading block %s to %s" % (bl[:8], peer), terminal=True) + logger.info(f"Uploading block {bl:[:8]} to {peer}", terminal=True) resp = basicrequests.do_post_request(url, data=data, proxyType=proxyType, content_type='application/octet-stream') if not resp == False: if resp == 'success': @@ -62,7 +65,7 @@ def upload_blocks_from_communicator(comm_inst): elif resp == 'exists': finishedUploads.append(bl) else: - logger.warn('Failed to upload %s, reason: %s' % (bl[:8], resp[:150]), terminal=True) + logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}'), terminal=True) for x in finishedUploads: try: comm_inst.blocksToUpload.remove(x) diff --git a/onionr/communicatorutils/uploadblocks/session.py b/onionr/communicatorutils/uploadblocks/session.py index 24415975..aaeabb0c 100644 --- a/onionr/communicatorutils/uploadblocks/session.py +++ b/onionr/communicatorutils/uploadblocks/session.py @@ -33,6 +33,7 @@ class UploadSession: block_hash = reconstructhash.reconstruct_hash(block_hash) if not stringvalidators.validate_hash(block_hash): raise ValueError + self.start_time = epoch.get_epoch() self.block_hash = reconstructhash.deconstruct_hash(block_hash) self.total_fail_count: int = 0 self.total_success_count: int = 0 diff --git a/onionr/communicatorutils/uploadblocks/sessionmanager.py b/onionr/communicatorutils/uploadblocks/sessionmanager.py index e69de29b..82cd6971 100644 --- a/onionr/communicatorutils/uploadblocks/sessionmanager.py +++ b/onionr/communicatorutils/uploadblocks/sessionmanager.py @@ -0,0 +1,37 @@ +""" + Onionr - Private P2P Communication + + Manager for upload 'sessions' +""" +from __future__ import annotations +""" + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +""" +from typing import Iterable, Union +from onionrutils import bytesconverter +class BlockUploadSessionManager: + def __init__(self, old_sessions:Iterable=None): + if old_session is None: + self.sessions = [] + else: + self.sessions = old_session + + def add_session(self, session_or_block: Union(str, bytes, UploadSession, Block)): + """Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession or Block object""" + if isinstance(session_or_block, bytes): session_or_block = bytesconverter.bytes_to_str(session_or_block) + if isinstance(session_or_block, str): + self.sessions.append() + + def clean_session(self, specific_session: Union[str, UploadSession]): + return \ No newline at end of file diff --git a/onionr/etc/waitforsetvar.py b/onionr/etc/waitforsetvar.py index f2c023ac..60921499 100644 --- a/onionr/etc/waitforsetvar.py +++ b/onionr/etc/waitforsetvar.py @@ -1,4 +1,8 @@ -def wait_for_set_var(obj, attribute): - while True: - if hasattr(obj, attribute): - break \ No newline at end of file +from __future__ import annotations +from typing import Union, Generic +from gevent import sleep +def wait_for_set_var(obj, attribute, sleep_seconds: Union[int, float]=0): + """Wait for an object to get an attribute with an optional sleep time""" + while not hasattr(obj, attribute): + if hasattr(obj, attribute): break + if sleep_seconds > 0: sleep(sleep_seconds) \ No newline at end of file diff --git a/onionr/static-data/www/shared/sites.js b/onionr/static-data/www/shared/sites.js index ab5c01a8..689938d9 100755 --- a/onionr/static-data/www/shared/sites.js +++ b/onionr/static-data/www/shared/sites.js @@ -13,6 +13,8 @@ document.getElementById('openSite').onclick = function(){ window.location.href = '/site/' + hash } else{ - alert('Invalid site hash') + PNotify.error({ + text: 'Invalid site hash' + }) } } \ No newline at end of file diff --git a/tests/test_waitforsetvars.py b/tests/test_waitforsetvars.py new file mode 100644 index 00000000..76bbb724 --- /dev/null +++ b/tests/test_waitforsetvars.py @@ -0,0 +1,45 @@ +import sys, os +sys.path.append(".") +sys.path.append("onionr/") +import unittest, uuid, time, threading + +TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' +os.environ["ONIONR_HOME"] = TEST_DIR +from etc import waitforsetvar + +def set_test_var_delay(obj, delay=0): + if delay > 0: time.sleep(delay) + obj.test_var = True + +class TestWaitForSetVar(unittest.TestCase): + def test_no_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_negative_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, -1]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_zero_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 0]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_one_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 1]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_three_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 3]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + +unittest.main()