Refactored gossip peer set, gossip block queues, and dandelionphase to use singleton/module instances instead of being passed around
This commit is contained in:
parent
9c44069248
commit
b07c176f5c
@ -53,10 +53,6 @@ class PrivateAPI:
|
|||||||
self.startTime = epoch.get_epoch()
|
self.startTime = epoch.get_epoch()
|
||||||
app = flask.Flask(__name__)
|
app = flask.Flask(__name__)
|
||||||
|
|
||||||
self.gossip_block_queue: 'queue.Queue' = None
|
|
||||||
self.gossip_peer_set: Set['Peer'] = None
|
|
||||||
|
|
||||||
|
|
||||||
bind_port = int(config.get('client.client.port', 59496))
|
bind_port = int(config.get('client.client.port', 59496))
|
||||||
self.bindPort = bind_port
|
self.bindPort = bind_port
|
||||||
|
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
import threading
|
import threading
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from typing import TYPE_CHECKING, Set, Tuple
|
from typing import TYPE_CHECKING, Set, Tuple
|
||||||
from os import urandom
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ordered_set import OrderedSet
|
from ordered_set import OrderedSet
|
||||||
@ -17,8 +16,8 @@ import logger
|
|||||||
from .connectpeer import connect_peer
|
from .connectpeer import connect_peer
|
||||||
from .client import gossip_client
|
from .client import gossip_client
|
||||||
from .server import gossip_server
|
from .server import gossip_server
|
||||||
from .commands import GossipCommands
|
|
||||||
from .constants import BOOTSTRAP_ATTEMPTS
|
from .constants import BOOTSTRAP_ATTEMPTS
|
||||||
|
from .peerset import gossip_peer_set
|
||||||
"""
|
"""
|
||||||
Onionr uses a flavor of Dandelion++ epidemic routing
|
Onionr uses a flavor of Dandelion++ epidemic routing
|
||||||
|
|
||||||
@ -40,25 +39,21 @@ In stem phase, server disables diffusion
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def start_gossip_threads(
|
def start_gossip_threads():
|
||||||
peer_set: "OrderedSet[Peer]",
|
|
||||||
block_queues: Tuple["Queue[Block]"]):
|
|
||||||
# Peer set is largely handled by the transport plugins
|
# Peer set is largely handled by the transport plugins
|
||||||
# There is a unified set so gossip logic is not repeated
|
# There is a unified set so gossip logic is not repeated
|
||||||
seed = urandom(32)
|
|
||||||
|
|
||||||
add_onionr_thread(
|
add_onionr_thread(
|
||||||
gossip_server, 1, peer_set, block_queues, seed, initial_sleep=0.2)
|
gossip_server, 1, initial_sleep=0.2)
|
||||||
|
|
||||||
threading.Thread(
|
threading.Thread(
|
||||||
target=gossip_client,
|
target=gossip_client, daemon=True).start()
|
||||||
args=[peer_set, block_queues, seed], daemon=True).start()
|
onionrplugins.events.event('gossip_start', data=None, threaded=True)
|
||||||
onionrplugins.events.event('gossip_start', data=peer_set, threaded=True)
|
|
||||||
for _ in range(BOOTSTRAP_ATTEMPTS):
|
for _ in range(BOOTSTRAP_ATTEMPTS):
|
||||||
onionrplugins.events.event(
|
onionrplugins.events.event(
|
||||||
'bootstrap', data={'peer_set': peer_set, 'callback': connect_peer},
|
'bootstrap', data={'callback': connect_peer},
|
||||||
threaded=False)
|
threaded=False)
|
||||||
sleep(60)
|
sleep(60)
|
||||||
if len(peer_set):
|
if len(gossip_peer_set):
|
||||||
return
|
return
|
||||||
logger.error("Could not connect to any peers :(", terminal=True)
|
logger.error("Could not connect to any peers :(", terminal=True)
|
||||||
|
30
src/gossip/blockqueues.py
Normal file
30
src/gossip/blockqueues.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
from queue import Queue
|
||||||
|
"""Onionr - Private P2P Communication.
|
||||||
|
|
||||||
|
block_queues where all received or created blocks are placed
|
||||||
|
|
||||||
|
Blocks are placed here before being sent to the network, the reason they are in
|
||||||
|
2 queues is for dandelion++ implementation.
|
||||||
|
|
||||||
|
The queues are drained randomly to incoming or outgoing edges depending on
|
||||||
|
dandelion++ phase
|
||||||
|
"""
|
||||||
|
from typing import Tuple, TYPE_CHECKING
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from onionrblocks import Block
|
||||||
|
"""
|
||||||
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""
|
||||||
|
|
||||||
|
gossip_block_queues: Tuple["Queue[Block]", "Queue[Block]"] = (Queue(), Queue())
|
@ -33,6 +33,7 @@ from blockdb import add_block_to_db
|
|||||||
from .announce import do_announce
|
from .announce import do_announce
|
||||||
from .dandelionstem import stem_out
|
from .dandelionstem import stem_out
|
||||||
from .peerexchange import get_new_peers
|
from .peerexchange import get_new_peers
|
||||||
|
from ..peerset import gossip_peer_set
|
||||||
|
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
@ -50,10 +51,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def gossip_client(
|
def gossip_client():
|
||||||
peer_set: "OrderedSet[Peer]",
|
|
||||||
block_queues: Tuple["Queue[Block]", "Queue[Block]"],
|
|
||||||
dandelion_seed: bytes):
|
|
||||||
"""
|
"""
|
||||||
Gossip client does the following:
|
Gossip client does the following:
|
||||||
|
|
||||||
@ -61,7 +59,7 @@ def gossip_client(
|
|||||||
Stream new blocks
|
Stream new blocks
|
||||||
"""
|
"""
|
||||||
bl: Block
|
bl: Block
|
||||||
do_announce(peer_set)
|
do_announce()
|
||||||
|
|
||||||
# Start a thread that runs every 1200 secs to
|
# Start a thread that runs every 1200 secs to
|
||||||
# Ask peers for a subset for their peer set
|
# Ask peers for a subset for their peer set
|
||||||
@ -70,12 +68,12 @@ def gossip_client(
|
|||||||
# transport plugin handles the new peer
|
# transport plugin handles the new peer
|
||||||
add_onionr_thread(
|
add_onionr_thread(
|
||||||
get_new_peers,
|
get_new_peers,
|
||||||
1200, peer_set, initial_sleep=5)
|
1200, initial_sleep=5)
|
||||||
|
|
||||||
dandelion_phase = DandelionPhase(dandelion_seed, DANDELION_EPOCH_LENGTH)
|
dandelion_phase = DandelionPhase(DANDELION_EPOCH_LENGTH)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
while not len(peer_set):
|
while not len(gossip_peer_set):
|
||||||
sleep(0.2)
|
sleep(0.2)
|
||||||
if dandelion_phase.remaining_time() <= 10:
|
if dandelion_phase.remaining_time() <= 10:
|
||||||
sleep(dandelion_phase.remaining_time())
|
sleep(dandelion_phase.remaining_time())
|
||||||
@ -83,8 +81,7 @@ def gossip_client(
|
|||||||
logger.debug("Entering stem phase", terminal=True)
|
logger.debug("Entering stem phase", terminal=True)
|
||||||
try:
|
try:
|
||||||
# Stem out blocks for (roughly) remaining epoch time
|
# Stem out blocks for (roughly) remaining epoch time
|
||||||
asyncio.run(stem_out(
|
asyncio.run(stem_out())
|
||||||
block_queues, peer_set, dandelion_phase))
|
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
continue
|
continue
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -93,4 +90,4 @@ def gossip_client(
|
|||||||
else:
|
else:
|
||||||
logger.debug("Entering fluff phase", terminal=True)
|
logger.debug("Entering fluff phase", terminal=True)
|
||||||
# Add block to primary block db, where the diffuser can read it
|
# Add block to primary block db, where the diffuser can read it
|
||||||
store_blocks(block_queues, dandelion_phase)
|
store_blocks(dandelion_phase)
|
||||||
|
@ -6,11 +6,13 @@ if TYPE_CHECKING:
|
|||||||
from .. import Peer
|
from .. import Peer
|
||||||
|
|
||||||
import logger
|
import logger
|
||||||
from ..commands import GossipCommands, command_to_byte
|
|
||||||
import onionrplugins
|
import onionrplugins
|
||||||
|
|
||||||
|
from ..commands import GossipCommands, command_to_byte
|
||||||
|
from ..peerset import gossip_peer_set
|
||||||
|
|
||||||
def do_announce(peer_set):
|
|
||||||
|
def do_announce():
|
||||||
"Announce with N peers of each identified transport"
|
"Announce with N peers of each identified transport"
|
||||||
def _announce(announce_peer: 'Peer', our_transport_address: str):
|
def _announce(announce_peer: 'Peer', our_transport_address: str):
|
||||||
try:
|
try:
|
||||||
@ -25,13 +27,13 @@ def do_announce(peer_set):
|
|||||||
f"Could not announce with {announce_peer.transport_address}")
|
f"Could not announce with {announce_peer.transport_address}")
|
||||||
sock.close()
|
sock.close()
|
||||||
|
|
||||||
while not len(peer_set):
|
while not len(gossip_peer_set):
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
|
||||||
per_transport = 3
|
per_transport = 3
|
||||||
peer_types = {}
|
peer_types = {}
|
||||||
count_for_peer = 0
|
count_for_peer = 0
|
||||||
for peer in peer_set:
|
for peer in gossip_peer_set:
|
||||||
try:
|
try:
|
||||||
count_for_peer = peer_types[peer.__class__]
|
count_for_peer = peer_types[peer.__class__]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -14,6 +14,8 @@ import logger
|
|||||||
from ...constants import BLACKHOLE_EVADE_TIMER_SECS, OUTBOUND_DANDELION_EDGES
|
from ...constants import BLACKHOLE_EVADE_TIMER_SECS, OUTBOUND_DANDELION_EDGES
|
||||||
from ...commands import GossipCommands, command_to_byte
|
from ...commands import GossipCommands, command_to_byte
|
||||||
from ... import dandelion
|
from ... import dandelion
|
||||||
|
from ...blockqueues import gossip_block_queues
|
||||||
|
from ...peerset import gossip_peer_set
|
||||||
|
|
||||||
from .stemstream import do_stem_stream
|
from .stemstream import do_stem_stream
|
||||||
|
|
||||||
@ -69,19 +71,16 @@ async def _setup_edge(
|
|||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
|
|
||||||
async def stem_out(
|
async def stem_out(d_phase: 'DandelionPhase'):
|
||||||
block_queues: Tuple["Queue[Block]", "Queue[Block]"],
|
|
||||||
peer_set: "OrderedSet[Peer]",
|
|
||||||
d_phase: 'DandelionPhase'):
|
|
||||||
|
|
||||||
# don't bother if there are no possible outbound edges
|
# don't bother if there are no possible outbound edges
|
||||||
if not len(peer_set):
|
if not len(gossip_peer_set):
|
||||||
sleep(1)
|
sleep(1)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Spawn threads with deep copied block queue to add to db after time
|
# Spawn threads with deep copied block queue to add to db after time
|
||||||
# for black hole attack
|
# for black hole attack
|
||||||
for block_q in block_queues:
|
for block_q in gossip_block_queues:
|
||||||
add_delayed_thread(
|
add_delayed_thread(
|
||||||
lambda q: set(map(add_block_to_db, q)),
|
lambda q: set(map(add_block_to_db, q)),
|
||||||
BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue))
|
BLACKHOLE_EVADE_TIMER_SECS, list(block_q.queue))
|
||||||
@ -96,7 +95,7 @@ async def stem_out(
|
|||||||
while len(peer_sockets) < OUTBOUND_DANDELION_EDGES:
|
while len(peer_sockets) < OUTBOUND_DANDELION_EDGES:
|
||||||
try:
|
try:
|
||||||
# Get a socket for stem out (makes sure they accept)
|
# Get a socket for stem out (makes sure they accept)
|
||||||
peer_sockets.append(await _setup_edge(peer_set, tried_edges))
|
peer_sockets.append(await _setup_edge(gossip_peer_set, tried_edges))
|
||||||
except NotEnoughEdges:
|
except NotEnoughEdges:
|
||||||
# No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE)
|
# No possible edges at this point (edges < OUTBOUND_DANDELION_EDGE)
|
||||||
logger.warn("Not able to build enough peers for stemout.",
|
logger.warn("Not able to build enough peers for stemout.",
|
||||||
@ -116,7 +115,7 @@ async def stem_out(
|
|||||||
|
|
||||||
for count, peer_socket in enumerate(peer_sockets):
|
for count, peer_socket in enumerate(peer_sockets):
|
||||||
stream_routines.append(
|
stream_routines.append(
|
||||||
do_stem_stream(peer_socket, block_queues[count], d_phase))
|
do_stem_stream(peer_socket, gossip_block_queues[count], d_phase))
|
||||||
|
|
||||||
for routine in stream_routines:
|
for routine in stream_routines:
|
||||||
try:
|
try:
|
||||||
|
@ -10,11 +10,12 @@ from ..peer import Peer
|
|||||||
from ..commands import GossipCommands, command_to_byte
|
from ..commands import GossipCommands, command_to_byte
|
||||||
from ..constants import PEER_AMOUNT_TO_ASK, TRANSPORT_SIZE_BYTES
|
from ..constants import PEER_AMOUNT_TO_ASK, TRANSPORT_SIZE_BYTES
|
||||||
from .. import connectpeer
|
from .. import connectpeer
|
||||||
|
from ..peerset import gossip_peer_set
|
||||||
|
|
||||||
MAX_PEERS = 10
|
MAX_PEERS = 10
|
||||||
|
|
||||||
|
|
||||||
def _ask_peer(peer, peer_set):
|
def _ask_peer(peer):
|
||||||
s: 'socket' = peer.get_socket(12)
|
s: 'socket' = peer.get_socket(12)
|
||||||
s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE))
|
s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE))
|
||||||
# Get 10 max peers
|
# Get 10 max peers
|
||||||
@ -24,19 +25,18 @@ def _ask_peer(peer, peer_set):
|
|||||||
break
|
break
|
||||||
connect_data = {
|
connect_data = {
|
||||||
'address': peer,
|
'address': peer,
|
||||||
'callback': connectpeer.connect_peer,
|
'callback': connectpeer.connect_peer
|
||||||
'peer_set': peer_set
|
|
||||||
}
|
}
|
||||||
onionrevents.event('announce_rec', data=connect_data, threaded=True)
|
onionrevents.event('announce_rec', data=connect_data, threaded=True)
|
||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
|
|
||||||
def get_new_peers(peer_set):
|
def get_new_peers():
|
||||||
while not len(peer_set):
|
while not len(gossip_peer_set):
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
|
|
||||||
# Deep copy the peer list
|
# Deep copy the peer list
|
||||||
peer_list: Peer = list(peer_set)
|
peer_list: Peer = list(gossip_peer_set)
|
||||||
peers_we_ask: Peer = []
|
peers_we_ask: Peer = []
|
||||||
asked_count = 0
|
asked_count = 0
|
||||||
|
|
||||||
@ -54,7 +54,7 @@ def get_new_peers(peer_set):
|
|||||||
# Start threads to ask the peers for more peers
|
# Start threads to ask the peers for more peers
|
||||||
threads = []
|
threads = []
|
||||||
for peer in peers_we_ask:
|
for peer in peers_we_ask:
|
||||||
t = Thread(target=_ask_peer, args=[peer, peer_set], daemon=True)
|
t = Thread(target=_ask_peer, args=[peer, gossip_peer_set], daemon=True)
|
||||||
t.start()
|
t.start()
|
||||||
threads.append(t)
|
threads.append(t)
|
||||||
peers_we_ask.clear()
|
peers_we_ask.clear()
|
||||||
|
@ -9,10 +9,10 @@ if TYPE_CHECKING:
|
|||||||
from onionrblocks import Block
|
from onionrblocks import Block
|
||||||
from ..dandelion.phase import DandelionPhase
|
from ..dandelion.phase import DandelionPhase
|
||||||
|
|
||||||
|
from ..blockqueues import gossip_block_queues
|
||||||
|
|
||||||
def store_blocks(
|
|
||||||
block_queues: Tuple["Queue[Block]", "Queue[Block]"],
|
def store_blocks(dandelion_phase: 'DandelionPhase'):
|
||||||
dandelion_phase: 'DandelionPhase'):
|
|
||||||
|
|
||||||
new_queue: "Queue[Block]" = Queue()
|
new_queue: "Queue[Block]" = Queue()
|
||||||
|
|
||||||
@ -26,7 +26,7 @@ def store_blocks(
|
|||||||
except Empty:
|
except Empty:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
for block_queue in block_queues:
|
for block_queue in gossip_block_queues:
|
||||||
Thread(target=_watch_queue, args=[block_queue], daemon=True).start()
|
Thread(target=_watch_queue, args=[block_queue], daemon=True).start()
|
||||||
|
|
||||||
while not dandelion_phase.is_stem_phase() \
|
while not dandelion_phase.is_stem_phase() \
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
from gossip.commands import GossipCommands, command_to_byte
|
from gossip.commands import GossipCommands, command_to_byte
|
||||||
|
from .peerset import gossip_peer_set
|
||||||
|
|
||||||
import logger
|
import logger
|
||||||
|
|
||||||
|
|
||||||
def connect_peer(peer_set, peer):
|
def connect_peer(peer):
|
||||||
if peer in peer_set:
|
if peer in gossip_peer_set:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
s = peer.get_socket(12)
|
s = peer.get_socket(12)
|
||||||
@ -12,5 +14,5 @@ def connect_peer(peer_set, peer):
|
|||||||
else:
|
else:
|
||||||
s.sendall(command_to_byte(GossipCommands.CLOSE))
|
s.sendall(command_to_byte(GossipCommands.CLOSE))
|
||||||
s.close()
|
s.close()
|
||||||
peer_set.add(peer)
|
gossip_peer_set.add(peer)
|
||||||
logger.info(f"connected to {peer.transport_address}")
|
logger.info(f"connected to {peer.transport_address}")
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
from time import time
|
from time import time
|
||||||
from hashlib import shake_128
|
from hashlib import shake_128
|
||||||
from secrets import randbits
|
from secrets import randbits
|
||||||
|
import secrets
|
||||||
|
|
||||||
|
seed = secrets.token_bytes(32)
|
||||||
|
|
||||||
|
|
||||||
class DandelionPhase:
|
class DandelionPhase:
|
||||||
def __init__(self, seed: bytes, epoch_interval_secs: int):
|
def __init__(self, epoch_interval_secs: int):
|
||||||
self.seed = seed # Seed intended to be from good random source like urandom
|
assert len(seed) == 32
|
||||||
assert len(self.seed) == 32
|
|
||||||
self.epoch = int(time())
|
self.epoch = int(time())
|
||||||
self.epoch_interval = epoch_interval_secs
|
self.epoch_interval = epoch_interval_secs
|
||||||
self._is_stem = bool(randbits(1))
|
self._is_stem = bool(randbits(1))
|
||||||
@ -18,7 +20,7 @@ class DandelionPhase:
|
|||||||
# Hash the seed with the time stamp to produce 8 pseudorandom bytes
|
# Hash the seed with the time stamp to produce 8 pseudorandom bytes
|
||||||
# Produce an len(8) byte string for time as well for year 2038 problem
|
# Produce an len(8) byte string for time as well for year 2038 problem
|
||||||
self.phase_id = shake_128(
|
self.phase_id = shake_128(
|
||||||
self.seed +
|
seed +
|
||||||
int.to_bytes(cur_time, 8, 'big')).digest(8)
|
int.to_bytes(cur_time, 8, 'big')).digest(8)
|
||||||
|
|
||||||
# Use first byte of phase id as random source for stem phase picking
|
# Use first byte of phase id as random source for stem phase picking
|
||||||
|
25
src/gossip/peerset.py
Normal file
25
src/gossip/peerset.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
"""Onionr - Private P2P Communication.
|
||||||
|
|
||||||
|
singleton set of gossip peers
|
||||||
|
"""
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .peer import Peer
|
||||||
|
from ordered_set import OrderedSet
|
||||||
|
"""
|
||||||
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""
|
||||||
|
|
||||||
|
gossip_peer_set: OrderedSet['Peer'] = OrderedSet()
|
@ -19,6 +19,7 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
from filepaths import gossip_server_socket_file
|
from filepaths import gossip_server_socket_file
|
||||||
from ..commands import GossipCommands
|
from ..commands import GossipCommands
|
||||||
|
from ..peerset import gossip_peer_set
|
||||||
from .acceptstem import accept_stem_blocks
|
from .acceptstem import accept_stem_blocks
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
@ -38,10 +39,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|||||||
inbound_dandelion_edge_count = [0]
|
inbound_dandelion_edge_count = [0]
|
||||||
|
|
||||||
|
|
||||||
def gossip_server(
|
def gossip_server():
|
||||||
peer_set: "OrderedSet[Peer]",
|
|
||||||
block_queues: Tuple["Queue[Block]", "Queue[Block]"],
|
|
||||||
dandelion_seed: bytes):
|
|
||||||
|
|
||||||
async def peer_connected(
|
async def peer_connected(
|
||||||
reader: 'StreamReader', writer: 'StreamWriter'):
|
reader: 'StreamReader', writer: 'StreamWriter'):
|
||||||
@ -66,14 +64,13 @@ def gossip_server(
|
|||||||
constants.TRANSPORT_SIZE_BYTES)
|
constants.TRANSPORT_SIZE_BYTES)
|
||||||
onionrevents.event(
|
onionrevents.event(
|
||||||
'announce_rec',
|
'announce_rec',
|
||||||
data={'peer_set': peer_set,
|
data={'address': address,
|
||||||
'address': address,
|
|
||||||
'callback': connect_peer},
|
'callback': connect_peer},
|
||||||
threaded=True)
|
threaded=True)
|
||||||
writer.write(int(1).to_bytes(1, 'big'))
|
writer.write(int(1).to_bytes(1, 'big'))
|
||||||
await asyncio.wait_for(_read_announce(), 10)
|
await asyncio.wait_for(_read_announce(), 10)
|
||||||
case GossipCommands.PEER_EXCHANGE:
|
case GossipCommands.PEER_EXCHANGE:
|
||||||
for peer in peer_set:
|
for peer in gossip_peer_set:
|
||||||
writer.write(
|
writer.write(
|
||||||
peer.transport_address.encode(
|
peer.transport_address.encode(
|
||||||
'utf-8').removesuffix(b'.onion'))
|
'utf-8').removesuffix(b'.onion'))
|
||||||
@ -82,7 +79,6 @@ def gossip_server(
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
await accept_stem_blocks(
|
await accept_stem_blocks(
|
||||||
block_queues,
|
|
||||||
reader, writer,
|
reader, writer,
|
||||||
inbound_dandelion_edge_count)
|
inbound_dandelion_edge_count)
|
||||||
except asyncio.exceptions.TimeoutError:
|
except asyncio.exceptions.TimeoutError:
|
||||||
|
@ -9,6 +9,7 @@ from onionrblocks import Block
|
|||||||
from ..dandelion import DandelionPhase, StemAcceptResult
|
from ..dandelion import DandelionPhase, StemAcceptResult
|
||||||
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE
|
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE
|
||||||
from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM
|
from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM
|
||||||
|
from ..blockqueues import gossip_block_queues
|
||||||
|
|
||||||
|
|
||||||
block_size_digits = len(str(BLOCK_MAX_SIZE))
|
block_size_digits = len(str(BLOCK_MAX_SIZE))
|
||||||
@ -20,7 +21,6 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
|
|
||||||
async def accept_stem_blocks(
|
async def accept_stem_blocks(
|
||||||
block_queues: Tuple["Queue[Block]", "Queue[Block]"],
|
|
||||||
reader: 'StreamReader',
|
reader: 'StreamReader',
|
||||||
writer: 'StreamWriter',
|
writer: 'StreamWriter',
|
||||||
inbound_edge_count: List[int]):
|
inbound_edge_count: List[int]):
|
||||||
@ -35,7 +35,7 @@ async def accept_stem_blocks(
|
|||||||
read_routine = reader.read(BLOCK_ID_SIZE)
|
read_routine = reader.read(BLOCK_ID_SIZE)
|
||||||
stream_start_time = int(time())
|
stream_start_time = int(time())
|
||||||
|
|
||||||
block_queue_to_use = secrets.choice(block_queues)
|
block_queue_to_use = secrets.choice(gossip_block_queues)
|
||||||
|
|
||||||
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
|
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
|
||||||
block_id = (
|
block_id = (
|
||||||
|
@ -117,13 +117,7 @@ def daemon():
|
|||||||
events.event('init', threaded=False)
|
events.event('init', threaded=False)
|
||||||
events.event('daemon_start')
|
events.event('daemon_start')
|
||||||
|
|
||||||
shared_state.get(apiservers.ClientAPI).gossip_peer_set = OrderedSet()
|
gossip.start_gossip_threads()
|
||||||
shared_state.get(apiservers.ClientAPI).gossip_block_queues = \
|
|
||||||
(queue.Queue(), queue.Queue())
|
|
||||||
|
|
||||||
gossip.start_gossip_threads(
|
|
||||||
shared_state.get(apiservers.ClientAPI).gossip_peer_set,
|
|
||||||
shared_state.get(apiservers.ClientAPI).gossip_block_queues)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
shared_state.get(apiservers.ClientAPI).start()
|
shared_state.get(apiservers.ClientAPI).start()
|
||||||
|
@ -3,6 +3,7 @@ import logger
|
|||||||
|
|
||||||
from getsocks import get_socks
|
from getsocks import get_socks
|
||||||
from torpeer import TorPeer
|
from torpeer import TorPeer
|
||||||
|
from gossip.peerset import gossip_peer_set
|
||||||
|
|
||||||
|
|
||||||
def on_announce_rec(api, data=None):
|
def on_announce_rec(api, data=None):
|
||||||
@ -21,5 +22,5 @@ def on_announce_rec(api, data=None):
|
|||||||
announced += '.onion'
|
announced += '.onion'
|
||||||
|
|
||||||
data['callback'](
|
data['callback'](
|
||||||
data['peer_set'],
|
gossip_peer_set,
|
||||||
TorPeer(socks_address, socks_port, announced))
|
TorPeer(socks_address, socks_port, announced))
|
||||||
|
@ -3,6 +3,7 @@ from time import sleep
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
from gossip.peer import Peer
|
from gossip.peer import Peer
|
||||||
|
from gossip.peerset import gossip_peer_set
|
||||||
import logger
|
import logger
|
||||||
import config
|
import config
|
||||||
from getsocks import get_socks
|
from getsocks import get_socks
|
||||||
@ -39,7 +40,7 @@ def on_bootstrap(api, data):
|
|||||||
# it will add it to data['peer_set'] if it responds to ping
|
# it will add it to data['peer_set'] if it responds to ping
|
||||||
Thread(
|
Thread(
|
||||||
target=data['callback'],
|
target=data['callback'],
|
||||||
args=[data['peer_set'], TorPeer(socks_address, socks_port, address)],
|
args=[TorPeer(socks_address, socks_port, address)],
|
||||||
daemon=True).start()
|
daemon=True).start()
|
||||||
|
|
||||||
|
|
||||||
|
0
static-data/default-plugins/tor/bootstrap.txt
Normal file
0
static-data/default-plugins/tor/bootstrap.txt
Normal file
@ -1,7 +1,6 @@
|
|||||||
"""Onionr - Private P2P Communication.
|
"""Onionr - Private P2P Communication.
|
||||||
|
|
||||||
This default plugin handles "flow" messages
|
Tor transport plugin
|
||||||
(global chatroom style communication)
|
|
||||||
"""
|
"""
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
Loading…
Reference in New Issue
Block a user