diff --git a/python/pyoprf/multiplexer.py b/python/pyoprf/multiplexer.py index 5449b8a..ec2a2d1 100755 --- a/python/pyoprf/multiplexer.py +++ b/python/pyoprf/multiplexer.py @@ -16,6 +16,14 @@ def split_by_n(iterable, n): return list(zip_longest(*[iter(iterable)]*n)) +def get_event_loop(): + try: + return asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop + class Peer: def __init__(self, name, addr, type = "SSL", ssl_cert=None, timeout=5, alpn_proto=None): self.name = name @@ -78,7 +86,7 @@ async def read_async(self,size): return b''.join(res) def read(self, *args, **kwargs): - return asyncio.get_event_loop().run_until_complete(self.read_async(*args, **kwargs)) + return get_event_loop().run_until_complete(self.read_async(*args, **kwargs)) def send(self, msg): if not self.connected(): @@ -178,8 +186,8 @@ async def _disconnect(self): self.state == "disconnected" def connect(self): - #asyncio.get_event_loop().set_debug(True) - asyncio.get_event_loop().run_until_complete(self._connect()) + #get_event_loop().set_debug(True) + get_event_loop().run_until_complete(self._connect()) while not self.connected(): time.sleep(0.001) def connected(self): @@ -189,7 +197,7 @@ def read(self,size): if not self.connected(): return None #raise ValueError(f"{self.name} cannot read, is not connected") - ct = asyncio.get_event_loop().run_until_complete(self.read_raw(size+16)) + ct = get_event_loop().run_until_complete(self.read_raw(size+16)) return noisexk.read_msg(self.session, ct) async def read_async(self, size): @@ -205,14 +213,14 @@ def send(self, msg): #raise ValueError(f"{self.name} cannot write, is not connected") ct = noisexk.send_msg(self.session, msg) header = struct.pack(">H",len(ct)) - asyncio.get_event_loop().run_until_complete(self._send(header+ct)) + get_event_loop().run_until_complete(self._send(header+ct)) def close(self): if self.state == "closed": return if not self.connected(): return #raise ValueError(f"{self.name} cannot close, is not connected") - asyncio.get_event_loop().run_until_complete(self._disconnect()) + get_event_loop().run_until_complete(self._disconnect()) class Serial(asyncio.Protocol): def __init__(self, *args, **kwargs): @@ -240,7 +248,7 @@ def data_received(self, data): def connection_lost(self, exc): print('port closed', file=sys.stderr) self.rx_available.set() - #asyncio.get_event_loop().stop() + #get_event_loop().stop() async def read_raw(self,size): #print(f"read_raw({size})",file=sys.stderr) @@ -312,7 +320,7 @@ async def _connect(self): def connect(self): self.path = self.find_usb_port() #print(f"connecting to {self.path}",file=sys.stderr) - loop = asyncio.get_event_loop() + loop = get_event_loop() #loop.set_debug(True) coro = create_serial_connection(loop, Serial, self.path, baudrate=115200) self.transport, self.protocol = loop.run_until_complete(coro) @@ -334,7 +342,7 @@ async def read_async(self, size): return noisexk.read_msg(self.session, ct) def read(self, *args, **kwargs): - return asyncio.get_event_loop().run_until_complete(self.read_async(*args, **kwargs)) + return get_event_loop().run_until_complete(self.read_async(*args, **kwargs)) def send(self, msg): if not self.connected(): @@ -436,7 +444,7 @@ async def gather_async(self, expected_msg_len, n=None, proc=None): return results def gather(self, *args, **kwargs): - return asyncio.get_event_loop().run_until_complete(self.gather_async(*args, **kwargs)) + return get_event_loop().run_until_complete(self.gather_async(*args, **kwargs)) def close(self): for p in self.peers: