work on fixing uploads

This commit is contained in:
Kevin Froman 2019-09-18 15:56:29 -05:00
parent 1114db8a30
commit af8d7a3e96
5 changed files with 54 additions and 16 deletions

View File

@ -19,6 +19,8 @@ from __future__ import annotations
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
from typing import Union, TYPE_CHECKING from typing import Union, TYPE_CHECKING
import threading
import logger import logger
from communicatorutils import proxypicker from communicatorutils import proxypicker
import onionrexceptions import onionrexceptions
@ -44,9 +46,18 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon):
logger.warn('Requested to upload invalid block', terminal=True) logger.warn('Requested to upload invalid block', terminal=True)
comm_inst.decrementThreadCount(TIMER_NAME) comm_inst.decrementThreadCount(TIMER_NAME)
return return
session_manager.add_session(bl) session = session_manager.add_session(bl)
for i in range(min(len(comm_inst.onlinePeers), 6)): for i in range(min(len(comm_inst.onlinePeers), 6)):
peer = onlinepeers.pick_online_peer(comm_inst) peer = onlinepeers.pick_online_peer(comm_inst)
try:
session.peer_exists[peer]
continue
except KeyError:
pass
try:
if session.peer_fails[peer] > 3: continue
except KeyError:
pass
if peer in triedPeers: continue if peer in triedPeers: continue
triedPeers.append(peer) triedPeers.append(peer)
url = f'http://{peer}/upload' url = f'http://{peer}/upload'
@ -56,19 +67,22 @@ def upload_blocks_from_communicator(comm_inst: OnionrCommunicatorDaemon):
finishedUploads.append(bl) finishedUploads.append(bl)
break break
proxyType = proxypicker.pick_proxy(peer) proxyType = proxypicker.pick_proxy(peer)
logger.info(f"Uploading block {bl:[:8]} to {peer}", terminal=True) logger.info(f"Uploading block {bl[:8]} to {peer}", terminal=True)
resp = basicrequests.do_post_request(url, data=data, proxyType=proxyType, content_type='application/octet-stream') resp = basicrequests.do_post_request(url, data=data, proxyType=proxyType, content_type='application/octet-stream')
if not resp == False: if not resp == False:
if resp == 'success': if resp == 'success':
session_manager.get session.success()
localcommand.local_command(f'waitforshare/{bl}', post=True) session.peer_exists[peer] = True
finishedUploads.append(bl)
elif resp == 'exists': elif resp == 'exists':
comm_inst.getPeerProfileInstance(peer).addScore(-1) session.peer_exists[peer] = True
finishedUploads.append(bl)
else: else:
session.fail()
session.fail_peer(peer)
comm_inst.getPeerProfileInstance(peer).addScore(-5) comm_inst.getPeerProfileInstance(peer).addScore(-5)
logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}', terminal=True) logger.warn(f'Failed to upload {bl[:8]}, reason: {resp[:15]}', terminal=True)
else:
session.fail()
session_manager.clean_session()
for x in finishedUploads: for x in finishedUploads:
try: try:
comm_inst.blocksToUpload.remove(x) comm_inst.blocksToUpload.remove(x)

View File

@ -3,6 +3,7 @@
Virtual upload "sessions" for blocks Virtual upload "sessions" for blocks
""" """
from __future__ import annotations
""" """
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by

View File

@ -21,6 +21,10 @@ from __future__ import annotations
from typing import Iterable, Union from typing import Iterable, Union
from onionrutils import bytesconverter from onionrutils import bytesconverter
from onionrutils import localcommand
from etc import onionrvalues
from etc import waitforsetvar
from utils import reconstructhash
from . import session from . import session
@ -29,18 +33,22 @@ class BlockUploadSessionManager:
Arguments: old_session: iterable of old UploadSession objects""" Arguments: old_session: iterable of old UploadSession objects"""
def __init__(self, old_sessions:Iterable=None): def __init__(self, old_sessions:Iterable=None):
#self._too_many: TooMany = None
if old_sessions is None: if old_sessions is None:
self.sessions = [] self.sessions = []
else: else:
self.sessions = old_session self.sessions = old_session
def add_session(self, session_or_block: Union(str, bytes, session.UploadSession, Block))->session.UploadSession: def add_session(self, session_or_block: Union(str, bytes, session.UploadSession))->session.UploadSession:
"""Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession or Block object""" """Create (or add existing) block upload session from a str/bytes block hex hash, existing UploadSession"""
if isinstance(session_or_block, session.UploadSession): if isinstance(session_or_block, session.UploadSession):
self.sessions.append(session_or_block) if not session_or_block in self.sessions:
self.sessions.append(session_or_block)
return session_or_block return session_or_block
# convert Block to hash string try:
if hasattr(session_or_block, 'bheader') and hasattr(session_or_block, 'raw'): session_or_block = session_or_block.hash return self.get_session(session_or_block)
except KeyError:
pass
# convert bytes hash to str # convert bytes hash to str
if isinstance(session_or_block, bytes): session_or_block = bytesconverter.bytes_to_str(session_or_block) if isinstance(session_or_block, bytes): session_or_block = bytesconverter.bytes_to_str(session_or_block)
# intentionally not elif # intentionally not elif
@ -50,8 +58,18 @@ class BlockUploadSessionManager:
return new_session return new_session
def get_session(self, block_hash: Union(str, bytes))->session.UploadSession: def get_session(self, block_hash: Union(str, bytes))->session.UploadSession:
block_hash = bytesconverter.bytes_to_str(block_hash).replace('=', '') block_hash = reconstructhash.deconstruct_hash(bytesconverter.bytes_to_str(block_hash))
for session in self.sessions: if session.block_hash == block_hash: return session for session in self.sessions:
if session.block_hash == block_hash: return session
raise KeyError
def clean_session(self, specific_session: Union[str, UploadSession]): def clean_session(self, specific_session: Union[str, UploadSession]=None):
return comm_inst: OnionrCommunicatorDaemon = self._too_many.get_by_string("OnionrCommunicatorDaemon")
sessions_to_delete = []
if comm_inst.getUptime() < 120: return
for session in self.sessions:
if (session.total_success_count / len(comm_inst.onlinePeers)) >= onionrvalues.MIN_BLOCK_UPLOAD_PEER_PERCENT:
sessions_to_delete.append(session)
for session in sessions_to_delete:
self.sessions.remove(session)
localcommand.local_command('waitforshare/{session.block_hash}')

View File

@ -31,6 +31,9 @@ MAX_BLOCK_CLOCK_SKEW = 120
MAIN_PUBLIC_KEY_SIZE = 32 MAIN_PUBLIC_KEY_SIZE = 32
ORIG_RUN_DIR_ENV_VAR = 'ORIG_ONIONR_RUN_DIR' ORIG_RUN_DIR_ENV_VAR = 'ORIG_ONIONR_RUN_DIR'
# Block creation anonymization requirements
MIN_BLOCK_UPLOAD_PEER_PERCENT = 0.1
# Begin OnionrValues migrated values # Begin OnionrValues migrated values
ANNOUNCE_POW = 5 ANNOUNCE_POW = 5
DEFAULT_EXPIRE = 2592000 DEFAULT_EXPIRE = 2592000

View File

@ -26,6 +26,7 @@ from netcontroller import NetController
from serializeddata import SerializedData from serializeddata import SerializedData
from onionrutils import mnemonickeys from onionrutils import mnemonickeys
from onionrutils import bytesconverter from onionrutils import bytesconverter
from utils import reconstructhash
pub_key = onionrcrypto.pub_key.replace('=', '') pub_key = onionrcrypto.pub_key.replace('=', '')
@ -78,6 +79,7 @@ class PrivateEndpoints:
def waitforshare(name): def waitforshare(name):
'''Used to prevent the **public** api from sharing blocks we just created''' '''Used to prevent the **public** api from sharing blocks we just created'''
if not name.isalnum(): raise ValueError('block hash needs to be alpha numeric') if not name.isalnum(): raise ValueError('block hash needs to be alpha numeric')
name = reconstructhash.reconstruct_hash(name)
if name in client_api.publicAPI.hideBlocks: if name in client_api.publicAPI.hideBlocks:
client_api.publicAPI.hideBlocks.remove(name) client_api.publicAPI.hideBlocks.remove(name)
return Response("removed") return Response("removed")