Peer announcing client side done
This commit is contained in:
parent
df3568fc15
commit
8d394c76a7
@ -1,9 +1,11 @@
|
|||||||
|
from time import sleep
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .. import Peer
|
from .. import Peer
|
||||||
|
|
||||||
|
import logger
|
||||||
from ..commands import GossipCommands, command_to_byte
|
from ..commands import GossipCommands, command_to_byte
|
||||||
import onionrplugins
|
import onionrplugins
|
||||||
|
|
||||||
@ -11,27 +13,38 @@ import onionrplugins
|
|||||||
def do_announce(peer_set):
|
def do_announce(peer_set):
|
||||||
"Announce with N peers of each identified transport"
|
"Announce with N peers of each identified transport"
|
||||||
def _announce(announce_peer: 'Peer', our_transport_address: str):
|
def _announce(announce_peer: 'Peer', our_transport_address: str):
|
||||||
|
try:
|
||||||
|
our_transport_address = our_transport_address.encode('utf-8')
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
sock = announce_peer.get_socket()
|
sock = announce_peer.get_socket()
|
||||||
sock.send(
|
sock.send(
|
||||||
command_to_byte(GossipCommands.ANNOUNCE) + our_transport_address)
|
command_to_byte(GossipCommands.ANNOUNCE) + our_transport_address)
|
||||||
if sock.dup
|
if int.from_bytes(sock.recv(1), 'big') != 1:
|
||||||
|
logger.warn(
|
||||||
|
f"Could not announce with {announce_peer.transport_address}")
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
while not len(peer_set):
|
||||||
|
sleep(1)
|
||||||
|
|
||||||
per_transport = 3
|
per_transport = 3
|
||||||
peer_types = {}
|
peer_types = {}
|
||||||
count_for_peer = 0
|
count_for_peer = 0
|
||||||
for peer in peer_set:
|
for peer in peer_set:
|
||||||
try:
|
try:
|
||||||
count_for_peer = peer_types[peer.__name__]
|
count_for_peer = peer_types[peer.__class__]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
peer_types[peer.__name__] = 0
|
count_for_peer = peer_types[peer.__class__] = 0
|
||||||
continue
|
|
||||||
|
|
||||||
if count_for_peer == per_transport:
|
if count_for_peer == per_transport:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Plugin for the transport associated with the peer will call _announce
|
||||||
|
# with the peer and *our* transport address
|
||||||
onionrplugins.events.event(
|
onionrplugins.events.event(
|
||||||
'get_our_transport',
|
'get_our_transport',
|
||||||
data={'callback': _announce, 'peer': peer})
|
data={'callback': _announce, 'peer': peer},
|
||||||
|
threaded=True)
|
||||||
|
|
||||||
peer_types[peer.__name__] += 1
|
peer_types[peer.__class__] += 1
|
||||||
|
@ -4,6 +4,8 @@ from typing import Set
|
|||||||
|
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
|
||||||
|
from onionrplugins import onionrevents
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from onionrblocks import Block
|
from onionrblocks import Block
|
||||||
from peer import Peer
|
from peer import Peer
|
||||||
@ -46,6 +48,15 @@ def gossip_server(
|
|||||||
writer.write(b'PONG')
|
writer.write(b'PONG')
|
||||||
case GossipCommands.CLOSE:
|
case GossipCommands.CLOSE:
|
||||||
writer.close()
|
writer.close()
|
||||||
|
case GossipCommands.ANNOUNCE:
|
||||||
|
async def _read_announce():
|
||||||
|
address = await reader.read(56)
|
||||||
|
onionrevents.event(
|
||||||
|
'announce_rec',
|
||||||
|
data={'peer_set': peer_set, 'address': address},
|
||||||
|
threaded=False)
|
||||||
|
writer.write(int(1).to_bytes(1, 'big'))
|
||||||
|
await asyncio.wait_for(_read_announce(), 10)
|
||||||
|
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
|
|
||||||
|
@ -63,6 +63,17 @@ def on_init(api, data=None):
|
|||||||
f"Tor Transport Plugin v{PLUGIN_VERSION} enabled", terminal=True)
|
f"Tor Transport Plugin v{PLUGIN_VERSION} enabled", terminal=True)
|
||||||
|
|
||||||
|
|
||||||
|
def on_get_our_transport(api, data=None):
|
||||||
|
callback_func = data['callback']
|
||||||
|
for_peer = data['peer']
|
||||||
|
if data['peer'].__class__ == TorPeer:
|
||||||
|
callback_func(for_peer, config.get('tor.transport_address'))
|
||||||
|
|
||||||
|
|
||||||
|
def on_announce_rec(api, data=None):
|
||||||
|
print("got announce rec event")
|
||||||
|
|
||||||
|
|
||||||
def on_bootstrap(api, data: Set[Peer] = None):
|
def on_bootstrap(api, data: Set[Peer] = None):
|
||||||
bootstrap_nodes: Set[str]
|
bootstrap_nodes: Set[str]
|
||||||
peers = data
|
peers = data
|
||||||
@ -89,7 +100,7 @@ def on_bootstrap(api, data: Set[Peer] = None):
|
|||||||
tor_peer = TorPeer(socks_address, socks_port, transport_address)
|
tor_peer = TorPeer(socks_address, socks_port, transport_address)
|
||||||
try:
|
try:
|
||||||
tor_peer.get_socket()
|
tor_peer.get_socket()
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.warn(
|
logger.warn(
|
||||||
f"Could not connnect to Tor peer {transport_address} " +
|
f"Could not connnect to Tor peer {transport_address} " +
|
||||||
"see logs for more info",
|
"see logs for more info",
|
||||||
|
Loading…
Reference in New Issue
Block a user