Onionr/src/gossip/client/streamblocks/streamfrom.py

160 lines
5.1 KiB
Python
Raw Normal View History

2022-03-28 05:13:36 +00:00
"""Onionr - Private P2P Communication.
Download blocks that are being diffused
doesn't apply for blocks in the gossip queue that are awaiting
descision to fluff or stem
"""
2022-07-19 05:32:54 +00:00
from ast import Index
2022-03-30 06:13:42 +00:00
from threading import Thread, Semaphore
2022-03-28 05:13:36 +00:00
from random import SystemRandom
2022-03-30 06:13:42 +00:00
from time import sleep
import traceback
2022-04-04 05:48:30 +00:00
from typing import TYPE_CHECKING, List
2022-03-28 05:13:36 +00:00
2022-04-03 06:16:58 +00:00
import blockdb
from ...constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS
2022-04-03 06:16:58 +00:00
2022-03-28 05:13:36 +00:00
if TYPE_CHECKING:
from socket import socket
2022-03-30 06:13:42 +00:00
from gossip.peer import Peer
from ordered_set import OrderedSet
import logger
2022-03-28 05:13:36 +00:00
2022-04-03 06:16:58 +00:00
import onionrblocks
from ...peerset import gossip_peer_set
from ...commands import GossipCommands, command_to_byte
2022-03-28 05:13:36 +00:00
"""
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
MAX_STREAMS = 6
CONNECT_TIMEOUT = 45
2022-03-30 06:13:42 +00:00
MAX_TRIED_PEERS = 10_000
2022-03-28 05:13:36 +00:00
def stream_from_peers():
2022-03-30 06:13:42 +00:00
# Pick N peers to stream from
# Create sockets for them
# Spawn thread to stream from the socket
tried_peers: OrderedSet['Peer'] = OrderedSet()
2022-03-28 05:13:36 +00:00
sys_rand = SystemRandom()
2022-03-30 06:13:42 +00:00
need_socket_lock = Semaphore(MAX_STREAMS)
2022-04-03 06:16:58 +00:00
offset = 0
2022-03-30 06:13:42 +00:00
2022-04-20 05:28:29 +00:00
def _stream_from_peer(peer: 'Peer'):
stream_counter = 0
stream_times = 100
2022-03-30 06:13:42 +00:00
try:
sock = peer.get_socket(CONNECT_TIMEOUT)
except ConnectionRefusedError:
need_socket_lock.release()
return
except Exception:
logger.warn(traceback.format_exc(), terminal=True)
need_socket_lock.release()
return
try:
2022-03-30 06:13:42 +00:00
sock.sendall(
command_to_byte(GossipCommands.STREAM_BLOCKS)
)
2022-04-03 06:16:58 +00:00
sock.sendall(
str(offset).zfill(BLOCK_STREAM_OFFSET_DIGITS).encode('utf-8'))
while stream_times >= stream_counter:
stream_counter += 1
logger.debug("Reading block id in stream with " + peer.transport_address, terminal=True)
sock.settimeout(5)
2022-04-03 06:16:58 +00:00
block_id = sock.recv(BLOCK_ID_SIZE)
if blockdb.has_block(block_id):
sock.sendall(int(0).to_bytes(1, 'big'))
continue
2022-04-04 05:48:30 +00:00
sock.sendall(int(1).to_bytes(1, 'big'))
2022-06-26 05:34:49 +00:00
logger.debug("Reading block size in stream", terminal=True)
sock.settimeout(5)
2022-06-26 05:34:49 +00:00
block_size = int(sock.recv(BLOCK_SIZE_LEN))
2022-04-03 06:16:58 +00:00
if block_size > BLOCK_MAX_SIZE or block_size <= 0:
logger.warn(
f"Peer {peer.transport_address} " +
"reported block size out of range")
break
sock.settimeout(5)
2022-04-03 06:16:58 +00:00
block_data = sock.recv(block_size)
2022-04-20 05:28:29 +00:00
2022-06-26 05:34:49 +00:00
logger.debug(
"We got a block from stream, assuming it is valid",
terminal=True)
2022-04-03 06:16:58 +00:00
try:
blockdb.add_block_to_db(
onionrblocks.Block(
2022-04-20 05:28:29 +00:00
block_id, block_data, auto_verify=True))
2022-04-03 06:16:58 +00:00
except Exception:
2022-04-04 05:48:30 +00:00
# They gave us a bad block, kill the stream
# Could be corruption or malice
2022-04-03 06:16:58 +00:00
sock.sendall(int(0).to_bytes(1, 'big'))
raise
2022-04-04 05:48:30 +00:00
# Tell them to keep streaming
2022-04-03 06:16:58 +00:00
sock.sendall(int(1).to_bytes(1, 'big'))
2022-05-31 00:52:03 +00:00
except (BrokenPipeError, TimeoutError) as e:
logger.debug(f"{e} when streaming peers", terminal=True)
#logger.debug(traceback.format_exc())
2022-03-30 06:13:42 +00:00
except Exception:
logger.warn(traceback.format_exc(), terminal=True)
2022-04-03 06:16:58 +00:00
finally:
2022-03-30 06:13:42 +00:00
sock.close()
need_socket_lock.release()
2022-04-04 05:48:30 +00:00
# spawn stream threads infinitely
2022-03-28 05:13:36 +00:00
while True:
2022-03-30 06:13:42 +00:00
available_set = gossip_peer_set - tried_peers
2022-07-19 05:32:54 +00:00
if not len(available_set) and len(tried_peers):
try:
tried_peers.clear()
2022-07-19 05:32:54 +00:00
except IndexError:
pass
available_set = gossip_peer_set.copy()
2022-03-30 06:13:42 +00:00
peers = sys_rand.sample(
available_set,
min(MAX_STREAMS, len(available_set)))
tried_peers.update(peers)
if len(tried_peers) >= MAX_TRIED_PEERS:
tried_peers.pop()
while len(peers):
try:
need_socket_lock.acquire()
2022-03-30 06:13:42 +00:00
Thread(
target=_stream_from_peer,
args=[peers.pop()],
daemon=True,
name="_stream_from_peer").start()
2022-03-30 06:13:42 +00:00
except IndexError:
need_socket_lock.release()
break
2022-07-19 05:32:54 +00:00