Implemented server dandelion++ stem portion

This commit is contained in:
Kevin F 2022-03-04 18:05:12 -06:00
parent 17b268d9e4
commit 9d17c7bd64
8 changed files with 73 additions and 24 deletions

View File

@ -8,6 +8,8 @@ from typing import Set
from time import sleep from time import sleep
from queue import Queue from queue import Queue
from ..constants import DANDELION_EPOCH_LENGTH
from ..connectpeer import connect_peer from ..connectpeer import connect_peer
if TYPE_CHECKING: if TYPE_CHECKING:
@ -17,7 +19,7 @@ if TYPE_CHECKING:
import logger import logger
import onionrplugins import onionrplugins
from ..commands import GossipCommands from ..commands import GossipCommands
from gossip.phase import DandelionPhase from gossip.dandelion.phase import DandelionPhase
from onionrthreads import add_onionr_thread from onionrthreads import add_onionr_thread
from .announce import do_announce from .announce import do_announce
@ -57,7 +59,7 @@ def gossip_client(
get_new_peers, get_new_peers,
1200, peer_set, initial_sleep=5) 1200, peer_set, initial_sleep=5)
dandelion_phase = DandelionPhase(dandelion_seed, 30) dandelion_phase = DandelionPhase(dandelion_seed, DANDELION_EPOCH_LENGTH)
while True: while True:
while not len(peer_set): while not len(peer_set):

View File

@ -6,13 +6,13 @@ from typing import TYPE_CHECKING, Set
if TYPE_CHECKING: if TYPE_CHECKING:
from onionrblocks import Block from onionrblocks import Block
from ..peer import Peer from ..peer import Peer
from ..phase import DandelionPhase from ..dandelion.phase import DandelionPhase
def stem_out( def stem_out(
block_queue: Queue['Block'], block_queue: Queue['Block'],
peer_set: Set['Block'], peer_set: Set['Block'],
d_phase: DandelionPhase): d_phase: 'DandelionPhase'):
block = block_queue.get(block=True, timeout=time_remaining_secs) block = block_queue.get(block=True, timeout=5)
raw_block = block.raw raw_block = block.raw
block_size = len(block.raw) block_size = len(block.raw)
block_id = block.id block_id = block.id

View File

@ -2,4 +2,7 @@ BOOTSTRAP_ATTEMPTS = 5
PEER_AMOUNT_TO_ASK = 3 PEER_AMOUNT_TO_ASK = 3
TRANSPORT_SIZE_BYTES = 64 TRANSPORT_SIZE_BYTES = 64
BLOCK_MAX_SIZE = 1024 * 2000 BLOCK_MAX_SIZE = 1024 * 2000
BLOCK_ID_SIZE = 128 BLOCK_ID_SIZE = 128
DANDELION_EPOCH_LENGTH = 60
MAX_INBOUND_DANDELION_EDGE = 2
MAX_STEM_BLOCKS_PER_STREAM = 1000

View File

@ -0,0 +1,7 @@
from enum import Enum
from .phase import DandelionPhase
class StemAcceptResult:
DENY = int(0).to_bytes(1, 'big')
ALLOW = int(1).to_bytes(1, 'big')

View File

@ -1,2 +0,0 @@
class DandelionGraph:
def

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
import traceback
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Set, List from typing import Set, List
@ -8,6 +9,7 @@ from gossip import constants
from ..connectpeer import connect_peer from ..connectpeer import connect_peer
from onionrplugins import onionrevents from onionrplugins import onionrevents
import logger
if TYPE_CHECKING: if TYPE_CHECKING:
from onionrblocks import Block from onionrblocks import Block
@ -32,6 +34,8 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
inbound_dandelion_edge_count = [0]
def gossip_server( def gossip_server(
peer_set: Set['Peer'], peer_set: Set['Peer'],
@ -40,6 +44,7 @@ def gossip_server(
async def peer_connected( async def peer_connected(
reader: 'StreamReader', writer: 'StreamWriter'): reader: 'StreamReader', writer: 'StreamWriter'):
while True: while True:
try: try:
cmd = await asyncio.wait_for(reader.read(1), 60) cmd = await asyncio.wait_for(reader.read(1), 60)
@ -73,7 +78,19 @@ def gossip_server(
'utf-8').removesuffix(b'.onion')) 'utf-8').removesuffix(b'.onion'))
case GossipCommands.PUT_BLOCKS: case GossipCommands.PUT_BLOCKS:
# Create block queue & append stemmed blocks to it # Create block queue & append stemmed blocks to it
await accept_stem_blocks(block_queues, reader, writer)
try:
await accept_stem_blocks(
block_queues,
reader, writer,
inbound_dandelion_edge_count)
except Exception:
logger.warn(
f"Err getting\n{traceback.format_exc()}",
terminal=True)
# Subtract dandelion edge, make sure >=0
inbound_dandelion_edge_count[0] = \
max(inbound_dandelion_edge_count[0] - 1, 0)
break break
await writer.drain() await writer.drain()

View File

@ -4,45 +4,67 @@ from queue import Queue
from time import time from time import time
from asyncio import wait_for from asyncio import wait_for
from onionrblocks import Block
from ..dandelion import DandelionPhase, StemAcceptResult
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE
from ..constants import MAX_INBOUND_DANDELION_EDGE, MAX_STEM_BLOCKS_PER_STREAM
block_size_digits = len(str(BLOCK_MAX_SIZE)) block_size_digits = len(str(BLOCK_MAX_SIZE))
base_wait_timeout = 10 base_wait_timeout = 10
if TYPE_CHECKING: if TYPE_CHECKING:
from onionrblocks import Block
from asyncio import StreamWriter, StreamReader from asyncio import StreamWriter, StreamReader
async def accept_stem_blocks( async def accept_stem_blocks(
block_queues: List[Queue['Block']], block_queues: List[Queue['Block']],
reader: 'StreamReader', reader: 'StreamReader',
writer: 'StreamWriter'): writer: 'StreamWriter',
inbound_edge_count: List[int]):
if inbound_edge_count[0] >= MAX_INBOUND_DANDELION_EDGE:
writer.write(StemAcceptResult.DENY)
return
writer.write(StemAcceptResult.ALLOW)
inbound_edge_count[0] += 1
# Start getting the first block # Start getting the first block
read_routine = reader.read(BLOCK_ID_SIZE) read_routine = reader.read(BLOCK_ID_SIZE)
stream_start_time = int(time()) stream_start_time = int(time())
max_accept_blocks = 1000
q = Queue() q = Queue()
block_queues.append(q) appended_queue = False
#block_queues.append(q)
for _ in range(max_accept_blocks):
block_id = await wait_for(read_routine, base_wait_timeout)
block_size = int(
await wait_for(
reader.read(block_size_digits),
base_wait_timeout)).decode('utf-8')
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
block_size = (await wait_for(
reader.read(block_size_digits),
base_wait_timeout)).decode('utf-8')
if not all(c in "0123456789" for c in block_size): if not all(c in "0123456789" for c in block_size):
raise ValueError("Invalid block size data (non 0-9 char)") raise ValueError("Invalid block size data (non 0-9 char)")
block_size = int(block_size)
if block_size > BLOCK_MAX_SIZE: if block_size > BLOCK_MAX_SIZE:
raise ValueError("Max block size") raise ValueError("Max block size")
raw_block: bytes = await wait_for(
reader.read(block_size), base_wait_timeout * 6)
q.put(
Block(block_id, raw_block, auto_verify=True)
)
# Regardless of stem phase, we add to queue
# Client will decide if they are to be stemmed
if not appended_queue:
if len(block_queues) < MAX_INBOUND_DANDELION_EDGE:
block_queues.append(q)
appended_queue = True
read_routine = reader.read(BLOCK_ID_SIZE)