Block diffusal mostly done
This commit is contained in:
parent
996bff267b
commit
9d2c2641f4
@ -38,3 +38,15 @@ def get_blocks_after_timestamp(
|
|||||||
yield block
|
yield block
|
||||||
else:
|
else:
|
||||||
yield block
|
yield block
|
||||||
|
|
||||||
|
|
||||||
|
def has_block(block_hash):
|
||||||
|
return block_hash in db.list_keys()
|
||||||
|
|
||||||
|
|
||||||
|
def get_block(block_hash) -> Block:
|
||||||
|
return Block(
|
||||||
|
block_hash,
|
||||||
|
db.get_db_obj(block_db_path, 'u').get(block_hash),
|
||||||
|
auto_verify=False)
|
||||||
|
|
||||||
|
@ -11,6 +11,10 @@ from random import SystemRandom
|
|||||||
from time import sleep
|
from time import sleep
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
import blockdb
|
||||||
|
|
||||||
|
from ..constants import BLOCK_ID_SIZE, BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from socket import socket
|
from socket import socket
|
||||||
from typing import TYPE_CHECKING, List
|
from typing import TYPE_CHECKING, List
|
||||||
@ -20,6 +24,7 @@ from ordered_set import OrderedSet
|
|||||||
|
|
||||||
import logger
|
import logger
|
||||||
|
|
||||||
|
import onionrblocks
|
||||||
from ..peerset import gossip_peer_set
|
from ..peerset import gossip_peer_set
|
||||||
from ..commands import GossipCommands, command_to_byte
|
from ..commands import GossipCommands, command_to_byte
|
||||||
|
|
||||||
@ -53,6 +58,7 @@ def stream_from_peers():
|
|||||||
sys_rand = SystemRandom()
|
sys_rand = SystemRandom()
|
||||||
|
|
||||||
need_socket_lock = Semaphore(MAX_STREAMS)
|
need_socket_lock = Semaphore(MAX_STREAMS)
|
||||||
|
offset = 0
|
||||||
|
|
||||||
def _stream_from_peer(peer: Peer):
|
def _stream_from_peer(peer: Peer):
|
||||||
|
|
||||||
@ -61,8 +67,35 @@ def stream_from_peers():
|
|||||||
sock.sendall(
|
sock.sendall(
|
||||||
command_to_byte(GossipCommands.STREAM_BLOCKS)
|
command_to_byte(GossipCommands.STREAM_BLOCKS)
|
||||||
)
|
)
|
||||||
|
sock.sendall(
|
||||||
|
str(offset).zfill(BLOCK_STREAM_OFFSET_DIGITS).encode('utf-8'))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
block_id = sock.recv(BLOCK_ID_SIZE)
|
||||||
|
if blockdb.has_block(block_id):
|
||||||
|
sock.sendall(int(0).to_bytes(1, 'big'))
|
||||||
|
continue
|
||||||
|
block_size = int(sock.recv(BLOCK_MAX_SIZE_LEN))
|
||||||
|
if block_size > BLOCK_MAX_SIZE or block_size <= 0:
|
||||||
|
logger.warn(
|
||||||
|
f"Peer {peer.transport_address} " +
|
||||||
|
"reported block size out of range")
|
||||||
|
break
|
||||||
|
block_data = sock.recv(block_size)
|
||||||
|
try:
|
||||||
|
blockdb.add_block_to_db(
|
||||||
|
onionrblocks.Block(
|
||||||
|
block_id, block_data, auto_verify=True)
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
sock.sendall(int(0).to_bytes(1, 'big'))
|
||||||
|
raise
|
||||||
|
sock.sendall(int(1).to_bytes(1, 'big'))
|
||||||
|
|
||||||
|
sock.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warn(traceback.format_exc())
|
logger.warn(traceback.format_exc())
|
||||||
|
finally:
|
||||||
sock.close()
|
sock.close()
|
||||||
need_socket_lock.release()
|
need_socket_lock.release()
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ TRANSPORT_SIZE_BYTES = 64
|
|||||||
BLOCK_MAX_SIZE = 1024 * 2000
|
BLOCK_MAX_SIZE = 1024 * 2000
|
||||||
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
|
BLOCK_MAX_SIZE_LEN = len(str(BLOCK_MAX_SIZE))
|
||||||
BLOCK_ID_SIZE = 128
|
BLOCK_ID_SIZE = 128
|
||||||
|
BLOCK_STREAM_OFFSET_DIGITS = 8
|
||||||
DANDELION_EPOCH_LENGTH = 60
|
DANDELION_EPOCH_LENGTH = 60
|
||||||
|
|
||||||
# Magic number i made up, not really specified in dandelion++ paper
|
# Magic number i made up, not really specified in dandelion++ paper
|
||||||
|
@ -17,7 +17,7 @@ if TYPE_CHECKING:
|
|||||||
from asyncio import StreamWriter, StreamReader
|
from asyncio import StreamWriter, StreamReader
|
||||||
from onionrblocks import Block
|
from onionrblocks import Block
|
||||||
|
|
||||||
from ..constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN
|
from ..constants import BLOCK_MAX_SIZE, BLOCK_MAX_SIZE_LEN, BLOCK_STREAM_OFFSET_DIGITS
|
||||||
|
|
||||||
import logger
|
import logger
|
||||||
from blockdb import get_blocks_after_timestamp, block_storage_observers
|
from blockdb import get_blocks_after_timestamp, block_storage_observers
|
||||||
@ -40,7 +40,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|||||||
async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
|
async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
|
||||||
"""stream blocks to a peer created since an offset
|
"""stream blocks to a peer created since an offset
|
||||||
"""
|
"""
|
||||||
time_offset = await wait_for(reader.read(8), 12)
|
time_offset = await wait_for(reader.readexactly(BLOCK_STREAM_OFFSET_DIGITS), 12)
|
||||||
time_offset = time_offset.decode('utf-8')
|
time_offset = time_offset.decode('utf-8')
|
||||||
keep_writing = True
|
keep_writing = True
|
||||||
|
|
||||||
@ -63,6 +63,11 @@ async def diffuse_blocks(reader: 'StreamReader', writer: 'StreamWriter'):
|
|||||||
|
|
||||||
async def _send_block(bl: 'Block'):
|
async def _send_block(bl: 'Block'):
|
||||||
writer.write(block.id)
|
writer.write(block.id)
|
||||||
|
|
||||||
|
# we tell id above, they tell if they want the block
|
||||||
|
if int.from_bytes(await reader.readexactly(1), 'big') == 0:
|
||||||
|
return
|
||||||
|
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
writer.write(
|
writer.write(
|
||||||
str(len(block.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))
|
str(len(block.raw)).zfill(BLOCK_MAX_SIZE_LEN).encode('utf-8'))
|
||||||
|
Loading…
Reference in New Issue
Block a user