From 458a780a5f78829d1bc6bd83af86bddfd05f2c85 Mon Sep 17 00:00:00 2001 From: Vladimir Goncharuk Date: Sat, 4 Oct 2025 18:16:45 +0500 Subject: [PATCH 1/7] add watched_columns feature --- pgsync/base.py | 2 ++ pgsync/constants.py | 2 ++ pgsync/node.py | 5 +++++ pgsync/sync.py | 18 ++++++++++++++++-- pgsync/trigger.py | 9 +++++---- pgsync/view.py | 36 +++++++++++++++++++++++++----------- 6 files changed, 55 insertions(+), 17 deletions(-) diff --git a/pgsync/base.py b/pgsync/base.py index 94bdf089..6618e49f 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -743,6 +743,7 @@ def create_view( schema: str, tables: t.Set, user_defined_fkey_tables: dict, + watched_columns_for_table: t.Dict[str, t.List[str]] ) -> None: create_view( self.engine, @@ -753,6 +754,7 @@ def create_view( tables, user_defined_fkey_tables, self._materialized_views(schema), + watched_columns_for_table, ) def drop_view(self, schema: str) -> None: diff --git a/pgsync/constants.py b/pgsync/constants.py index 22464c4b..82235ca2 100644 --- a/pgsync/constants.py +++ b/pgsync/constants.py @@ -42,6 +42,7 @@ "schema", "table", "transform", + "watched_columns", ] # Relationship attributes @@ -198,6 +199,7 @@ "indices", "primary_keys", "table_name", + "watched_columns", ] # Primary key delimiter diff --git a/pgsync/node.py b/pgsync/node.py index ed6446b8..1ca3d5dd 100644 --- a/pgsync/node.py +++ b/pgsync/node.py @@ -134,6 +134,7 @@ class Node(object): parent: t.Optional[Node] = None base_tables: t.Optional[list] = None is_through: bool = False + watched_columns: t.Optional[list] = None def __post_init__(self): self.model: sa.sql.Alias = self.models(self.table, self.schema) @@ -284,6 +285,7 @@ class Tree(threading.local): def __post_init__(self): self.tables: t.Set[str] = set() + self.watched_columns_tables: t.Set[str] = set() self.__nodes: t.Dict[Node] = {} self.__schemas: t.Set[str] = set() self.root: t.Optional[Node] = None @@ -324,11 +326,14 @@ def build(self, nodes: dict) -> Node: columns=nodes.get("columns", []), relationship=nodes.get("relationship", {}), base_tables=nodes.get("base_tables", []), + watched_columns=nodes.get("watched_columns", []), ) if self.root is None: self.root = node self.tables.add(node.table) + if node.watched_columns: + self.watched_columns_tables.add(node.table) for through_node in node.relationship.throughs: through_node.is_through = True self.tables.add(through_node.table) diff --git a/pgsync/sync.py b/pgsync/sync.py index ce76dc34..6bd4222f 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -125,6 +125,8 @@ def __init__( self.count: dict = dict(xlog=0, db=0, redis=0) self.tasks: t.List[asyncio.Task] = [] self.lock = threading.Lock() + self.skipped_xmins: t.List[int] = [] + self.lock_skipped_xmins = threading.Lock() def validate(self, repl_slots: bool = True, polling=False) -> None: """Perform all validation right away.""" @@ -301,6 +303,7 @@ def setup(self, no_create: bool = False) -> None: tables: t.Set = set() # tables with user defined foreign keys user_defined_fkey_tables: dict = {} + watched_columns_for_table: dict = {} for node in self.tree.traverse_breadth_first(): if node.schema != schema: @@ -327,6 +330,8 @@ def setup(self, no_create: bool = False) -> None: if columns: user_defined_fkey_tables.setdefault(node.table, set()) user_defined_fkey_tables[node.table] |= set(columns) + + watched_columns_for_table[node.table] = node.watched_columns if tables: if if_not_exists or not self.view_exists( MATERIALIZED_VIEW, schema @@ -337,6 +342,7 @@ def setup(self, no_create: bool = False) -> None: schema, tables, user_defined_fkey_tables, + watched_columns_for_table, ) self.create_triggers( @@ -1398,7 +1404,13 @@ def poll_db(self) -> None: and self.index in payload["indices"] and payload["schema"] in self.tree.schemas ): - payloads.append(payload) + if (payload["tg_op"] == UPDATE and payload["table"] in self.tree.watched_columns_tables + and payload["old"] == payload["new"]): + logger.info(f"Skipping payload due to no change: {payload['new']}") + with self.lock_skipped_xmins: + self.skipped_xmins.append(payload["xmin"]) + else: + payloads.append(payload) logger.debug(f"poll_db: {payload}") with self.lock: self.count["db"] += 1 @@ -1498,7 +1510,9 @@ def _on_publish(self, payloads: t.List[Payload]) -> None: ) _payloads: list = [] - txids: t.Set = set(map(lambda x: x.xmin, payloads)) + with self.lock_skipped_xmins: + txids: t.Set = set(map(lambda x: x.xmin, payloads)) | set(self.skipped_xmins) + self.skipped_xmins = [] # for truncate, tg_op txids is None so skip setting the checkpoint if txids != set([None]): self.checkpoint: int = min(min(txids), self.txid_current) - 1 diff --git a/pgsync/trigger.py b/pgsync/trigger.py index fc22ffdf..219a39f1 100644 --- a/pgsync/trigger.py +++ b/pgsync/trigger.py @@ -18,6 +18,7 @@ _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; + _watched_columns TEXT []; BEGIN -- database is also the channel name. @@ -40,8 +41,8 @@ ELSE IF TG_OP <> 'TRUNCATE' THEN - SELECT primary_keys, foreign_keys, indices - INTO _primary_keys, _foreign_keys, _indices + SELECT primary_keys, foreign_keys, indices, watched_columns + INTO _primary_keys, _foreign_keys, _indices, _watched_columns FROM {MATERIALIZED_VIEW} WHERE table_name = TG_TABLE_NAME; @@ -49,14 +50,14 @@ new_row := ( SELECT JSONB_OBJECT_AGG(key, value) FROM JSON_EACH(new_row) - WHERE key = ANY(_primary_keys || _foreign_keys) + WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns)) ); IF TG_OP = 'UPDATE' THEN old_row = ROW_TO_JSON(OLD); old_row := ( SELECT JSONB_OBJECT_AGG(key, value) FROM JSON_EACH(old_row) - WHERE key = ANY(_primary_keys || _foreign_keys) + WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns)) ); END IF; xmin := NEW.xmin; diff --git a/pgsync/view.py b/pgsync/view.py index efdf0e0b..05aebe46 100644 --- a/pgsync/view.py +++ b/pgsync/view.py @@ -340,6 +340,7 @@ def create_view( tables: t.Set, user_defined_fkey_tables: dict, views: t.List[str], + watched_columns_for_tables: t.Dict[str, t.List[str]], ) -> None: """ This module defines a function `create_view` that creates a view describing primary_keys and foreign_keys for each table @@ -371,18 +372,18 @@ def create_view( So if 'specie' was the only row before, and the next query returns 'unit' and 'structure', we want to end up with the result below. - table_name | primary_keys | foreign_keys | indices - ------------+--------------+------------------+------------ - specie | {id} | {id, user_id} | {foo, bar} - unit | {id} | {id, profile_id} | {foo, bar} - structure | {id} | {id} | {foo, bar} - unit | {id} | {id, profile_id} | {foo, bar} - structure | {id} | {id} | {foo, bar} + table_name | primary_keys | foreign_keys | indices | watched_columns + ------------+--------------+------------------+------------+---------------- + specie | {id} | {id, user_id} | {foo, bar} | {foo, bar} + unit | {id} | {id, profile_id} | {foo, bar} | {foo} + structure | {id} | {id} | {foo, bar} | {bar} + unit | {id} | {id, profile_id} | {foo, bar} | {foo} + structure | {id} | {id} | {foo, bar} | {bar} """ rows: dict = {} if MATERIALIZED_VIEW in views: - for table_name, primary_keys, foreign_keys, indices in fetchall( + for table_name, primary_keys, foreign_keys, indices, watched_columns in fetchall( sa.select("*").select_from( sa.text(f"{schema}.{MATERIALIZED_VIEW}") ) @@ -393,6 +394,7 @@ def create_view( "primary_keys": set(), "foreign_keys": set(), "indices": set(), + "watched_columns": set(), }, ) if primary_keys: @@ -401,6 +403,8 @@ def create_view( rows[table_name]["foreign_keys"] = set(foreign_keys) if indices: rows[table_name]["indices"] = set(indices) + if watched_columns: + rows[table_name]["watched_columns"] = set(watched_columns) with engine.connect().execution_options( isolation_level="AUTOCOMMIT" ) as conn: @@ -413,7 +417,7 @@ def create_view( for table_name, columns in fetchall(_primary_keys(models, schema, tables)): rows.setdefault( table_name, - {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()}, ) if columns: rows[table_name]["primary_keys"] |= set(columns) @@ -422,7 +426,7 @@ def create_view( for table_name, columns in fetchall(_foreign_keys(models, schema, tables)): rows.setdefault( table_name, - {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()}, ) if columns: rows[table_name]["foreign_keys"] |= set(columns) @@ -436,6 +440,7 @@ def create_view( "primary_keys": set(), "foreign_keys": set(), "indices": set(), + "watched_columns": set(), }, ) if columns: @@ -445,15 +450,19 @@ def create_view( if not rows: rows.setdefault( None, - {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()}, ) + for table_name, watched_columns in watched_columns_for_tables.items(): + rows[table_name]["watched_columns"] = set(watched_columns) + statement = sa.select( sa.sql.Values( sa.column("table_name"), sa.column("primary_keys"), sa.column("foreign_keys"), sa.column("indices"), + sa.column("watched_columns"), ) .data( [ @@ -474,6 +483,11 @@ def create_view( if fields.get("indices") else None ), + ( + array(fields.get("watched_columns")) + if fields.get("watched_columns") + else None + ) ) for table_name, fields in rows.items() ] From d67185334e5d58b82a9d0c55d7ebc2f42a807d53 Mon Sep 17 00:00:00 2001 From: Vladimir Goncharuk Date: Sat, 4 Oct 2025 18:37:46 +0500 Subject: [PATCH 2/7] update existing tests --- tests/test_sync.py | 1 + tests/test_trigger.py | 9 +++++---- tests/test_view.py | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/test_sync.py b/tests/test_sync.py index 72d143ac..855734a9 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -922,6 +922,7 @@ def test_setup(self, mock_teardown, sync): "public", {"publisher", "book"}, {"publisher": {"publisher_id", "id"}}, + {'book': [], 'publisher': []}, ) mock_create_function.assert_called_once_with("public") mock_teardown.assert_called_once_with(drop_view=False) diff --git a/tests/test_trigger.py b/tests/test_trigger.py index e3f8c90c..2443de93 100644 --- a/tests/test_trigger.py +++ b/tests/test_trigger.py @@ -23,6 +23,7 @@ def test_trigger_template(self): _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; + _watched_columns TEXT []; BEGIN -- database is also the channel name. @@ -45,8 +46,8 @@ def test_trigger_template(self): ELSE IF TG_OP <> 'TRUNCATE' THEN - SELECT primary_keys, foreign_keys, indices - INTO _primary_keys, _foreign_keys, _indices + SELECT primary_keys, foreign_keys, indices, watched_columns + INTO _primary_keys, _foreign_keys, _indices, _watched_columns FROM _view WHERE table_name = TG_TABLE_NAME; @@ -54,14 +55,14 @@ def test_trigger_template(self): new_row := ( SELECT JSONB_OBJECT_AGG(key, value) FROM JSON_EACH(new_row) - WHERE key = ANY(_primary_keys || _foreign_keys) + WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns)) ); IF TG_OP = 'UPDATE' THEN old_row = ROW_TO_JSON(OLD); old_row := ( SELECT JSONB_OBJECT_AGG(key, value) FROM JSON_EACH(old_row) - WHERE key = ANY(_primary_keys || _foreign_keys) + WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns)) ); END IF; xmin := NEW.xmin; diff --git a/tests/test_view.py b/tests/test_view.py index 36cc7fe6..ad60f582 100644 --- a/tests/test_view.py +++ b/tests/test_view.py @@ -268,6 +268,7 @@ def fetchall(statement): ["book", "publisher"], user_defined_fkey_tables={}, views=[], + watched_columns_for_tables={"publisher": ["name"]}, ) assert mock_logger.debug.call_count == 2 assert mock_logger.debug.call_args_list == [ @@ -288,6 +289,7 @@ def fetchall(statement): set(["book", "publisher"]), user_defined_fkey_tables=user_defined_fkey_tables, views=[], + watched_columns_for_tables={"publisher": ["name"]}, ) assert mock_logger.debug.call_count == 2 assert mock_logger.debug.call_args_list == [ From e9c806d7be42ca01d102b4cc05654745619da290 Mon Sep 17 00:00:00 2001 From: Vladimir Goncharuk Date: Sun, 5 Oct 2025 12:08:56 +0500 Subject: [PATCH 3/7] add new tests for watched_columns --- pgsync/sync.py | 16 +++- tests/test_watched_columns.py | 141 ++++++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+), 2 deletions(-) create mode 100644 tests/test_watched_columns.py diff --git a/pgsync/sync.py b/pgsync/sync.py index 6bd4222f..bdd815ae 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -1348,6 +1348,19 @@ async def async_poll_redis(self) -> None: while True: await self._async_poll_redis() + def _should_skip_update_due_to_watched_columns(self, payload: dict) -> bool: + """ + Returns True if this UPDATE payload should be skipped because none of the watched + columns changed; False otherwise. + """ + if payload.get("tg_op") != UPDATE: + return False + + if payload["table"] not in self.tree.watched_columns_tables: + return False + + return payload["old"] == payload["new"] + @threaded @exception def poll_db(self) -> None: @@ -1404,8 +1417,7 @@ def poll_db(self) -> None: and self.index in payload["indices"] and payload["schema"] in self.tree.schemas ): - if (payload["tg_op"] == UPDATE and payload["table"] in self.tree.watched_columns_tables - and payload["old"] == payload["new"]): + if self._should_skip_update_due_to_watched_columns(payload): logger.info(f"Skipping payload due to no change: {payload['new']}") with self.lock_skipped_xmins: self.skipped_xmins.append(payload["xmin"]) diff --git a/tests/test_watched_columns.py b/tests/test_watched_columns.py new file mode 100644 index 00000000..ce4ec58a --- /dev/null +++ b/tests/test_watched_columns.py @@ -0,0 +1,141 @@ +"""Tests for watched_columns feature.""" + +import pytest + +from pgsync.base import subtransactions +from pgsync.constants import UPDATE +from pgsync.node import Tree, Node + + +@pytest.mark.usefixtures("table_creator") +class TestWatchedColumns(object): + """Tests for watched_columns functionality.""" + + @pytest.fixture(scope="function") + def data(self, sync, book_cls, publisher_cls): + session = sync.session + + publishers = [ + publisher_cls( + id=1, + name="Test Publisher", + ), + ] + + books = [ + book_cls( + isbn="test-isbn", + title="Test Book", + description="Test Description", + publisher_id=1, + ), + ] + + with subtransactions(session): + session.add_all(publishers) + session.add_all(books) + + yield books, publishers + + with subtransactions(session): + session.query(book_cls).delete() + session.query(publisher_cls).delete() + + session.connection().engine.connect().close() + session.connection().engine.dispose() + sync.search_client.close() + + def _set_tree(self, sync, watched): + nodes = { + "table": "book", + "columns": ["isbn", "title", "description"], + "watched_columns": watched, + } + sync.tree = Tree(sync.models, nodes) + + def test_watched_columns_process_watched_update(self, sync, data): + self._set_tree(sync, ["title"]) + payload = { + "tg_op": UPDATE, + "table": "book", + "schema": "public", + "old": {"isbn": "test-isbn", "title": "Test Book"}, + "new": {"isbn": "test-isbn", "title": "Updated Title"}, + "xmin": 12346, "indices": ["testdb"] + } + assert sync._should_skip_update_due_to_watched_columns(payload) is False + + def test_watched_columns_no_change_in_watched(self, sync, data): + self._set_tree(sync, ["title", "description"]) + payload = { + "tg_op": UPDATE, + "table": "book", + "schema": "public", + "old": {"isbn": "test-isbn", "title": "Test Book", "description": "Test Description"}, + "new": {"isbn": "test-isbn", "title": "Test Book", "description": "Test Description"}, + "xmin": 12351, "indices": ["testdb"] + } + assert sync._should_skip_update_due_to_watched_columns(payload) is True + + def test_watched_columns_with_multiple_watched_columns(self, sync, data): + self._set_tree(sync, ["title", "description"]) + payload = { + "tg_op": UPDATE, + "table": "book", + "schema": "public", + "old": {"isbn": "test-isbn", "title": "Test Book", "description": "Test Description"}, + "new": {"isbn": "test-isbn", "title": "Test Book", "description": "Updated Description"}, + "xmin": 12350, "indices": ["testdb"] + } + assert sync._should_skip_update_due_to_watched_columns(payload) is False + + def test_node_watched_columns_attribute(self, sync): + """Test that Node correctly handles watched_columns attribute.""" + + # Use real models from sync fixture instead of Mock + # Test node without watched_columns + node_data = { + "table": "book", + "schema": "public", + "columns": ["isbn", "title"], + } + + node = Node(models=sync.models, **node_data) + assert node.watched_columns is None + + # Test node with watched_columns + node_data_with_watched = { + "table": "book", + "schema": "public", + "columns": ["isbn", "title"], + "watched_columns": ["title", "isbn"] + } + + node_watched = Node(models=sync.models, **node_data_with_watched) + assert node_watched.watched_columns == ["title", "isbn"] + + def test_tree_watched_columns_tables_tracking(self, sync): + """Test that Tree correctly tracks tables with watched_columns.""" + nodes = { + "table": "book", + "columns": ["isbn", "title"], + "watched_columns": ["title"], # This table has watched_columns + "children": [ + { + "table": "publisher", + "columns": ["id", "name"], + # This table has no watched_columns + "relationship": { + "variant": "object", + "type": "one_to_one" + } + } + ] + } + + # Use real models from sync fixture instead of Mock + tree = Tree(models=sync.models, nodes=nodes) + + # Only book table should be in watched_columns_tables + assert "book" in tree.watched_columns_tables + assert "publisher" not in tree.watched_columns_tables From f1220de9a1f08f49b7f33992e17a1b6e768561c3 Mon Sep 17 00:00:00 2001 From: Vladimir Goncharuk Date: Sun, 5 Oct 2025 12:32:48 +0500 Subject: [PATCH 4/7] add doc for watched_columns --- README.md | 77 ++++++++++++++++++++++++++++++++++++++---------------- README.rst | 31 +++++++++++++++++++--- 2 files changed, 83 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 06bbe485..0268afb1 100644 --- a/README.md +++ b/README.md @@ -14,25 +14,25 @@ It allows you to keep [Postgres](https://www.postgresql.org) as your source of t expose structured denormalized documents in [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/). Changes to nested entities are propagated to [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/). -PGSync's advanced query builder then generates optimized SQL queries +PGSync's advanced query builder then generates optimized SQL queries on the fly based on your schema. PGSync's advisory model allows you to quickly move and transform large volumes of data quickly whilst maintaining relational integrity. -Simply describe your document structure or schema in JSON and [PGSync](https://pgsync.com) will -continuously capture changes in your data and load it into [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) +Simply describe your document structure or schema in JSON and [PGSync](https://pgsync.com) will +continuously capture changes in your data and load it into [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) without writing any code. [PGSync](https://pgsync.com) transforms your relational data into a structured document format. -It allows you to take advantage of the expressive power and scalability of -[Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) directly from [Postgres](https://www.postgresql.org). +It allows you to take advantage of the expressive power and scalability of +[Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) directly from [Postgres](https://www.postgresql.org). You don't have to write complex queries and transformation pipelines. PGSync is lightweight, flexible and fast. [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) is more suited as as secondary denormalised search engine to accompany a more traditional normalized datastore. Moreover, you shouldn't store your primary data in [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/). -So how do you then get your data into [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) in the first place? -Tools like [Logstash](https://www.elastic.co/products/logstash) and [Kafka](https://kafka.apache.org) can aid this task but they still require a bit +So how do you then get your data into [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) in the first place? +Tools like [Logstash](https://www.elastic.co/products/logstash) and [Kafka](https://kafka.apache.org) can aid this task but they still require a bit of engineering and development. [Extract Transform Load](https://en.wikipedia.org/wiki/Extract,_transform,_load) and [Change data capture](https://en.wikipedia.org/wiki/Change_data_capture) tools can be complex and require expensive engineering effort. @@ -45,15 +45,15 @@ Other benefits of PGSync include: #### Why? -At a high level, you have data in a Postgres database and you want to mirror it in Elasticsearch/OpenSearch. -This means every change to your data (***Insert***, ***Update***, ***Delete*** and ***Truncate*** statements) needs to be replicated to Elasticsearch/OpenSearch. +At a high level, you have data in a Postgres database and you want to mirror it in Elasticsearch/OpenSearch. +This means every change to your data (***Insert***, ***Update***, ***Delete*** and ***Truncate*** statements) needs to be replicated to Elasticsearch/OpenSearch. At first, this seems easy and then it's not. Simply add some code to copy the data to Elasticsearch/OpenSearch after updating the database (or so called dual writes). Writing SQL queries spanning multiple tables and involving multiple relationships are hard to write. Detecting changes within a nested document can also be quite hard. Of course, if your data never changed, then you could just take a snapshot in time and load it into Elasticsearch/OpenSearch as a one-off operation. PGSync is appropriate for you if: -- [Postgres](https://www.postgresql.org) is your read/write source of truth whilst [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) is your +- [Postgres](https://www.postgresql.org) is your read/write source of truth whilst [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) is your read-only search layer. - You need to denormalize relational data into a NoSQL data source. - Your data is constantly changing. @@ -90,7 +90,7 @@ There are plans to support zero-downtime migrations to streamline this process. There are several ways of installing and trying PGSync - [Running in Docker](#running-in-docker) is the easiest way to get up and running. - - [Manual configuration](#manual-configuration) + - [Manual configuration](#manual-configuration) ##### Running in Docker (Using Github Repository) @@ -147,7 +147,7 @@ Environment variable placeholders - full list [here](https://pgsync.com/env-vars ##### Manual configuration - Setup - - Ensure the database user is a superuser + - Ensure the database user is a superuser - Enable logical decoding. You would also need to set up at least two parameters at postgresql.conf ```wal_level = logical``` @@ -161,11 +161,11 @@ Environment variable placeholders - full list [here](https://pgsync.com/env-vars - Installation - Install PGSync from pypi using pip - - ```$ pip install pgsync``` + - ```$ pip install pgsync``` - Create a [schema.json](https://github.com/toluaina/pgsync/blob/main/examples/airbnb/schema.json) for your document representation - Bootstrap the database (one time only) - ```bootstrap --config schema.json``` - - Run the program with + - Run the program with - ```pgsync --config schema.json``` - Or as a daemon - ```pgsync --config schema.json -d``` @@ -175,8 +175,8 @@ Environment variable placeholders - full list [here](https://pgsync.com/env-vars Key features of PGSync are: -- Easily denormalize relational data. -- Works with any PostgreSQL database (version 9.6 or later). +- Easily denormalize relational data. +- Works with any PostgreSQL database (version 9.6 or later). - Negligible impact on database performance. - Transactionally consistent output in Elasticsearch/OpenSearch. This means: writes appear only when they are committed to the database, insert, update and delete operations appear in the same order as they were committed (as opposed to eventual consistency). - Fault-tolerant: does not lose data, even if processes crash or a network interruption occurs, etc. The process can be recovered from the last checkpoint. @@ -277,22 +277,22 @@ To get this document structure in [Elasticsearch](https://www.elastic.co/product Behind the scenes, PGSync is generating advanced queries for you such as. ```sql -SELECT +SELECT JSON_BUILD_OBJECT( - 'isbn', book_1.isbn, - 'title', book_1.title, + 'isbn', book_1.isbn, + 'title', book_1.title, 'description', book_1.description, 'authors', anon_1.authors ) AS "JSON_BUILD_OBJECT_1", book_1.id FROM book AS book_1 LEFT OUTER JOIN - (SELECT + (SELECT JSON_AGG(anon_2.anon) AS authors, book_author_1.book_isbn AS book_isbn FROM book_author AS book_author_1 LEFT OUTER JOIN - (SELECT + (SELECT author_1.name AS anon, author_1.id AS id FROM author AS author_1) AS anon_2 ON anon_2.id = book_author_1.author_id @@ -311,6 +311,39 @@ e.g } ``` +#### Performance bust via watches_columns + +If your system runs under high load and performs many SQL updates — for example, on many-to-many or related tables — that often don’t actually change any data, or if you have large tables that are frequently updated but you only need certain fields reflected in OpenSearch/Elasticsearch, +you can use the watched_columns parameter to specify which columns should trigger document updates. + +This prevents unnecessary re-indexing and significantly reduces load on both the database and the search index. + +Imagine your table `author` has many columns and its often updating, but +you need only `name` for searching, so this approach can help you. + +```json +{ + "table": "book", + "columns": [ + "isbn", + "title", + "description" + ], + "children": [ + { + "table": "author", + "columns": [ + "name" + ], + "watched_columns": [ + "name" + ] + } + ] +} +``` + + PGSync addresses the following challenges: - What if we update the author's name in the database? - What if we wanted to add another author for an existing book? @@ -346,5 +379,5 @@ Contributions are very welcome! Check out the [Contribution](CONTRIBUTING.rst) G This project is licensed under the terms of the [MIT](https://opensource.org/license/mit/) license. Please see [LICENSE](LICENSE) for more details. -You should have received a copy of the MIT License along with PGSync. +You should have received a copy of the MIT License along with PGSync. If not, see https://opensource.org/license/mit/. diff --git a/README.rst b/README.rst index ff276bfc..392a6a92 100644 --- a/README.rst +++ b/README.rst @@ -15,8 +15,8 @@ expose structured denormalized documents in [Elasticsearch](https://www.elastic. - [SQLAlchemy](https://www.sqlalchemy.org) 1.3.4+ ### Postgres setup - - Enable [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) in your + + Enable [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) in your Postgres setting. - You also need to set up two parameters in your Postgres config postgresql.conf @@ -76,7 +76,32 @@ Example spec } ] -### Environment variables +#### Watched columns in config. + +If your system runs under high load and performs many SQL updates — for example, on many-to-many or related tables — that often don’t actually change any data, or if you have large tables that are frequently updated but you only need certain fields reflected in OpenSearch/Elasticsearch, +you can use the watched_columns parameter to specify which columns should trigger document updates. + +This prevents unnecessary re-indexing and significantly reduces load on both the database and the search index. + +.. code-block:: + { + "database": "[database name]", + "index": "[Elasticsearch or OpenSearch index]", + "nodes": { + "table": "[table A]", + "schema": "[table A schema]", + "columns": [ + "column 1 from table A", + "column 2 from table A", + ], + "watched_columns": [ + "column 1 from table A", + "column 2 from table A", + ], + + + +### Environment variables Setup environment variables required for the application From e958a14f8542d931c4799189bb3629ccb3ad014b Mon Sep 17 00:00:00 2001 From: Vladimir Date: Thu, 9 Oct 2025 23:26:14 +0500 Subject: [PATCH 5/7] return spaces back README.md --- README.md | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 0268afb1..a72b7781 100644 --- a/README.md +++ b/README.md @@ -14,25 +14,25 @@ It allows you to keep [Postgres](https://www.postgresql.org) as your source of t expose structured denormalized documents in [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/). Changes to nested entities are propagated to [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/). -PGSync's advanced query builder then generates optimized SQL queries +PGSync's advanced query builder then generates optimized SQL queries on the fly based on your schema. PGSync's advisory model allows you to quickly move and transform large volumes of data quickly whilst maintaining relational integrity. -Simply describe your document structure or schema in JSON and [PGSync](https://pgsync.com) will -continuously capture changes in your data and load it into [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) +Simply describe your document structure or schema in JSON and [PGSync](https://pgsync.com) will +continuously capture changes in your data and load it into [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) without writing any code. [PGSync](https://pgsync.com) transforms your relational data into a structured document format. -It allows you to take advantage of the expressive power and scalability of -[Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) directly from [Postgres](https://www.postgresql.org). +It allows you to take advantage of the expressive power and scalability of +[Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) directly from [Postgres](https://www.postgresql.org). You don't have to write complex queries and transformation pipelines. PGSync is lightweight, flexible and fast. [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) is more suited as as secondary denormalised search engine to accompany a more traditional normalized datastore. Moreover, you shouldn't store your primary data in [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/). -So how do you then get your data into [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) in the first place? -Tools like [Logstash](https://www.elastic.co/products/logstash) and [Kafka](https://kafka.apache.org) can aid this task but they still require a bit +So how do you then get your data into [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) in the first place? +Tools like [Logstash](https://www.elastic.co/products/logstash) and [Kafka](https://kafka.apache.org) can aid this task but they still require a bit of engineering and development. [Extract Transform Load](https://en.wikipedia.org/wiki/Extract,_transform,_load) and [Change data capture](https://en.wikipedia.org/wiki/Change_data_capture) tools can be complex and require expensive engineering effort. @@ -45,15 +45,15 @@ Other benefits of PGSync include: #### Why? -At a high level, you have data in a Postgres database and you want to mirror it in Elasticsearch/OpenSearch. -This means every change to your data (***Insert***, ***Update***, ***Delete*** and ***Truncate*** statements) needs to be replicated to Elasticsearch/OpenSearch. +At a high level, you have data in a Postgres database and you want to mirror it in Elasticsearch/OpenSearch. +This means every change to your data (***Insert***, ***Update***, ***Delete*** and ***Truncate*** statements) needs to be replicated to Elasticsearch/OpenSearch. At first, this seems easy and then it's not. Simply add some code to copy the data to Elasticsearch/OpenSearch after updating the database (or so called dual writes). Writing SQL queries spanning multiple tables and involving multiple relationships are hard to write. Detecting changes within a nested document can also be quite hard. Of course, if your data never changed, then you could just take a snapshot in time and load it into Elasticsearch/OpenSearch as a one-off operation. PGSync is appropriate for you if: -- [Postgres](https://www.postgresql.org) is your read/write source of truth whilst [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) is your +- [Postgres](https://www.postgresql.org) is your read/write source of truth whilst [Elasticsearch](https://www.elastic.co/products/elastic-stack)/[OpenSearch](https://opensearch.org/) is your read-only search layer. - You need to denormalize relational data into a NoSQL data source. - Your data is constantly changing. @@ -90,7 +90,7 @@ There are plans to support zero-downtime migrations to streamline this process. There are several ways of installing and trying PGSync - [Running in Docker](#running-in-docker) is the easiest way to get up and running. - - [Manual configuration](#manual-configuration) + - [Manual configuration](#manual-configuration) ##### Running in Docker (Using Github Repository) @@ -147,7 +147,7 @@ Environment variable placeholders - full list [here](https://pgsync.com/env-vars ##### Manual configuration - Setup - - Ensure the database user is a superuser + - Ensure the database user is a superuser - Enable logical decoding. You would also need to set up at least two parameters at postgresql.conf ```wal_level = logical``` @@ -161,11 +161,11 @@ Environment variable placeholders - full list [here](https://pgsync.com/env-vars - Installation - Install PGSync from pypi using pip - - ```$ pip install pgsync``` + - ```$ pip install pgsync``` - Create a [schema.json](https://github.com/toluaina/pgsync/blob/main/examples/airbnb/schema.json) for your document representation - Bootstrap the database (one time only) - ```bootstrap --config schema.json``` - - Run the program with + - Run the program with - ```pgsync --config schema.json``` - Or as a daemon - ```pgsync --config schema.json -d``` @@ -175,8 +175,8 @@ Environment variable placeholders - full list [here](https://pgsync.com/env-vars Key features of PGSync are: -- Easily denormalize relational data. -- Works with any PostgreSQL database (version 9.6 or later). +- Easily denormalize relational data. +- Works with any PostgreSQL database (version 9.6 or later). - Negligible impact on database performance. - Transactionally consistent output in Elasticsearch/OpenSearch. This means: writes appear only when they are committed to the database, insert, update and delete operations appear in the same order as they were committed (as opposed to eventual consistency). - Fault-tolerant: does not lose data, even if processes crash or a network interruption occurs, etc. The process can be recovered from the last checkpoint. @@ -277,17 +277,17 @@ To get this document structure in [Elasticsearch](https://www.elastic.co/product Behind the scenes, PGSync is generating advanced queries for you such as. ```sql -SELECT +SELECT JSON_BUILD_OBJECT( - 'isbn', book_1.isbn, - 'title', book_1.title, + 'isbn', book_1.isbn, + 'title', book_1.title, 'description', book_1.description, 'authors', anon_1.authors ) AS "JSON_BUILD_OBJECT_1", book_1.id FROM book AS book_1 LEFT OUTER JOIN - (SELECT + (SELECT JSON_AGG(anon_2.anon) AS authors, book_author_1.book_isbn AS book_isbn FROM book_author AS book_author_1 @@ -379,5 +379,5 @@ Contributions are very welcome! Check out the [Contribution](CONTRIBUTING.rst) G This project is licensed under the terms of the [MIT](https://opensource.org/license/mit/) license. Please see [LICENSE](LICENSE) for more details. -You should have received a copy of the MIT License along with PGSync. +You should have received a copy of the MIT License along with PGSync. If not, see https://opensource.org/license/mit/. From 14518b7627a6dd3957a87795509d6c3ffa2a417a Mon Sep 17 00:00:00 2001 From: Vladimir Date: Thu, 9 Oct 2025 23:28:08 +0500 Subject: [PATCH 6/7] return spaces back README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a72b7781..3fbc62a7 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ Other benefits of PGSync include: #### Why? -At a high level, you have data in a Postgres database and you want to mirror it in Elasticsearch/OpenSearch. +At a high level, you have data in a Postgres database and you want to mirror it in Elasticsearch/OpenSearch. This means every change to your data (***Insert***, ***Update***, ***Delete*** and ***Truncate*** statements) needs to be replicated to Elasticsearch/OpenSearch. At first, this seems easy and then it's not. Simply add some code to copy the data to Elasticsearch/OpenSearch after updating the database (or so called dual writes). Writing SQL queries spanning multiple tables and involving multiple relationships are hard to write. @@ -292,7 +292,7 @@ LEFT OUTER JOIN book_author_1.book_isbn AS book_isbn FROM book_author AS book_author_1 LEFT OUTER JOIN - (SELECT + (SELECT author_1.name AS anon, author_1.id AS id FROM author AS author_1) AS anon_2 ON anon_2.id = book_author_1.author_id From 3fb54f677f1afb248e1a1347bec3d7bf0f917ee1 Mon Sep 17 00:00:00 2001 From: Vladimir Date: Thu, 9 Oct 2025 23:29:46 +0500 Subject: [PATCH 7/7] return spaces back README.rst --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 392a6a92..86c40a8d 100644 --- a/README.rst +++ b/README.rst @@ -15,8 +15,8 @@ expose structured denormalized documents in [Elasticsearch](https://www.elastic. - [SQLAlchemy](https://www.sqlalchemy.org) 1.3.4+ ### Postgres setup - - Enable [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) in your + + Enable [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) in your Postgres setting. - You also need to set up two parameters in your Postgres config postgresql.conf