From a053821f86ed9b5a30edb0de29e90553ad5fc526 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Wed, 11 Jun 2025 23:20:17 +0100 Subject: [PATCH 01/10] Implement a Redis Delayed Queue to support sorted sets --- bin/mark_as_ready | 82 +++++++++++++++++++++++++++++++++ examples/book/benchmark.py | 9 +++- pgsync/base.py | 14 +++++- pgsync/redisqueue.py | 92 +++++++++++++++++++++++++++----------- pgsync/sync.py | 78 +++++++++++++++++++++----------- pgsync/trigger.py | 7 ++- 6 files changed, 228 insertions(+), 54 deletions(-) create mode 100755 bin/mark_as_ready diff --git a/bin/mark_as_ready b/bin/mark_as_ready new file mode 100755 index 00000000..0b378497 --- /dev/null +++ b/bin/mark_as_ready @@ -0,0 +1,82 @@ +#!/usr/bin/env python + +"""PGSync Mark As Ready.""" +import logging + +import click + +from pgsync.sync import Sync +from pgsync.utils import config_loader, get_config, show_settings + +logger = logging.getLogger(__name__) + + +@click.command() +@click.option( + "--config", + "-c", + help="Schema config", + type=click.Path(exists=True), +) +@click.option("--host", "-h", help="PG_HOST override") +@click.option("--password", is_flag=True, help="Prompt for database password") +@click.option("--port", "-p", help="PG_PORT override", type=int) +@click.option( + "--teardown", + "-t", + is_flag=True, + help="Teardown database triggers and replication slots", +) +@click.option("--user", "-u", help="PG_USER override") +@click.option( + "--verbose", + "-v", + is_flag=True, + default=False, + help="Turn on verbosity", +) +def main( + teardown: bool, + config: str, + user: str, + password: bool, + host: str, + port: int, + verbose: bool, +) -> None: + kwargs: dict = { + "user": user, + "host": host, + "port": port, + } + if password: + kwargs["password"] = click.prompt( + "Password", + type=str, + hide_input=True, + ) + kwargs = {key: value for key, value in kwargs.items() if value is not None} + + config: str = get_config(config) + + show_settings(config) + + validate: bool = False if teardown else True + + for doc in config_loader(config): + sync: Sync = Sync( + doc, + verbose=verbose, + validate=validate, + repl_slots=False, + **kwargs, + ) + if teardown: + sync.teardown() + continue + sync.redis.mark_all_ready() + logger.info("Marked all items as ready in Redis") + + +if __name__ == "__main__": + main() diff --git a/examples/book/benchmark.py b/examples/book/benchmark.py index f804d780..835fcb4b 100644 --- a/examples/book/benchmark.py +++ b/examples/book/benchmark.py @@ -134,7 +134,11 @@ def truncate_op(session: sessionmaker, model, nsize: int) -> None: case_sensitive=False, ), ) -def main(config: str, nsize: int, daemon: bool, tg_op: str): +@click.option("--delayed", is_flag=True, help="Delay persist to Redis") +def main( + config: str, nsize: int, daemon: bool, tg_op: str, delayed: bool +) -> None: + """Benchmarking script for Book model operations.""" show_settings(config) config: str = get_config(config) @@ -144,6 +148,9 @@ def main(config: str, nsize: int, daemon: bool, tg_op: str): Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) session = Session() + if delayed: + session.execute(sa.text("SET pgsync.delayed = True")) + model = Book func: dict = { INSERT: insert_op, diff --git a/pgsync/base.py b/pgsync/base.py index 2eb27788..f0b342e1 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -73,9 +73,19 @@ class Payload(object): new (dict): The new values of the row that was affected by the event (for INSERT and UPDATE operations). xmin (int): The transaction ID of the event. indices (List[str]): The indices of the affected rows (for UPDATE and DELETE operations). + delayed (bool): Whether the event was delayed or not. """ - __slots__ = ("tg_op", "table", "schema", "old", "new", "xmin", "indices") + __slots__ = ( + "tg_op", + "table", + "schema", + "old", + "new", + "xmin", + "indices", + "delayed", + ) def __init__( self, @@ -86,6 +96,7 @@ def __init__( new: t.Optional[t.Dict[str, t.Any]] = None, xmin: t.Optional[int] = None, indices: t.Optional[t.List[str]] = None, + delayed: bool = False, ): self.tg_op: t.Optional[str] = tg_op self.table: t.Optional[str] = table @@ -94,6 +105,7 @@ def __init__( self.new: t.Dict[str, t.Any] = new or {} self.xmin: t.Optional[int] = xmin self.indices: t.List[str] = indices + self.delayed: bool = delayed @property def data(self) -> dict: diff --git a/pgsync/redisqueue.py b/pgsync/redisqueue.py index 6948d238..5660eea6 100644 --- a/pgsync/redisqueue.py +++ b/pgsync/redisqueue.py @@ -2,6 +2,7 @@ import json import logging +import time import typing as t from redis import Redis @@ -13,18 +14,20 @@ logger = logging.getLogger(__name__) -class RedisQueue(object): - """Simple Queue with Redis Backend.""" +# sentinel 300 years in ms +_FAR_FUTURE = int((time.time() + 300 * 365 * 24 * 3600) * 1_000) + + +class RedisQueue: + """A Redis‐backed queue where items become poppable only once ready is True.""" def __init__(self, name: str, namespace: str = "queue", **kwargs): - """Init Simple Queue with Redis Backend.""" url: str = get_redis_url(**kwargs) self.key: str = f"{namespace}:{name}" self._meta_key: str = f"{self.key}:meta" try: self.__db: Redis = Redis.from_url( - url, - socket_timeout=REDIS_SOCKET_TIMEOUT, + url, socket_timeout=REDIS_SOCKET_TIMEOUT ) self.__db.ping() except ConnectionError as e: @@ -33,34 +36,71 @@ def __init__(self, name: str, namespace: str = "queue", **kwargs): @property def qsize(self) -> int: - """Return the approximate size of the queue.""" - return self.__db.llen(self.key) - - def pop(self, chunk_size: t.Optional[int] = None) -> t.List[dict]: - """Remove and return multiple items from the queue.""" - chunk_size = chunk_size or REDIS_READ_CHUNK_SIZE - if self.qsize > 0: - pipeline = self.__db.pipeline() - pipeline.lrange(self.key, 0, chunk_size - 1) - pipeline.ltrim(self.key, chunk_size, -1) - items: t.List = pipeline.execute() - logger.debug(f"pop size: {len(items[0])}") - return list(map(lambda value: json.loads(value), items[0])) - - def push(self, items: t.List) -> None: - """Push multiple items onto the queue.""" - self.__db.rpush(self.key, *map(json.dumps, items)) + """Number of items currently in the ZSET (regardless of ready/not).""" + return self.__db.zcard(self.key) + + def push(self, items: t.List[dict], ready: bool = True) -> None: + """ + Push a batch of items. + If ready=True score = now (ms), so pop_ready() can retrieve immediately. + If ready=False score = FAR_FUTURE, so pop_ready() ignores it until mark_ready(). + """ + now_ms: int = int(time.time() * 1_000) + score: int = now_ms if ready else _FAR_FUTURE + mapping: dict = {json.dumps(item): score for item in items} + self.__db.zadd(self.key, mapping) + + def pop(self, chunk_size: int = REDIS_READ_CHUNK_SIZE) -> t.List[dict]: + """ + Atomically pull up to chunk_size items whose score ≤ now_ms. + These are the ready items. + """ + now_ms: int = int(time.time() * 1_000) + # fetch members ready to run + values = self.__db.zrangebyscore( + self.key, 0, now_ms, start=0, num=chunk_size + ) + if not values: + return [] + # remove them in one pipeline + pipeline = self.__db.pipeline() + pipeline.zrem(self.key, *values) + pipeline.execute() + return [json.loads(value) for value in values] + + def mark_ready(self, items: t.List[dict]) -> None: + """ + Flip previously-pushed, delayed items to ready by updating their score to now_ms. + """ + now_ms: int = int(time.time() * 1_000) + mapping: dict = {json.dumps(item): now_ms for item in items} + self.__db.zadd(self.key, mapping) + + def mark_all_ready(self) -> None: + """ + Find every queue member whose score is still in the future + and set its score to now, so pop_ready() will pick it up. + """ + now_ms: int = int(time.time() * 1_000) + # grab everything with score > now + pending = self.__db.zrangebyscore(self.key, now_ms + 1, "+inf") + if not pending: + return + + # map each member to new score + update: dict = {member: now_ms for member in pending} + self.__db.zadd(self.key, update) def delete(self) -> None: - """Delete all items from the named queue.""" - logger.info(f"Deleting redis key: {self.key}") + """Delte all items from the named queue including its metadata.""" self.__db.delete(self.key) + self.__db.delete(self._meta_key) def set_meta(self, value: t.Any) -> None: - """Store an arbitrary JSON-serialisable value in a dedicated key.""" + """Store an arbitrary JSON‐serializable value in a dedicated key.""" self.__db.set(self._meta_key, json.dumps(value)) def get_meta(self, default: t.Any = None) -> t.Any: - """Retrieve the stored value (or *default* if nothing is set).""" + """Retrieve the stored metadata (or *default* if nothing is set).""" raw = self.__db.get(self._meta_key) return json.loads(raw) if raw is not None else default diff --git a/pgsync/sync.py b/pgsync/sync.py index 1af83d95..400fba1f 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -1171,6 +1171,28 @@ def poll_db(self) -> None: ) payloads: list = [] + def _flush_payloads(): + nonlocal payloads + if not payloads: + return + # split into ready vs delayed items + ready_items = [ + payload + for payload in payloads + if not payload.get("delayed", False) + ] + delayed_items = [ + payload + for payload in payloads + if payload.get("delayed", False) + ] + # push each batch with appropriate flag + if ready_items: + self.redis.push(ready_items, ready=True) + if delayed_items: + self.redis.push(delayed_items, ready=False) + payloads = [] + while True: # NB: consider reducing POLL_TIMEOUT to increase throughput if select.select([conn], [], [], settings.POLL_TIMEOUT) == ( @@ -1178,10 +1200,8 @@ def poll_db(self) -> None: [], [], ): - # Catch any hanging items from the last poll - if payloads: - self.redis.push(payloads) - payloads = [] + # flush anything hanging from the last poll + _flush_payloads() continue try: @@ -1192,28 +1212,33 @@ def poll_db(self) -> None: while conn.notifies: if len(payloads) >= settings.REDIS_WRITE_CHUNK_SIZE: - self.redis.push(payloads) - payloads = [] - notification: t.AnyStr = conn.notifies.pop(0) - if notification.channel == self.database: + _flush_payloads() - try: - payload = json.loads(notification.payload) - except json.JSONDecodeError as e: - logger.exception( - f"Error decoding JSON payload: {e}\n" - f"Payload: {notification.payload}" - ) - continue - if ( - payload["indices"] - and self.index in payload["indices"] - and payload["schema"] in self.tree.schemas - ): - payloads.append(payload) - logger.debug(f"poll_db: {payload}") - with self.lock: - self.count["db"] += 1 + notification = conn.notifies.pop(0) + if notification.channel != self.database: + continue + + try: + payload = json.loads(notification.payload) + except json.JSONDecodeError as e: + logger.exception( + f"Error decoding JSON payload: {e}\n" + f"Payload: {notification.payload}" + ) + continue + + if ( + payload.get("indices") + and self.index in payload["indices"] + and payload.get("schema") in self.tree.schemas + ): + payloads.append(payload) + logger.debug(f"poll_db: {payload}") + with self.lock: + self.count["db"] += 1 + + # after draining notifies, flush any remaining + _flush_payloads() @exception def async_poll_db(self) -> None: @@ -1222,6 +1247,9 @@ def async_poll_db(self) -> None: Receive a notification message from the channel we are listening on """ + raise NotImplementedError( + "async_poll_db is not implemented. Use poll_db instead." + ) try: self.conn.poll() except OperationalError as e: diff --git a/pgsync/trigger.py b/pgsync/trigger.py index 4e690ae2..b5736a61 100644 --- a/pgsync/trigger.py +++ b/pgsync/trigger.py @@ -18,11 +18,15 @@ _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; + delayed BOOLEAN := FALSE; BEGIN -- database is also the channel name. channel := CURRENT_DATABASE(); + -- If the pgsync.delayed setting is true, we delay the notification. + delayed := CURRENT_SETTING('pgsync.delayed', true)::BOOLEAN; + IF TG_OP = 'DELETE' THEN SELECT primary_keys, indices @@ -71,7 +75,8 @@ 'indices', _indices, 'tg_op', TG_OP, 'table', TG_TABLE_NAME, - 'schema', TG_TABLE_SCHEMA + 'schema', TG_TABLE_SCHEMA, + 'delayed', delayed ); -- Notify/Listen updates occur asynchronously, From 09cee5070bd10a67c63b6f5a5944eefb4c3d87f9 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Fri, 13 Jun 2025 19:08:37 +0100 Subject: [PATCH 02/10] Auto flush the ready items if no tasks in flight --- .env.sample | 2 +- pgsync/redisqueue.py | 38 ++++++++++++++++++++++++++++++++++++-- pgsync/settings.py | 3 +++ pgsync/sync.py | 11 +++++------ 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/.env.sample b/.env.sample index e0d6a68e..e7f69a30 100644 --- a/.env.sample +++ b/.env.sample @@ -101,7 +101,7 @@ # REDIS_SCHEME=redis # REDIS_URL takes precedence over the above variables # REDIS_URL=redis://megatron:PLEASE_REPLACE_ME@localhost:6379/0 - +# REDIS_AUTO_POP_READY_STATE=True # Logging # CRITICAL - 50 # ERROR - 40 diff --git a/pgsync/redisqueue.py b/pgsync/redisqueue.py index 5660eea6..6381ec47 100644 --- a/pgsync/redisqueue.py +++ b/pgsync/redisqueue.py @@ -50,7 +50,9 @@ def push(self, items: t.List[dict], ready: bool = True) -> None: mapping: dict = {json.dumps(item): score for item in items} self.__db.zadd(self.key, mapping) - def pop(self, chunk_size: int = REDIS_READ_CHUNK_SIZE) -> t.List[dict]: + def pop_ready( + self, chunk_size: int = REDIS_READ_CHUNK_SIZE + ) -> t.List[dict]: """ Atomically pull up to chunk_size items whose score ≤ now_ms. These are the ready items. @@ -68,9 +70,25 @@ def pop(self, chunk_size: int = REDIS_READ_CHUNK_SIZE) -> t.List[dict]: pipeline.execute() return [json.loads(value) for value in values] + def pop( + self, chunk_size: int = REDIS_READ_CHUNK_SIZE, auto_ready: bool = False + ) -> t.List[dict]: + """ + Pop up to chunk_size ready items. + If auto_ready=True and none are ready, will flip up + to chunk_size delayed items to ready and retry once. + """ + items = self.pop_ready(chunk_size) + if not items and auto_ready: + flipped = self._mark_next_n_ready(chunk_size) + if flipped: + items = self.pop_ready(chunk_size) + return items + def mark_ready(self, items: t.List[dict]) -> None: """ - Flip previously-pushed, delayed items to ready by updating their score to now_ms. + Flip previously-pushed, delayed items to ready by + updating their score to now_ms. """ now_ms: int = int(time.time() * 1_000) mapping: dict = {json.dumps(item): now_ms for item in items} @@ -91,6 +109,22 @@ def mark_all_ready(self) -> None: update: dict = {member: now_ms for member in pending} self.__db.zadd(self.key, update) + def _mark_next_n_ready(self, nsize: int) -> int: + """ + Find up to nsize members whose score > now and set their score to now. + Returns how many were flipped. + """ + now_ms: int = int(time.time() * 1_000) + # get at most nsize pending items (score > now) + pending = self.__db.zrangebyscore( + self.key, now_ms + 1, "+inf", start=0, num=nsize + ) + if not pending: + return 0 + update: dict = {member: now_ms for member in pending} + self.__db.zadd(self.key, update) + return len(pending) + def delete(self) -> None: """Delte all items from the named queue including its metadata.""" self.__db.delete(self.key) diff --git a/pgsync/settings.py b/pgsync/settings.py index 2d123462..6ee4279b 100644 --- a/pgsync/settings.py +++ b/pgsync/settings.py @@ -191,6 +191,9 @@ # number of items to write to Redis at a time REDIS_WRITE_CHUNK_SIZE = env.int("REDIS_WRITE_CHUNK_SIZE", default=500) REDIS_URL = env.str("REDIS_URL", default=None) +REDIS_AUTO_POP_READY_STATE = env.bool( + "REDIS_AUTO_POP_READY_STATE", default=True +) # Logging: diff --git a/pgsync/sync.py b/pgsync/sync.py index 400fba1f..4caa967f 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -1119,7 +1119,9 @@ def checkpoint(self, value: t.Optional[str] = None) -> None: self._checkpoint = value def _poll_redis(self) -> None: - payloads: list = self.redis.pop() + payloads: list = self.redis.pop( + auto_ready=settings.REDIS_AUTO_POP_READY_STATE + ) if payloads: logger.debug(f"_poll_redis: {payloads}") with self.lock: @@ -1138,7 +1140,7 @@ def poll_redis(self) -> None: self._poll_redis() async def _async_poll_redis(self) -> None: - payloads: list = self.redis.pop() + payloads: list = self.redis.pop(settings.REDIS_AUTO_POP_READY_STATE) if payloads: logger.debug(f"_async_poll_redis: {payloads}") self.count["redis"] += len(payloads) @@ -1247,9 +1249,6 @@ def async_poll_db(self) -> None: Receive a notification message from the channel we are listening on """ - raise NotImplementedError( - "async_poll_db is not implemented. Use poll_db instead." - ) try: self.conn.poll() except OperationalError as e: @@ -1265,7 +1264,7 @@ def async_poll_db(self) -> None: and self.index in payload["indices"] and payload["schema"] in self.tree.schemas ): - self.redis.push([payload]) + self.redis.push([payload], ready=True) logger.debug(f"async_poll: {payload}") self.count["db"] += 1 From 5371ed5d5186872e40ecffa0fed9c5208c5b5c01 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 14 Jun 2025 16:41:31 +0100 Subject: [PATCH 03/10] fixed failing tests --- pgsync/redisqueue.py | 4 +++- tests/test_redisqueue.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pgsync/redisqueue.py b/pgsync/redisqueue.py index 6381ec47..12fb67c0 100644 --- a/pgsync/redisqueue.py +++ b/pgsync/redisqueue.py @@ -78,7 +78,8 @@ def pop( If auto_ready=True and none are ready, will flip up to chunk_size delayed items to ready and retry once. """ - items = self.pop_ready(chunk_size) + items: t.List[dict] = self.pop_ready(chunk_size) + logger.debug(f"pop size: {len(items[0])}") if not items and auto_ready: flipped = self._mark_next_n_ready(chunk_size) if flipped: @@ -127,6 +128,7 @@ def _mark_next_n_ready(self, nsize: int) -> int: def delete(self) -> None: """Delte all items from the named queue including its metadata.""" + logger.info(f"Deleting redis key: {self.key} and {self._meta_key}") self.__db.delete(self.key) self.__db.delete(self._meta_key) diff --git a/tests/test_redisqueue.py b/tests/test_redisqueue.py index 39cf1c5e..4537a646 100644 --- a/tests/test_redisqueue.py +++ b/tests/test_redisqueue.py @@ -86,6 +86,6 @@ def test_delete(self, mock_logger): assert queue.qsize == 6 queue.delete() mock_logger.info.assert_called_once_with( - "Deleting redis key: queue:something" + "Deleting redis key: queue:something and queue:something:meta" ) assert queue.qsize == 0 From 5d2a8bc3c57600b1aecadeeb2a0e3421d83fb394 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sun, 15 Jun 2025 18:48:19 +0100 Subject: [PATCH 04/10] reraise with exception --- pgsync/sync.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pgsync/sync.py b/pgsync/sync.py index 4caa967f..66e990ba 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -1368,7 +1368,9 @@ def pull(self, polling: bool = False) -> None: if polling: return else: - raise + raise Exception( + f"Error while pulling logical slot changes: {e}" + ) from e self.checkpoint: int = txmax or self.txid_current self._truncate = True From f656d832d004d8ec7223339b000c12b3b615e24d Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Tue, 17 Jun 2025 22:47:58 +0100 Subject: [PATCH 05/10] acquire advisory_lock on can_create_replication_slot --- pgsync/base.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/pgsync/base.py b/pgsync/base.py index f0b342e1..5240e24c 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -238,22 +238,27 @@ def pg_settings(self, column: str) -> t.Optional[str]: def _can_create_replication_slot(self, slot_name: str) -> None: """Check if the given user can create and destroy replication slots.""" - if self.replication_slots(slot_name): - logger.exception(f"Replication slot {slot_name} already exists") - self.drop_replication_slot(slot_name) + with self.advisory_lock( + slot_name, max_retries=None, retry_interval=0.1 + ): + if self.replication_slots(slot_name): + logger.exception( + f"Replication slot {slot_name} already exists" + ) + self.drop_replication_slot(slot_name) - try: - self.create_replication_slot(slot_name) + try: + self.create_replication_slot(slot_name) - except Exception as e: - logger.exception(f"{e}") - raise ReplicationSlotError( - f'PG_USER "{self.engine.url.username}" needs to be ' - f"superuser or have permission to read, create and destroy " - f"replication slots to perform this action.\n{e}" - ) - else: - self.drop_replication_slot(slot_name) + except Exception as e: + logger.exception(f"{e}") + raise ReplicationSlotError( + f'PG_USER "{self.engine.url.username}" needs to be ' + f"superuser or have permission to read, create and destroy " + f"replication slots to perform this action.\n{e}" + ) + else: + self.drop_replication_slot(slot_name) # Tables... def models(self, table: str, schema: str) -> sa.sql.Alias: From e62a7536506af9f8d3b194d3c96a022d6aca9206 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Wed, 25 Jun 2025 22:15:34 +0100 Subject: [PATCH 06/10] implement priority as weight; remove delayed --- bin/mark_as_ready | 82 --------------------------- examples/book/benchmark.py | 10 ++-- pgsync/base.py | 8 +-- pgsync/redisqueue.py | 106 +++++++++-------------------------- pgsync/sync.py | 110 ++++++++++++++++++------------------- pgsync/trigger.py | 14 +++-- requirements/base.txt | 22 +++++--- requirements/dev.txt | 75 +++++++++++++++---------- tests/test_redisqueue.py | 28 +++++++++- tests/test_sync.py | 34 ++++++++++++ tests/test_trigger.py | 13 ++++- 11 files changed, 230 insertions(+), 272 deletions(-) delete mode 100755 bin/mark_as_ready diff --git a/bin/mark_as_ready b/bin/mark_as_ready deleted file mode 100755 index 0b378497..00000000 --- a/bin/mark_as_ready +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env python - -"""PGSync Mark As Ready.""" -import logging - -import click - -from pgsync.sync import Sync -from pgsync.utils import config_loader, get_config, show_settings - -logger = logging.getLogger(__name__) - - -@click.command() -@click.option( - "--config", - "-c", - help="Schema config", - type=click.Path(exists=True), -) -@click.option("--host", "-h", help="PG_HOST override") -@click.option("--password", is_flag=True, help="Prompt for database password") -@click.option("--port", "-p", help="PG_PORT override", type=int) -@click.option( - "--teardown", - "-t", - is_flag=True, - help="Teardown database triggers and replication slots", -) -@click.option("--user", "-u", help="PG_USER override") -@click.option( - "--verbose", - "-v", - is_flag=True, - default=False, - help="Turn on verbosity", -) -def main( - teardown: bool, - config: str, - user: str, - password: bool, - host: str, - port: int, - verbose: bool, -) -> None: - kwargs: dict = { - "user": user, - "host": host, - "port": port, - } - if password: - kwargs["password"] = click.prompt( - "Password", - type=str, - hide_input=True, - ) - kwargs = {key: value for key, value in kwargs.items() if value is not None} - - config: str = get_config(config) - - show_settings(config) - - validate: bool = False if teardown else True - - for doc in config_loader(config): - sync: Sync = Sync( - doc, - verbose=verbose, - validate=validate, - repl_slots=False, - **kwargs, - ) - if teardown: - sync.teardown() - continue - sync.redis.mark_all_ready() - logger.info("Marked all items as ready in Redis") - - -if __name__ == "__main__": - main() diff --git a/examples/book/benchmark.py b/examples/book/benchmark.py index 835fcb4b..0dd5f1aa 100644 --- a/examples/book/benchmark.py +++ b/examples/book/benchmark.py @@ -134,9 +134,11 @@ def truncate_op(session: sessionmaker, model, nsize: int) -> None: case_sensitive=False, ), ) -@click.option("--delayed", is_flag=True, help="Delay persist to Redis") +@click.option( + "--weight", "-w", default=0.0, help="Weight for pgsync operations" +) def main( - config: str, nsize: int, daemon: bool, tg_op: str, delayed: bool + config: str, nsize: int, daemon: bool, tg_op: str, weight: float ) -> None: """Benchmarking script for Book model operations.""" show_settings(config) @@ -148,8 +150,8 @@ def main( Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) session = Session() - if delayed: - session.execute(sa.text("SET pgsync.delayed = True")) + if weight: + session.execute(sa.text(f"SET pgsync.weight = {weight}")) model = Book func: dict = { diff --git a/pgsync/base.py b/pgsync/base.py index 5240e24c..b0ef9774 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -73,7 +73,7 @@ class Payload(object): new (dict): The new values of the row that was affected by the event (for INSERT and UPDATE operations). xmin (int): The transaction ID of the event. indices (List[str]): The indices of the affected rows (for UPDATE and DELETE operations). - delayed (bool): Whether the event was delayed or not. + weight (int): The weight of the event. """ __slots__ = ( @@ -84,7 +84,7 @@ class Payload(object): "new", "xmin", "indices", - "delayed", + "weight", ) def __init__( @@ -96,7 +96,7 @@ def __init__( new: t.Optional[t.Dict[str, t.Any]] = None, xmin: t.Optional[int] = None, indices: t.Optional[t.List[str]] = None, - delayed: bool = False, + weight: t.Optional[int] = None, ): self.tg_op: t.Optional[str] = tg_op self.table: t.Optional[str] = table @@ -105,7 +105,7 @@ def __init__( self.new: t.Dict[str, t.Any] = new or {} self.xmin: t.Optional[int] = xmin self.indices: t.List[str] = indices - self.delayed: bool = delayed + self.weight: bool = weight @property def data(self) -> dict: diff --git a/pgsync/redisqueue.py b/pgsync/redisqueue.py index 12fb67c0..195a7c08 100644 --- a/pgsync/redisqueue.py +++ b/pgsync/redisqueue.py @@ -11,11 +11,12 @@ from .settings import REDIS_READ_CHUNK_SIZE, REDIS_SOCKET_TIMEOUT from .urls import get_redis_url -logger = logging.getLogger(__name__) +# Pick a MULTIPLIER > max timestamp_ms (~1.7e12). +# 10**13 is safe for now. +_MULTIPLIER = 10**13 -# sentinel 300 years in ms -_FAR_FUTURE = int((time.time() + 300 * 365 * 24 * 3600) * 1_000) +logger = logging.getLogger(__name__) class RedisQueue: @@ -39,95 +40,38 @@ def qsize(self) -> int: """Number of items currently in the ZSET (regardless of ready/not).""" return self.__db.zcard(self.key) - def push(self, items: t.List[dict], ready: bool = True) -> None: - """ - Push a batch of items. - If ready=True score = now (ms), so pop_ready() can retrieve immediately. - If ready=False score = FAR_FUTURE, so pop_ready() ignores it until mark_ready(). + def push(self, items: t.List[dict], weight: float = 0.0) -> None: """ - now_ms: int = int(time.time() * 1_000) - score: int = now_ms if ready else _FAR_FUTURE - mapping: dict = {json.dumps(item): score for item in items} - self.__db.zadd(self.key, mapping) + Push a batch of items with the given numeric weight. - def pop_ready( - self, chunk_size: int = REDIS_READ_CHUNK_SIZE - ) -> t.List[dict]: - """ - Atomically pull up to chunk_size items whose score ≤ now_ms. - These are the ready items. - """ - now_ms: int = int(time.time() * 1_000) - # fetch members ready to run - values = self.__db.zrangebyscore( - self.key, 0, now_ms, start=0, num=chunk_size - ) - if not values: - return [] - # remove them in one pipeline - pipeline = self.__db.pipeline() - pipeline.zrem(self.key, *values) - pipeline.execute() - return [json.loads(value) for value in values] - - def pop( - self, chunk_size: int = REDIS_READ_CHUNK_SIZE, auto_ready: bool = False - ) -> t.List[dict]: - """ - Pop up to chunk_size ready items. - If auto_ready=True and none are ready, will flip up - to chunk_size delayed items to ready and retry once. - """ - items: t.List[dict] = self.pop_ready(chunk_size) - logger.debug(f"pop size: {len(items[0])}") - if not items and auto_ready: - flipped = self._mark_next_n_ready(chunk_size) - if flipped: - items = self.pop_ready(chunk_size) - return items - - def mark_ready(self, items: t.List[dict]) -> None: - """ - Flip previously-pushed, delayed items to ready by - updating their score to now_ms. + - Higher weight -> higher priority. + - Among equal weight, FIFO order. """ now_ms: int = int(time.time() * 1_000) - mapping: dict = {json.dumps(item): now_ms for item in items} + mapping: dict = {} + for item in items: + # score = -weight*M + timestamp + score = -weight * _MULTIPLIER + now_ms + mapping[json.dumps(item)] = score + # ZADD will add/update each member's score self.__db.zadd(self.key, mapping) - def mark_all_ready(self) -> None: + def pop(self, chunk_size: int = REDIS_READ_CHUNK_SIZE) -> t.List[dict]: """ - Find every queue member whose score is still in the future - and set its score to now, so pop_ready() will pick it up. + Pop up to chunk_size highest priority items (by weight, then FIFO). """ - now_ms: int = int(time.time() * 1_000) - # grab everything with score > now - pending = self.__db.zrangebyscore(self.key, now_ms + 1, "+inf") - if not pending: - return - - # map each member to new score - update: dict = {member: now_ms for member in pending} - self.__db.zadd(self.key, update) - - def _mark_next_n_ready(self, nsize: int) -> int: - """ - Find up to nsize members whose score > now and set their score to now. - Returns how many were flipped. - """ - now_ms: int = int(time.time() * 1_000) - # get at most nsize pending items (score > now) - pending = self.__db.zrangebyscore( - self.key, now_ms + 1, "+inf", start=0, num=nsize + # ZPOPMIN pulls the entries with the smallest score first + popped: t.List[t.Tuple[bytes, float]] = self.__db.zpopmin( + self.key, chunk_size ) - if not pending: - return 0 - update: dict = {member: now_ms for member in pending} - self.__db.zadd(self.key, update) - return len(pending) + results: t.List[dict] = [ + json.loads(member) for member, score in popped + ] + logger.debug(f"popped {len(results)} items (by priority)") + return results def delete(self) -> None: - """Delte all items from the named queue including its metadata.""" + """Delete all items from the named queue including its metadata.""" logger.info(f"Deleting redis key: {self.key} and {self._meta_key}") self.__db.delete(self.key) self.__db.delete(self._meta_key) diff --git a/pgsync/sync.py b/pgsync/sync.py index 66e990ba..fef90f12 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -12,6 +12,7 @@ import time import typing as t from collections import defaultdict +from math import inf from pathlib import Path import click @@ -1119,9 +1120,7 @@ def checkpoint(self, value: t.Optional[str] = None) -> None: self._checkpoint = value def _poll_redis(self) -> None: - payloads: list = self.redis.pop( - auto_ready=settings.REDIS_AUTO_POP_READY_STATE - ) + payloads: list = self.redis.pop() if payloads: logger.debug(f"_poll_redis: {payloads}") with self.lock: @@ -1140,7 +1139,9 @@ def poll_redis(self) -> None: self._poll_redis() async def _async_poll_redis(self) -> None: - payloads: list = self.redis.pop(settings.REDIS_AUTO_POP_READY_STATE) + payloads: t.List[t.Dict] = self.redis.pop( + settings.REDIS_AUTO_POP_READY_STATE + ) if payloads: logger.debug(f"_async_poll_redis: {payloads}") self.count["redis"] += len(payloads) @@ -1156,54 +1157,43 @@ async def async_poll_redis(self) -> None: while True: await self._async_poll_redis() + def _flush_payloads(self, payloads: list[dict]) -> None: + if not payloads: + return + + # group by weight, default=+inf so missing weights pop first + weight_buckets: t.Dict[float, t.List[t.Dict]] = {} + for payload in payloads: + raw = payload.get("weight") + weight: float = float(raw) if raw is not None else inf + weight_buckets.setdefault(weight, []).append(payload) + + # push each bucket in descending weight order (highest first) + for weight, items in sorted( + weight_buckets.items(), key=lambda kv: -kv[0] + ): + logger.debug(f"Pushing {len(items)} items with weight={weight}") + self.redis.push(items, weight=weight) + @threaded @exception def poll_db(self) -> None: - """ - Producer which polls Postgres continuously. - - Receive a notification message from the channel we are listening on - """ conn = self.engine.connect().connection conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - cursor = conn.cursor() - cursor.execute(f'LISTEN "{self.database}"') + conn.cursor().execute(f'LISTEN "{self.database}"') logger.debug( f'Listening to notifications on channel "{self.database}"' ) - payloads: list = [] - def _flush_payloads(): - nonlocal payloads - if not payloads: - return - # split into ready vs delayed items - ready_items = [ - payload - for payload in payloads - if not payload.get("delayed", False) - ] - delayed_items = [ - payload - for payload in payloads - if payload.get("delayed", False) - ] - # push each batch with appropriate flag - if ready_items: - self.redis.push(ready_items, ready=True) - if delayed_items: - self.redis.push(delayed_items, ready=False) - payloads = [] + payloads: t.List[t.Dict] = [] while True: - # NB: consider reducing POLL_TIMEOUT to increase throughput if select.select([conn], [], [], settings.POLL_TIMEOUT) == ( [], [], [], ): - # flush anything hanging from the last poll - _flush_payloads() + self._flush_payloads(payloads) continue try: @@ -1214,19 +1204,16 @@ def _flush_payloads(): while conn.notifies: if len(payloads) >= settings.REDIS_WRITE_CHUNK_SIZE: - _flush_payloads() + self._flush_payloads(payloads) - notification = conn.notifies.pop(0) + notification: t.AnyStr = conn.notifies.pop(0) if notification.channel != self.database: continue try: payload = json.loads(notification.payload) - except json.JSONDecodeError as e: - logger.exception( - f"Error decoding JSON payload: {e}\n" - f"Payload: {notification.payload}" - ) + except json.JSONDecodeError: + logger.exception("Invalid JSON in notification, skipping") continue if ( @@ -1235,12 +1222,12 @@ def _flush_payloads(): and payload.get("schema") in self.tree.schemas ): payloads.append(payload) - logger.debug(f"poll_db: {payload}") + logger.debug(f"Queued payload: {payload}") with self.lock: self.count["db"] += 1 - # after draining notifies, flush any remaining - _flush_payloads() + # flush anything left after draining notifications + self._flush_payloads(payloads) @exception def async_poll_db(self) -> None: @@ -1257,16 +1244,29 @@ def async_poll_db(self) -> None: while self.conn.notifies: notification: t.AnyStr = self.conn.notifies.pop(0) - if notification.channel == self.database: + if notification.channel != self.database: + continue + + try: payload = json.loads(notification.payload) - if ( - payload["indices"] - and self.index in payload["indices"] - and payload["schema"] in self.tree.schemas - ): - self.redis.push([payload], ready=True) - logger.debug(f"async_poll: {payload}") - self.count["db"] += 1 + except json.JSONDecodeError as e: + logger.exception(f"Error decoding JSON payload: {e}") + continue + + if ( + payload.get("indices") + and self.index in payload["indices"] + and payload.get("schema") in self.tree.schemas + ): + # extract numeric weight (missing +inf for highest priority) + raw_w = payload.get("weight") + weight = float(raw_w) if raw_w is not None else inf + + # push via priority queue + self.redis.push([payload], weight=weight) + + logger.debug(f"async_poll: {payload} (weight={weight})") + self.count["db"] += 1 def refresh_views(self) -> None: self._refresh_views() diff --git a/pgsync/trigger.py b/pgsync/trigger.py index b5736a61..b3a33efc 100644 --- a/pgsync/trigger.py +++ b/pgsync/trigger.py @@ -18,14 +18,18 @@ _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; - delayed BOOLEAN := FALSE; - + weight NUMERIC := 0; BEGIN -- database is also the channel name. channel := CURRENT_DATABASE(); - -- If the pgsync.delayed setting is true, we delay the notification. - delayed := CURRENT_SETTING('pgsync.delayed', true)::BOOLEAN; + -- load your numeric weight (default 0 if unset) + BEGIN + weight := CURRENT_SETTING('pgsync.weight', true)::NUMERIC; + EXCEPTION WHEN undefined_object THEN + -- setting not defined leave weight = 0 + NULL; + END; IF TG_OP = 'DELETE' THEN @@ -76,7 +80,7 @@ 'tg_op', TG_OP, 'table', TG_TABLE_NAME, 'schema', TG_TABLE_SCHEMA, - 'delayed', delayed + 'weight', weight ); -- Notify/Listen updates occur asynchronously, diff --git a/requirements/base.txt b/requirements/base.txt index 1496201d..c74cc47e 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,23 +1,27 @@ # -# This file is autogenerated by pip-compile with Python 3.11 +# This file is autogenerated by pip-compile with Python 3.9 # by the following command: # # pip-compile --output-file=requirements/base.txt requirements/base.in # -boto3==1.38.34 +async-timeout==5.0.1 + # via redis +backports-datetime-fromisoformat==2.0.3 + # via marshmallow +boto3==1.38.43 # via -r requirements/base.in -botocore==1.38.34 +botocore==1.38.43 # via # boto3 # s3transfer -certifi==2025.4.26 +certifi==2025.6.15 # via # elastic-transport # opensearch-py # requests charset-normalizer==3.4.2 # via requests -click==8.2.1 +click==8.1.8 # via -r requirements/base.in elastic-transport==8.17.1 # via @@ -45,7 +49,7 @@ marshmallow==4.0.0 # via environs opensearch-dsl==2.1.0 # via -r requirements/base.in -opensearch-py==2.8.0 +opensearch-py==3.0.0 # via opensearch-dsl psycopg2-binary==2.9.10 # via -r requirements/base.in @@ -56,7 +60,7 @@ python-dateutil==2.9.0.post0 # elasticsearch-dsl # opensearch-dsl # opensearch-py -python-dotenv==1.1.0 +python-dotenv==1.1.1 # via # -r requirements/base.in # environs @@ -82,8 +86,10 @@ typing-extensions==4.14.0 # via # elasticsearch # elasticsearch-dsl + # environs + # marshmallow # sqlalchemy -urllib3==2.4.0 +urllib3==1.26.20 # via # botocore # elastic-transport diff --git a/requirements/dev.txt b/requirements/dev.txt index 0c700071..29787843 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -1,18 +1,22 @@ # -# This file is autogenerated by pip-compile with Python 3.11 +# This file is autogenerated by pip-compile with Python 3.9 # by the following command: # # pip-compile --output-file=requirements/dev.txt requirements/dev.in # +async-timeout==5.0.1 + # via redis +backports-datetime-fromisoformat==2.0.3 + # via marshmallow black==25.1.0 # via -r requirements/dev.in -boto3==1.38.34 - # via -r requirements/base.in -botocore==1.38.34 +boto3==1.38.43 + # via -r /Users/tolu/pgsync/requirements/base.in +botocore==1.38.43 # via # boto3 # s3transfer -certifi==2025.4.26 +certifi==2025.6.15 # via # elastic-transport # opensearch-py @@ -21,11 +25,11 @@ cfgv==3.4.0 # via pre-commit charset-normalizer==3.4.2 # via requests -click==8.2.1 +click==8.1.8 # via - # -r requirements/base.in + # -r /Users/tolu/pgsync/requirements/base.in # black -coverage[toml]==7.8.2 +coverage[toml]==7.9.1 # via # -r requirements/dev.in # pytest-cov @@ -37,19 +41,21 @@ elastic-transport==8.17.1 # elasticsearch-dsl elasticsearch==8.18.1 # via - # -r requirements/base.in + # -r /Users/tolu/pgsync/requirements/base.in # elasticsearch-dsl elasticsearch-dsl==8.18.0 - # via -r requirements/base.in + # via -r /Users/tolu/pgsync/requirements/base.in environs==14.2.0 - # via -r requirements/base.in + # via -r /Users/tolu/pgsync/requirements/base.in events==0.5 # via opensearch-py -faker==37.3.0 +exceptiongroup==1.3.0 + # via pytest +faker==37.4.0 # via -r requirements/dev.in filelock==3.18.0 # via virtualenv -flake8==7.2.0 +flake8==7.3.0 # via -r requirements/dev.in freezegun==1.5.2 # via -r requirements/dev.in @@ -78,8 +84,8 @@ mypy-extensions==1.1.0 nodeenv==1.9.1 # via pre-commit opensearch-dsl==2.1.0 - # via -r requirements/base.in -opensearch-py==2.8.0 + # via -r /Users/tolu/pgsync/requirements/base.in +opensearch-py==3.0.0 # via opensearch-dsl packaging==25.0 # via @@ -92,23 +98,25 @@ platformdirs==4.3.8 # black # virtualenv pluggy==1.6.0 - # via pytest + # via + # pytest + # pytest-cov pre-commit==4.2.0 # via -r requirements/dev.in psycopg2-binary==2.9.10 - # via -r requirements/base.in -pycodestyle==2.13.0 + # via -r /Users/tolu/pgsync/requirements/base.in +pycodestyle==2.14.0 # via flake8 -pyflakes==3.3.2 +pyflakes==3.4.0 # via flake8 -pygments==2.19.1 +pygments==2.19.2 # via pytest -pytest==8.4.0 +pytest==8.4.1 # via # -r requirements/dev.in # pytest-cov # pytest-mock -pytest-cov==6.1.1 +pytest-cov==6.2.1 # via -r requirements/dev.in pytest-mock==3.14.1 # via -r requirements/dev.in @@ -120,20 +128,20 @@ python-dateutil==2.9.0.post0 # freezegun # opensearch-dsl # opensearch-py -python-dotenv==1.1.0 +python-dotenv==1.1.1 # via - # -r requirements/base.in + # -r /Users/tolu/pgsync/requirements/base.in # environs pyyaml==6.0.2 # via pre-commit redis==6.2.0 - # via -r requirements/base.in + # via -r /Users/tolu/pgsync/requirements/base.in requests==2.32.4 # via # opensearch-py # requests-aws4auth requests-aws4auth==1.3.1 - # via -r requirements/base.in + # via -r /Users/tolu/pgsync/requirements/base.in s3transfer==0.13.0 # via boto3 six==1.17.0 @@ -141,17 +149,26 @@ six==1.17.0 # opensearch-dsl # python-dateutil sqlalchemy==2.0.41 - # via -r requirements/base.in + # via -r /Users/tolu/pgsync/requirements/base.in sqlparse==0.5.3 - # via -r requirements/base.in + # via -r /Users/tolu/pgsync/requirements/base.in +tomli==2.2.1 + # via + # black + # coverage + # pytest typing-extensions==4.14.0 # via + # black # elasticsearch # elasticsearch-dsl + # environs + # exceptiongroup + # marshmallow # sqlalchemy tzdata==2025.2 # via faker -urllib3==2.4.0 +urllib3==1.26.20 # via # botocore # elastic-transport diff --git a/tests/test_redisqueue.py b/tests/test_redisqueue.py index 4537a646..22af0d2f 100644 --- a/tests/test_redisqueue.py +++ b/tests/test_redisqueue.py @@ -1,6 +1,9 @@ """RedisQueues tests.""" +import time + import pytest +from freezegun import freeze_time from mock import patch from redis.exceptions import ConnectionError @@ -69,11 +72,13 @@ def test_pop(self, mock_logger): queue.delete() queue.push([1, 2]) items = queue.pop() - mock_logger.debug.assert_called_once_with("pop size: 2") + mock_logger.debug.assert_called_once_with( + "popped 2 items (by priority)" + ) assert items == [1, 2] queue.push([3, 4, 5]) items = queue.pop() - mock_logger.debug.assert_any_call("pop size: 3") + mock_logger.debug.assert_any_call("popped 3 items (by priority)") assert items == [3, 4, 5] queue.delete() @@ -89,3 +94,22 @@ def test_delete(self, mock_logger): "Deleting redis key: queue:something and queue:something:meta" ) assert queue.qsize == 0 + + @freeze_time("2025-06-25T12:00:00Z") + def test_push_and_pop_respects_weight_and_fifo(self): + queue = RedisQueue("test") + a = {"id": "A"} + b = {"id": "B"} + c = {"id": "C"} + # A has no explicit weight → default 0.0 + queue.push([a]) + # wait a millisecond for a different timestamp + time.sleep(0.001) + # B and C both weight=5 + queue.push([b], weight=5) + time.sleep(0.001) + queue.push([c], weight=5) + # popping 3 items + out = queue.pop(3) + # B then C (both weight=5, FIFO), then A (weight=0) + assert [x["id"] for x in out] == ["B", "C", "A"] diff --git a/tests/test_sync.py b/tests/test_sync.py index 2a6dabdf..1674ae1a 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -4,6 +4,7 @@ import os import typing as t from collections import namedtuple +from math import inf import pytest from mock import ANY, call, patch @@ -948,3 +949,36 @@ def test_poll_redis( mock_logger.debug.assert_called_once_with(f"_poll_redis: {items}") mock_time.sleep.assert_called_once_with(settings.REDIS_POLL_INTERVAL) assert sync.count["redis"] == 2 + + def test_flush_groups_and_orders(self, sync): + + class DummyRedis: + def __init__(self): + self.calls = [] + + def push(self, items, weight): + self.calls.append((tuple(items), weight)) + + sync.redis = DummyRedis() + + # four payloads; one has no weight + payloads = [ + {"id": 1, "indices": ["x"], "schema": "public"}, + {"id": 2, "weight": 1, "indices": ["x"], "schema": "public"}, + {"id": 3, "weight": 2, "indices": ["x"], "schema": "public"}, + {"id": 4, "weight": 1, "indices": ["x"], "schema": "public"}, + ] + + sync._flush_payloads(payloads) + + # expect three pushes, in this order: + # 1) id=1 (no weight → inf) + # 2) id=3 (weight=2) + # 3) ids=2 and 4 (weight=1) + calls = sync.redis.calls + assert calls[0][1] == inf + assert calls[0][0] == (payloads[0],) + assert calls[1][1] == 2.0 + assert calls[1][0] == (payloads[2],) + assert calls[2][1] == 1.0 + assert set(item["id"] for item in calls[2][0]) == {2, 4} diff --git a/tests/test_trigger.py b/tests/test_trigger.py index e3f8c90c..e1984121 100644 --- a/tests/test_trigger.py +++ b/tests/test_trigger.py @@ -23,11 +23,19 @@ def test_trigger_template(self): _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; - + weight NUMERIC := 0; BEGIN -- database is also the channel name. channel := CURRENT_DATABASE(); + -- load your numeric weight (default 0 if unset) + BEGIN + weight := CURRENT_SETTING('pgsync.weight', true)::NUMERIC; + EXCEPTION WHEN undefined_object THEN + -- setting not defined leave weight = 0 + NULL; + END; + IF TG_OP = 'DELETE' THEN SELECT primary_keys, indices @@ -76,7 +84,8 @@ def test_trigger_template(self): 'indices', _indices, 'tg_op', TG_OP, 'table', TG_TABLE_NAME, - 'schema', TG_TABLE_SCHEMA + 'schema', TG_TABLE_SCHEMA, + 'weight', weight ); -- Notify/Listen updates occur asynchronously, From 87402615b037aff0b5dd579930c672b898961df6 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 26 Jun 2025 19:02:55 +0100 Subject: [PATCH 07/10] prevent same timestamp for all items --- pgsync/redisqueue.py | 2 +- tests/test_redisqueue.py | 34 +++++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/pgsync/redisqueue.py b/pgsync/redisqueue.py index 2bfbd637..8135074e 100644 --- a/pgsync/redisqueue.py +++ b/pgsync/redisqueue.py @@ -53,9 +53,9 @@ def push(self, items: t.List[dict], weight: float = 0.0) -> None: - Higher weight -> higher priority. - Among equal weight, FIFO order. """ - now_ms: int = int(time.time() * 1_000) mapping: dict = {} for item in items: + now_ms: int = int(time.time() * 1_000) # score = -weight*M + timestamp score = -weight * _MULTIPLIER + now_ms mapping[json.dumps(item)] = score diff --git a/tests/test_redisqueue.py b/tests/test_redisqueue.py index 22af0d2f..a4f97eeb 100644 --- a/tests/test_redisqueue.py +++ b/tests/test_redisqueue.py @@ -1,13 +1,15 @@ """RedisQueues tests.""" +import json import time +import typing as t import pytest from freezegun import freeze_time from mock import patch from redis.exceptions import ConnectionError -from pgsync.redisqueue import RedisQueue +from pgsync.redisqueue import _MULTIPLIER, RedisQueue class TestRedisQueue(object): @@ -97,10 +99,10 @@ def test_delete(self, mock_logger): @freeze_time("2025-06-25T12:00:00Z") def test_push_and_pop_respects_weight_and_fifo(self): - queue = RedisQueue("test") - a = {"id": "A"} - b = {"id": "B"} - c = {"id": "C"} + queue: RedisQueue = RedisQueue("test") + a: dict = {"id": "A"} + b: dict = {"id": "B"} + c: dict = {"id": "C"} # A has no explicit weight → default 0.0 queue.push([a]) # wait a millisecond for a different timestamp @@ -113,3 +115,25 @@ def test_push_and_pop_respects_weight_and_fifo(self): out = queue.pop(3) # B then C (both weight=5, FIFO), then A (weight=0) assert [x["id"] for x in out] == ["B", "C", "A"] + + @freeze_time("2024-06-25T12:00:00Z") + def test_push_adds_correct_scores(self): + queue: RedisQueue = RedisQueue("test") + items: t.List[t.Dict] = [{"id": 1}, {"id": 2}] + weight: float = 5.0 + with ( + patch.object(queue, "_RedisQueue__db") as mock_db, + patch( + "time.time", side_effect=[1_717_267_200.100, 1_717_267_200.200] + ), + ): + queue.push(items, weight=weight) + expected_mapping: dict = { + json.dumps({"id": 1}, sort_keys=True): -weight * _MULTIPLIER + + int(1_717_267_200.100 * 1_000), + json.dumps({"id": 2}, sort_keys=True): -weight * _MULTIPLIER + + int(1_717_267_200.200 * 1_000), + } + mock_db.zadd.assert_called_once_with( + "queue:test", expected_mapping + ) From c323da4fc8b216cf1e3694572e854f0a6977f44b Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 28 Jun 2025 17:18:14 +0100 Subject: [PATCH 08/10] Handle eventual consistency of the logical replication slot --- pgsync/sync.py | 29 ++++++++++++++++++++++++++--- tests/test_sync.py | 13 ++++++++----- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/pgsync/sync.py b/pgsync/sync.py index 854f4f54..4ca1186a 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -1379,9 +1379,32 @@ async def async_truncate_slots(self) -> None: await asyncio.sleep(settings.REPLICATION_SLOT_CLEANUP_INTERVAL) def _truncate_slots(self) -> None: - if self._truncate: - logger.debug(f"Truncating replication slot: {self.__name}") - self.logical_slot_get_changes(self.__name, upto_nchanges=None) + if not self._truncate: + return + + """ + Handle eventual consistency of the logical replication slot. + We retry logical_slot_changes a few times in case of replication slot in use error. + """ + retries: int = 3 + backoff: int = 1 + txmax: int = self.txid_current + upto_lsn: str = self.current_wal_lsn + + for attempt in range(1, retries + 1): + try: + logger.debug(f"Truncating replication slot: {self.__name}") + self.logical_slot_changes(txmax=txmax, upto_lsn=upto_lsn) + logger.debug("Truncation successful.") + break + except Exception as e: + logger.warning(f"Attempt {attempt} failed with {e}") + if attempt == retries: + logger.error("Max retries reached, raising exception.") + raise + sleep_time: int = backoff * (2 ** (attempt - 1)) + logger.debug(f"Retrying in {sleep_time} seconds...") + time.sleep(sleep_time) @threaded @exception diff --git a/tests/test_sync.py b/tests/test_sync.py index c4e8d82a..bd007561 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -454,16 +454,19 @@ def test_status(self, sync): @patch("pgsync.sync.logger") def test_truncate_slots(self, mock_logger, sync): with patch( - "pgsync.sync.Sync.logical_slot_get_changes" + "pgsync.sync.Sync.logical_slot_changes" ) as mock_logical_slot_changes: sync._truncate = True sync._truncate_slots() mock_logical_slot_changes.assert_called_once_with( - "testdb_testdb", upto_nchanges=None - ) - mock_logger.debug.assert_called_once_with( - "Truncating replication slot: testdb_testdb" + txmax=ANY, upto_lsn=ANY ) + mock_logger.debug.call_args_list == [ + call( + "Truncating replication slot: testdb_testdb", + ), + call("Truncation successful."), + ] @patch("pgsync.sync.SearchClient.bulk") @patch("pgsync.sync.logger") From 306f1a14a82de8122ecd80573261198f666810e8 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 28 Jun 2025 19:46:45 +0100 Subject: [PATCH 09/10] fic type hints --- pgsync/base.py | 6 +++--- tests/test_sync.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pgsync/base.py b/pgsync/base.py index b0ef9774..2bf8d223 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -73,7 +73,7 @@ class Payload(object): new (dict): The new values of the row that was affected by the event (for INSERT and UPDATE operations). xmin (int): The transaction ID of the event. indices (List[str]): The indices of the affected rows (for UPDATE and DELETE operations). - weight (int): The weight of the event. + weight (float): The weight of the event. """ __slots__ = ( @@ -96,7 +96,7 @@ def __init__( new: t.Optional[t.Dict[str, t.Any]] = None, xmin: t.Optional[int] = None, indices: t.Optional[t.List[str]] = None, - weight: t.Optional[int] = None, + weight: t.Optional[float] = None, ): self.tg_op: t.Optional[str] = tg_op self.table: t.Optional[str] = table @@ -105,7 +105,7 @@ def __init__( self.new: t.Dict[str, t.Any] = new or {} self.xmin: t.Optional[int] = xmin self.indices: t.List[str] = indices - self.weight: bool = weight + self.weight: float = weight @property def data(self) -> dict: diff --git a/tests/test_sync.py b/tests/test_sync.py index bd007561..59cafb18 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -461,7 +461,7 @@ def test_truncate_slots(self, mock_logger, sync): mock_logical_slot_changes.assert_called_once_with( txmax=ANY, upto_lsn=ANY ) - mock_logger.debug.call_args_list == [ + assert mock_logger.debug.call_args_list == [ call( "Truncating replication slot: testdb_testdb", ), From 0f6d8a6552f78ddd700978ad77b963b0f7e8b230 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Wed, 2 Jul 2025 20:31:09 +0100 Subject: [PATCH 10/10] reset the payloads after flush --- pgsync/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pgsync/sync.py b/pgsync/sync.py index 4ca1186a..c731cd4f 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -1184,6 +1184,7 @@ def poll_db(self) -> None: [], ): self._flush_payloads(payloads) + payloads = [] continue try: @@ -1195,6 +1196,7 @@ def poll_db(self) -> None: while conn.notifies: if len(payloads) >= settings.REDIS_WRITE_CHUNK_SIZE: self._flush_payloads(payloads) + payloads = [] notification: t.AnyStr = conn.notifies.pop(0) if notification.channel != self.database: @@ -1218,6 +1220,7 @@ def poll_db(self) -> None: # flush anything left after draining notifications self._flush_payloads(payloads) + payloads = [] @exception def async_poll_db(self) -> None: