Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions python/pyoprf/multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
def split_by_n(iterable, n):
return list(zip_longest(*[iter(iterable)]*n))

def get_event_loop():
try:
return asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop

class Peer:
def __init__(self, name, addr, type = "SSL", ssl_cert=None, timeout=5, alpn_proto=None):
self.name = name
Expand Down Expand Up @@ -78,7 +86,7 @@ async def read_async(self,size):
return b''.join(res)

def read(self, *args, **kwargs):
return asyncio.get_event_loop().run_until_complete(self.read_async(*args, **kwargs))
return get_event_loop().run_until_complete(self.read_async(*args, **kwargs))

def send(self, msg):
if not self.connected():
Expand Down Expand Up @@ -178,8 +186,8 @@ async def _disconnect(self):
self.state == "disconnected"

def connect(self):
#asyncio.get_event_loop().set_debug(True)
asyncio.get_event_loop().run_until_complete(self._connect())
#get_event_loop().set_debug(True)
get_event_loop().run_until_complete(self._connect())
while not self.connected(): time.sleep(0.001)

def connected(self):
Expand All @@ -189,7 +197,7 @@ def read(self,size):
if not self.connected():
return None
#raise ValueError(f"{self.name} cannot read, is not connected")
ct = asyncio.get_event_loop().run_until_complete(self.read_raw(size+16))
ct = get_event_loop().run_until_complete(self.read_raw(size+16))
return noisexk.read_msg(self.session, ct)

async def read_async(self, size):
Expand All @@ -205,14 +213,14 @@ def send(self, msg):
#raise ValueError(f"{self.name} cannot write, is not connected")
ct = noisexk.send_msg(self.session, msg)
header = struct.pack(">H",len(ct))
asyncio.get_event_loop().run_until_complete(self._send(header+ct))
get_event_loop().run_until_complete(self._send(header+ct))

def close(self):
if self.state == "closed": return
if not self.connected():
return
#raise ValueError(f"{self.name} cannot close, is not connected")
asyncio.get_event_loop().run_until_complete(self._disconnect())
get_event_loop().run_until_complete(self._disconnect())

class Serial(asyncio.Protocol):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -240,7 +248,7 @@ def data_received(self, data):
def connection_lost(self, exc):
print('port closed', file=sys.stderr)
self.rx_available.set()
#asyncio.get_event_loop().stop()
#get_event_loop().stop()

async def read_raw(self,size):
#print(f"read_raw({size})",file=sys.stderr)
Expand Down Expand Up @@ -312,7 +320,7 @@ async def _connect(self):
def connect(self):
self.path = self.find_usb_port()
#print(f"connecting to {self.path}",file=sys.stderr)
loop = asyncio.get_event_loop()
loop = get_event_loop()
#loop.set_debug(True)
coro = create_serial_connection(loop, Serial, self.path, baudrate=115200)
self.transport, self.protocol = loop.run_until_complete(coro)
Expand All @@ -334,7 +342,7 @@ async def read_async(self, size):
return noisexk.read_msg(self.session, ct)

def read(self, *args, **kwargs):
return asyncio.get_event_loop().run_until_complete(self.read_async(*args, **kwargs))
return get_event_loop().run_until_complete(self.read_async(*args, **kwargs))

def send(self, msg):
if not self.connected():
Expand Down Expand Up @@ -436,7 +444,7 @@ async def gather_async(self, expected_msg_len, n=None, proc=None):
return results

def gather(self, *args, **kwargs):
return asyncio.get_event_loop().run_until_complete(self.gather_async(*args, **kwargs))
return get_event_loop().run_until_complete(self.gather_async(*args, **kwargs))

def close(self):
for p in self.peers:
Expand Down
Loading