lots of work on gossip
This commit is contained in:
parent
4f3da58a60
commit
17b268d9e4
@ -8,9 +8,10 @@ Run with 'help' for usage.
|
|||||||
# Enable pyjion if we can because why not
|
# Enable pyjion if we can because why not
|
||||||
pyjion_enabled = False
|
pyjion_enabled = False
|
||||||
try:
|
try:
|
||||||
import pyjion
|
pass
|
||||||
pyjion.enable()
|
#import pyjion
|
||||||
pyjion_enabled = True
|
#pyjion.enable()
|
||||||
|
#pyjion_enabled = True
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import threading
|
import threading
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from typing import TYPE_CHECKING, Set
|
from typing import TYPE_CHECKING, Set, List
|
||||||
from os import urandom
|
from os import urandom
|
||||||
import queue
|
import queue
|
||||||
|
|
||||||
@ -17,6 +17,7 @@ from .connectpeer import connect_peer
|
|||||||
from .client import gossip_client
|
from .client import gossip_client
|
||||||
from .server import gossip_server
|
from .server import gossip_server
|
||||||
from .commands import GossipCommands
|
from .commands import GossipCommands
|
||||||
|
from .constants import BOOTSTRAP_ATTEMPTS
|
||||||
"""
|
"""
|
||||||
Onionr uses a flavor of Dandelion++ epidemic routing
|
Onionr uses a flavor of Dandelion++ epidemic routing
|
||||||
|
|
||||||
@ -39,19 +40,19 @@ In stem phase, server disables diffusion
|
|||||||
|
|
||||||
|
|
||||||
def start_gossip_threads(
|
def start_gossip_threads(
|
||||||
peer_set: Set['Peer'], block_queue: queue.Queue['Block']):
|
peer_set: Set['Peer'], block_queues: List[queue.Queue['Block']]):
|
||||||
# Peer set is largely handled by the transport plugins
|
# Peer set is largely handled by the transport plugins
|
||||||
# There is a unified set so gossip logic is not repeated
|
# There is a unified set so gossip logic is not repeated
|
||||||
seed = urandom(32)
|
seed = urandom(32)
|
||||||
|
|
||||||
add_onionr_thread(
|
add_onionr_thread(
|
||||||
gossip_server, 1, peer_set, block_queue, seed, initial_sleep=0.2)
|
gossip_server, 1, peer_set, block_queues, seed, initial_sleep=0.2)
|
||||||
|
|
||||||
threading.Thread(
|
threading.Thread(
|
||||||
target=gossip_client,
|
target=gossip_client,
|
||||||
args=[peer_set, block_queue, seed], daemon=True).start()
|
args=[peer_set, block_queues, seed], daemon=True).start()
|
||||||
onionrplugins.events.event('gossip_start', data=peer_set, threaded=True)
|
onionrplugins.events.event('gossip_start', data=peer_set, threaded=True)
|
||||||
for _ in range(2):
|
for _ in range(BOOTSTRAP_ATTEMPTS):
|
||||||
onionrplugins.events.event(
|
onionrplugins.events.event(
|
||||||
'bootstrap', data={'peer_set': peer_set, 'callback': connect_peer},
|
'bootstrap', data={'peer_set': peer_set, 'callback': connect_peer},
|
||||||
threaded=False)
|
threaded=False)
|
||||||
@ -59,5 +60,3 @@ def start_gossip_threads(
|
|||||||
if len(peer_set):
|
if len(peer_set):
|
||||||
return
|
return
|
||||||
logger.error("Could not connect to any peers :(", terminal=True)
|
logger.error("Could not connect to any peers :(", terminal=True)
|
||||||
|
|
||||||
|
|
||||||
|
@ -8,6 +8,7 @@ from typing import Set
|
|||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
from ..connectpeer import connect_peer
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from onionrblocks import Block
|
from onionrblocks import Block
|
||||||
@ -20,7 +21,9 @@ from gossip.phase import DandelionPhase
|
|||||||
from onionrthreads import add_onionr_thread
|
from onionrthreads import add_onionr_thread
|
||||||
|
|
||||||
from .announce import do_announce
|
from .announce import do_announce
|
||||||
#from .peerexchange import get_new_peers
|
from .dandelionstem import stem_out
|
||||||
|
from .peerexchange import get_new_peers
|
||||||
|
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
@ -48,25 +51,25 @@ def gossip_client(
|
|||||||
Stream new blocks
|
Stream new blocks
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _trigger_new_peers_event(new_peer_set: Set['Peer']):
|
do_announce(peer_set)
|
||||||
onionrplugins.events.event(
|
|
||||||
'new_peer',
|
|
||||||
data={'peers': peer_set, 'new_peers': new_peer_set})
|
|
||||||
|
|
||||||
add_onionr_thread(do_announce, 3600, peer_set, initial_sleep=10)
|
|
||||||
"""
|
|
||||||
add_onionr_thread(
|
add_onionr_thread(
|
||||||
get_new_peers,
|
get_new_peers,
|
||||||
1200, peer_set, _trigger_new_peers_event, initial_sleep=5)
|
1200, peer_set, initial_sleep=5)
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
dandelion_phase = DandelionPhase(dandelion_seed, 30)
|
dandelion_phase = DandelionPhase(dandelion_seed, 30)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
while not len(peer_set):
|
while not len(peer_set):
|
||||||
sleep(0.2)
|
sleep(0.2)
|
||||||
if dandelion_phase.is_stem_phase():
|
if dandelion_phase.is_stem_phase():
|
||||||
pass
|
try:
|
||||||
|
# Stem out blocks for (roughly) remaining epoch time
|
||||||
|
stem_out(
|
||||||
|
block_queue, peer_set, dandelion_phase)
|
||||||
|
except TimeoutError:
|
||||||
|
continue
|
||||||
|
continue
|
||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
20
src/gossip/client/dandelionstem.py
Normal file
20
src/gossip/client/dandelionstem.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
from queue import Queue
|
||||||
|
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING, Set
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from onionrblocks import Block
|
||||||
|
from ..peer import Peer
|
||||||
|
from ..phase import DandelionPhase
|
||||||
|
|
||||||
|
def stem_out(
|
||||||
|
block_queue: Queue['Block'],
|
||||||
|
peer_set: Set['Block'],
|
||||||
|
d_phase: DandelionPhase):
|
||||||
|
block = block_queue.get(block=True, timeout=time_remaining_secs)
|
||||||
|
raw_block = block.raw
|
||||||
|
block_size = len(block.raw)
|
||||||
|
block_id = block.id
|
||||||
|
del block
|
||||||
|
|
@ -0,0 +1,64 @@
|
|||||||
|
from threading import Thread
|
||||||
|
from time import sleep
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from socket import socket
|
||||||
|
|
||||||
|
from onionrplugins import onionrevents
|
||||||
|
|
||||||
|
from ..peer import Peer
|
||||||
|
from ..commands import GossipCommands, command_to_byte
|
||||||
|
from ..constants import PEER_AMOUNT_TO_ASK, TRANSPORT_SIZE_BYTES
|
||||||
|
from .. import connectpeer
|
||||||
|
|
||||||
|
MAX_PEERS = 10
|
||||||
|
|
||||||
|
|
||||||
|
def _ask_peer(peer, peer_set):
|
||||||
|
s: 'socket' = peer.get_socket()
|
||||||
|
s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE))
|
||||||
|
# Get 10 max peers
|
||||||
|
for _ in range(MAX_PEERS):
|
||||||
|
peer = s.recv(TRANSPORT_SIZE_BYTES)
|
||||||
|
if not peer:
|
||||||
|
break
|
||||||
|
connect_data = {
|
||||||
|
'address': peer,
|
||||||
|
'callback': connectpeer.connect_peer,
|
||||||
|
'peer_set': peer_set
|
||||||
|
}
|
||||||
|
onionrevents.event('announce_rec', data=connect_data, threaded=True)
|
||||||
|
s.close()
|
||||||
|
|
||||||
|
|
||||||
|
def get_new_peers(peer_set):
|
||||||
|
while not len(peer_set):
|
||||||
|
sleep(0.5)
|
||||||
|
|
||||||
|
# Deep copy the peer list
|
||||||
|
peer_list: Peer = list(peer_set)
|
||||||
|
peers_we_ask: Peer = []
|
||||||
|
asked_count = 0
|
||||||
|
|
||||||
|
while asked_count < PEER_AMOUNT_TO_ASK:
|
||||||
|
try:
|
||||||
|
peers_we_ask.append(peer_list.pop())
|
||||||
|
except IndexError:
|
||||||
|
break
|
||||||
|
asked_count += 1
|
||||||
|
|
||||||
|
if not len(peers_we_ask):
|
||||||
|
raise ValueError("No peers present in pool during get_new_peers")
|
||||||
|
peer_list.clear() # Clear the deep copy so it doesn't occupy memory
|
||||||
|
|
||||||
|
# Start threads to ask the peers for more peers
|
||||||
|
threads = []
|
||||||
|
for peer in peers_we_ask:
|
||||||
|
t = Thread(target=_ask_peer, args=[peer, peer_set], daemon=True)
|
||||||
|
t.start()
|
||||||
|
threads.append(t)
|
||||||
|
peers_we_ask.clear()
|
||||||
|
# Wait for the threads to finish because this function is on a timer
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
5
src/gossip/constants.py
Normal file
5
src/gossip/constants.py
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
BOOTSTRAP_ATTEMPTS = 5
|
||||||
|
PEER_AMOUNT_TO_ASK = 3
|
||||||
|
TRANSPORT_SIZE_BYTES = 64
|
||||||
|
BLOCK_MAX_SIZE = 1024 * 2000
|
||||||
|
BLOCK_ID_SIZE = 128
|
2
src/gossip/graph.py
Normal file
2
src/gossip/graph.py
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
class DandelionGraph:
|
||||||
|
def
|
@ -26,6 +26,11 @@ class DandelionPhase:
|
|||||||
self._is_stem = False
|
self._is_stem = False
|
||||||
|
|
||||||
|
|
||||||
|
def remaining_time(self) -> int:
|
||||||
|
current_time = int(time())
|
||||||
|
return self.epoch_interval - (current_time - self.epoch)
|
||||||
|
|
||||||
|
|
||||||
def is_stem_phase(self) -> bool:
|
def is_stem_phase(self) -> bool:
|
||||||
current_time = int(time())
|
current_time = int(time())
|
||||||
if current_time - self.epoch >= self.epoch_interval:
|
if current_time - self.epoch >= self.epoch_interval:
|
||||||
|
@ -1,18 +1,22 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
from typing import Set
|
from typing import Set, List
|
||||||
|
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
from .connectpeer import connect_peer
|
|
||||||
|
from gossip import constants
|
||||||
|
from ..connectpeer import connect_peer
|
||||||
|
|
||||||
from onionrplugins import onionrevents
|
from onionrplugins import onionrevents
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from onionrblocks import Block
|
from onionrblocks import Block
|
||||||
from peer import Peer
|
from peer import Peer
|
||||||
|
from asyncio import StreamReader, StreamWriter
|
||||||
|
|
||||||
from filepaths import gossip_server_socket_file
|
from filepaths import gossip_server_socket_file
|
||||||
from .commands import GossipCommands
|
from ..commands import GossipCommands
|
||||||
|
from .acceptstem import accept_stem_blocks
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
@ -31,10 +35,11 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|||||||
|
|
||||||
def gossip_server(
|
def gossip_server(
|
||||||
peer_set: Set['Peer'],
|
peer_set: Set['Peer'],
|
||||||
block_queue: Queue['Block'],
|
block_queues: List[Queue['Block']],
|
||||||
dandelion_seed: bytes):
|
dandelion_seed: bytes):
|
||||||
|
|
||||||
async def peer_connected(reader, writer):
|
async def peer_connected(
|
||||||
|
reader: 'StreamReader', writer: 'StreamWriter'):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
cmd = await asyncio.wait_for(reader.read(1), 60)
|
cmd = await asyncio.wait_for(reader.read(1), 60)
|
||||||
@ -51,7 +56,8 @@ def gossip_server(
|
|||||||
pass
|
pass
|
||||||
case GossipCommands.ANNOUNCE:
|
case GossipCommands.ANNOUNCE:
|
||||||
async def _read_announce():
|
async def _read_announce():
|
||||||
address = await reader.read(56)
|
address = await reader.read(
|
||||||
|
constants.TRANSPORT_SIZE_BYTES)
|
||||||
onionrevents.event(
|
onionrevents.event(
|
||||||
'announce_rec',
|
'announce_rec',
|
||||||
data={'peer_set': peer_set,
|
data={'peer_set': peer_set,
|
||||||
@ -60,6 +66,14 @@ def gossip_server(
|
|||||||
threaded=True)
|
threaded=True)
|
||||||
writer.write(int(1).to_bytes(1, 'big'))
|
writer.write(int(1).to_bytes(1, 'big'))
|
||||||
await asyncio.wait_for(_read_announce(), 10)
|
await asyncio.wait_for(_read_announce(), 10)
|
||||||
|
case GossipCommands.PEER_EXCHANGE:
|
||||||
|
for peer in peer_set:
|
||||||
|
writer.write(
|
||||||
|
peer.transport_address.encode(
|
||||||
|
'utf-8').removesuffix(b'.onion'))
|
||||||
|
case GossipCommands.PUT_BLOCKS:
|
||||||
|
# Create block queue & append stemmed blocks to it
|
||||||
|
await accept_stem_blocks(block_queues, reader, writer)
|
||||||
break
|
break
|
||||||
|
|
||||||
await writer.drain()
|
await writer.drain()
|
48
src/gossip/server/acceptstem.py
Normal file
48
src/gossip/server/acceptstem.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
from typing import TYPE_CHECKING
|
||||||
|
from typing import List
|
||||||
|
from queue import Queue
|
||||||
|
from time import time
|
||||||
|
from asyncio import wait_for
|
||||||
|
|
||||||
|
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE
|
||||||
|
|
||||||
|
|
||||||
|
block_size_digits = len(str(BLOCK_MAX_SIZE))
|
||||||
|
base_wait_timeout = 10
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from onionrblocks import Block
|
||||||
|
from asyncio import StreamWriter, StreamReader
|
||||||
|
|
||||||
|
|
||||||
|
async def accept_stem_blocks(
|
||||||
|
block_queues: List[Queue['Block']],
|
||||||
|
reader: 'StreamReader',
|
||||||
|
writer: 'StreamWriter'):
|
||||||
|
|
||||||
|
# Start getting the first block
|
||||||
|
read_routine = reader.read(BLOCK_ID_SIZE)
|
||||||
|
stream_start_time = int(time())
|
||||||
|
max_accept_blocks = 1000
|
||||||
|
|
||||||
|
q = Queue()
|
||||||
|
block_queues.append(q)
|
||||||
|
|
||||||
|
for _ in range(max_accept_blocks):
|
||||||
|
block_id = await wait_for(read_routine, base_wait_timeout)
|
||||||
|
block_size = int(
|
||||||
|
await wait_for(
|
||||||
|
reader.read(block_size_digits),
|
||||||
|
base_wait_timeout)).decode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
if not all(c in "0123456789" for c in block_size):
|
||||||
|
raise ValueError("Invalid block size data (non 0-9 char)")
|
||||||
|
if block_size > BLOCK_MAX_SIZE:
|
||||||
|
raise ValueError("Max block size")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -78,7 +78,7 @@ def on_gossip_start(api, data: Set[Peer] = None):
|
|||||||
controller.authenticate()
|
controller.authenticate()
|
||||||
logger.info(
|
logger.info(
|
||||||
"Tor socks is listening on " +
|
"Tor socks is listening on " +
|
||||||
f"{controller.get_listeners('SOCKS')}", terminal=True)
|
f"{controller.get_listeners('SOCKS')[0]}", terminal=True)
|
||||||
key = config.get('tor.key')
|
key = config.get('tor.key')
|
||||||
new_address = ''
|
new_address = ''
|
||||||
if not key:
|
if not key:
|
||||||
|
@ -4,6 +4,8 @@ import socks
|
|||||||
class TorPeer:
|
class TorPeer:
|
||||||
|
|
||||||
def __init__(self, socks_host, socks_port, onion_address):
|
def __init__(self, socks_host, socks_port, onion_address):
|
||||||
|
if not onion_address or onion_address == '.onion':
|
||||||
|
raise ValueError("Invalid transport address")
|
||||||
self.transport_address = self.onion_address = onion_address
|
self.transport_address = self.onion_address = onion_address
|
||||||
self.socks_host = socks_host
|
self.socks_host = socks_host
|
||||||
self.socks_port = socks_port
|
self.socks_port = socks_port
|
||||||
|
Loading…
Reference in New Issue
Block a user