diff --git a/README.md b/README.md index 06bbe485..3fbc62a7 100644 --- a/README.md +++ b/README.md @@ -311,6 +311,39 @@ e.g } ``` +#### Performance bust via watches_columns + +If your system runs under high load and performs many SQL updates — for example, on many-to-many or related tables — that often don’t actually change any data, or if you have large tables that are frequently updated but you only need certain fields reflected in OpenSearch/Elasticsearch, +you can use the watched_columns parameter to specify which columns should trigger document updates. + +This prevents unnecessary re-indexing and significantly reduces load on both the database and the search index. + +Imagine your table `author` has many columns and its often updating, but +you need only `name` for searching, so this approach can help you. + +```json +{ + "table": "book", + "columns": [ + "isbn", + "title", + "description" + ], + "children": [ + { + "table": "author", + "columns": [ + "name" + ], + "watched_columns": [ + "name" + ] + } + ] +} +``` + + PGSync addresses the following challenges: - What if we update the author's name in the database? - What if we wanted to add another author for an existing book? diff --git a/README.rst b/README.rst index ff276bfc..86c40a8d 100644 --- a/README.rst +++ b/README.rst @@ -76,7 +76,32 @@ Example spec } ] -### Environment variables +#### Watched columns in config. + +If your system runs under high load and performs many SQL updates — for example, on many-to-many or related tables — that often don’t actually change any data, or if you have large tables that are frequently updated but you only need certain fields reflected in OpenSearch/Elasticsearch, +you can use the watched_columns parameter to specify which columns should trigger document updates. + +This prevents unnecessary re-indexing and significantly reduces load on both the database and the search index. + +.. code-block:: + { + "database": "[database name]", + "index": "[Elasticsearch or OpenSearch index]", + "nodes": { + "table": "[table A]", + "schema": "[table A schema]", + "columns": [ + "column 1 from table A", + "column 2 from table A", + ], + "watched_columns": [ + "column 1 from table A", + "column 2 from table A", + ], + + + +### Environment variables Setup environment variables required for the application diff --git a/pgsync/base.py b/pgsync/base.py index 2f74332e..4ba753f4 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -776,6 +776,7 @@ def create_view( schema: str, tables: t.Set, user_defined_fkey_tables: dict, + watched_columns_for_table: t.Dict[str, t.List[str]] ) -> None: create_view( self.engine, @@ -786,6 +787,7 @@ def create_view( tables, user_defined_fkey_tables, self._materialized_views(schema), + watched_columns_for_table, ) def drop_view(self, schema: str) -> None: diff --git a/pgsync/constants.py b/pgsync/constants.py index 22464c4b..82235ca2 100644 --- a/pgsync/constants.py +++ b/pgsync/constants.py @@ -42,6 +42,7 @@ "schema", "table", "transform", + "watched_columns", ] # Relationship attributes @@ -198,6 +199,7 @@ "indices", "primary_keys", "table_name", + "watched_columns", ] # Primary key delimiter diff --git a/pgsync/node.py b/pgsync/node.py index ed6446b8..1ca3d5dd 100644 --- a/pgsync/node.py +++ b/pgsync/node.py @@ -134,6 +134,7 @@ class Node(object): parent: t.Optional[Node] = None base_tables: t.Optional[list] = None is_through: bool = False + watched_columns: t.Optional[list] = None def __post_init__(self): self.model: sa.sql.Alias = self.models(self.table, self.schema) @@ -284,6 +285,7 @@ class Tree(threading.local): def __post_init__(self): self.tables: t.Set[str] = set() + self.watched_columns_tables: t.Set[str] = set() self.__nodes: t.Dict[Node] = {} self.__schemas: t.Set[str] = set() self.root: t.Optional[Node] = None @@ -324,11 +326,14 @@ def build(self, nodes: dict) -> Node: columns=nodes.get("columns", []), relationship=nodes.get("relationship", {}), base_tables=nodes.get("base_tables", []), + watched_columns=nodes.get("watched_columns", []), ) if self.root is None: self.root = node self.tables.add(node.table) + if node.watched_columns: + self.watched_columns_tables.add(node.table) for through_node in node.relationship.throughs: through_node.is_through = True self.tables.add(through_node.table) diff --git a/pgsync/sync.py b/pgsync/sync.py index 2c3ec84b..d9efb755 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -121,6 +121,8 @@ def __init__( self.query_builder: QueryBuilder = QueryBuilder(verbose=verbose) self.count: dict = dict(xlog=0, db=0, redis=0) self.tasks: t.List[asyncio.Task] = [] + self.skipped_xmins: t.List[int] = [] + self.lock_skipped_xmins: threading.Lock = threading.Lock() self.lock: threading.Lock = threading.Lock() @property @@ -307,6 +309,7 @@ def setup(self, no_create: bool = False) -> None: tables: t.Set = set() # tables with user defined foreign keys user_defined_fkey_tables: dict = {} + watched_columns_for_table: dict = {} for node in self.tree.traverse_breadth_first(): if node.schema != schema: @@ -333,6 +336,8 @@ def setup(self, no_create: bool = False) -> None: if columns: user_defined_fkey_tables.setdefault(node.table, set()) user_defined_fkey_tables[node.table] |= set(columns) + + watched_columns_for_table[node.table] = node.watched_columns if tables: if if_not_exists or not self.view_exists( MATERIALIZED_VIEW, schema @@ -343,6 +348,7 @@ def setup(self, no_create: bool = False) -> None: schema, tables, user_defined_fkey_tables, + watched_columns_for_table, ) self.create_triggers( @@ -1348,6 +1354,19 @@ async def async_poll_redis(self) -> None: while True: await self._async_poll_redis() + def _should_skip_update_due_to_watched_columns(self, payload: dict) -> bool: + """ + Returns True if this UPDATE payload should be skipped because none of the watched + columns changed; False otherwise. + """ + if payload.get("tg_op") != UPDATE: + return False + + if payload["table"] not in self.tree.watched_columns_tables: + return False + + return payload["old"] == payload["new"] + @threaded @exception def poll_db(self) -> None: @@ -1404,7 +1423,12 @@ def poll_db(self) -> None: and self.index in payload["indices"] and payload["schema"] in self.tree.schemas ): - payloads.append(payload) + if self._should_skip_update_due_to_watched_columns(payload): + logger.info(f"Skipping payload due to no change: {payload['new']}") + with self.lock_skipped_xmins: + self.skipped_xmins.append(payload["xmin"]) + else: + payloads.append(payload) logger.debug(f"poll_db: {payload}") with self.lock: self.count["db"] += 1 @@ -1504,7 +1528,9 @@ def _on_publish(self, payloads: t.List[Payload]) -> None: ) _payloads: list = [] - txids: t.Set = set(map(lambda x: x.xmin, payloads)) + with self.lock_skipped_xmins: + txids: t.Set = set(map(lambda x: x.xmin, payloads)) | set(self.skipped_xmins) + self.skipped_xmins = [] # for truncate, tg_op txids is None so skip setting the checkpoint if txids != set([None]): self.checkpoint: int = min(min(txids), self.txid_current) - 1 diff --git a/pgsync/trigger.py b/pgsync/trigger.py index fc22ffdf..219a39f1 100644 --- a/pgsync/trigger.py +++ b/pgsync/trigger.py @@ -18,6 +18,7 @@ _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; + _watched_columns TEXT []; BEGIN -- database is also the channel name. @@ -40,8 +41,8 @@ ELSE IF TG_OP <> 'TRUNCATE' THEN - SELECT primary_keys, foreign_keys, indices - INTO _primary_keys, _foreign_keys, _indices + SELECT primary_keys, foreign_keys, indices, watched_columns + INTO _primary_keys, _foreign_keys, _indices, _watched_columns FROM {MATERIALIZED_VIEW} WHERE table_name = TG_TABLE_NAME; @@ -49,14 +50,14 @@ new_row := ( SELECT JSONB_OBJECT_AGG(key, value) FROM JSON_EACH(new_row) - WHERE key = ANY(_primary_keys || _foreign_keys) + WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns)) ); IF TG_OP = 'UPDATE' THEN old_row = ROW_TO_JSON(OLD); old_row := ( SELECT JSONB_OBJECT_AGG(key, value) FROM JSON_EACH(old_row) - WHERE key = ANY(_primary_keys || _foreign_keys) + WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns)) ); END IF; xmin := NEW.xmin; diff --git a/pgsync/view.py b/pgsync/view.py index efdf0e0b..05aebe46 100644 --- a/pgsync/view.py +++ b/pgsync/view.py @@ -340,6 +340,7 @@ def create_view( tables: t.Set, user_defined_fkey_tables: dict, views: t.List[str], + watched_columns_for_tables: t.Dict[str, t.List[str]], ) -> None: """ This module defines a function `create_view` that creates a view describing primary_keys and foreign_keys for each table @@ -371,18 +372,18 @@ def create_view( So if 'specie' was the only row before, and the next query returns 'unit' and 'structure', we want to end up with the result below. - table_name | primary_keys | foreign_keys | indices - ------------+--------------+------------------+------------ - specie | {id} | {id, user_id} | {foo, bar} - unit | {id} | {id, profile_id} | {foo, bar} - structure | {id} | {id} | {foo, bar} - unit | {id} | {id, profile_id} | {foo, bar} - structure | {id} | {id} | {foo, bar} + table_name | primary_keys | foreign_keys | indices | watched_columns + ------------+--------------+------------------+------------+---------------- + specie | {id} | {id, user_id} | {foo, bar} | {foo, bar} + unit | {id} | {id, profile_id} | {foo, bar} | {foo} + structure | {id} | {id} | {foo, bar} | {bar} + unit | {id} | {id, profile_id} | {foo, bar} | {foo} + structure | {id} | {id} | {foo, bar} | {bar} """ rows: dict = {} if MATERIALIZED_VIEW in views: - for table_name, primary_keys, foreign_keys, indices in fetchall( + for table_name, primary_keys, foreign_keys, indices, watched_columns in fetchall( sa.select("*").select_from( sa.text(f"{schema}.{MATERIALIZED_VIEW}") ) @@ -393,6 +394,7 @@ def create_view( "primary_keys": set(), "foreign_keys": set(), "indices": set(), + "watched_columns": set(), }, ) if primary_keys: @@ -401,6 +403,8 @@ def create_view( rows[table_name]["foreign_keys"] = set(foreign_keys) if indices: rows[table_name]["indices"] = set(indices) + if watched_columns: + rows[table_name]["watched_columns"] = set(watched_columns) with engine.connect().execution_options( isolation_level="AUTOCOMMIT" ) as conn: @@ -413,7 +417,7 @@ def create_view( for table_name, columns in fetchall(_primary_keys(models, schema, tables)): rows.setdefault( table_name, - {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()}, ) if columns: rows[table_name]["primary_keys"] |= set(columns) @@ -422,7 +426,7 @@ def create_view( for table_name, columns in fetchall(_foreign_keys(models, schema, tables)): rows.setdefault( table_name, - {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()}, ) if columns: rows[table_name]["foreign_keys"] |= set(columns) @@ -436,6 +440,7 @@ def create_view( "primary_keys": set(), "foreign_keys": set(), "indices": set(), + "watched_columns": set(), }, ) if columns: @@ -445,15 +450,19 @@ def create_view( if not rows: rows.setdefault( None, - {"primary_keys": set(), "foreign_keys": set(), "indices": set()}, + {"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()}, ) + for table_name, watched_columns in watched_columns_for_tables.items(): + rows[table_name]["watched_columns"] = set(watched_columns) + statement = sa.select( sa.sql.Values( sa.column("table_name"), sa.column("primary_keys"), sa.column("foreign_keys"), sa.column("indices"), + sa.column("watched_columns"), ) .data( [ @@ -474,6 +483,11 @@ def create_view( if fields.get("indices") else None ), + ( + array(fields.get("watched_columns")) + if fields.get("watched_columns") + else None + ) ) for table_name, fields in rows.items() ] diff --git a/tests/test_sync.py b/tests/test_sync.py index f4f0a63a..80956cc4 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -922,6 +922,7 @@ def test_setup(self, mock_teardown, sync): "public", {"publisher", "book"}, {"publisher": {"publisher_id", "id"}}, + {'book': [], 'publisher': []}, ) mock_create_function.assert_called_once_with("public") mock_teardown.assert_called_once_with(drop_view=False) diff --git a/tests/test_trigger.py b/tests/test_trigger.py index e3f8c90c..2443de93 100644 --- a/tests/test_trigger.py +++ b/tests/test_trigger.py @@ -23,6 +23,7 @@ def test_trigger_template(self): _indices TEXT []; _primary_keys TEXT []; _foreign_keys TEXT []; + _watched_columns TEXT []; BEGIN -- database is also the channel name. @@ -45,8 +46,8 @@ def test_trigger_template(self): ELSE IF TG_OP <> 'TRUNCATE' THEN - SELECT primary_keys, foreign_keys, indices - INTO _primary_keys, _foreign_keys, _indices + SELECT primary_keys, foreign_keys, indices, watched_columns + INTO _primary_keys, _foreign_keys, _indices, _watched_columns FROM _view WHERE table_name = TG_TABLE_NAME; @@ -54,14 +55,14 @@ def test_trigger_template(self): new_row := ( SELECT JSONB_OBJECT_AGG(key, value) FROM JSON_EACH(new_row) - WHERE key = ANY(_primary_keys || _foreign_keys) + WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns)) ); IF TG_OP = 'UPDATE' THEN old_row = ROW_TO_JSON(OLD); old_row := ( SELECT JSONB_OBJECT_AGG(key, value) FROM JSON_EACH(old_row) - WHERE key = ANY(_primary_keys || _foreign_keys) + WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns)) ); END IF; xmin := NEW.xmin; diff --git a/tests/test_view.py b/tests/test_view.py index 36cc7fe6..ad60f582 100644 --- a/tests/test_view.py +++ b/tests/test_view.py @@ -268,6 +268,7 @@ def fetchall(statement): ["book", "publisher"], user_defined_fkey_tables={}, views=[], + watched_columns_for_tables={"publisher": ["name"]}, ) assert mock_logger.debug.call_count == 2 assert mock_logger.debug.call_args_list == [ @@ -288,6 +289,7 @@ def fetchall(statement): set(["book", "publisher"]), user_defined_fkey_tables=user_defined_fkey_tables, views=[], + watched_columns_for_tables={"publisher": ["name"]}, ) assert mock_logger.debug.call_count == 2 assert mock_logger.debug.call_args_list == [ diff --git a/tests/test_watched_columns.py b/tests/test_watched_columns.py new file mode 100644 index 00000000..ce4ec58a --- /dev/null +++ b/tests/test_watched_columns.py @@ -0,0 +1,141 @@ +"""Tests for watched_columns feature.""" + +import pytest + +from pgsync.base import subtransactions +from pgsync.constants import UPDATE +from pgsync.node import Tree, Node + + +@pytest.mark.usefixtures("table_creator") +class TestWatchedColumns(object): + """Tests for watched_columns functionality.""" + + @pytest.fixture(scope="function") + def data(self, sync, book_cls, publisher_cls): + session = sync.session + + publishers = [ + publisher_cls( + id=1, + name="Test Publisher", + ), + ] + + books = [ + book_cls( + isbn="test-isbn", + title="Test Book", + description="Test Description", + publisher_id=1, + ), + ] + + with subtransactions(session): + session.add_all(publishers) + session.add_all(books) + + yield books, publishers + + with subtransactions(session): + session.query(book_cls).delete() + session.query(publisher_cls).delete() + + session.connection().engine.connect().close() + session.connection().engine.dispose() + sync.search_client.close() + + def _set_tree(self, sync, watched): + nodes = { + "table": "book", + "columns": ["isbn", "title", "description"], + "watched_columns": watched, + } + sync.tree = Tree(sync.models, nodes) + + def test_watched_columns_process_watched_update(self, sync, data): + self._set_tree(sync, ["title"]) + payload = { + "tg_op": UPDATE, + "table": "book", + "schema": "public", + "old": {"isbn": "test-isbn", "title": "Test Book"}, + "new": {"isbn": "test-isbn", "title": "Updated Title"}, + "xmin": 12346, "indices": ["testdb"] + } + assert sync._should_skip_update_due_to_watched_columns(payload) is False + + def test_watched_columns_no_change_in_watched(self, sync, data): + self._set_tree(sync, ["title", "description"]) + payload = { + "tg_op": UPDATE, + "table": "book", + "schema": "public", + "old": {"isbn": "test-isbn", "title": "Test Book", "description": "Test Description"}, + "new": {"isbn": "test-isbn", "title": "Test Book", "description": "Test Description"}, + "xmin": 12351, "indices": ["testdb"] + } + assert sync._should_skip_update_due_to_watched_columns(payload) is True + + def test_watched_columns_with_multiple_watched_columns(self, sync, data): + self._set_tree(sync, ["title", "description"]) + payload = { + "tg_op": UPDATE, + "table": "book", + "schema": "public", + "old": {"isbn": "test-isbn", "title": "Test Book", "description": "Test Description"}, + "new": {"isbn": "test-isbn", "title": "Test Book", "description": "Updated Description"}, + "xmin": 12350, "indices": ["testdb"] + } + assert sync._should_skip_update_due_to_watched_columns(payload) is False + + def test_node_watched_columns_attribute(self, sync): + """Test that Node correctly handles watched_columns attribute.""" + + # Use real models from sync fixture instead of Mock + # Test node without watched_columns + node_data = { + "table": "book", + "schema": "public", + "columns": ["isbn", "title"], + } + + node = Node(models=sync.models, **node_data) + assert node.watched_columns is None + + # Test node with watched_columns + node_data_with_watched = { + "table": "book", + "schema": "public", + "columns": ["isbn", "title"], + "watched_columns": ["title", "isbn"] + } + + node_watched = Node(models=sync.models, **node_data_with_watched) + assert node_watched.watched_columns == ["title", "isbn"] + + def test_tree_watched_columns_tables_tracking(self, sync): + """Test that Tree correctly tracks tables with watched_columns.""" + nodes = { + "table": "book", + "columns": ["isbn", "title"], + "watched_columns": ["title"], # This table has watched_columns + "children": [ + { + "table": "publisher", + "columns": ["id", "name"], + # This table has no watched_columns + "relationship": { + "variant": "object", + "type": "one_to_one" + } + } + ] + } + + # Use real models from sync fixture instead of Mock + tree = Tree(models=sync.models, nodes=nodes) + + # Only book table should be in watched_columns_tables + assert "book" in tree.watched_columns_tables + assert "publisher" not in tree.watched_columns_tables