Use tuple of queues for block queues instead of list for clarity that queues cannot be added

This commit is contained in:
Kevin F 2022-03-11 11:15:18 -06:00
parent 747b7d70a8
commit 4c54677387
7 changed files with 36 additions and 40 deletions

View File

@ -1,10 +1,11 @@
import threading import threading
from time import sleep from time import sleep
from typing import TYPE_CHECKING, Set, List from typing import TYPE_CHECKING, Set, Tuple
from os import urandom from os import urandom
import queue import queue
if TYPE_CHECKING: if TYPE_CHECKING:
from ordered_set import OrderedSet
from onionrblocks import Block from onionrblocks import Block
from .peer import Peer from .peer import Peer
@ -40,7 +41,8 @@ In stem phase, server disables diffusion
def start_gossip_threads( def start_gossip_threads(
peer_set: Set['Peer'], block_queues: List[queue.Queue['Block']]): peer_set: OrderedSet['Peer'],
block_queues: Tuple[queue.Queue['Block']]):
# Peer set is largely handled by the transport plugins # Peer set is largely handled by the transport plugins
# There is a unified set so gossip logic is not repeated # There is a unified set so gossip logic is not repeated
seed = urandom(32) seed = urandom(32)

View File

@ -4,7 +4,7 @@ Dandelion ++ Gossip client logic
""" """
import traceback import traceback
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Set, List from typing import Set, Tuple
from time import sleep from time import sleep
from queue import Queue from queue import Queue
@ -19,6 +19,7 @@ from ..connectpeer import connect_peer
if TYPE_CHECKING: if TYPE_CHECKING:
from ..peer import Peer from ..peer import Peer
from ordered_set import OrderedSet
import logger import logger
import onionrplugins import onionrplugins
@ -49,8 +50,8 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
def gossip_client( def gossip_client(
peer_set: Set['Peer'], peer_set: OrderedSet['Peer'],
block_queues: List[Queue['Block']], block_queues: Tuple[Queue['Block']],
dandelion_seed: bytes): dandelion_seed: bytes):
""" """
Gossip client does the following: Gossip client does the following:

View File

@ -1,23 +1,23 @@
from queue import Queue from queue import Queue
from time import sleep
from typing import TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Set
if TYPE_CHECKING: if TYPE_CHECKING:
from ordered_set import OrderedSet
from onionrblocks import Block from onionrblocks import Block
from ..peer import Peer from ..peer import Peer
from ..dandelion.phase import DandelionPhase from ..dandelion.phase import DandelionPhase
def stem_out( def stem_out(
block_queue: Queue['Block'], block_queues: Tuple[Queue['Block']],
peer_set: Set['Block'], peer_set: OrderedSet['Peer'],
d_phase: 'DandelionPhase'): d_phase: 'DandelionPhase'):
# Deep copy the block queues so that everything gets
# stemmed out if we run out of time in epoch # Spawn a thread with block set to add to db after time for black hole attack
# Also spawn a thread with block set to add to db after time for black hole attack
block = block_queue.get(block=True, timeout=d_phase.remaining_time) if not len(peer_set):
raw_block = block.raw sleep(1)
block_size = len(block.raw) return
block_id = block.id
del block

View File

@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, List from typing import TYPE_CHECKING, Tuple
from threading import Thread from threading import Thread
from queue import Queue from queue import Queue
@ -10,7 +10,7 @@ if TYPE_CHECKING:
def store_blocks( def store_blocks(
block_queues: List[Queue['Block']], block_queues: Tuple[Queue['Block']],
dandelion_phase: 'DandelionPhase'): dandelion_phase: 'DandelionPhase'):
new_queue: Queue['Block'] = Queue() new_queue: Queue['Block'] = Queue()

View File

@ -1,7 +1,7 @@
import asyncio import asyncio
import traceback import traceback
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Set, List from typing import Set, Tuple
from queue import Queue from queue import Queue
@ -14,6 +14,7 @@ import logger
if TYPE_CHECKING: if TYPE_CHECKING:
from onionrblocks import Block from onionrblocks import Block
from peer import Peer from peer import Peer
from ordered_set import OrderedSet
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter
from filepaths import gossip_server_socket_file from filepaths import gossip_server_socket_file
@ -38,8 +39,8 @@ inbound_dandelion_edge_count = [0]
def gossip_server( def gossip_server(
peer_set: Set['Peer'], peer_set: OrderedSet['Peer'],
block_queues: List[Queue['Block']], block_queues: Tuple[Queue['Block']],
dandelion_seed: bytes): dandelion_seed: bytes):
async def peer_connected( async def peer_connected(

View File

@ -1,6 +1,6 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import List from typing import List, Tuple
from queue import Queue import secrets
from time import time from time import time
from asyncio import wait_for from asyncio import wait_for
@ -15,12 +15,12 @@ block_size_digits = len(str(BLOCK_MAX_SIZE))
base_wait_timeout = 10 base_wait_timeout = 10
if TYPE_CHECKING: if TYPE_CHECKING:
from queue import Queue
from asyncio import StreamWriter, StreamReader from asyncio import StreamWriter, StreamReader
async def accept_stem_blocks( async def accept_stem_blocks(
block_queues: List[Queue['Block']], block_queues: Tuple[Queue['Block']],
reader: 'StreamReader', reader: 'StreamReader',
writer: 'StreamWriter', writer: 'StreamWriter',
inbound_edge_count: List[int]): inbound_edge_count: List[int]):
@ -35,9 +35,6 @@ async def accept_stem_blocks(
read_routine = reader.read(BLOCK_ID_SIZE) read_routine = reader.read(BLOCK_ID_SIZE)
stream_start_time = int(time()) stream_start_time = int(time())
q = Queue()
appended_queue = False
for _ in range(MAX_STEM_BLOCKS_PER_STREAM): for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
block_id = ( block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8') await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
@ -54,15 +51,11 @@ async def accept_stem_blocks(
raw_block: bytes = await wait_for( raw_block: bytes = await wait_for(
reader.read(block_size), base_wait_timeout * 6) reader.read(block_size), base_wait_timeout * 6)
q.put( secrets.choice(block_queues).put(
Block(block_id, raw_block, auto_verify=True) Block(block_id, raw_block, auto_verify=True)
) )
# Regardless of stem phase, we add to queue # Regardless of stem phase, we add to queue
# Client will decide if they are to be stemmed # Client will decide if they are to be stemmed
if not appended_queue:
if len(block_queues) < MAX_INBOUND_DANDELION_EDGE:
block_queues.append(q)
appended_queue = True
read_routine = reader.read(BLOCK_ID_SIZE) read_routine = reader.read(BLOCK_ID_SIZE)

View File

@ -7,14 +7,13 @@ import queue
import sys import sys
import platform import platform
import signal import signal
from threading import Thread
from stem.connection import IncorrectPassword from stem.connection import IncorrectPassword
import stem
import toomanyobjs import toomanyobjs
import filenuke import filenuke
from deadsimplekv import DeadSimpleKV from deadsimplekv import DeadSimpleKV
import psutil import psutil
from ordered_set import OrderedSet
import config import config
@ -31,7 +30,6 @@ from onionrcrypto import getourkeypair
import runtests import runtests
from httpapi import daemoneventsapi from httpapi import daemoneventsapi
from .. import version from .. import version
from utils.bettersleep import better_sleep
from .killdaemon import kill_daemon # noqa from .killdaemon import kill_daemon # noqa
from .showlogo import show_logo from .showlogo import show_logo
import gossip import gossip
@ -119,12 +117,13 @@ def daemon():
events.event('init', threaded=False) events.event('init', threaded=False)
events.event('daemon_start') events.event('daemon_start')
shared_state.get(apiservers.ClientAPI).gossip_peer_set = set() shared_state.get(apiservers.ClientAPI).gossip_peer_set = OrderedSet()
shared_state.get(apiservers.ClientAPI).gossip_block_queue = queue.Queue() shared_state.get(apiservers.ClientAPI).gossip_block_queues = \
(queue.Queue(), queue.Queue())
gossip.start_gossip_threads( gossip.start_gossip_threads(
shared_state.get(apiservers.ClientAPI).gossip_peer_set, shared_state.get(apiservers.ClientAPI).gossip_peer_set,
shared_state.get(apiservers.ClientAPI).gossip_block_queue) shared_state.get(apiservers.ClientAPI).gossip_block_queues)
try: try:
shared_state.get(apiservers.ClientAPI).start() shared_state.get(apiservers.ClientAPI).start()