async bug fixes

This commit is contained in:
Kevin F 2022-07-11 10:27:13 -05:00
parent 1eadb4bf6e
commit d11d12b67f
9 changed files with 25 additions and 22 deletions

View File

@ -87,10 +87,8 @@ def start_gossip_client():
""" """
bl: Block bl: Block
def _start_announce():
sleep(60) Thread(target=do_announce, daemon=True).start()
do_announce()
Thread(target=_start_announce, daemon=True).start()
# Start a thread that runs every 1200 secs to # Start a thread that runs every 1200 secs to
# Ask peers for a subset for their peer set # Ask peers for a subset for their peer set

View File

@ -14,11 +14,15 @@ from ..peerset import gossip_peer_set
def do_announce(): def do_announce():
"Announce with N peers of each identified transport" "Announce with N peers of each identified transport"
per_transport = 4
peer_types = {}
count_for_peer = 0
def _announce(announce_peer: 'Peer', our_transport_address: str): def _announce(announce_peer: 'Peer', our_transport_address: str):
assert our_transport_address
try: try:
our_transport_address = our_transport_address.encode('utf-8') + b"\n" our_transport_address = our_transport_address.encode('utf-8') + b"\n"
except AttributeError: except AttributeError:
pass our_transport_address = our_transport_address + b'\n'
sock = announce_peer.get_socket(12) sock = announce_peer.get_socket(12)
sock.sendall(command_to_byte(GossipCommands.ANNOUNCE)) sock.sendall(command_to_byte(GossipCommands.ANNOUNCE))
sock.sendall(our_transport_address) sock.sendall(our_transport_address)
@ -30,9 +34,7 @@ def do_announce():
while not len(gossip_peer_set): while not len(gossip_peer_set):
sleep(1) sleep(1)
per_transport = 3
peer_types = {}
count_for_peer = 0
for peer in gossip_peer_set: for peer in gossip_peer_set:
try: try:
count_for_peer = peer_types[peer.__class__] count_for_peer = peer_types[peer.__class__]

View File

@ -1,5 +1,4 @@
import asyncio import asyncio
from audioop import add
import traceback import traceback
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Set, Tuple from typing import Set, Tuple
@ -60,7 +59,6 @@ def gossip_server():
match GossipCommands(cmd): match GossipCommands(cmd):
case GossipCommands.PING: case GossipCommands.PING:
writer.write(b'PONG') writer.write(b'PONG')
break
case GossipCommands.ANNOUNCE: case GossipCommands.ANNOUNCE:
async def _read_announce(): async def _read_announce():
address = await reader.readuntil(b'\n') address = await reader.readuntil(b'\n')
@ -72,6 +70,7 @@ def gossip_server():
'callback': connect_peer}, 'callback': connect_peer},
threaded=True) threaded=True)
writer.write(int(1).to_bytes(1, 'big')) writer.write(int(1).to_bytes(1, 'big'))
await writer.drain()
await asyncio.wait_for(_read_announce(), 10) await asyncio.wait_for(_read_announce(), 10)
case GossipCommands.PEER_EXCHANGE: case GossipCommands.PEER_EXCHANGE:
@ -111,14 +110,12 @@ def gossip_server():
break break
await writer.drain() await writer.drain()
writer.close()
async def main(): async def main():
server = await asyncio.start_unix_server( server = await asyncio.start_unix_server(
peer_connected, gossip_server_socket_file peer_connected, gossip_server_socket_file
) )
async with server: async with server:
await server.serve_forever() await server.serve_forever()

View File

@ -6,9 +6,8 @@ doesn't apply for blocks in the gossip queue that are awaiting
descision to fluff or stem descision to fluff or stem
""" """
from asyncio import IncompleteReadError, wait_for from asyncio import IncompleteReadError, wait_for, Queue
import queue
import traceback import traceback
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from time import time from time import time
@ -54,7 +53,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
raise ValueError( raise ValueError(
"Peer's specified time offset skewed too far into the future") "Peer's specified time offset skewed too far into the future")
newly_stored_blocks = queue.Queue() newly_stored_blocks = Queue()
def _add_to_queue(bl): def _add_to_queue(bl):
newly_stored_blocks.put_nowait(bl) newly_stored_blocks.put_nowait(bl)
@ -70,7 +69,6 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
if int.from_bytes(await reader.readexactly(1), 'big') == 0: if int.from_bytes(await reader.readexactly(1), 'big') == 0:
return return
await writer.drain()
# write block size # write block size
writer.write( writer.write(
str(len(block.raw)).zfill(BLOCK_SIZE_LEN).encode('utf-8')) str(len(block.raw)).zfill(BLOCK_SIZE_LEN).encode('utf-8'))
@ -95,7 +93,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
# Diffuse blocks stored since we started this stream # Diffuse blocks stored since we started this stream
while keep_writing: while keep_writing:
await _send_block(newly_stored_blocks.get()) await _send_block(await newly_stored_blocks.get())
try: try:
keep_writing = bool( keep_writing = bool(
int.from_bytes(await reader.readexactly(1), 'big') int.from_bytes(await reader.readexactly(1), 'big')

View File

@ -1,19 +1,23 @@
import config import config
import logger import logger
from gossip.peerset import gossip_peer_set
from getsocks import get_socks from getsocks import get_socks
from torpeer import TorPeer from torpeer import TorPeer
MAX_TOR_PEERS = 20
def on_announce_rec(api, data=None): def on_announce_rec(api, data=None):
socks_address, socks_port = get_socks()[0]
announced: str = data['address'] announced: str = data['address']
try: try:
announced = announced.decode('utf-8') announced = announced.decode('utf-8')
except AttributeError: except AttributeError:
pass pass
announced = announced.strip() announced = announced.strip()
if not announced.endswith('.onion'):
return
socks_address, socks_port = get_socks()[0]
if announced.removesuffix('.onion') == config.get( if announced.removesuffix('.onion') == config.get(
'tor.transport_address', '').removesuffix('.onion'): 'tor.transport_address', '').removesuffix('.onion'):
@ -21,8 +25,6 @@ def on_announce_rec(api, data=None):
"Received announcement for our own node, which shouldn't happen") "Received announcement for our own node, which shouldn't happen")
return return
if not announced.endswith('.onion'):
announced += '.onion'
logger.info(f"Peer {announced} announced to us.", terminal=True) logger.info(f"Peer {announced} announced to us.", terminal=True)

View File

@ -13,6 +13,9 @@ def on_announce_rec(api, data=None):
except AttributeError: except AttributeError:
pass pass
announced = announced.strip() announced = announced.strip()
if not announced.endswith('.sock'):
return
if announced == gossip_server_socket_file: if announced == gossip_server_socket_file:
logger.warn( logger.warn(

View File

@ -65,4 +65,5 @@ def on_bootstrap(api, data):
target=callback_func, target=callback_func,
args=[UnixPeer(address)], args=[UnixPeer(address)],
daemon=True).start() daemon=True).start()
sleep(1)

View File

@ -1 +1 @@
/dev/shm/onionr655223043/gossip-server.sock /dev/shm/onionr442130151/gossip-server.sock

View File

@ -61,6 +61,8 @@ def on_init(api, data=None):
def on_get_our_transport(api, data=None): def on_get_our_transport(api, data=None):
callback_func = data['callback'] callback_func = data['callback']
for_peer = data['peer'] for_peer = data['peer']
if for_peer.transport_address == gossip_server_socket_file:
return
if data['peer'].__class__ == UnixPeer: if data['peer'].__class__ == UnixPeer:
callback_func(for_peer, gossip_server_socket_file) callback_func(for_peer, gossip_server_socket_file)