Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pip install eric-sse
*Features*

* Send to one listener and broadcast
* SSE format was adopted by design, making the library suitable for such kind of model
* Callbacks and threading support
* Support to SSE and concurrency batch process implementation
* Sockets server prefab for offline inter process communication

*Possible applications*
Expand Down
25 changes: 24 additions & 1 deletion eric_sse/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import eric_sse
from eric_sse.listener import MessageQueueListener
from eric_sse.message import MessageContract
from eric_sse.queues import Queue, InMemoryQueue

from eric_sse.handlers import QueuingErrorHandler

class Connection:
"""
Expand All @@ -16,6 +17,8 @@ def __init__(self, listener: MessageQueueListener, queue: Queue, connection_id:
self.__listener = listener
self.__queue = queue
self.__id = connection_id or eric_sse.generate_uuid()
self.__queues_error_handlers: list[QueuingErrorHandler] = []


@property
def listener(self) -> MessageQueueListener:
Expand All @@ -29,6 +32,26 @@ def queue(self) -> Queue:
def id(self) -> str:
return self.__id

def send_message(self, msg: MessageContract):
try:
self.__queue.push(msg)
except Exception as e:
for handler in self.__queues_error_handlers:
handler.handle_push_error(msg=msg, exception=e)
raise


def fetch_message(self) -> MessageContract:
try:
return self.__queue.pop()
except Exception as e:
for handler in self.__queues_error_handlers:
handler.handle_pop_error(exception=e)
raise e

def register_queuing_error_handler(self, handler: QueuingErrorHandler):
self.__queues_error_handlers.append(handler)


class ConnectionsFactory(ABC):
@abstractmethod
Expand Down
66 changes: 33 additions & 33 deletions eric_sse/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,39 @@
from eric_sse.listener import MessageQueueListener
from eric_sse.connection import Connection, ConnectionsFactory, InMemoryConnectionsFactory
from eric_sse.message import MessageContract, Message
from eric_sse.queues import Queue
from eric_sse.handlers import ListenerErrorHandler

logger = eric_sse.get_logger()

MESSAGE_TYPE_CLOSED = '_eric_channel_closed'
MESSAGE_TYPE_END_OF_STREAM = '_eric_channel_eof'
MESSAGE_TYPE_INTERNAL_ERROR = '_eric_error'


class _ConnectionManager:
"""Maintains relationships between listeners and queues"""
"""Maintains relationships between listeners and connections."""
def __init__(self, channel_id: str):
self.__channel_id = channel_id
self.__listeners: dict[str, MessageQueueListener] = {}
self.__queues: dict[str, Queue] = {}
self.__connections: dict[str, Connection] = {}

def register_connection(self, connection: Connection):
self.__connections[connection.listener.id] = connection
self.__queues[connection.listener.id] = connection.queue
self.__listeners[connection.listener.id] = connection.listener

def remove_listener(self, listener_id: str):
try:
del self.__connections[listener_id]
del self.__queues[listener_id]
del self.__listeners[listener_id]
except KeyError:
raise InvalidListenerException(listener_id) from None

def get_queue(self, listener_id: str) -> Queue:
def get_listener(self, listener_id: str) -> MessageQueueListener:
try:
return self.__queues[listener_id]
return self.__listeners[listener_id]
except KeyError:
raise InvalidListenerException(f"Invalid listener {listener_id}") from None
raise InvalidListenerException(listener_id) from None

def get_listener(self, listener_id: str) -> MessageQueueListener:
def get_connection(self, listener_id: str) -> Connection:
try:
return self.__listeners[listener_id]
return self.__connections[listener_id]
except KeyError:
raise InvalidListenerException
raise InvalidListenerException(listener_id) from None

def get_listeners(self) -> dict[str, MessageQueueListener]:
"""Returns a dict mapping listener ids to listeners"""
Expand Down Expand Up @@ -82,6 +74,9 @@ def __init__(
self.__connection_manager: _ConnectionManager = _ConnectionManager(self.__id)
self.__connections_factory = connections_factory if connections_factory else InMemoryConnectionsFactory()

self.__listeners_error_handlers: list[ListenerErrorHandler] = []



@property
def id(self) -> str:
Expand All @@ -100,10 +95,9 @@ async def message_stream(self, listener: MessageQueueListener) -> AsyncIterable[

A message with type = 'error' is yield on invalid listener
"""
try:
self.__connection_manager.get_listener(listener.id)
except InvalidListenerException:
raise

# check that listener was registered
_ = self.__connection_manager.get_listener(listener.id)

Comment thread
laxertu marked this conversation as resolved.
async def new_messages():
try:
Expand Down Expand Up @@ -149,12 +143,15 @@ def register_listener(self, listener: MessageQueueListener):

def register_connection(self, connection: Connection):
"""
Register and existing connection.
Register an existing connection.

**Warning**: Listener and queue should belong to the same classes returned by connection factory to avoid compatibility issues with persistence layer
"""
self.__connection_manager.register_connection(connection)

def register_listener_error_handler(self, handler: ListenerErrorHandler):
self.__listeners_error_handlers.append(handler)

def remove_listener(self, listener_id: str):
self.__connection_manager.remove_listener(listener_id)

Expand All @@ -166,21 +163,29 @@ def deliver_next(self, listener_id: str) -> MessageContract:
"""
listener = self.get_listener(listener_id)
if listener.is_running():
queue = self.__connection_manager.get_queue(listener.id)
msg = queue.pop()
listener.on_message(msg)
msg = self._get_connection(listener.id).fetch_message()
try:
listener.on_message(msg)
except Exception as e:
for handler in self.__listeners_error_handlers:
handler.handle_on_message_error(msg=msg, exception=e)
raise
return msg

raise NoMessagesException

def _get_queue(self, listener_id: str) -> Queue:
return self.__connection_manager.get_queue(listener_id)
def _get_connection(self, listener_id: str) -> Connection:
return self.__connection_manager.get_connection(listener_id)

def dispatch(self, listener_id: str, msg: MessageContract):
"""Adds a message to listener's queue"""

queue = self._get_queue(listener_id)
queue.push(msg)
try:
self._get_connection(listener_id).send_message(msg)
except Exception:
logger.exception("Failed to dispatch message to listener_id=%s", listener_id)
raise

logger.debug(f"Dispatched {msg} to {listener_id}")

def broadcast(self, msg: MessageContract):
Expand All @@ -194,8 +199,3 @@ def get_listener(self, listener_id: str) -> MessageQueueListener:
def get_connections(self) -> Iterable[Connection]:
return self.__connection_manager.get_connections()

async def watch(self) -> AsyncIterable[Any]:
listener = self.add_listener()
listener.start()
return self.message_stream(listener)

17 changes: 17 additions & 0 deletions eric_sse/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
from eric_sse.message import MessageContract

from eric_sse import get_logger
logger = get_logger()

class QueuingErrorHandler:

def handle_push_error(self, msg: MessageContract, exception: Exception):
pass
def handle_pop_error(self, exception: Exception):
pass

class ListenerErrorHandler(ABC):
@abstractmethod
def handle_on_message_error(self, msg: MessageContract, exception: Exception):
pass
16 changes: 8 additions & 8 deletions eric_sse/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ def delete(self, connection_id: str):


class ConnectionRepositoryInterface(ABC):
@property
@abstractmethod
def connections_factory(self) -> ConnectionsFactory:
pass

@property
@abstractmethod
def queues_repository(self) -> QueueRepositoryInterface:
Expand All @@ -58,7 +63,7 @@ def load_all(self, channel_id: str) -> Iterable[Connection]:
pass

@abstractmethod
def load_one(self, channel_id: str, connection_id: str) -> Connection:
def load_one(self, connection_id: str) -> Connection:
"""Loads a connection given the connection and channel id it belongs to."""
pass

Expand All @@ -68,18 +73,13 @@ def persist(self, channel_id: str, connection: Connection):
pass

@abstractmethod
def delete(self, channel_id: str, connection_id: str):
"""Deletes a connection given the connection and channel id it belongs to."""
def delete(self, connection_id: str):
"""Deletes a connection given its id."""
pass



class ChannelRepositoryInterface(ABC):
@property
@abstractmethod
def connections_factory(self) -> ConnectionsFactory:
"""The connections factory that will be injected into concrete channel instances."""
pass

@property
@abstractmethod
Expand Down
18 changes: 18 additions & 0 deletions eric_sse/patterns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from eric_sse.handlers import QueuingErrorHandler
from eric_sse.message import MessageContract
from eric_sse.queues import Queue
from eric_sse import get_logger

logger = get_logger()

class DeadLetterQueueHandler(QueuingErrorHandler):
def __init__(self, queue: Queue):
self.__queue = queue

def handle_push_error(self, msg: MessageContract, exception: Exception):
try:
self.__queue.push(msg)
except Exception as e:
logger.exception(f"Dead-letter push failed. msg type: {msg.type} payload {msg.payload} {repr(e)}")


4 changes: 2 additions & 2 deletions eric_sse/prefabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def process_queue(self, listener: MessageQueueListener) -> AsyncIterable[d
loop = asyncio.get_running_loop()
while there_are_pending_messages:
try:
msg = self._get_queue(listener_id=listener.id).pop()
msg = self._get_connection(listener_id=listener.id).fetch_message()
tasks.append(loop.run_in_executor(e, DataProcessingChannel._invoke_callback_and_return, listener.on_message, msg))

except NoMessagesException:
Expand Down Expand Up @@ -177,7 +177,7 @@ def create(self, channel_data: dict) -> SSEChannel:
"""
:param dict channel_data: Fill it with SSEChannel constructor arguments, except for connections_factory that wil be injected by repository
"""
return SSEChannel(**channel_data, connections_factory=self.connections_factory)
return SSEChannel(**channel_data, connections_factory=self.connections_repository.connections_factory)

@staticmethod
def _channel_to_dict(channel: SSEChannel) -> dict:
Expand Down
Loading