Onionr/static-data/official-plugins/unixtransport/unixpeer.py

34 lines
863 B
Python
Raw Permalink Normal View History

2022-06-26 19:43:16 +00:00
from os.path import exists
from socket import AF_UNIX, SOCK_STREAM, socket
from gossip.peerset import gossip_peer_set
2022-06-26 19:43:16 +00:00
class UnixPeer:
def __init__(self, socket_file):
if not exists(socket_file):
raise FileExistsError("No such file " + socket_file)
self.transport_address = socket_file
def get_socket(self, connect_timeout) -> socket:
s = socket(AF_UNIX, SOCK_STREAM)
#s.settimeout(connect_timeout)
try:
s.connect(self.transport_address)
except FileNotFoundError:
gossip_peer_set.remove(self)
else:
return s
2022-06-26 19:43:16 +00:00
def __hash__(self):
return hash(self.transport_address)
def __eq__(self, other):
try:
return self.transport_address == other.transport_address
except AttributeError:
return False