From af8d7a3e96f49a8f06b738f38bcbc46c87810ffa Mon Sep 17 00:00:00 2001 From: Kevin Froman Date: Wed, 18 Sep 2019 15:56:29 -0500 Subject: [PATCH] work on fixing uploads --- .../uploadblocks/__init__.py | 28 +++++++++++---- .../communicatorutils/uploadblocks/session.py | 1 + .../uploadblocks/sessionmanager.py | 36 ++++++++++++++----- onionr/etc/onionrvalues.py | 3 ++ onionr/httpapi/miscclientapi/endpoints.py | 2 ++ 5 files changed, 54 insertions(+), 16 deletions(-) diff --git a/onionr/communicatorutils/uploadblocks/__init__.py b/onionr/communicatorutils/uploadblocks/__init__.py index 263b5c9d..27f205a3 100755 --- a/onionr/communicatorutils/uploadblocks/__init__.py +++ b/onionr/communicatorutils/uploadblocks/__init__.py @@ -19,6 +19,8 @@ from __future__ import annotations along with this program. If not, see . ''' from typing import Union, TYPE_CHECKING +import threading + import logger from communicatorutils import proxypicker import onionrexceptions @@ -44,9 +46,18 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): logger.warn('Requested to upload invalid block', terminal=True) comm_inst.decrementThreadCount(TIMER_NAME) return - session_manager.add_session(bl) + session = session_manager.add_session(bl) for i in range(min(len(comm_inst.onlinePeers), 6)): peer = onlinepeers.pick_online_peer(comm_inst) + try: + session.peer_exists[peer] + continue + except KeyError: + pass + try: + if session.peer_fails[peer] > 3: continue + except KeyError: + pass if peer in triedPeers: continue triedPeers.append(peer) url = f'http://{peer}/upload' @@ -56,19 +67,22 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon): finishedUploads.append(bl) break proxyType = proxypicker.pick_proxy(peer) - logger.info(f"Uploading block {bl:[:8]} to {peer}", terminal=True) + logger.info(f"Uploading block {bl[:8]} to {peer}", terminal=True) resp = basicrequests.do_post_request(url, data=data, proxyType=proxyType, content_type='application/octet-stream') if not resp == False: if resp == 'success': - session_manager.get - localcommand.local_command(f'waitforshare/{bl}', post=True) - finishedUploads.append(bl) + session.success() + session.peer_exists[peer] = True elif resp == 'exists': - comm_inst.getPeerProfileInstance(peer).addScore(-1) - finishedUploads.append(bl) + session.peer_exists[peer] = True else: + session.fail() + session.fail_peer(peer) comm_inst.getPeerProfileInstance(peer).addScore(-5) logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}', terminal=True) + else: + session.fail() + session_manager.clean_session() for x in finishedUploads: try: comm_inst.blocksToUpload.remove(x) diff --git a/onionr/communicatorutils/uploadblocks/session.py b/onionr/communicatorutils/uploadblocks/session.py index 3658e1ca..6c0b3472 100644 --- a/onionr/communicatorutils/uploadblocks/session.py +++ b/onionr/communicatorutils/uploadblocks/session.py @@ -3,6 +3,7 @@ Virtual upload "sessions" for blocks """ +from __future__ import annotations """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by diff --git a/onionr/communicatorutils/uploadblocks/sessionmanager.py b/onionr/communicatorutils/uploadblocks/sessionmanager.py index 3d188694..4e8f58d2 100644 --- a/onionr/communicatorutils/uploadblocks/sessionmanager.py +++ b/onionr/communicatorutils/uploadblocks/sessionmanager.py @@ -21,6 +21,10 @@ from __future__ import annotations from typing import Iterable, Union from onionrutils import bytesconverter +from onionrutils import localcommand +from etc import onionrvalues +from etc import waitforsetvar +from utils import reconstructhash from . import session @@ -29,18 +33,22 @@ class BlockUploadSessionManager: Arguments: old_session: iterable of old UploadSession objects""" def __init__(self, old_sessions:Iterable=None): + #self._too_many: TooMany = None if old_sessions is None: self.sessions = [] else: self.sessions = old_session - def add_session(self, session_or_block: Union(str, bytes, session.UploadSession, Block))->session.UploadSession: - """Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession or Block object""" + def add_session(self, session_or_block: Union(str, bytes, session.UploadSession))->session.UploadSession: + """Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession""" if isinstance(session_or_block, session.UploadSession): - self.sessions.append(session_or_block) + if not session_or_block in self.sessions: + self.sessions.append(session_or_block) return session_or_block - # convert Block to hash string - if hasattr(session_or_block, 'bheader') and hasattr(session_or_block, 'raw'): session_or_block = session_or_block.hash + try: + return self.get_session(session_or_block) + except KeyError: + pass # convert bytes hash to str if isinstance(session_or_block, bytes): session_or_block = bytesconverter.bytes_to_str(session_or_block) # intentionally not elif @@ -50,8 +58,18 @@ class BlockUploadSessionManager: return new_session def get_session(self, block_hash: Union(str, bytes))->session.UploadSession: - block_hash = bytesconverter.bytes_to_str(block_hash).replace('=', '') - for session in self.sessions: if session.block_hash == block_hash: return session + block_hash = reconstructhash.deconstruct_hash(bytesconverter.bytes_to_str(block_hash)) + for session in self.sessions: + if session.block_hash == block_hash: return session + raise KeyError - def clean_session(self, specific_session: Union[str, UploadSession]): - return \ No newline at end of file + def clean_session(self, specific_session: Union[str, UploadSession]=None): + comm_inst: OnionrCommunicatorDaemon = self._too_many.get_by_string("OnionrCommunicatorDaemon") + sessions_to_delete = [] + if comm_inst.getUptime() < 120: return + for session in self.sessions: + if (session.total_success_count / len(comm_inst.onlinePeers)) >= onionrvalues.MIN_BLOCK_UPLOAD_PEER_PERCENT: + sessions_to_delete.append(session) + for session in sessions_to_delete: + self.sessions.remove(session) + localcommand.local_command('waitforshare/{session.block_hash}') \ No newline at end of file diff --git a/onionr/etc/onionrvalues.py b/onionr/etc/onionrvalues.py index 66075c0a..642fe4c7 100755 --- a/onionr/etc/onionrvalues.py +++ b/onionr/etc/onionrvalues.py @@ -31,6 +31,9 @@ MAX_BLOCK_CLOCK_SKEW = 120 MAIN_PUBLIC_KEY_SIZE = 32 ORIG_RUN_DIR_ENV_VAR = 'ORIG_ONIONR_RUN_DIR' +# Block creation anonymization requirements +MIN_BLOCK_UPLOAD_PEER_PERCENT = 0.1 + # Begin OnionrValues migrated values ANNOUNCE_POW = 5 DEFAULT_EXPIRE = 2592000 diff --git a/onionr/httpapi/miscclientapi/endpoints.py b/onionr/httpapi/miscclientapi/endpoints.py index 12082a1d..d4b903e6 100644 --- a/onionr/httpapi/miscclientapi/endpoints.py +++ b/onionr/httpapi/miscclientapi/endpoints.py @@ -26,6 +26,7 @@ from netcontroller import NetController from serializeddata import SerializedData from onionrutils import mnemonickeys from onionrutils import bytesconverter +from utils import reconstructhash pub_key = onionrcrypto.pub_key.replace('=', '') @@ -78,6 +79,7 @@ class PrivateEndpoints: def waitforshare(name): '''Used to prevent the **public** api from sharing blocks we just created''' if not name.isalnum(): raise ValueError('block hash needs to be alpha numeric') + name = reconstructhash.reconstruct_hash(name) if name in client_api.publicAPI.hideBlocks: client_api.publicAPI.hideBlocks.remove(name) return Response("removed")