Compare commits

...

2 Commits

9 changed files with 45 additions and 41 deletions

View File

@ -16,3 +16,4 @@ watchdog==2.1.6
ujson==5.1.0
cffi==1.14.4
onionrblocks==7.0.0
ordered_set==4.1.0

View File

@ -25,6 +25,7 @@ cffi==1.14.4 \
--hash=sha256:6bc25fc545a6b3d57b5f8618e59fc13d3a3a68431e8ca5fd4c13241cd70d0009 \
--hash=sha256:798caa2a2384b1cbe8a2a139d80734c9db54f9cc155c99d7cc92441a23871c03 \
--hash=sha256:7c6b1dece89874d9541fc974917b631406233ea0440d0bdfbb8e03bf39a49b3b \
--hash=sha256:7ef7d4ced6b325e92eb4d3502946c78c5367bc416398d387b39591532536734e \
--hash=sha256:840793c68105fe031f34d6a086eaea153a0cd5c491cde82a74b420edd0a2b909 \
--hash=sha256:8d6603078baf4e11edc4168a514c5ce5b3ba6e3e9c374298cb88437957960a53 \
--hash=sha256:9cc46bc107224ff5b6d04369e7c595acb700c3613ad7bcf2e2012f62ece80c35 \
@ -294,6 +295,10 @@ onionrblocks==7.0.0 \
--hash=sha256:53e90964371076d9daf2ed0790b21f174ef3321f4f1808209cc6dd9b7ff6d8ff \
--hash=sha256:54af28d0be856209525646c4ef9f977f95f0ae1329b2cc023b351317c9d0eef7
# via -r requirements.in
ordered_set==4.1.0 \
--hash=sha256:046e1132c71fcf3330438a539928932caf51ddbc582496833e23de611de14562 \
--hash=sha256:694a8e44c87657c59292ede72891eb91d34131f6531463aab3009191c77364a8
# via -r requirements.in
psutil==5.8.0 \
--hash=sha256:0066a82f7b1b37d334e68697faba68e5ad5e858279fd6351c8ca6024e8d6ba64 \
--hash=sha256:02b8292609b1f7fcb34173b25e48d0da8667bc85f81d7476584d889c6e0f2131 \
@ -332,6 +337,7 @@ pynacl==1.4.0 \
--hash=sha256:11335f09060af52c97137d4ac54285bcb7df0cef29014a1a4efe64ac065434c4 \
--hash=sha256:2fe0fc5a2480361dcaf4e6e7cea00e078fcda07ba45f811b167e3f99e8cff574 \
--hash=sha256:30f9b96db44e09b3304f9ea95079b1b7316b2b4f3744fe3aaecccd95d547063d \
--hash=sha256:4e10569f8cbed81cb7526ae137049759d2a8d57726d52c1a000a3ce366779634 \
--hash=sha256:511d269ee845037b95c9781aa702f90ccc36036f95d0f31373a6a79bd8242e25 \
--hash=sha256:537a7ccbea22905a0ab36ea58577b39d1fa9b1884869d173b5cf111f006f689f \
--hash=sha256:54e9a2c849c742006516ad56a88f5c74bf2ce92c9f67435187c3c5953b346505 \
@ -340,6 +346,7 @@ pynacl==1.4.0 \
--hash=sha256:7c6092102219f59ff29788860ccb021e80fffd953920c4a8653889c029b2d420 \
--hash=sha256:8122ba5f2a2169ca5da936b2e5a511740ffb73979381b4229d9188f6dcb22f1f \
--hash=sha256:9c4a7ea4fb81536c1b1f5cc44d54a296f96ae78c1ebd2311bd0b60be45a48d96 \
--hash=sha256:c914f78da4953b33d4685e3cdc7ce63401247a21425c16a39760e282075ac4a6 \
--hash=sha256:cd401ccbc2a249a47a3a1724c2918fcd04be1f7b54eb2a5a71ff915db0ac51c6 \
--hash=sha256:d452a6746f0a7e11121e64625109bc4468fc3100452817001dbe018bb8b08514 \
--hash=sha256:ea6841bc3a76fa4942ce00f3bda7d436fda21e2d91602b9e21b7ca9ecab8f3ff \

View File

@ -1,10 +1,11 @@
import threading
from time import sleep
from typing import TYPE_CHECKING, Set, List
from typing import TYPE_CHECKING, Set, Tuple
from os import urandom
import queue
if TYPE_CHECKING:
from ordered_set import OrderedSet
from onionrblocks import Block
from .peer import Peer
@ -40,7 +41,8 @@ In stem phase, server disables diffusion
def start_gossip_threads(
peer_set: Set['Peer'], block_queues: List[queue.Queue['Block']]):
peer_set: OrderedSet['Peer'],
block_queues: Tuple[queue.Queue['Block']]):
# Peer set is largely handled by the transport plugins
# There is a unified set so gossip logic is not repeated
seed = urandom(32)

View File

@ -4,7 +4,7 @@ Dandelion ++ Gossip client logic
"""
import traceback
from typing import TYPE_CHECKING
from typing import Set, List
from typing import Set, Tuple
from time import sleep
from queue import Queue
@ -19,6 +19,7 @@ from ..connectpeer import connect_peer
if TYPE_CHECKING:
from ..peer import Peer
from ordered_set import OrderedSet
import logger
import onionrplugins
@ -49,8 +50,8 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
def gossip_client(
peer_set: Set['Peer'],
block_queues: List[Queue['Block']],
peer_set: OrderedSet['Peer'],
block_queues: Tuple[Queue['Block']],
dandelion_seed: bytes):
"""
Gossip client does the following:

View File

@ -1,23 +1,23 @@
from queue import Queue
from time import sleep
from typing import TYPE_CHECKING, Set
from typing import TYPE_CHECKING, Tuple
if TYPE_CHECKING:
from ordered_set import OrderedSet
from onionrblocks import Block
from ..peer import Peer
from ..dandelion.phase import DandelionPhase
def stem_out(
block_queue: Queue['Block'],
peer_set: Set['Block'],
block_queues: Tuple[Queue['Block']],
peer_set: OrderedSet['Peer'],
d_phase: 'DandelionPhase'):
# Deep copy the block queues so that everything gets
# stemmed out if we run out of time in epoch
# Also spawn a thread with block set to add to db after time for black hole attack
block = block_queue.get(block=True, timeout=d_phase.remaining_time)
raw_block = block.raw
block_size = len(block.raw)
block_id = block.id
del block
# Spawn a thread with block set to add to db after time for black hole attack
if not len(peer_set):
sleep(1)
return

View File

@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING, Tuple
from threading import Thread
from queue import Queue
@ -10,7 +10,7 @@ if TYPE_CHECKING:
def store_blocks(
block_queues: List[Queue['Block']],
block_queues: Tuple[Queue['Block']],
dandelion_phase: 'DandelionPhase'):
new_queue: Queue['Block'] = Queue()

View File

@ -1,7 +1,7 @@
import asyncio
import traceback
from typing import TYPE_CHECKING
from typing import Set, List
from typing import Set, Tuple
from queue import Queue
@ -14,6 +14,7 @@ import logger
if TYPE_CHECKING:
from onionrblocks import Block
from peer import Peer
from ordered_set import OrderedSet
from asyncio import StreamReader, StreamWriter
from filepaths import gossip_server_socket_file
@ -38,8 +39,8 @@ inbound_dandelion_edge_count = [0]
def gossip_server(
peer_set: Set['Peer'],
block_queues: List[Queue['Block']],
peer_set: OrderedSet['Peer'],
block_queues: Tuple[Queue['Block']],
dandelion_seed: bytes):
async def peer_connected(

View File

@ -1,6 +1,6 @@
from typing import TYPE_CHECKING
from typing import List
from queue import Queue
from typing import List, Tuple
import secrets
from time import time
from asyncio import wait_for
@ -15,12 +15,12 @@ block_size_digits = len(str(BLOCK_MAX_SIZE))
base_wait_timeout = 10
if TYPE_CHECKING:
from queue import Queue
from asyncio import StreamWriter, StreamReader
async def accept_stem_blocks(
block_queues: List[Queue['Block']],
block_queues: Tuple[Queue['Block']],
reader: 'StreamReader',
writer: 'StreamWriter',
inbound_edge_count: List[int]):
@ -35,9 +35,6 @@ async def accept_stem_blocks(
read_routine = reader.read(BLOCK_ID_SIZE)
stream_start_time = int(time())
q = Queue()
appended_queue = False
for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
block_id = (
await wait_for(read_routine, base_wait_timeout)).decode('utf-8')
@ -54,15 +51,11 @@ async def accept_stem_blocks(
raw_block: bytes = await wait_for(
reader.read(block_size), base_wait_timeout * 6)
q.put(
secrets.choice(block_queues).put(
Block(block_id, raw_block, auto_verify=True)
)
# Regardless of stem phase, we add to queue
# Client will decide if they are to be stemmed
if not appended_queue:
if len(block_queues) < MAX_INBOUND_DANDELION_EDGE:
block_queues.append(q)
appended_queue = True
read_routine = reader.read(BLOCK_ID_SIZE)

View File

@ -7,14 +7,13 @@ import queue
import sys
import platform
import signal
from threading import Thread
from stem.connection import IncorrectPassword
import stem
import toomanyobjs
import filenuke
from deadsimplekv import DeadSimpleKV
import psutil
from ordered_set import OrderedSet
import config
@ -31,7 +30,6 @@ from onionrcrypto import getourkeypair
import runtests
from httpapi import daemoneventsapi
from .. import version
from utils.bettersleep import better_sleep
from .killdaemon import kill_daemon # noqa
from .showlogo import show_logo
import gossip
@ -119,12 +117,13 @@ def daemon():
events.event('init', threaded=False)
events.event('daemon_start')
shared_state.get(apiservers.ClientAPI).gossip_peer_set = set()
shared_state.get(apiservers.ClientAPI).gossip_block_queue = queue.Queue()
shared_state.get(apiservers.ClientAPI).gossip_peer_set = OrderedSet()
shared_state.get(apiservers.ClientAPI).gossip_block_queues = \
(queue.Queue(), queue.Queue())
gossip.start_gossip_threads(
shared_state.get(apiservers.ClientAPI).gossip_peer_set,
shared_state.get(apiservers.ClientAPI).gossip_block_queue)
shared_state.get(apiservers.ClientAPI).gossip_block_queues)
try:
shared_state.get(apiservers.ClientAPI).start()