added block store function used when not in stem phase

This commit is contained in:
Kevin F 2022-03-10 01:10:13 -06:00
parent 9d17c7bd64
commit fefec8bdc8
4 changed files with 63 additions and 10 deletions

View File

@ -4,16 +4,20 @@ Dandelion ++ Gossip client logic
""" """
import traceback import traceback
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Set from typing import Set, List
from time import sleep from time import sleep
from queue import Queue from queue import Queue
from onionrblocks import Block
from gossip.client.storeblocks import store_blocks
from ..constants import DANDELION_EPOCH_LENGTH from ..constants import DANDELION_EPOCH_LENGTH
from ..connectpeer import connect_peer from ..connectpeer import connect_peer
if TYPE_CHECKING: if TYPE_CHECKING:
from onionrblocks import Block
from ..peer import Peer from ..peer import Peer
import logger import logger
@ -21,6 +25,8 @@ import onionrplugins
from ..commands import GossipCommands from ..commands import GossipCommands
from gossip.dandelion.phase import DandelionPhase from gossip.dandelion.phase import DandelionPhase
from onionrthreads import add_onionr_thread from onionrthreads import add_onionr_thread
from blockdb import store_vdf_block
from .announce import do_announce from .announce import do_announce
from .dandelionstem import stem_out from .dandelionstem import stem_out
@ -44,7 +50,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
def gossip_client( def gossip_client(
peer_set: Set['Peer'], peer_set: Set['Peer'],
block_queue: Queue['Block'], block_queues: List[Queue['Block']],
dandelion_seed: bytes): dandelion_seed: bytes):
""" """
Gossip client does the following: Gossip client does the following:
@ -52,9 +58,14 @@ def gossip_client(
Stem new blocks we created or downloaded *during stem phase* Stem new blocks we created or downloaded *during stem phase*
Stream new blocks Stream new blocks
""" """
bl: Block
do_announce(peer_set) do_announce(peer_set)
# Start a thread that runs every 1200 secs to
# Ask peers for a subset for their peer set
# The transport addresses for said peers will
# be passed to a plugin event where the respective
# transport plugin handles the new peer
add_onionr_thread( add_onionr_thread(
get_new_peers, get_new_peers,
1200, peer_set, initial_sleep=5) 1200, peer_set, initial_sleep=5)
@ -64,14 +75,16 @@ def gossip_client(
while True: while True:
while not len(peer_set): while not len(peer_set):
sleep(0.2) sleep(0.2)
if dandelion_phase.remaining_time <= 10:
sleep(dandelion_phase.remaining_time)
if dandelion_phase.is_stem_phase(): if dandelion_phase.is_stem_phase():
try: try:
# Stem out blocks for (roughly) remaining epoch time # Stem out blocks for (roughly) remaining epoch time
stem_out( stem_out(
block_queue, peer_set, dandelion_phase) block_queues, peer_set, dandelion_phase)
except TimeoutError: except TimeoutError:
continue continue
continue continue
else: else:
pass # Add block to primary block db, where the diffuser can read it
store_blocks(block_queues, dandelion_phase)

View File

@ -12,7 +12,10 @@ def stem_out(
block_queue: Queue['Block'], block_queue: Queue['Block'],
peer_set: Set['Block'], peer_set: Set['Block'],
d_phase: 'DandelionPhase'): d_phase: 'DandelionPhase'):
block = block_queue.get(block=True, timeout=5) # Deep copy the block queues so that everything gets
# stemmed out if we run out of time in epoch
# Also spawn a thread with block set to add to db after time for black hole attack
block = block_queue.get(block=True, timeout=d_phase.remaining_time)
raw_block = block.raw raw_block = block.raw
block_size = len(block.raw) block_size = len(block.raw)
block_id = block.id block_id = block.id

View File

@ -0,0 +1,39 @@
from typing import TYPE_CHECKING, List
from threading import Thread
from queue import Queue
import blockdb
if TYPE_CHECKING:
from onionrblocks import Block
from ..dandelion.phase import DandelionPhase
def store_blocks(
block_queues: List[Queue['Block']],
dandelion_phase: 'DandelionPhase'):
new_queue: Queue['Block'] = Queue()
def _watch_queue(block_queue: Queue['Block']):
# Copy all incoming blocks into 1 queue which gets processed to db
while not dandelion_phase.is_stem_phase() \
and dandelion_phase.remaining_time() > 1:
try:
new_queue.put(
block_queue.get(timeout=dandelion_phase.remaining_time()))
except TimeoutError:
pass
for block_queue in block_queues:
Thread(target=_watch_queue, args=block_queue, daemon=True).start()
while not dandelion_phase.is_stem_phase() \
and dandelion_phase.remaining_time() > 1:
try:
blockdb.store_vdf_block(
new_queue.get(timeout=dandelion_phase.remaining_time())
)
except TimeoutError:
pass

View File

@ -37,7 +37,6 @@ async def accept_stem_blocks(
q = Queue() q = Queue()
appended_queue = False appended_queue = False
#block_queues.append(q)
for _ in range(MAX_STEM_BLOCKS_PER_STREAM): for _ in range(MAX_STEM_BLOCKS_PER_STREAM):
block_id = ( block_id = (
@ -67,4 +66,3 @@ async def accept_stem_blocks(
appended_queue = True appended_queue = True
read_routine = reader.read(BLOCK_ID_SIZE) read_routine = reader.read(BLOCK_ID_SIZE)