Fix tor peers not being removed from connection pool when they fail
This commit is contained in:
parent
69a31d1d83
commit
86615305d7
@ -101,7 +101,7 @@ def start_gossip_client():
|
|||||||
# transport plugin handles the new peer
|
# transport plugin handles the new peer
|
||||||
add_onionr_thread(
|
add_onionr_thread(
|
||||||
get_new_peers,
|
get_new_peers,
|
||||||
60, 'get_new_peers', initial_sleep=120)
|
120, 'get_new_peers', initial_sleep=120)
|
||||||
|
|
||||||
# Start a new thread to stream blocks from peers
|
# Start a new thread to stream blocks from peers
|
||||||
# These blocks are being diffused and are stored in
|
# These blocks are being diffused and are stored in
|
||||||
|
@ -24,9 +24,6 @@ def _do_ask_peer(peer):
|
|||||||
_ask_peer(peer)
|
_ask_peer(peer)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
logger.debug("Timed out when asking for new peers")
|
logger.debug("Timed out when asking for new peers")
|
||||||
except GeneralProxyError:
|
|
||||||
logger.debug("Proxy error")
|
|
||||||
logger.debug(format_exc(), terminal=True)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.error(format_exc(), terminal=True)
|
logger.error(format_exc(), terminal=True)
|
||||||
|
|
||||||
@ -49,7 +46,7 @@ def _ask_peer(peer):
|
|||||||
'address': peer,
|
'address': peer,
|
||||||
'callback': connectpeer.connect_peer
|
'callback': connectpeer.connect_peer
|
||||||
}
|
}
|
||||||
logger.info("Got new peer from exchange " + peer.decode('utf-8'), terminal=True)
|
#logger.info("Got new peer from exchange " + peer.decode('utf-8'), terminal=True)
|
||||||
onionrevents.event('announce_rec', data=connect_data, threaded=True)
|
onionrevents.event('announce_rec', data=connect_data, threaded=True)
|
||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
@ -79,7 +76,7 @@ def get_new_peers():
|
|||||||
threads = []
|
threads = []
|
||||||
for peer in peers_we_ask:
|
for peer in peers_we_ask:
|
||||||
t = Thread(
|
t = Thread(
|
||||||
target=_do_ask_peer,
|
target=_do_ask_peer,
|
||||||
args=[peer], daemon=True, name='_do_ask_peer')
|
args=[peer], daemon=True, name='_do_ask_peer')
|
||||||
t.start()
|
t.start()
|
||||||
threads.append(t)
|
threads.append(t)
|
||||||
|
@ -67,7 +67,7 @@ def stream_from_peers():
|
|||||||
stream_times = 100
|
stream_times = 100
|
||||||
try:
|
try:
|
||||||
sock = peer.get_socket(CONNECT_TIMEOUT)
|
sock = peer.get_socket(CONNECT_TIMEOUT)
|
||||||
except ConnectionRefusedError:
|
except (TimeoutError, ConnectionRefusedError) as _:
|
||||||
need_socket_lock.release()
|
need_socket_lock.release()
|
||||||
return
|
return
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -118,7 +118,7 @@ def stream_from_peers():
|
|||||||
raise
|
raise
|
||||||
# Tell them to keep streaming
|
# Tell them to keep streaming
|
||||||
sock.sendall(int(1).to_bytes(1, 'big'))
|
sock.sendall(int(1).to_bytes(1, 'big'))
|
||||||
except (BrokenPipeError, TimeoutError) as e:
|
except (BrokenPipeError, TimeoutError, ConnectionError) as e:
|
||||||
pass
|
pass
|
||||||
#logger.debug(f"{e} when streaming from peers", terminal=True)
|
#logger.debug(f"{e} when streaming from peers", terminal=True)
|
||||||
#logger.debug(traceback.format_exc())
|
#logger.debug(traceback.format_exc())
|
||||||
|
@ -3,12 +3,24 @@ import socks
|
|||||||
from gossip.peerset import gossip_peer_set
|
from gossip.peerset import gossip_peer_set
|
||||||
import logger
|
import logger
|
||||||
|
|
||||||
|
|
||||||
class HandleRevc:
|
class HandleRevc:
|
||||||
def __init__(self, sock):
|
def __init__(self, sock):
|
||||||
self.sock_recv = sock.recv
|
self.sock_recv = sock.recv
|
||||||
|
self.sock = sock
|
||||||
|
|
||||||
def recv(self, *args, **kwargs):
|
def recv(self, *args, **kwargs):
|
||||||
return self.sock_recv(*args, **kwargs)
|
try:
|
||||||
|
got_data = self.sock_recv(*args, **kwargs)
|
||||||
|
if not len(got_data):
|
||||||
|
raise ConnectionError("Peer socket returned empty value")
|
||||||
|
return got_data
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
gossip_peer_set.remove(self.sock)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
class TorPeer:
|
class TorPeer:
|
||||||
@ -37,7 +49,7 @@ class TorPeer:
|
|||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
logger.debug(f"Could not create socket to peer {self.transport_address}", terminal=True)
|
logger.debug(f"Could not create socket to peer {self.transport_address}", terminal=True)
|
||||||
raise
|
raise TimeoutError
|
||||||
mock_recv = HandleRevc(s)
|
mock_recv = HandleRevc(s)
|
||||||
s.recv = mock_recv.recv
|
s.recv = mock_recv.recv
|
||||||
return s
|
return s
|
||||||
|
Loading…
Reference in New Issue
Block a user