From d598d9b9c22c00f262649f0527944ad3b1938f96 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Mon, 16 Sep 2019 20:16:06 -0500 Subject: [PATCH 1/2] started work on upload sessions --- onionr/communicator/__init__.py | 4 +- .../uploadblocks/__init__.py | 19 ++++---- .../communicatorutils/uploadblocks/session.py | 1 + .../uploadblocks/sessionmanager.py | 37 +++++++++++++++ onionr/etc/waitforsetvar.py | 12 +++-- onionr/static-data/www/shared/sites.js | 4 +- tests/test_waitforsetvars.py | 45 +++++++++++++++++++ 7 files changed, 108 insertions(+), 14 deletions(-) create mode 100644 tests/test_waitforsetvars.py diff --git a/onionr/communicator/__init__.py b/onionr/communicator/__init__.py index ff4d1853..5dfaf053 100755 --- a/onionr/communicator/__init__.py +++ b/onionr/communicator/__init__.py @@ -43,7 +43,7 @@ class OnionrCommunicatorDaemon: self.config = config self.storage_counter = storagecounter.StorageCounter() self.isOnline = True # Assume we're connected to the internet - self.shared_state = shared_state + self.shared_state = shared_state # TooManyObjects module # list of timer instances self.timers = [] @@ -53,6 +53,8 @@ class OnionrCommunicatorDaemon: # Upload information, list of blocks to upload self.blocksToUpload = [] + self.upload_session_manager = self.shared_state.get(uploadblocks.sessionmanager.BlockUploadSessionManager) + self.shared_state.share_object() # loop time.sleep delay in seconds self.delay = 1 diff --git a/onionr/communicatorutils/uploadblocks/__init__.py b/onionr/communicatorutils/uploadblocks/__init__.py index 19b43c9b..20005b49 100755 --- a/onionr/communicatorutils/uploadblocks/__init__.py +++ b/onionr/communicatorutils/uploadblocks/__init__.py @@ -3,6 +3,7 @@ Upload blocks in the upload queue to peers from the communicator ''' +from __future__ import annotations ''' This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -17,6 +18,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . ''' +from typing import Union import logger from communicatorutils import proxypicker import onionrexceptions @@ -25,12 +27,13 @@ from onionrutils import localcommand, stringvalidators, basicrequests from communicator import onlinepeers import onionrcrypto -from . import session - -def upload_blocks_from_communicator(comm_inst): - # when inserting a block, we try to upload it to a few peers to add some deniability +def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): + """Accepts a communicator instance and uploads blocks from its upload queue""" + """when inserting a block, we try to upload + it to a few peers to add some deniability & increase functionality""" TIMER_NAME = "upload_blocks_from_communicator" + session_manager = comm_inst.shared_state.get_by_string('BlockUploadSessionManager') triedPeers = [] finishedUploads = [] comm_inst.blocksToUpload = onionrcrypto.cryptoutils.random_shuffle(comm_inst.blocksToUpload) @@ -40,20 +43,20 @@ def upload_blocks_from_communicator(comm_inst): logger.warn('Requested to upload invalid block', terminal=True) comm_inst.decrementThreadCount(TIMER_NAME) return + session_manager.new_session(bl) for i in range(min(len(comm_inst.onlinePeers), 6)): peer = onlinepeers.pick_online_peer(comm_inst) if peer in triedPeers: continue triedPeers.append(peer) - url = 'http://%s/upload' % (peer,) + url = f'http://{peer}/upload' try: - #data = {'block': block.Block(bl).getRaw()} data = block.Block(bl).getRaw() except onionrexceptions.NoDataAvailable: finishedUploads.append(bl) break proxyType = proxypicker.pick_proxy(peer) - logger.info("Uploading block %s to %s" % (bl[:8], peer), terminal=True) + logger.info(f"Uploading block {bl:[:8]} to {peer}", terminal=True) resp = basicrequests.do_post_request(url, data=data, proxyType=proxyType, content_type='application/octet-stream') if not resp == False: if resp == 'success': @@ -62,7 +65,7 @@ def upload_blocks_from_communicator(comm_inst): elif resp == 'exists': finishedUploads.append(bl) else: - logger.warn('Failed to upload %s, reason: %s' % (bl[:8], resp[:150]), terminal=True) + logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}'), terminal=True) for x in finishedUploads: try: comm_inst.blocksToUpload.remove(x) diff --git a/onionr/communicatorutils/uploadblocks/session.py b/onionr/communicatorutils/uploadblocks/session.py index 24415975..aaeabb0c 100644 --- a/onionr/communicatorutils/uploadblocks/session.py +++ b/onionr/communicatorutils/uploadblocks/session.py @@ -33,6 +33,7 @@ class UploadSession: block_hash = reconstructhash.reconstruct_hash(block_hash) if not stringvalidators.validate_hash(block_hash): raise ValueError + self.start_time = epoch.get_epoch() self.block_hash = reconstructhash.deconstruct_hash(block_hash) self.total_fail_count: int = 0 self.total_success_count: int = 0 diff --git a/onionr/communicatorutils/uploadblocks/sessionmanager.py b/onionr/communicatorutils/uploadblocks/sessionmanager.py index e69de29b..82cd6971 100644 --- a/onionr/communicatorutils/uploadblocks/sessionmanager.py +++ b/onionr/communicatorutils/uploadblocks/sessionmanager.py @@ -0,0 +1,37 @@ +""" + Onionr - Private P2P Communication + + Manager for upload 'sessions' +""" +from __future__ import annotations +""" + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +""" +from typing import Iterable, Union +from onionrutils import bytesconverter +class BlockUploadSessionManager: + def __init__(self, old_sessions:Iterable=None): + if old_session is None: + self.sessions = [] + else: + self.sessions = old_session + + def add_session(self, session_or_block: Union(str, bytes, UploadSession, Block)): + """Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession or Block object""" + if isinstance(session_or_block, bytes): session_or_block = bytesconverter.bytes_to_str(session_or_block) + if isinstance(session_or_block, str): + self.sessions.append() + + def clean_session(self, specific_session: Union[str, UploadSession]): + return \ No newline at end of file diff --git a/onionr/etc/waitforsetvar.py b/onionr/etc/waitforsetvar.py index f2c023ac..60921499 100644 --- a/onionr/etc/waitforsetvar.py +++ b/onionr/etc/waitforsetvar.py @@ -1,4 +1,8 @@ -def wait_for_set_var(obj, attribute): - while True: - if hasattr(obj, attribute): - break \ No newline at end of file +from __future__ import annotations +from typing import Union, Generic +from gevent import sleep +def wait_for_set_var(obj, attribute, sleep_seconds: Union[int, float]=0): + """Wait for an object to get an attribute with an optional sleep time""" + while not hasattr(obj, attribute): + if hasattr(obj, attribute): break + if sleep_seconds > 0: sleep(sleep_seconds) \ No newline at end of file diff --git a/onionr/static-data/www/shared/sites.js b/onionr/static-data/www/shared/sites.js index ab5c01a8..689938d9 100755 --- a/onionr/static-data/www/shared/sites.js +++ b/onionr/static-data/www/shared/sites.js @@ -13,6 +13,8 @@ document.getElementById('openSite').onclick = function(){ window.location.href = '/site/' + hash } else{ - alert('Invalid site hash') + PNotify.error({ + text: 'Invalid site hash' + }) } } \ No newline at end of file diff --git a/tests/test_waitforsetvars.py b/tests/test_waitforsetvars.py new file mode 100644 index 00000000..76bbb724 --- /dev/null +++ b/tests/test_waitforsetvars.py @@ -0,0 +1,45 @@ +import sys, os +sys.path.append(".") +sys.path.append("onionr/") +import unittest, uuid, time, threading + +TEST_DIR = 'testdata/%s-%s' % (uuid.uuid4(), os.path.basename(__file__)) + '/' +os.environ["ONIONR_HOME"] = TEST_DIR +from etc import waitforsetvar + +def set_test_var_delay(obj, delay=0): + if delay > 0: time.sleep(delay) + obj.test_var = True + +class TestWaitForSetVar(unittest.TestCase): + def test_no_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_negative_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, -1]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_zero_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 0]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_one_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 1]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + + def test_three_wait(self): + test_object = lambda: None + threading.Thread(target=set_test_var_delay, args=[test_object, 3]).start() + waitforsetvar.wait_for_set_var(test_object, 'test_var') + self.assertTrue(test_object.test_var) + +unittest.main() From 1114db8a30585d2cbdbe726dfae3e801c3568a98 Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Tue, 17 Sep 2019 01:56:13 -0500 Subject: [PATCH 2/2] started work on upload sessions --- onionr/communicator/peeraction.py | 2 +- .../uploadblocks/__init__.py | 17 +++++++----- .../communicatorutils/uploadblocks/session.py | 1 + .../uploadblocks/sessionmanager.py | 26 ++++++++++++++++--- onionr/static-data/www/shared/configeditor.js | 4 ++- onionr/static-data/www/shared/sites.js | 2 +- 6 files changed, 39 insertions(+), 13 deletions(-) diff --git a/onionr/communicator/peeraction.py b/onionr/communicator/peeraction.py index f3b73cfe..ca715273 100644 --- a/onionr/communicator/peeraction.py +++ b/onionr/communicator/peeraction.py @@ -44,7 +44,7 @@ def peer_action(comm_inst, peer, action, returnHeaders=False, max_resp_size=5242 onlinepeers.remove_online_peer(comm_inst, peer) keydb.transportinfo.set_address_info(peer, 'lastConnectAttempt', epoch.get_epoch()) if action != 'ping' and not comm_inst.shutdown: - logger.warn('Lost connection to ' + peer, terminal=True) + logger.warn(f'Lost connection to {peer}', terminal=True) onlinepeers.get_online_peers(comm_inst) # Will only add a new peer to pool if needed except ValueError: pass diff --git a/onionr/communicatorutils/uploadblocks/__init__.py b/onionr/communicatorutils/uploadblocks/__init__.py index 20005b49..263b5c9d 100755 --- a/onionr/communicatorutils/uploadblocks/__init__.py +++ b/onionr/communicatorutils/uploadblocks/__init__.py @@ -18,7 +18,7 @@ from __future__ import annotations You should have received a copy of the GNU General Public License along with this program. If not, see . ''' -from typing import Union +from typing import Union, TYPE_CHECKING import logger from communicatorutils import proxypicker import onionrexceptions @@ -26,6 +26,7 @@ import onionrblockapi as block from onionrutils import localcommand, stringvalidators, basicrequests from communicator import onlinepeers import onionrcrypto +from . import sessionmanager def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): """Accepts a communicator instance and uploads blocks from its upload queue""" @@ -33,7 +34,7 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): it to a few peers to add some deniability & increase functionality""" TIMER_NAME = "upload_blocks_from_communicator" - session_manager = comm_inst.shared_state.get_by_string('BlockUploadSessionManager') + session_manager: sessionmanager.BlockUploadSessionManager = comm_inst.shared_state.get(sessionmanager.BlockUploadSessionManager) triedPeers = [] finishedUploads = [] comm_inst.blocksToUpload = onionrcrypto.cryptoutils.random_shuffle(comm_inst.blocksToUpload) @@ -43,11 +44,10 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): logger.warn('Requested to upload invalid block', terminal=True) comm_inst.decrementThreadCount(TIMER_NAME) return - session_manager.new_session(bl) + session_manager.add_session(bl) for i in range(min(len(comm_inst.onlinePeers), 6)): peer = onlinepeers.pick_online_peer(comm_inst) - if peer in triedPeers: - continue + if peer in triedPeers: continue triedPeers.append(peer) url = f'http://{peer}/upload' try: @@ -60,12 +60,15 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): resp = basicrequests.do_post_request(url, data=data, proxyType=proxyType, content_type='application/octet-stream') if not resp == False: if resp == 'success': - localcommand.local_command('waitforshare/' + bl, post=True) + session_manager.get + localcommand.local_command(f'waitforshare/{bl}', post=True) finishedUploads.append(bl) elif resp == 'exists': + comm_inst.getPeerProfileInstance(peer).addScore(-1) finishedUploads.append(bl) else: - logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}'), terminal=True) + comm_inst.getPeerProfileInstance(peer).addScore(-5) + logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}', terminal=True) for x in finishedUploads: try: comm_inst.blocksToUpload.remove(x) diff --git a/onionr/communicatorutils/uploadblocks/session.py b/onionr/communicatorutils/uploadblocks/session.py index aaeabb0c..3658e1ca 100644 --- a/onionr/communicatorutils/uploadblocks/session.py +++ b/onionr/communicatorutils/uploadblocks/session.py @@ -38,6 +38,7 @@ class UploadSession: self.total_fail_count: int = 0 self.total_success_count: int = 0 self.peer_fails = {} + self.peer_exists = {} def fail_peer(self, peer): try: diff --git a/onionr/communicatorutils/uploadblocks/sessionmanager.py b/onionr/communicatorutils/uploadblocks/sessionmanager.py index 82cd6971..3d188694 100644 --- a/onionr/communicatorutils/uploadblocks/sessionmanager.py +++ b/onionr/communicatorutils/uploadblocks/sessionmanager.py @@ -19,19 +19,39 @@ from __future__ import annotations along with this program. If not, see . """ from typing import Iterable, Union + from onionrutils import bytesconverter + +from . import session + class BlockUploadSessionManager: + """Holds block UploadSession instances. Optionally accepts iterable of sessions to added on init + + Arguments: old_session: iterable of old UploadSession objects""" def __init__(self, old_sessions:Iterable=None): - if old_session is None: + if old_sessions is None: self.sessions = [] else: self.sessions = old_session - def add_session(self, session_or_block: Union(str, bytes, UploadSession, Block)): + def add_session(self, session_or_block: Union(str, bytes, session.UploadSession, Block))->session.UploadSession: """Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession or Block object""" + if isinstance(session_or_block, session.UploadSession): + self.sessions.append(session_or_block) + return session_or_block + # convert Block to hash string + if hasattr(session_or_block, 'bheader') and hasattr(session_or_block, 'raw'): session_or_block = session_or_block.hash + # convert bytes hash to str if isinstance(session_or_block, bytes): session_or_block = bytesconverter.bytes_to_str(session_or_block) + # intentionally not elif if isinstance(session_or_block, str): - self.sessions.append() + new_session = session.UploadSession(session_or_block) + self.sessions.append(new_session) + return new_session + + def get_session(self, block_hash: Union(str, bytes))->session.UploadSession: + block_hash = bytesconverter.bytes_to_str(block_hash).replace('=', '') + for session in self.sessions: if session.block_hash == block_hash: return session def clean_session(self, specific_session: Union[str, UploadSession]): return \ No newline at end of file diff --git a/onionr/static-data/www/shared/configeditor.js b/onionr/static-data/www/shared/configeditor.js index 172b3d5a..7c9068ce 100755 --- a/onionr/static-data/www/shared/configeditor.js +++ b/onionr/static-data/www/shared/configeditor.js @@ -49,6 +49,8 @@ saveBtn.onclick = function(){ }}) .then((resp) => resp.text()) // Transform the data into text .then(function(data) { - alert('Config saved') + PNotify.success({ + text: 'Config saved' + }) }) } \ No newline at end of file diff --git a/onionr/static-data/www/shared/sites.js b/onionr/static-data/www/shared/sites.js index 689938d9..b318fffc 100755 --- a/onionr/static-data/www/shared/sites.js +++ b/onionr/static-data/www/shared/sites.js @@ -13,7 +13,7 @@ document.getElementById('openSite').onclick = function(){ window.location.href = '/site/' + hash } else{ - PNotify.error({ + PNotify.notice({ text: 'Invalid site hash' }) }