Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions pgsync/redisqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import typing as t

from redis import Redis
from redis.exceptions import ConnectionError
from redis.exceptions import ConnectionError, ResponseError

from .settings import (
REDIS_READ_CHUNK_SIZE,
Expand All @@ -16,6 +16,8 @@

logger = logging.getLogger(__name__)

_META_SCALAR_KEY = "__value"


class RedisQueue(object):
"""Simple Queue with Redis/Valkey Backend."""
Expand Down Expand Up @@ -112,10 +114,43 @@ def delete(self) -> None:
logger.info(f"Deleted redis key: {self.key}")

def set_meta(self, value: t.Any) -> None:
"""Store an arbitrary JSON-serialisable value in a dedicated key."""
self.__db.set(self._meta_key, json.dumps(value))
"""Merge *value* into the metadata hash using HSET."""
if not isinstance(value, dict):
value = {_META_SCALAR_KEY: value}
mapping = {field: json.dumps(v) for field, v in value.items()}
try:
self.__db.hset(self._meta_key, mapping=mapping)
except ResponseError as e:
if "WRONGTYPE" not in str(e):
raise

# non-atomic fallback approach, this should only happen during the initial migration
self.__db.delete(self._meta_key)
self.__db.hset(self._meta_key, mapping=mapping)

def get_meta(self, default: t.Any = None) -> t.Any:
"""Retrieve the stored value (or *default* if nothing is set)."""
raw: t.Optional[str] = self.__db.get(self._meta_key)
return json.loads(raw) if raw is not None else default
"""Return the metadata dict.

Reads from the Redis hash (new format). Falls back to reading a plain
JSON string (legacy format written by older pgsync versions) so that
existing deployments can upgrade without losing checkpoint data.
"""
try:
raw: dict = self.__db.hgetall(self._meta_key)
except ResponseError as e:
if "WRONGTYPE" not in str(e):
raise
raw = {}

if raw:
result = {
(k.decode() if isinstance(k, bytes) else k): json.loads(v)
for k, v in raw.items()
}
if list(result.keys()) == [_META_SCALAR_KEY]:
return result[_META_SCALAR_KEY]
return result

# Empty hash or WRONGTYPE - fall back to legacy plain-string format.
legacy: t.Optional[bytes] = self.__db.get(self._meta_key)
return json.loads(legacy) if legacy is not None else default
89 changes: 88 additions & 1 deletion tests/test_redisqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest
from mock import call, patch
from redis.exceptions import ConnectionError
from redis.exceptions import ConnectionError, ResponseError

from pgsync.redisqueue import RedisQueue

Expand Down Expand Up @@ -125,6 +125,93 @@ def visibility(xmins):
assert len(remaining) == 1
assert json.loads(remaining[0])["xmin"] == 2002

def test_set_meta_and_get_meta(self):
"""Test trivial get/set round trip"""
queue = RedisQueue("something")
queue.delete()

queue.set_meta({"checkpoint": 42})
assert queue.get_meta() == {"checkpoint": 42}
queue.delete()

def test_set_meta_merges_fields(self):
"""Test that set_meta merges fields correctly"""
queue = RedisQueue("something")
queue.delete()

# run 2 partial key updates
queue.set_meta({"checkpoint": 100})
queue.set_meta({"txid_current": 200})

# check that both are successful and don't overwrite each other
meta = queue.get_meta(default={})
assert meta.get("checkpoint") == 100
assert meta.get("txid_current") == 200
queue.delete()

def test_get_meta_default_when_absent(self):
"""Test that get_meta returns the default value when the key is absent"""
queue = RedisQueue("something")
queue.delete()

assert queue.get_meta(default={}) == {}
assert queue.get_meta() is None
queue.delete()

def test_set_meta_scalar_roundtrip(self):
"""Test that a non-dict scalar is transparently wrapped and unwrapped."""
queue = RedisQueue("something")
queue.delete()

for value in (42, 3.14, "hello", True, [1, 2, 3]):
queue.set_meta(value)
assert queue.get_meta() == value

queue.delete()

def test_get_meta_legacy_string_format(self):
"""Test that get_meta correctly handles the legacy string format"""
queue = RedisQueue("something")
queue.delete()

# write the old format directly, bypassing set_meta.
queue._RedisQueue__db.set(
queue._meta_key,
json.dumps({"checkpoint": 7, "txid_current": 99}),
)
meta = queue.get_meta(default={})
assert meta == {"checkpoint": 7, "txid_current": 99}
queue.delete()

def test_set_meta_overwrites_legacy_string_key(self):
"""set_meta must succeed even when the key holds a legacy plain string."""
queue = RedisQueue("something")
queue.delete()

# write legacy plain-string format directly
queue._RedisQueue__db.set(
queue._meta_key, json.dumps({"checkpoint": 1})
)

# set_meta should transparently replace the string key with a hash
queue.set_meta({"checkpoint": 99})
assert queue.get_meta() == {"checkpoint": 99}
queue.delete()

def test_get_meta_reraises_non_wrongtype_response_error(self, mocker):
"""get_meta must re-raise ResponseErrors that are not WRONGTYPE."""
queue = RedisQueue("something")
queue.delete()

mocker.patch.object(
queue._RedisQueue__db,
"hgetall",
side_effect=ResponseError("ERR some other error"),
)
with pytest.raises(ResponseError, match="ERR some other error"):
queue.get_meta()
queue.delete()

def test_pop_visible_in_snapshot_none_visible(self):
queue: RedisQueue = RedisQueue("something")
queue.delete()
Expand Down