79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
|
import unittest
|
||
|
import socket
|
||
|
from threading import Thread
|
||
|
import time
|
||
|
import stem
|
||
|
import stem.process
|
||
|
from stem.control import Controller
|
||
|
from youandme import server
|
||
|
|
||
|
control_port = str(1353)
|
||
|
socks_port = str(1354)
|
||
|
|
||
|
stem.process.launch_tor_with_config(
|
||
|
config = {
|
||
|
'ControlPort': control_port,
|
||
|
'SocksPort': socks_port,
|
||
|
'Log': [
|
||
|
'NOTICE stdout'
|
||
|
],
|
||
|
}, take_ownership=True)
|
||
|
|
||
|
def send_test_data(ip, port):
|
||
|
time.sleep(2)
|
||
|
s = socket.socket()
|
||
|
s.connect(('127.0.0.1', port))
|
||
|
while True:
|
||
|
s.send(b"test")
|
||
|
for c in b"test2":
|
||
|
try:
|
||
|
s.send(chr(c).encode('utf8'))
|
||
|
except TypeError:
|
||
|
print(c)
|
||
|
s.close()
|
||
|
|
||
|
class TestServer(unittest.TestCase):
|
||
|
|
||
|
def test_server(self):
|
||
|
send_data = bytearray()
|
||
|
recv_data = bytearray()
|
||
|
with Controller.from_port(port=int(control_port)) as controller:
|
||
|
controller.authenticate()
|
||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
|
ip = '127.0.0.1'
|
||
|
s.bind((ip, 0))
|
||
|
s.listen(1)
|
||
|
port = s.getsockname()[1]
|
||
|
print(port)
|
||
|
serv = controller.create_ephemeral_hidden_service(
|
||
|
{1337: f'{ip}:{port}'},
|
||
|
key_content='ED25519-V3',
|
||
|
await_publication=True,
|
||
|
)
|
||
|
Thread(target=send_test_data, args=[ip, port], daemon=True).start()
|
||
|
conn, addr = s.accept()
|
||
|
Thread(target=server, args=[0.1, controller, conn, send_data, recv_data], daemon=True).start()
|
||
|
time.sleep(1)
|
||
|
max_iters = 10000000
|
||
|
c = 0
|
||
|
tested = False
|
||
|
filler_rec = False
|
||
|
while True:
|
||
|
c += 1
|
||
|
if c >= max_iters:
|
||
|
break
|
||
|
try:
|
||
|
char = chr(recv_data.pop(0)).encode('utf8')
|
||
|
if char != b'\n':
|
||
|
tested = True
|
||
|
self.assertIn(char, b"testtest2")
|
||
|
except IndexError:
|
||
|
pass
|
||
|
else:
|
||
|
pass
|
||
|
if send_data:
|
||
|
print(send_data)
|
||
|
if not tested:
|
||
|
raise ValueError('not tested')
|
||
|
|
||
|
unittest.main()
|