Added peer exchange test (passing)
This commit is contained in:
parent
6a6460ef31
commit
237cdde4e5
@ -76,6 +76,8 @@ def gossip_client():
|
||||
dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH)
|
||||
|
||||
while True:
|
||||
sleep(5)
|
||||
continue
|
||||
while not len(gossip_peer_set):
|
||||
sleep(0.2)
|
||||
if dandelion_phase.remaining_time() <= 10:
|
||||
@ -84,7 +86,7 @@ def gossip_client():
|
||||
logger.debug("Entering stem phase", terminal=True)
|
||||
try:
|
||||
# Stem out blocks for (roughly) remaining epoch time
|
||||
asyncio.run(stem_out())
|
||||
asyncio.run(stem_out(dandelion_phase))
|
||||
except TimeoutError:
|
||||
continue
|
||||
except Exception:
|
||||
|
@ -16,7 +16,7 @@ def do_announce():
|
||||
"Announce with N peers of each identified transport"
|
||||
def _announce(announce_peer: 'Peer', our_transport_address: str):
|
||||
try:
|
||||
our_transport_address = our_transport_address.encode('utf-8')
|
||||
our_transport_address = our_transport_address.encode('utf-8') + b"\n"
|
||||
except AttributeError:
|
||||
pass
|
||||
sock = announce_peer.get_socket(12)
|
||||
|
@ -20,7 +20,11 @@ def _ask_peer(peer):
|
||||
s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE))
|
||||
# Get 10 max peers
|
||||
for _ in range(MAX_PEERS):
|
||||
peer = s.recv(TRANSPORT_SIZE_BYTES)
|
||||
peer = b''
|
||||
c = b''
|
||||
while c != b'\n':
|
||||
c = s.recv(1)
|
||||
peer += c
|
||||
if not peer:
|
||||
break
|
||||
connect_data = {
|
||||
@ -54,7 +58,7 @@ def get_new_peers():
|
||||
# Start threads to ask the peers for more peers
|
||||
threads = []
|
||||
for peer in peers_we_ask:
|
||||
t = Thread(target=_ask_peer, args=[peer, gossip_peer_set], daemon=True)
|
||||
t = Thread(target=_ask_peer, args=[peer], daemon=True)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
peers_we_ask.clear()
|
||||
|
@ -59,6 +59,7 @@ def stream_from_peers():
|
||||
need_socket_lock = Semaphore(MAX_STREAMS)
|
||||
offset = 0
|
||||
|
||||
|
||||
def _stream_from_peer(peer: Peer):
|
||||
|
||||
try:
|
||||
@ -83,11 +84,11 @@ def stream_from_peers():
|
||||
"reported block size out of range")
|
||||
break
|
||||
block_data = sock.recv(block_size)
|
||||
|
||||
try:
|
||||
blockdb.add_block_to_db(
|
||||
onionrblocks.Block(
|
||||
block_id, block_data, auto_verify=True)
|
||||
)
|
||||
block_id, block_data, auto_verify=True))
|
||||
except Exception:
|
||||
# They gave us a bad block, kill the stream
|
||||
# Could be corruption or malice
|
||||
|
@ -8,7 +8,7 @@ def connect_peer(peer):
|
||||
if peer in gossip_peer_set:
|
||||
return
|
||||
try:
|
||||
s = peer.get_socket(12)
|
||||
s = peer.get_socket(15)
|
||||
except Exception:
|
||||
logger.warn(f"Could not connect to {peer.transport_address}")
|
||||
else:
|
||||
|
@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
from audioop import add
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Set, Tuple
|
||||
@ -63,8 +64,8 @@ def gossip_server():
|
||||
pass
|
||||
case GossipCommands.ANNOUNCE:
|
||||
async def _read_announce():
|
||||
address = await reader.readexactly(
|
||||
constants.TRANSPORT_SIZE_BYTES)
|
||||
address = await reader.readuntil(b'\n')
|
||||
|
||||
if address:
|
||||
onionrevents.event(
|
||||
'announce_rec',
|
||||
@ -74,10 +75,10 @@ def gossip_server():
|
||||
writer.write(int(1).to_bytes(1, 'big'))
|
||||
await asyncio.wait_for(_read_announce(), 10)
|
||||
case GossipCommands.PEER_EXCHANGE:
|
||||
|
||||
for peer in gossip_peer_set:
|
||||
writer.write(
|
||||
peer.transport_address.encode(
|
||||
'utf-8').removesuffix(b'.onion'))
|
||||
writer.write(peer.transport_address.encode('utf-8') + b'\n')
|
||||
await writer.drain()
|
||||
case GossipCommands.STREAM_BLOCKS:
|
||||
try:
|
||||
await diffuse_blocks(reader, writer)
|
||||
|
@ -70,6 +70,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
|
||||
return
|
||||
|
||||
await writer.drain()
|
||||
# write block size
|
||||
writer.write(
|
||||
str(len(block.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))
|
||||
await writer.drain()
|
||||
|
@ -3,7 +3,6 @@ import logger
|
||||
|
||||
from getsocks import get_socks
|
||||
from torpeer import TorPeer
|
||||
from gossip.peerset import gossip_peer_set
|
||||
|
||||
|
||||
def on_announce_rec(api, data=None):
|
||||
@ -16,11 +15,11 @@ def on_announce_rec(api, data=None):
|
||||
pass
|
||||
|
||||
if announced == config.get('tor.transport_address'):
|
||||
logger.warn("Recieved announcement for our own node, which shouldnt happen")
|
||||
logger.warn(
|
||||
"Received announcement for our own node, which shouldn't happen")
|
||||
return
|
||||
|
||||
announced = announced.strip()
|
||||
announced += '.onion'
|
||||
|
||||
data['callback'](
|
||||
gossip_peer_set,
|
||||
TorPeer(socks_address, socks_port, announced))
|
||||
data['callback'](TorPeer(socks_address, socks_port, announced))
|
||||
|
@ -92,7 +92,7 @@ def on_gossip_start(api, data: Set[Peer] = None):
|
||||
try:
|
||||
add_onion_resp = controller.create_ephemeral_hidden_service(
|
||||
{'80': f'unix:{gossip_server_socket_file}'},
|
||||
key_content=key, key_type='ED25519-V3', detached=True)
|
||||
key_content=key, key_type='ED25519-V3', detached=True, await_publication=True)
|
||||
except stem.ProtocolError:
|
||||
logger.error(
|
||||
"Could not start Tor transport. Try restarting Onionr",
|
||||
|
@ -6,11 +6,15 @@ class TorPeer:
|
||||
def __init__(self, socks_host, socks_port, onion_address):
|
||||
if not onion_address or onion_address == '.onion':
|
||||
raise ValueError("Invalid transport address")
|
||||
if not onion_address.endswith('.onion'):
|
||||
self.onion_address = onion_address.strip() + '.onion'
|
||||
|
||||
self.transport_address = self.onion_address = onion_address
|
||||
self.socks_host = socks_host
|
||||
self.socks_port = socks_port
|
||||
|
||||
def get_socket(self, connect_timeout) -> socks.socksocket:
|
||||
|
||||
s = socks.socksocket()
|
||||
s.set_proxy(socks.SOCKS4, self.socks_host, self.socks_port, rdns=True)
|
||||
s.settimeout(connect_timeout)
|
||||
|
Loading…
Reference in New Issue
Block a user