From 3ca4efb11de34b3f5cc6771552cc20250c2b57d7 Mon Sep 17 00:00:00 2001 From: "Ivan K." Date: Fri, 13 Mar 2026 10:43:34 +0000 Subject: [PATCH] fix partial redis updates implementation --- pgsync/redisqueue.py | 47 ++++++++++++++++++--- tests/test_redisqueue.py | 89 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 129 insertions(+), 7 deletions(-) diff --git a/pgsync/redisqueue.py b/pgsync/redisqueue.py index 0472efa3..c1664689 100644 --- a/pgsync/redisqueue.py +++ b/pgsync/redisqueue.py @@ -5,7 +5,7 @@ import typing as t from redis import Redis -from redis.exceptions import ConnectionError +from redis.exceptions import ConnectionError, ResponseError from .settings import ( REDIS_READ_CHUNK_SIZE, @@ -16,6 +16,8 @@ logger = logging.getLogger(__name__) +_META_SCALAR_KEY = "__value" + class RedisQueue(object): """Simple Queue with Redis/Valkey Backend.""" @@ -112,10 +114,43 @@ def delete(self) -> None: logger.info(f"Deleted redis key: {self.key}") def set_meta(self, value: t.Any) -> None: - """Store an arbitrary JSON-serialisable value in a dedicated key.""" - self.__db.set(self._meta_key, json.dumps(value)) + """Merge *value* into the metadata hash using HSET.""" + if not isinstance(value, dict): + value = {_META_SCALAR_KEY: value} + mapping = {field: json.dumps(v) for field, v in value.items()} + try: + self.__db.hset(self._meta_key, mapping=mapping) + except ResponseError as e: + if "WRONGTYPE" not in str(e): + raise + + # non-atomic fallback approach, this should only happen during the initial migration + self.__db.delete(self._meta_key) + self.__db.hset(self._meta_key, mapping=mapping) def get_meta(self, default: t.Any = None) -> t.Any: - """Retrieve the stored value (or *default* if nothing is set).""" - raw: t.Optional[str] = self.__db.get(self._meta_key) - return json.loads(raw) if raw is not None else default + """Return the metadata dict. + + Reads from the Redis hash (new format). Falls back to reading a plain + JSON string (legacy format written by older pgsync versions) so that + existing deployments can upgrade without losing checkpoint data. + """ + try: + raw: dict = self.__db.hgetall(self._meta_key) + except ResponseError as e: + if "WRONGTYPE" not in str(e): + raise + raw = {} + + if raw: + result = { + (k.decode() if isinstance(k, bytes) else k): json.loads(v) + for k, v in raw.items() + } + if list(result.keys()) == [_META_SCALAR_KEY]: + return result[_META_SCALAR_KEY] + return result + + # Empty hash or WRONGTYPE - fall back to legacy plain-string format. + legacy: t.Optional[bytes] = self.__db.get(self._meta_key) + return json.loads(legacy) if legacy is not None else default diff --git a/tests/test_redisqueue.py b/tests/test_redisqueue.py index cb3ab1bd..d14c41dc 100644 --- a/tests/test_redisqueue.py +++ b/tests/test_redisqueue.py @@ -4,7 +4,7 @@ import pytest from mock import call, patch -from redis.exceptions import ConnectionError +from redis.exceptions import ConnectionError, ResponseError from pgsync.redisqueue import RedisQueue @@ -125,6 +125,93 @@ def visibility(xmins): assert len(remaining) == 1 assert json.loads(remaining[0])["xmin"] == 2002 + def test_set_meta_and_get_meta(self): + """Test trivial get/set round trip""" + queue = RedisQueue("something") + queue.delete() + + queue.set_meta({"checkpoint": 42}) + assert queue.get_meta() == {"checkpoint": 42} + queue.delete() + + def test_set_meta_merges_fields(self): + """Test that set_meta merges fields correctly""" + queue = RedisQueue("something") + queue.delete() + + # run 2 partial key updates + queue.set_meta({"checkpoint": 100}) + queue.set_meta({"txid_current": 200}) + + # check that both are successful and don't overwrite each other + meta = queue.get_meta(default={}) + assert meta.get("checkpoint") == 100 + assert meta.get("txid_current") == 200 + queue.delete() + + def test_get_meta_default_when_absent(self): + """Test that get_meta returns the default value when the key is absent""" + queue = RedisQueue("something") + queue.delete() + + assert queue.get_meta(default={}) == {} + assert queue.get_meta() is None + queue.delete() + + def test_set_meta_scalar_roundtrip(self): + """Test that a non-dict scalar is transparently wrapped and unwrapped.""" + queue = RedisQueue("something") + queue.delete() + + for value in (42, 3.14, "hello", True, [1, 2, 3]): + queue.set_meta(value) + assert queue.get_meta() == value + + queue.delete() + + def test_get_meta_legacy_string_format(self): + """Test that get_meta correctly handles the legacy string format""" + queue = RedisQueue("something") + queue.delete() + + # write the old format directly, bypassing set_meta. + queue._RedisQueue__db.set( + queue._meta_key, + json.dumps({"checkpoint": 7, "txid_current": 99}), + ) + meta = queue.get_meta(default={}) + assert meta == {"checkpoint": 7, "txid_current": 99} + queue.delete() + + def test_set_meta_overwrites_legacy_string_key(self): + """set_meta must succeed even when the key holds a legacy plain string.""" + queue = RedisQueue("something") + queue.delete() + + # write legacy plain-string format directly + queue._RedisQueue__db.set( + queue._meta_key, json.dumps({"checkpoint": 1}) + ) + + # set_meta should transparently replace the string key with a hash + queue.set_meta({"checkpoint": 99}) + assert queue.get_meta() == {"checkpoint": 99} + queue.delete() + + def test_get_meta_reraises_non_wrongtype_response_error(self, mocker): + """get_meta must re-raise ResponseErrors that are not WRONGTYPE.""" + queue = RedisQueue("something") + queue.delete() + + mocker.patch.object( + queue._RedisQueue__db, + "hgetall", + side_effect=ResponseError("ERR some other error"), + ) + with pytest.raises(ResponseError, match="ERR some other error"): + queue.get_meta() + queue.delete() + def test_pop_visible_in_snapshot_none_visible(self): queue: RedisQueue = RedisQueue("something") queue.delete()