Small gossip fixes

This commit is contained in:
Kevin F 2022-06-05 15:11:53 -05:00
parent ac17b53663
commit 911d8118bc
8 changed files with 37 additions and 14 deletions

View File

@ -23,6 +23,7 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
untrusted_exec = True
def block_system(cmd): def block_system(cmd):
"""Prevent os.system except for whitelisted commands+contexts.""" """Prevent os.system except for whitelisted commands+contexts."""
@ -36,6 +37,8 @@ def block_exec(event, info):
# because libraries have stupid amounts of compile/exec/eval, # because libraries have stupid amounts of compile/exec/eval,
# We have to use a whitelist where it can be tolerated # We have to use a whitelist where it can be tolerated
# Generally better than nothing, not a silver bullet # Generally better than nothing, not a silver bullet
if untrusted_exec:
return
whitelisted_code = [ whitelisted_code = [
'netrc.py', 'netrc.py',
'shlex.py', 'shlex.py',

View File

@ -3,6 +3,7 @@
Dandelion ++ Gossip client logic Dandelion ++ Gossip client logic
""" """
import traceback import traceback
from threading import Thread
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from typing import Set, Tuple from typing import Set, Tuple
from time import sleep from time import sleep
@ -85,7 +86,11 @@ def start_gossip_client():
Stream new blocks Stream new blocks
""" """
bl: Block bl: Block
do_announce()
def _start_announce():
sleep(60)
do_announce()
Thread(target=_start_announce, daemon=True).start()
# Start a thread that runs every 1200 secs to # Start a thread that runs every 1200 secs to
# Ask peers for a subset for their peer set # Ask peers for a subset for their peer set

View File

@ -1,4 +1,4 @@
from queue import Queue from queue import Empty, Queue
from time import sleep from time import sleep
from secrets import choice from secrets import choice
import traceback import traceback
@ -112,7 +112,9 @@ async def stem_out(d_phase: 'DandelionPhase'):
"Did not stem out any blocks in time, " + "Did not stem out any blocks in time, " +
"if this happens regularly you may be under attack", "if this happens regularly you may be under attack",
terminal=True) terminal=True)
list(map(lambda p: p.close(), peer_sockets)) for s in peer_sockets:
if s:
s.close()
peer_sockets.clear() peer_sockets.clear()
break break
# If above loop ran out of time or NotEnoughEdges, loops below will not execute # If above loop ran out of time or NotEnoughEdges, loops below will not execute
@ -124,6 +126,8 @@ async def stem_out(d_phase: 'DandelionPhase'):
for routine in stream_routines: for routine in stream_routines:
try: try:
await routine await routine
except Empty:
pass
except Exception: except Exception:
logger.warn(traceback.format_exc()) logger.warn(traceback.format_exc())
else: else:

View File

@ -8,6 +8,8 @@ if TYPE_CHECKING:
from onionrplugins import onionrevents from onionrplugins import onionrevents
import logger import logger
from socks import GeneralProxyError
from ..peer import Peer from ..peer import Peer
from ..commands import GossipCommands, command_to_byte from ..commands import GossipCommands, command_to_byte
from ..constants import PEER_AMOUNT_TO_ASK, TRANSPORT_SIZE_BYTES from ..constants import PEER_AMOUNT_TO_ASK, TRANSPORT_SIZE_BYTES
@ -22,11 +24,15 @@ def _do_ask_peer(peer):
_ask_peer(peer) _ask_peer(peer)
except TimeoutError: except TimeoutError:
logger.debug("Timed out when asking for new peers") logger.debug("Timed out when asking for new peers")
except GeneralProxyError:
logger.debug("Proxy error")
logger.debug(format_exc(), terminal=True)
except Exception: except Exception:
logger.error(format_exc(), terminal=True) logger.error(format_exc(), terminal=True)
def _ask_peer(peer): def _ask_peer(peer):
s: 'socket' = peer.get_socket(12) s: 'socket' = peer.get_socket(12)
s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE)) s.sendall(command_to_byte(GossipCommands.PEER_EXCHANGE))
# Get 10 max peers # Get 10 max peers
for _ in range(MAX_PEERS): for _ in range(MAX_PEERS):
@ -49,7 +55,8 @@ def _ask_peer(peer):
def get_new_peers(): def get_new_peers():
if not len(gossip_peer_set): if not len(gossip_peer_set):
raise ValueError("Peer set empty") logger.debug("Peer set empty, cannot get new peers")
return
# Deep copy the peer list # Deep copy the peer list
peer_list: Peer = list(gossip_peer_set) peer_list: Peer = list(gossip_peer_set)

View File

@ -1,3 +1,4 @@
import traceback
from gossip.commands import GossipCommands, command_to_byte from gossip.commands import GossipCommands, command_to_byte
from .peerset import gossip_peer_set from .peerset import gossip_peer_set
@ -8,12 +9,14 @@ def connect_peer(peer):
if peer in gossip_peer_set: if peer in gossip_peer_set:
return return
try: try:
s = peer.get_socket(15) s = peer.get_socket(120)
except Exception: except Exception:
logger.warn(f"Could not connect to {peer.transport_address}") logger.warn(f"Could not connect to {peer.transport_address}")
logger.warn(traceback.format_exc())
else: else:
s.sendall(command_to_byte(GossipCommands.PING)) with s:
if s.recv(5).decode('utf-8') == 'PONG': s.sendall(command_to_byte(GossipCommands.PING))
gossip_peer_set.add(peer)
logger.info(f"connected to {peer.transport_address}") if s.recv(4).decode('utf-8') == 'PONG':
s.close() gossip_peer_set.add(peer)
logger.info(f"connected to {peer.transport_address}", terminal=True)

View File

@ -61,7 +61,7 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
_add_to_queue _add_to_queue
) )
async def _send_block(bl: 'Block'): async def _send_block(block: 'Block'):
writer.write(block.id) writer.write(block.id)
await writer.drain() await writer.drain()

View File

@ -57,7 +57,7 @@ def show_stats():
# file and folder size stats # file and folder size stats
'div1': True, # this creates a solid line across the screen, a div 'div1': True, # this creates a solid line across the screen, a div
'Total Block Size': 'Total Block Size':
sizeutils.human_size(sizeutils.size(home + 'blocks/')), sizeutils.human_size(sizeutils.size(home + 'blocks.db')),
'Total Plugin Size': 'Total Plugin Size':
sizeutils.human_size(sizeutils.size(home + 'plugins/')), sizeutils.human_size(sizeutils.size(home + 'plugins/')),
'Log File Size': 'Log File Size':

View File

@ -2,6 +2,7 @@ import shelve
from threading import Thread from threading import Thread
from time import sleep from time import sleep
import os import os
import dbm
import traceback import traceback
from typing import Callable from typing import Callable
@ -39,14 +40,14 @@ def on_bootstrap(api, data):
try: try:
load_existing_peers(callback_func) load_existing_peers(callback_func)
except FileNotFoundError: except dbm.error:
try: try:
with open(bootstrap_file, 'r') as bootstrap_file_obj: with open(bootstrap_file, 'r') as bootstrap_file_obj:
bootstrap_nodes = set(bootstrap_file_obj.read().split(',')) bootstrap_nodes = set(bootstrap_file_obj.read().split(','))
except FileNotFoundError: except FileNotFoundError:
bootstrap_nodes = set() bootstrap_nodes = set()
except Exception as e: except Exception as e:
logger.warn(traceback.format_exc()) logger.warn(traceback.format_exc(), terminal=True)
return return
else: else:
return return