Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 77 additions & 77 deletions libgstc/python/pygstc/gstc.py

Large diffs are not rendered by default.

100 changes: 29 additions & 71 deletions libgstc/python/pygstc/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
# OF THE POSSIBILITY OF SUCH DAMAGE.

import json
import select
import asyncio
import socket
from contextlib import asynccontextmanager

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature has a minimum python version requirement of 3.7. I can probably rework this to not require it but it makes socket cleanup a bit cleaner IMO.


"""
GstClient - Ipc Class
Expand All @@ -54,7 +54,7 @@ def __init__(
ip,
port,
maxsize=None,
terminator='\x00'.encode('utf-8'),
terminator=b'\x00',
):
"""
Initialize new Ipc
Expand All @@ -81,7 +81,23 @@ def __init__(
self._maxsize = maxsize
self._terminator = terminator

def send(self, line, timeout=None):
@asynccontextmanager
async def gstd_conn(self):
kwargs = {
'host': self._ip,
'port': self._port
}
if self._maxsize is not None:
kwargs['limit'] = self._maxsize
reader, writer = await asyncio.open_connection(**kwargs)
try:
yield reader, writer
finally:
if not writer.is_closing():
writer.close()
await writer.wait_closed()

async def send(self, line, timeout=None):
"""
Create a socket and sends a message through it

Expand All @@ -103,87 +119,29 @@ def send(self, line, timeout=None):
data : string
Decoded JSON string with the response
"""
data = None
self._logger.debug('GSTD socket sending line: {}'.format(line))
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.connect((self._ip, self._port))
s.sendall(' '.join(line).encode('utf-8'))
data = self._recvall(s, timeout)
if not data:
raise socket.error("Socket read error happened")
data = data.decode('utf-8')
s.close()
return data

async with self.gstd_conn() as (reader, writer):
writer.write(' '.join(line).encode('utf-8'))
await writer.drain()
fut = reader.readuntil(separator=self._terminator)
data = await asyncio.wait_for(fut, timeout=timeout)
if not data:
raise socket.error("Socket read error happened")
data = data[:-1].decode('utf-8')
return data
except BufferError as e:
s.close()
error_msg = 'Server response too long'
self._logger.error(error_msg)
raise BufferError(error_msg)\
from e
except TimeoutError as e:
s.close()
error_msg = 'Server took too long to respond'
self._logger.error(error_msg)
raise TimeoutError(error_msg)\
from e
except socket.error as e:
s.close()
error_msg = 'Server did not respond. Is it up?'
self._logger.error(error_msg)
raise ConnectionRefusedError(error_msg)\
from e

def _recvall(self, sock, timeout):
"""
Wait for a response message from the socket

Parameters
----------
sock : string
The socket to poll
timeout : float
Timeout in seconds to wait for a response. 0: non-blocking, None: blocking

Raises
------
socket.error
Error is triggered when Gstd IPC fails
BufferError
When the incoming buffer is too big.

Returns
-------
buf : string
Raw socket response
"""
buf = b''
newbuf = ''
try:
sock.settimeout(timeout)
except socket.error as e:
raise TimeoutError from e

while True:
if (self._maxsize and self._maxsize > len(newbuf)):
raise BufferError

try:
newbuf = sock.recv(self._socket_read_size)
# Raise an exception timeout
except socket.error as e:
raise TimeoutError from e

# When a connection dies, the socket does not close properly and it
# returns immediately with an empty string. So, check that first.
if len(newbuf) == 0:
break

if self._terminator in newbuf:
buf += newbuf[:newbuf.find(self._terminator)]
break
else:
buf += newbuf
return buf
12 changes: 8 additions & 4 deletions libgstc/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@
'tests']),
scripts=[],
classifiers=['Development Status :: 3 - Alpha',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7'],
python_requires='>=3.5',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13'],
python_requires='>=3.7',
install_requires=[],
command_options={},
extras_require={},
Expand Down
38 changes: 22 additions & 16 deletions tests/libgstc/python/gstd_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
# OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import pathlib
import socket
import subprocess
import unittest


DEFAULT_TEAR_DOWN_TIMEOUT = 1

class GstdTestRunner(unittest.TestCase):
class GstdTestRunner(unittest.IsolatedAsyncioTestCase):

def get_open_port(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -48,24 +48,30 @@ def get_open_port(self):
s.close()
return port

def setUp(self):
async def asyncSetUp(self):
self.port = self.get_open_port()
self.gstd_path = (pathlib.Path(__file__).parent.parent.parent.parent
.joinpath('gstd').joinpath('gstd').resolve())
self.gstd = subprocess.Popen([self.gstd_path, '-p', str(self.port)])
connected = -1
while connected != 0:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connected = sock.connect_ex(("", self.port))
sock.close()
self.gstd = await asyncio.create_subprocess_exec(self.gstd_path, '-p', str(self.port))
asyncio.get_event_loop().call_later(5, self.gstd.kill)
connected = False
while not connected:
try:
reader, writer = await asyncio.open_connection(port=self.port)
writer.close()
await writer.wait_closed()
connected = True
except OSError:
pass

def tearDown(self):
self.gstd.terminate()
try:
self.gstd.wait(DEFAULT_TEAR_DOWN_TIMEOUT)
except subprocess.TimeoutExpired:
self.gstd.kill()
self.gstd.wait()
async def asyncTearDown(self):
if self.gstd.returncode is None:
self.gstd.terminate()
try:
await asyncio.wait_for(self.gstd.wait(), timeout=DEFAULT_TEAR_DOWN_TIMEOUT)
except asyncio.TimeoutError:
self.gstd.kill()
await self.gstd.wait()


if __name__ == '__main__':
Expand Down
16 changes: 8 additions & 8 deletions tests/libgstc/python/test_libgstc_python_bus_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@

class TestGstcBusFilterMethods(GstdTestRunner):

def test_bus_filter_eos(self):
async def test_bus_filter_eos(self):
pipeline = 'videotestsrc name=v0 ! fakesink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.pipeline_create('p0', pipeline)
self.gstd_client.pipeline_play('p0')
self.gstd_client.event_eos('p0')
self.gstd_client.bus_filter('p0', 'eos')
ret = self.gstd_client.bus_read('p0')
await self.gstd_client.pipeline_create('p0', pipeline)
await self.gstd_client.pipeline_play('p0')
await self.gstd_client.event_eos('p0')
await self.gstd_client.bus_filter('p0', 'eos')
ret = await self.gstd_client.bus_read('p0')
self.assertEqual(ret['type'], 'eos')
self.gstd_client.pipeline_stop('p0')
self.gstd_client.pipeline_delete('p0')
await self.gstd_client.pipeline_stop('p0')
await self.gstd_client.pipeline_delete('p0')


if __name__ == '__main__':
Expand Down
32 changes: 16 additions & 16 deletions tests/libgstc/python/test_libgstc_python_bus_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,32 @@

class TestGstcBusTimeoutMethods(GstdTestRunner):

def test_bus_timeout_eos(self):
async def test_bus_timeout_eos(self):
pipeline = 'videotestsrc name=v0 ! fakesink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.pipeline_create('p0', pipeline)
self.gstd_client.pipeline_play('p0')
self.gstd_client.event_eos('p0')
self.gstd_client.bus_filter('p0', 'eos')
self.gstd_client.bus_timeout('p0', 1000)
ret = self.gstd_client.bus_read('p0')
await self.gstd_client.pipeline_create('p0', pipeline)
await self.gstd_client.pipeline_play('p0')
await self.gstd_client.event_eos('p0')
await self.gstd_client.bus_filter('p0', 'eos')
await self.gstd_client.bus_timeout('p0', 1000)
ret = await self.gstd_client.bus_read('p0')
if ret:
self.assertEqual(ret['type'], 'eos')
self.gstd_client.pipeline_stop('p0')
self.gstd_client.pipeline_delete('p0')
await self.gstd_client.pipeline_stop('p0')
await self.gstd_client.pipeline_delete('p0')

def test_bus_timeout_no_response(self):
async def test_bus_timeout_no_response(self):
pipeline = 'videotestsrc name=v0 ! fakesink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.pipeline_create('p0', pipeline)
self.gstd_client.pipeline_play('p0')
self.gstd_client.bus_timeout('p0', 1000)
ret = self.gstd_client.bus_read('p0')
await self.gstd_client.pipeline_create('p0', pipeline)
await self.gstd_client.pipeline_play('p0')
await self.gstd_client.bus_timeout('p0', 1000)
ret = await self.gstd_client.bus_read('p0')
self.assertEqual(ret, None)
self.gstd_client.pipeline_stop('p0')
self.gstd_client.pipeline_delete('p0')
await self.gstd_client.pipeline_stop('p0')
await self.gstd_client.pipeline_delete('p0')


if __name__ == '__main__':
Expand Down
14 changes: 7 additions & 7 deletions tests/libgstc/python/test_libgstc_python_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,24 @@

class TestGstcCreateMethods(GstdTestRunner):

def test_create_pipeline(self):
async def test_create_pipeline(self):
pipeline = 'videotestsrc name=v0 ! fakesink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
ret = self.gstd_client.read('pipelines')
ret = await self.gstd_client.read('pipelines')
initial_n_pipes = len(ret['nodes'])
self.gstd_client.create('pipelines', 'p0', pipeline)
ret = self.gstd_client.read('pipelines')
await self.gstd_client.create('pipelines', 'p0', pipeline)
ret = await self.gstd_client.read('pipelines')
final_n_pipes = len(ret['nodes'])
self.assertEqual(final_n_pipes, initial_n_pipes + 1)
self.gstd_client.pipeline_delete('p0')
await self.gstd_client.pipeline_delete('p0')

def test_create_bad_pipeline(self):
async def test_create_bad_pipeline(self):
pipeline = 'source sink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
with self.assertRaises(GstdError):
self.gstd_client.create('pipelines', 'p0', pipeline)
await self.gstd_client.create('pipelines', 'p0', pipeline)


if __name__ == '__main__':
Expand Down
8 changes: 4 additions & 4 deletions tests/libgstc/python/test_libgstc_python_debug_color.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@

class TestGstcDebugColorMethods(GstdTestRunner):

def test_debug_color_true(self):
async def test_debug_color_true(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_color(True)
await self.gstd_client.debug_color(True)

def test_debug_color_false(self):
async def test_debug_color_false(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_color(False)
await self.gstd_client.debug_color(False)


if __name__ == '__main__':
Expand Down
8 changes: 4 additions & 4 deletions tests/libgstc/python/test_libgstc_python_debug_enable.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@

class TestGstcDebugEnableMethods(GstdTestRunner):

def test_debug_enable_true(self):
async def test_debug_enable_true(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_enable(True)
await self.gstd_client.debug_enable(True)

def test_debug_enable_false(self):
async def test_debug_enable_false(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_enable(False)
await self.gstd_client.debug_enable(False)


if __name__ == '__main__':
Expand Down
8 changes: 4 additions & 4 deletions tests/libgstc/python/test_libgstc_python_debug_reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@

class TestGstcDebugResetMethods(GstdTestRunner):

def test_debug_reset_true(self):
async def test_debug_reset_true(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_reset(True)
await self.gstd_client.debug_reset(True)

def test_debug_reset_false(self):
async def test_debug_reset_false(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_reset(False)
await self.gstd_client.debug_reset(False)


if __name__ == '__main__':
Expand Down
Loading