From a7b0d474b6a06f7a708f7022a98b49419bbf814f Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 27 Oct 2022 22:12:18 +0100 Subject: [PATCH 01/26] add support for python 3.11 --- .github/workflows/python-build.yml | 2 +- bin/parallel_sync | 95 ++++++++++++++++++++++++++++-- pgsync/__init__.py | 2 +- requirements/dev.txt | 18 +++--- requirements/prod.txt | 8 +-- requirements/test.txt | 12 ++-- setup.py | 1 + 7 files changed, 113 insertions(+), 25 deletions(-) diff --git a/.github/workflows/python-build.yml b/.github/workflows/python-build.yml index 333f55c4..e9e3aa52 100644 --- a/.github/workflows/python-build.yml +++ b/.github/workflows/python-build.yml @@ -8,7 +8,7 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: [3.7, 3.8, 3.9, '3.10'] + python-version: [3.7, 3.8, 3.9, '3.10', '3.11'] services: postgres: image: debezium/postgres:15 diff --git a/bin/parallel_sync b/bin/parallel_sync index a7fe3ecc..51a3b966 100755 --- a/bin/parallel_sync +++ b/bin/parallel_sync @@ -44,6 +44,7 @@ and row numbers. import asyncio import multiprocessing import os +import re import sys from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import dataclass @@ -54,9 +55,32 @@ from typing import Generator, Optional, Union import click import sqlalchemy as sa -from pgsync.settings import BLOCK_SIZE +from pgsync.settings import BLOCK_SIZE, CHECKPOINT_PATH from pgsync.sync import Sync -from pgsync.utils import get_config, load_config, show_settings, timeit +from pgsync.utils import ( + compiled_query, + get_config, + load_config, + show_settings, + timeit, +) + + +def save_ctid(page: int, row: int, name: str) -> None: + checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid") + with open(checkpoint_file, "w+") as fp: + fp.write(f"{page},{row}\n") + + +def read_ctid(name: str) -> None: + checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid") + if os.path.exists(checkpoint_file): + with open(checkpoint_file, "r") as fp: + pairs: str = fp.read().split()[0].split(",") + page = int(pairs[0]) + row = int(pairs[1]) + return page, row + return None, None def logical_slot_changes( @@ -92,10 +116,19 @@ class Task: @timeit -def fetch_tasks(doc: dict, block_size: Optional[int] = None) -> Generator: +def fetch_tasks( + doc: dict, + block_size: Optional[int] = None, +) -> Generator: block_size = block_size or BLOCK_SIZE pages: dict = {} sync: Sync = Sync(doc) + page: Optional[int] = None + row: Optional[int] = None + name: str = re.sub( + "[^0-9a-zA-Z_]+", "", f"{sync.database.lower()}_{sync.index}" + ) + page, row = read_ctid(name=name) statement: sa.sql.Select = sa.select( [ sa.literal_column("1").label("x"), @@ -103,6 +136,51 @@ def fetch_tasks(doc: dict, block_size: Optional[int] = None) -> Generator: sa.column("ctid"), ] ).select_from(sync.tree.root.model) + + # filter by Page + if page: + statement = statement.where( + sa.cast( + sa.func.SPLIT_PART( + sa.func.REPLACE( + sa.func.REPLACE( + sa.cast(sa.column("ctid"), sa.Text), + "(", + "", + ), + ")", + "", + ), + ",", + 1, + ), + sa.Integer, + ) + > page + ) + + # filter by Row + if row: + statement = statement.where( + sa.cast( + sa.func.SPLIT_PART( + sa.func.REPLACE( + sa.func.REPLACE( + sa.cast(sa.column("ctid"), sa.Text), + "(", + "", + ), + ")", + "", + ), + ",", + 2, + ), + sa.Integer, + ) + > row + ) + i: int = 1 for _, _, ctid in sync.fetchmany(statement): value: list = ctid[0].split(",") @@ -234,6 +312,8 @@ def multiprocess_async( event_loop.run_until_complete( run_tasks(executor, tasks, doc, verbose=verbose, validate=validate) ) + except KeyboardInterrupt: + pass finally: event_loop.close() @@ -286,7 +366,14 @@ def run_task( sync.index, sync.sync(ctid=task, txmin=txmin, txmax=txmax), ) - print("run_task complete") + if len(task) > 0: + page: int = max(task.keys()) + row: int = max(task[page]) + name: str = re.sub( + "[^0-9a-zA-Z_]+", "", f"{sync.database.lower()}_{sync.index}" + ) + save_ctid(page=page, row=row, name=name) + return 1 diff --git a/pgsync/__init__.py b/pgsync/__init__.py index 0156ea8f..5d441df0 100644 --- a/pgsync/__init__.py +++ b/pgsync/__init__.py @@ -4,4 +4,4 @@ __author__ = "Tolu Aina" __email__ = "tolu@pgsync.com" -__version__ = "2.3.3" +__version__ = "2.3.4" diff --git a/requirements/dev.txt b/requirements/dev.txt index cf716d28..9b359496 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -10,13 +10,13 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.24.94 +boto3==1.25.3 # via -r requirements/base.in -botocore==1.27.94 +botocore==1.28.3 # via # boto3 # s3transfer -build==0.8.0 +build==0.9.0 # via pip-tools bump2version==1.0.1 # via bumpversion @@ -51,6 +51,8 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in +exceptiongroup==1.0.0 + # via pytest faker==15.1.1 # via -r requirements/base.in filelock==3.8.0 @@ -75,7 +77,7 @@ freezegun==1.2.2 # via -r requirements/test.in greenlet==1.1.3.post0 # via sqlalchemy -identify==2.5.6 +identify==2.5.8 # via pre-commit idna==3.4 # via requests @@ -118,10 +120,8 @@ pluggy==1.0.0 # via pytest pre-commit==2.20.0 # via -r requirements/dev.in -psycopg2-binary==2.9.4 +psycopg2-binary==2.9.5 # via -r requirements/base.in -py==1.11.0 - # via pytest pycodestyle==2.9.1 # via # flake8 @@ -134,7 +134,7 @@ pyflakes==2.5.0 # via flake8 pyparsing==3.0.9 # via packaging -pytest==7.1.3 +pytest==7.2.0 # via # -r requirements/test.in # pytest-cov @@ -194,7 +194,7 @@ urllib3==1.26.12 # botocore # elasticsearch # requests -virtualenv==20.16.5 +virtualenv==20.16.6 # via pre-commit wheel==0.37.1 # via pip-tools diff --git a/requirements/prod.txt b/requirements/prod.txt index 5266947a..2dfb6d2b 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -8,9 +8,9 @@ async-timeout==4.0.2 # via redis black==22.10.0 # via -r requirements/base.in -boto3==1.24.94 +boto3==1.25.3 # via -r requirements/base.in -botocore==1.27.94 +botocore==1.28.3 # via # boto3 # s3transfer @@ -52,7 +52,7 @@ marshmallow==3.18.0 # via environs mypy-extensions==0.4.3 # via black -newrelic==8.2.1 +newrelic==8.4.0 # via -r requirements/prod.in packaging==21.3 # via @@ -62,7 +62,7 @@ pathspec==0.10.1 # via black platformdirs==2.5.2 # via black -psycopg2-binary==2.9.4 +psycopg2-binary==2.9.5 # via -r requirements/base.in pyparsing==3.0.9 # via packaging diff --git a/requirements/test.txt b/requirements/test.txt index 95a72b6f..61a9e6f2 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.24.94 +boto3==1.25.3 # via -r requirements/base.in -botocore==1.27.94 +botocore==1.28.3 # via # boto3 # s3transfer @@ -42,6 +42,8 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in +exceptiongroup==1.0.0 + # via pytest faker==15.1.1 # via -r requirements/base.in flake8==5.0.4 @@ -94,10 +96,8 @@ platformdirs==2.5.2 # via black pluggy==1.0.0 # via pytest -psycopg2-binary==2.9.4 +psycopg2-binary==2.9.5 # via -r requirements/base.in -py==1.11.0 - # via pytest pycodestyle==2.9.1 # via # flake8 @@ -110,7 +110,7 @@ pyflakes==2.5.0 # via flake8 pyparsing==3.0.9 # via packaging -pytest==7.1.3 +pytest==7.2.0 # via # -r requirements/test.in # pytest-cov diff --git a/setup.py b/setup.py index 6a288e4e..b52d2232 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ def get_version() -> str: "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)", From 815fab9e66f8f73c7e0a4a830510f18f5d804d48 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 27 Oct 2022 22:26:44 +0100 Subject: [PATCH 02/26] change build matrix to 3.11-dev --- .github/workflows/python-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-build.yml b/.github/workflows/python-build.yml index e9e3aa52..d732e8b0 100644 --- a/.github/workflows/python-build.yml +++ b/.github/workflows/python-build.yml @@ -8,7 +8,7 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: [3.7, 3.8, 3.9, '3.10', '3.11'] + python-version: ["3.7", "3.8", "3.9", '3.10', '3.11-dev'] services: postgres: image: debezium/postgres:15 From c7142d719cadd2b676fc8e2bb4f11f6415dd21d4 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 27 Oct 2022 22:40:48 +0100 Subject: [PATCH 03/26] change ci build template --- .github/workflows/python-build.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python-build.yml b/.github/workflows/python-build.yml index d732e8b0..a2edde19 100644 --- a/.github/workflows/python-build.yml +++ b/.github/workflows/python-build.yml @@ -8,7 +8,7 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: ["3.7", "3.8", "3.9", '3.10', '3.11-dev'] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11-dev"] services: postgres: image: debezium/postgres:15 @@ -34,9 +34,9 @@ jobs: network.host: 127.0.0.1 http.host: 0.0.0.0 steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v1 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies From 60f7e145e5b8a2316c909977390dcd1689cb7fa5 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 27 Oct 2022 22:47:35 +0100 Subject: [PATCH 04/26] change to non dev vesion of python 3.11 --- .github/workflows/python-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-build.yml b/.github/workflows/python-build.yml index a2edde19..bfda2894 100644 --- a/.github/workflows/python-build.yml +++ b/.github/workflows/python-build.yml @@ -8,7 +8,7 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11-dev"] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] services: postgres: image: debezium/postgres:15 From d619c8db81aa4e57f07af4a02bc50566d68d11ea Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 27 Oct 2022 23:00:20 +0100 Subject: [PATCH 05/26] add python 3.11 to pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4c31d15d..65e68881 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,3 @@ [tool.black] line-length = 79 -target-version = ['py37', 'py38', 'py39', 'py310'] \ No newline at end of file +target-version = ['py37', 'py38', 'py39', 'py310', 'py311'] \ No newline at end of file From a4b5af0236645a6c456e55a1e1e7dfa10038db61 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Mon, 31 Oct 2022 18:12:04 +0100 Subject: [PATCH 06/26] update package deps --- pgsync/node.py | 2 +- pgsync/redisqueue.py | 2 +- pgsync/sync.py | 2 +- requirements/dev.txt | 6 +++--- requirements/prod.txt | 4 ++-- requirements/test.txt | 6 +++--- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pgsync/node.py b/pgsync/node.py index 69987368..15d66bb6 100644 --- a/pgsync/node.py +++ b/pgsync/node.py @@ -164,7 +164,7 @@ def setup(self): for column_name in self.column_names: - tokens = None + tokens: Optional[list] = None if any(op in column_name for op in JSONB_OPERATORS): tokens = re.split( f"({'|'.join(JSONB_OPERATORS)})", diff --git a/pgsync/redisqueue.py b/pgsync/redisqueue.py index 9fd96b9c..430e6cad 100644 --- a/pgsync/redisqueue.py +++ b/pgsync/redisqueue.py @@ -41,7 +41,7 @@ def bulk_pop(self, chunk_size: Optional[int] = None) -> List[dict]: pipeline.lrange(self.key, 0, chunk_size - 1) pipeline.ltrim(self.key, chunk_size, -1) items: List = pipeline.execute() - logger.debug(f"bulk_pop nsize: {len(items[0])}") + logger.debug(f"bulk_pop size: {len(items[0])}") return list(map(lambda value: json.loads(value), items[0])) def bulk_push(self, items: List) -> None: diff --git a/pgsync/sync.py b/pgsync/sync.py index 791630c2..8e30b74b 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -987,7 +987,7 @@ def poll_db(self) -> None: payloads: list = [] while True: - # NB: consider reducing POLL_TIMEOUT to increase throughout + # NB: consider reducing POLL_TIMEOUT to increase throughput if select.select([conn], [], [], POLL_TIMEOUT) == ([], [], []): # Catch any hanging items from the last poll if payloads: diff --git a/requirements/dev.txt b/requirements/dev.txt index 9b359496..9c893db1 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.25.3 +boto3==1.25.4 # via -r requirements/base.in -botocore==1.28.3 +botocore==1.28.4 # via # boto3 # s3transfer @@ -177,7 +177,7 @@ sqlalchemy==1.4.42 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in -termcolor==2.0.1 +termcolor==2.1.0 # via pytest-sugar toml==0.10.2 # via pre-commit diff --git a/requirements/prod.txt b/requirements/prod.txt index 2dfb6d2b..b2d1110a 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -8,9 +8,9 @@ async-timeout==4.0.2 # via redis black==22.10.0 # via -r requirements/base.in -boto3==1.25.3 +boto3==1.25.4 # via -r requirements/base.in -botocore==1.28.3 +botocore==1.28.4 # via # boto3 # s3transfer diff --git a/requirements/test.txt b/requirements/test.txt index 61a9e6f2..75827600 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.25.3 +boto3==1.25.4 # via -r requirements/base.in -botocore==1.28.3 +botocore==1.28.4 # via # boto3 # s3transfer @@ -151,7 +151,7 @@ sqlalchemy==1.4.42 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in -termcolor==2.0.1 +termcolor==2.1.0 # via pytest-sugar tomli==2.0.1 # via From 39540bc382327dc7ff012fcc6967cb2fc886c350 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Mon, 31 Oct 2022 20:57:35 +0100 Subject: [PATCH 07/26] fixed tests --- tests/test_redisqueue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_redisqueue.py b/tests/test_redisqueue.py index 2d4ec7e3..dd89cae3 100644 --- a/tests/test_redisqueue.py +++ b/tests/test_redisqueue.py @@ -69,11 +69,11 @@ def test_bulk_pop(self, mock_logger): queue.delete() queue.bulk_push([1, 2]) items = queue.bulk_pop() - mock_logger.debug.assert_called_once_with("bulk_pop nsize: 2") + mock_logger.debug.assert_called_once_with("bulk_pop size: 2") assert items == [1, 2] queue.bulk_push([3, 4, 5]) items = queue.bulk_pop() - mock_logger.debug.assert_any_call("bulk_pop nsize: 3") + mock_logger.debug.assert_any_call("bulk_pop size: 3") assert items == [3, 4, 5] queue.delete() From 3fe7ffeb98f55d3d201ddf7ad679bfe1498738da Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Tue, 8 Nov 2022 23:22:23 +0100 Subject: [PATCH 08/26] handle search_phase_execution_exception with incompatible data types dring search #352 --- pgsync/elastichelper.py | 10 ++++++++-- pgsync/utils.py | 12 ++++++------ requirements/dev.txt | 18 +++++++++--------- requirements/prod.txt | 12 ++++++------ requirements/test.txt | 16 ++++++++-------- 5 files changed, 37 insertions(+), 31 deletions(-) diff --git a/pgsync/elastichelper.py b/pgsync/elastichelper.py index 077b6721..05e1262f 100644 --- a/pgsync/elastichelper.py +++ b/pgsync/elastichelper.py @@ -5,6 +5,7 @@ import boto3 from elasticsearch import Elasticsearch, helpers, RequestsHttpConnection +from elasticsearch.exceptions import RequestError from elasticsearch_dsl import Q, Search from elasticsearch_dsl.query import Bool from requests_aws4auth import AWS4Auth @@ -227,8 +228,13 @@ def _search(self, index: str, table: str, fields: Optional[dict] = None): ] ) ) - for hit in search.scan(): - yield hit.meta.id + try: + for hit in search.scan(): + yield hit.meta.id + except RequestError as e: + logger.warning(f"RequestError: {e}") + if "is out of range for a long" not in str(e): + raise def search(self, index: str, body: dict): """ diff --git a/pgsync/utils.py b/pgsync/utils.py index dc609dee..8e95d2a3 100644 --- a/pgsync/utils.py +++ b/pgsync/utils.py @@ -19,7 +19,7 @@ logger = logging.getLogger(__name__) -HIGHLIGHT_START = "\033[4m" +HIGHLIGHT_BEGIN = "\033[4m" HIGHLIGHT_END = "\033[0m:" @@ -95,20 +95,20 @@ def get_redacted_url(result: ParseResult) -> ParseResult: def show_settings(schema: Optional[str] = None) -> None: """Show settings.""" - logger.info(f"{HIGHLIGHT_START}Settings{HIGHLIGHT_END}") + logger.info(f"{HIGHLIGHT_BEGIN}Settings{HIGHLIGHT_END}") logger.info(f'{"Schema":<10s}: {schema or SCHEMA}') logger.info("-" * 65) - logger.info(f"{HIGHLIGHT_START}Checkpoint{HIGHLIGHT_END}") + logger.info(f"{HIGHLIGHT_BEGIN}Checkpoint{HIGHLIGHT_END}") logger.info(f"Path: {CHECKPOINT_PATH}") - logger.info(f"{HIGHLIGHT_START}Postgres{HIGHLIGHT_END}") + logger.info(f"{HIGHLIGHT_BEGIN}Postgres{HIGHLIGHT_END}") result: ParseResult = get_redacted_url( urlparse(get_postgres_url("postgres")) ) logger.info(f"URL: {result.geturl()}") result = get_redacted_url(urlparse(get_elasticsearch_url())) - logger.info(f"{HIGHLIGHT_START}Elasticsearch{HIGHLIGHT_END}") + logger.info(f"{HIGHLIGHT_BEGIN}Elasticsearch{HIGHLIGHT_END}") logger.info(f"URL: {result.geturl()}") - logger.info(f"{HIGHLIGHT_START}Redis{HIGHLIGHT_END}") + logger.info(f"{HIGHLIGHT_BEGIN}Redis{HIGHLIGHT_END}") result = get_redacted_url(urlparse(get_redis_url())) logger.info(f"URL: {result.geturl()}") logger.info("-" * 65) diff --git a/requirements/dev.txt b/requirements/dev.txt index 9c893db1..fce4b68d 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.25.4 +boto3==1.26.5 # via -r requirements/base.in -botocore==1.28.4 +botocore==1.29.5 # via # boto3 # s3transfer @@ -51,9 +51,9 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -exceptiongroup==1.0.0 +exceptiongroup==1.0.1 # via pytest -faker==15.1.1 +faker==15.3.1 # via -r requirements/base.in filelock==3.8.0 # via virtualenv @@ -75,7 +75,7 @@ flake8-todo==0.7 # via -r requirements/test.in freezegun==1.2.2 # via -r requirements/test.in -greenlet==1.1.3.post0 +greenlet==2.0.1 # via sqlalchemy identify==2.5.8 # via pre-commit @@ -112,7 +112,7 @@ pep517==0.13.0 # via build pip-tools==6.9.0 # via -r requirements/dev.in -platformdirs==2.5.2 +platformdirs==2.5.3 # via # black # virtualenv @@ -146,7 +146,7 @@ pytest-mock==3.10.0 # via -r requirements/test.in pytest-runner==6.0.0 # via -r requirements/test.in -pytest-sugar==0.9.5 +pytest-sugar==0.9.6 # via -r requirements/test.in python-dateutil==2.8.2 # via @@ -173,7 +173,7 @@ six==1.16.0 # requests-aws4auth snowballstemmer==2.2.0 # via pydocstyle -sqlalchemy==1.4.42 +sqlalchemy==1.4.43 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in @@ -196,7 +196,7 @@ urllib3==1.26.12 # requests virtualenv==20.16.6 # via pre-commit -wheel==0.37.1 +wheel==0.38.3 # via pip-tools wrapt==1.14.1 # via deprecated diff --git a/requirements/prod.txt b/requirements/prod.txt index b2d1110a..d8ce0fb9 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -8,9 +8,9 @@ async-timeout==4.0.2 # via redis black==22.10.0 # via -r requirements/base.in -boto3==1.25.4 +boto3==1.26.5 # via -r requirements/base.in -botocore==1.28.4 +botocore==1.29.5 # via # boto3 # s3transfer @@ -38,9 +38,9 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -faker==15.1.1 +faker==15.3.1 # via -r requirements/base.in -greenlet==1.1.3.post0 +greenlet==2.0.1 # via sqlalchemy idna==3.4 # via requests @@ -60,7 +60,7 @@ packaging==21.3 # redis pathspec==0.10.1 # via black -platformdirs==2.5.2 +platformdirs==2.5.3 # via black psycopg2-binary==2.9.5 # via -r requirements/base.in @@ -86,7 +86,7 @@ six==1.16.0 # elasticsearch-dsl # python-dateutil # requests-aws4auth -sqlalchemy==1.4.42 +sqlalchemy==1.4.43 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in diff --git a/requirements/test.txt b/requirements/test.txt index 75827600..eb40ac0f 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.25.4 +boto3==1.26.5 # via -r requirements/base.in -botocore==1.28.4 +botocore==1.29.5 # via # boto3 # s3transfer @@ -42,9 +42,9 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -exceptiongroup==1.0.0 +exceptiongroup==1.0.1 # via pytest -faker==15.1.1 +faker==15.3.1 # via -r requirements/base.in flake8==5.0.4 # via @@ -64,7 +64,7 @@ flake8-todo==0.7 # via -r requirements/test.in freezegun==1.2.2 # via -r requirements/test.in -greenlet==1.1.3.post0 +greenlet==2.0.1 # via sqlalchemy idna==3.4 # via requests @@ -92,7 +92,7 @@ packaging==21.3 # redis pathspec==0.10.1 # via black -platformdirs==2.5.2 +platformdirs==2.5.3 # via black pluggy==1.0.0 # via pytest @@ -122,7 +122,7 @@ pytest-mock==3.10.0 # via -r requirements/test.in pytest-runner==6.0.0 # via -r requirements/test.in -pytest-sugar==0.9.5 +pytest-sugar==0.9.6 # via -r requirements/test.in python-dateutil==2.8.2 # via @@ -147,7 +147,7 @@ six==1.16.0 # requests-aws4auth snowballstemmer==2.2.0 # via pydocstyle -sqlalchemy==1.4.42 +sqlalchemy==1.4.43 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in From dc191aff62af9bed2623eb74009135ecb2439be3 Mon Sep 17 00:00:00 2001 From: Loren Siebert Date: Sun, 13 Nov 2022 06:26:26 -0800 Subject: [PATCH 09/26] Fix call to `sorted()` on dicts (#381) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Running `—analyze` errors in python 3.11 when indices exist ```bash File "/Users/lorensiebert/workspace/pgsync/pgsync/base.py", line 252, in indices self.__indices[(table, schema)] = sorted( ^^^^^^^ TypeError: '<' not supported between instances of 'dict' and 'dict' ``` --- pgsync/base.py | 5 ++--- tests/test_base.py | 7 ++++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pgsync/base.py b/pgsync/base.py index fbaebfc8..8ef30242 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -249,9 +249,8 @@ def _materialized_views(self, schema: str) -> list: def indices(self, table: str, schema: str) -> list: """Get the database table indexes.""" if (table, schema) not in self.__indices: - self.__indices[(table, schema)] = sorted( - sa.inspect(self.engine).get_indexes(table, schema=schema) - ) + indexes = sa.inspect(self.engine).get_indexes(table, schema=schema) + self.__indices[(table, schema)] = sorted(indexes, key=lambda d: d['name']) return self.__indices[(table, schema)] def tables(self, schema: str) -> list: diff --git a/tests/test_base.py b/tests/test_base.py index 6ff23c74..4c7807de 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -110,7 +110,12 @@ def test_tables(self, connection): def test_indices(self, connection): pg_base = Base(connection.engine.url.database) - assert pg_base.indices("book", "public") == [] + assert pg_base.indices("contact_item", "public") == [ + {'name': 'contact_item_contact_id_key', 'unique': True, 'column_names': ['contact_id'], + 'include_columns': [], 'duplicates_constraint': 'contact_item_contact_id_key', + 'dialect_options': {'postgresql_include': []}}, + {'name': 'contact_item_name_key', 'unique': True, 'column_names': ['name'], 'include_columns': [], + 'duplicates_constraint': 'contact_item_name_key', 'dialect_options': {'postgresql_include': []}}] @patch("pgsync.base.logger") @patch("pgsync.sync.Base.execute") From d559b6f6a0fbc145eeaf456cb3bce4181fe4098f Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sun, 13 Nov 2022 15:46:52 +0100 Subject: [PATCH 10/26] refomatting files --- pgsync/base.py | 4 +++- tests/test_base.py | 22 +++++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/pgsync/base.py b/pgsync/base.py index 8ef30242..40062068 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -250,7 +250,9 @@ def indices(self, table: str, schema: str) -> list: """Get the database table indexes.""" if (table, schema) not in self.__indices: indexes = sa.inspect(self.engine).get_indexes(table, schema=schema) - self.__indices[(table, schema)] = sorted(indexes, key=lambda d: d['name']) + self.__indices[(table, schema)] = sorted( + indexes, key=lambda d: d["name"] + ) return self.__indices[(table, schema)] def tables(self, schema: str) -> list: diff --git a/tests/test_base.py b/tests/test_base.py index 4c7807de..5da4cf36 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -111,11 +111,23 @@ def test_tables(self, connection): def test_indices(self, connection): pg_base = Base(connection.engine.url.database) assert pg_base.indices("contact_item", "public") == [ - {'name': 'contact_item_contact_id_key', 'unique': True, 'column_names': ['contact_id'], - 'include_columns': [], 'duplicates_constraint': 'contact_item_contact_id_key', - 'dialect_options': {'postgresql_include': []}}, - {'name': 'contact_item_name_key', 'unique': True, 'column_names': ['name'], 'include_columns': [], - 'duplicates_constraint': 'contact_item_name_key', 'dialect_options': {'postgresql_include': []}}] + { + "name": "contact_item_contact_id_key", + "unique": True, + "column_names": ["contact_id"], + "include_columns": [], + "duplicates_constraint": "contact_item_contact_id_key", + "dialect_options": {"postgresql_include": []}, + }, + { + "name": "contact_item_name_key", + "unique": True, + "column_names": ["name"], + "include_columns": [], + "duplicates_constraint": "contact_item_name_key", + "dialect_options": {"postgresql_include": []}, + }, + ] @patch("pgsync.base.logger") @patch("pgsync.sync.Base.execute") From 22b665b1077b6fb27bc14f871affd8cce8fbb334 Mon Sep 17 00:00:00 2001 From: Loren Siebert Date: Tue, 15 Nov 2022 03:31:43 -0800 Subject: [PATCH 11/26] Use STREAM_RESULTS in call to execute (#383) Some evidence that memory usage spikes when truncating a slot with many txs in it --- pgsync/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgsync/base.py b/pgsync/base.py index 40062068..2ee647e0 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -430,7 +430,7 @@ def logical_slot_get_changes( limit=limit, offset=offset, ) - self.execute(statement) + self.execute(statement, options=dict(stream_results=STREAM_RESULTS)) def logical_slot_peek_changes( self, From 46d661bfc3802df381929962aa1531c776ffa9f7 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 17 Nov 2022 16:28:13 +0100 Subject: [PATCH 12/26] bumped up requirements --- requirements/dev.txt | 22 +++++++++++----------- requirements/prod.txt | 14 +++++++------- requirements/test.txt | 16 ++++++++-------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/requirements/dev.txt b/requirements/dev.txt index fce4b68d..4cd7b8f6 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.26.5 +boto3==1.26.11 # via -r requirements/base.in -botocore==1.29.5 +botocore==1.29.11 # via # boto3 # s3transfer @@ -51,9 +51,9 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -exceptiongroup==1.0.1 +exceptiongroup==1.0.4 # via pytest -faker==15.3.1 +faker==15.3.2 # via -r requirements/base.in filelock==3.8.0 # via virtualenv @@ -89,7 +89,7 @@ jmespath==1.0.1 # via # boto3 # botocore -marshmallow==3.18.0 +marshmallow==3.19.0 # via environs mccabe==0.7.0 # via flake8 @@ -106,13 +106,13 @@ packaging==21.3 # pytest # pytest-sugar # redis -pathspec==0.10.1 +pathspec==0.10.2 # via black pep517==0.13.0 # via build -pip-tools==6.9.0 +pip-tools==6.10.0 # via -r requirements/dev.in -platformdirs==2.5.3 +platformdirs==2.5.4 # via # black # virtualenv @@ -173,7 +173,7 @@ six==1.16.0 # requests-aws4auth snowballstemmer==2.2.0 # via pydocstyle -sqlalchemy==1.4.43 +sqlalchemy==1.4.44 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in @@ -194,9 +194,9 @@ urllib3==1.26.12 # botocore # elasticsearch # requests -virtualenv==20.16.6 +virtualenv==20.16.7 # via pre-commit -wheel==0.38.3 +wheel==0.38.4 # via pip-tools wrapt==1.14.1 # via deprecated diff --git a/requirements/prod.txt b/requirements/prod.txt index d8ce0fb9..aa099fe2 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -8,9 +8,9 @@ async-timeout==4.0.2 # via redis black==22.10.0 # via -r requirements/base.in -boto3==1.26.5 +boto3==1.26.11 # via -r requirements/base.in -botocore==1.29.5 +botocore==1.29.11 # via # boto3 # s3transfer @@ -38,7 +38,7 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -faker==15.3.1 +faker==15.3.2 # via -r requirements/base.in greenlet==2.0.1 # via sqlalchemy @@ -48,7 +48,7 @@ jmespath==1.0.1 # via # boto3 # botocore -marshmallow==3.18.0 +marshmallow==3.19.0 # via environs mypy-extensions==0.4.3 # via black @@ -58,9 +58,9 @@ packaging==21.3 # via # marshmallow # redis -pathspec==0.10.1 +pathspec==0.10.2 # via black -platformdirs==2.5.3 +platformdirs==2.5.4 # via black psycopg2-binary==2.9.5 # via -r requirements/base.in @@ -86,7 +86,7 @@ six==1.16.0 # elasticsearch-dsl # python-dateutil # requests-aws4auth -sqlalchemy==1.4.43 +sqlalchemy==1.4.44 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in diff --git a/requirements/test.txt b/requirements/test.txt index eb40ac0f..a97f3546 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.26.5 +boto3==1.26.11 # via -r requirements/base.in -botocore==1.29.5 +botocore==1.29.11 # via # boto3 # s3transfer @@ -42,9 +42,9 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -exceptiongroup==1.0.1 +exceptiongroup==1.0.4 # via pytest -faker==15.3.1 +faker==15.3.2 # via -r requirements/base.in flake8==5.0.4 # via @@ -76,7 +76,7 @@ jmespath==1.0.1 # via # boto3 # botocore -marshmallow==3.18.0 +marshmallow==3.19.0 # via environs mccabe==0.7.0 # via flake8 @@ -90,9 +90,9 @@ packaging==21.3 # pytest # pytest-sugar # redis -pathspec==0.10.1 +pathspec==0.10.2 # via black -platformdirs==2.5.3 +platformdirs==2.5.4 # via black pluggy==1.0.0 # via pytest @@ -147,7 +147,7 @@ six==1.16.0 # requests-aws4auth snowballstemmer==2.2.0 # via pydocstyle -sqlalchemy==1.4.43 +sqlalchemy==1.4.44 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in From 07d6351715fb7541d8c0eea1cd1ad158dd921bd7 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Fri, 18 Nov 2022 15:55:11 +0100 Subject: [PATCH 13/26] Query optimization for root level nodes --- .env.sample | 3 ++ pgsync/querybuilder.py | 5 ++- pgsync/settings.py | 3 ++ pgsync/sync.py | 90 ++++++++++++++++++++++++++++++++++++------ pgsync/utils.py | 30 ++++++++++++++ 5 files changed, 116 insertions(+), 15 deletions(-) diff --git a/.env.sample b/.env.sample index 65c599c7..7a608498 100644 --- a/.env.sample +++ b/.env.sample @@ -21,6 +21,9 @@ # USE_ASYNC=False # JOIN_QUERIES=True # STREAM_RESULTS=True +# db polling interval +# POLL_INTERVAL=0.5 +# FILTER_CHUNK_SIZE=10000 # Elasticsearch # ELASTICSEARCH_SCHEME=http diff --git a/pgsync/querybuilder.py b/pgsync/querybuilder.py index 0def2ed4..c7e6956a 100644 --- a/pgsync/querybuilder.py +++ b/pgsync/querybuilder.py @@ -38,8 +38,9 @@ def _build_filters( _filters: list = [] for _filter in filters.get(node.table): where: list = [] - for key, value in _filter.items(): - where.append(node.model.c[key] == value) + for column, value in _filter.items(): + where.append(node.model.c[column] == value) + # and clause is applied when we have a composite primary key _filters.append(sa.and_(*where)) return sa.or_(*_filters) diff --git a/pgsync/settings.py b/pgsync/settings.py index d78a6df5..48aaef2c 100644 --- a/pgsync/settings.py +++ b/pgsync/settings.py @@ -28,6 +28,7 @@ QUERY_LITERAL_BINDS = env.bool("QUERY_LITERAL_BINDS", default=False) # db query chunk size (how many records to fetch at a time) QUERY_CHUNK_SIZE = env.int("QUERY_CHUNK_SIZE", default=10000) +FILTER_CHUNK_SIZE = env.int("FILTER_CHUNK_SIZE", default=10000) # replication slot cleanup interval (in secs) REPLICATION_SLOT_CLEANUP_INTERVAL = env.float( "REPLICATION_SLOT_CLEANUP_INTERVAL", @@ -37,6 +38,8 @@ SCHEMA = env.str("SCHEMA", default=None) USE_ASYNC = env.bool("USE_ASYNC", default=False) STREAM_RESULTS = env.bool("STREAM_RESULTS", default=True) +# db polling interval +POLL_INTERVAL = env.float("POLL_INTERVAL", default=0.5) # Elasticsearch: ELASTICSEARCH_API_KEY = env.str("ELASTICSEARCH_API_KEY", default=None) diff --git a/pgsync/sync.py b/pgsync/sync.py index 8e30b74b..02d4e734 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -44,10 +44,12 @@ from .redisqueue import RedisQueue from .settings import ( CHECKPOINT_PATH, + FILTER_CHUNK_SIZE, JOIN_QUERIES, LOG_INTERVAL, LOGICAL_SLOT_CHUNK_SIZE, NTHREADS_POLLDB, + POLL_INTERVAL, POLL_TIMEOUT, REDIS_POLL_INTERVAL, REDIS_WRITE_CHUNK_SIZE, @@ -56,10 +58,12 @@ ) from .transform import Transform from .utils import ( + chunks, compiled_query, exception, get_config, load_config, + MutuallyExclusiveOption, show_settings, threaded, Timer, @@ -77,6 +81,7 @@ def __init__( verbose: bool = False, validate: bool = True, repl_slots: bool = True, + display_tree: bool = False, **kwargs, ) -> None: """Constructor.""" @@ -100,6 +105,7 @@ def __init__( self._checkpoint_file: str = os.path.join( CHECKPOINT_PATH, f".{self.__name}" ) + self.display_tree: bool = display_tree self.redis: RedisQueue = RedisQueue(self.__name) self.tree: Tree = Tree(self.models) if validate: @@ -175,7 +181,8 @@ def validate(self, repl_slots: bool = True) -> None: ) self.tree.build(self.nodes) - self.tree.display() + if self.display_tree: + self.tree.display() for node in self.tree.traverse_breadth_first(): @@ -471,7 +478,6 @@ def _insert_op( raise # set the parent as the new entity that has changed - filters[node.parent.table] = [] foreign_keys = self.query_builder._get_foreign_keys( node.parent, node, @@ -492,7 +498,6 @@ def _insert_op( # handle case where we insert into a through table # set the parent as the new entity that has changed - filters[node.parent.table] = [] foreign_keys = self.query_builder.get_foreign_keys( node.parent, node, @@ -798,7 +803,13 @@ def _payloads(self, payloads: List[Payload]) -> None: logger.debug(f"tg_op: {payload.tg_op} table: {node.name}") - filters: dict = {node.table: [], self.tree.root.table: []} + filters: dict = { + node.table: [], + self.tree.root.table: [], + } + if not node.is_root: + filters[node.parent.table] = [] + extra: dict = {} if payload.tg_op == INSERT: @@ -834,7 +845,33 @@ def _payloads(self, payloads: List[Payload]) -> None: # otherwise we would end up performing a full query # and sync the entire db! if any(filters.values()): - yield from self.sync(filters=filters, extra=extra) + """ + Filters is a dict of tables where each key is a list of id's + { + 'city': [1, 2, 3], + 'book': [ + {'id': '1'}, + {'id': '2'}, + {'id': '7'}, + ... + ] + } + """ + # Lets chunk at only the root node for now. + for chunk in chunks( + filters[self.tree.root.table], + FILTER_CHUNK_SIZE, + ): + values: dict = { + self.tree.root.table: chunk, + node.table: filters[node.table], + } + if not node.is_root: + filters[node.parent.table] = filters[node.table] + yield from self.sync( + filters=values, + extra=extra, + ) def sync( self, @@ -1207,7 +1244,21 @@ def receive(self, nthreads_polldb: int) -> None: help="Schema config", type=click.Path(exists=True), ) -@click.option("--daemon", "-d", is_flag=True, help="Run as a daemon") +@click.option( + "--daemon", + "-d", + is_flag=True, + help="Run as a daemon (Incompatible with --polling)", + cls=MutuallyExclusiveOption, + mutually_exclusive=["polling"], +) +@click.option( + "--polling", + is_flag=True, + help="Polling mode (Incompatible with -d)", + cls=MutuallyExclusiveOption, + mutually_exclusive=["daemon"], +) @click.option("--host", "-h", help="PG_HOST override") @click.option("--password", is_flag=True, help="Prompt for database password") @click.option("--port", "-p", help="PG_PORT override", type=int) @@ -1272,6 +1323,7 @@ def main( version, analyze, nthreads_polldb, + polling, ): """Main application syncer.""" if version: @@ -1301,17 +1353,29 @@ def main( with Timer(): - for document in load_config(config): - sync: Sync = Sync(document, verbose=verbose, **kwargs) + if analyze: - if analyze: + for document in load_config(config): + sync: Sync = Sync(document, verbose=verbose, **kwargs) sync.analyze() - continue - sync.pull() + elif polling: + + while True: + for document in load_config(config): + sync: Sync = Sync( + document, verbose=verbose, display_tree=False, **kwargs + ) + sync.pull() + time.sleep(POLL_INTERVAL) + + else: - if daemon: - sync.receive(nthreads_polldb) + for document in load_config(config): + sync: Sync = Sync(document, verbose=verbose, **kwargs) + sync.pull() + if daemon: + sync.receive(nthreads_polldb) if __name__ == "__main__": diff --git a/pgsync/utils.py b/pgsync/utils.py index 8e95d2a3..183753da 100644 --- a/pgsync/utils.py +++ b/pgsync/utils.py @@ -10,6 +10,7 @@ from typing import Callable, Generator, Optional from urllib.parse import ParseResult, urlparse +import click import sqlalchemy as sa import sqlparse @@ -23,6 +24,12 @@ HIGHLIGHT_END = "\033[0m:" +def chunks(l: list, n: int): + """Yield successive n-sized chunks from l""" + for i in range(0, len(l), n): + yield l[i : i + n] + + def timeit(func: Callable): def timed(*args, **kwargs): since: float = time() @@ -160,3 +167,26 @@ def compiled_query( sys.stdout.write(f"{query}\n") sys.stdout.write("-" * 79) sys.stdout.write("\n") + + +class MutuallyExclusiveOption(click.Option): + def __init__(self, *args, **kwargs): + self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) + help = kwargs.get("help", "") + if self.mutually_exclusive: + kwargs["help"] = help + ( + f" NOTE: This argument is mutually exclusive with " + f" arguments: [{', '.join(self.mutually_exclusive)}]." + ) + super(MutuallyExclusiveOption, self).__init__(*args, **kwargs) + + def handle_parse_result(self, ctx, opts, args): + if self.mutually_exclusive.intersection(opts) and self.name in opts: + raise click.UsageError( + f"Illegal usage: `{self.name}` is mutually exclusive with " + f"arguments `{', '.join(self.mutually_exclusive)}`." + ) + + return super(MutuallyExclusiveOption, self).handle_parse_result( + ctx, opts, args + ) From 3d79c48ce9ae9eff6047dadd8838e409f1b11ade Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 19 Nov 2022 20:50:00 +0100 Subject: [PATCH 14/26] rename load_config to config_loader and more rocust query chunking --- .env.sample | 4 +- bin/bootstrap | 4 +- bin/es_mapping | 6 ++- bin/parallel_sync | 4 +- examples/airbnb/data.py | 4 +- examples/airbnb/schema.py | 4 +- examples/ancestry/data.py | 4 +- examples/ancestry/schema.py | 4 +- examples/book/benchmark.py | 4 +- examples/book/data.py | 4 +- examples/book/schema.py | 4 +- examples/book_view/benchmark.py | 4 +- examples/book_view/data.py | 4 +- examples/book_view/schema.py | 4 +- examples/node/data.py | 4 +- examples/node/schema.py | 4 +- examples/quiz/data.py | 4 +- examples/quiz/schema.py | 4 +- examples/schemas/data.py | 4 +- examples/schemas/schema.py | 4 +- examples/social/data.py | 4 +- examples/social/schema.py | 4 +- examples/starcraft/data.py | 4 +- examples/starcraft/schema.py | 4 +- pgsync/base.py | 2 +- pgsync/helper.py | 6 +-- pgsync/querybuilder.py | 27 +++++++----- pgsync/settings.py | 4 +- pgsync/sync.py | 73 ++++++++++++++++++++++----------- pgsync/utils.py | 10 +++-- tests/test_utils.py | 6 +-- 31 files changed, 131 insertions(+), 95 deletions(-) diff --git a/.env.sample b/.env.sample index 7a608498..37bd921a 100644 --- a/.env.sample +++ b/.env.sample @@ -22,8 +22,8 @@ # JOIN_QUERIES=True # STREAM_RESULTS=True # db polling interval -# POLL_INTERVAL=0.5 -# FILTER_CHUNK_SIZE=10000 +# POLL_INTERVAL=0.1 +# FILTER_CHUNK_SIZE=5000 # Elasticsearch # ELASTICSEARCH_SCHEME=http diff --git a/bin/bootstrap b/bin/bootstrap index 4e29c387..c8d6e3ad 100755 --- a/bin/bootstrap +++ b/bin/bootstrap @@ -6,7 +6,7 @@ import logging import click from pgsync.sync import Sync -from pgsync.utils import get_config, load_config, show_settings +from pgsync.utils import config_loader, get_config, show_settings logger = logging.getLogger(__name__) @@ -54,7 +54,7 @@ def main(teardown, config, user, password, host, port, verbose): show_settings(config) - for document in load_config(config): + for document in config_loader(config): sync: Sync = Sync( document, verbose=verbose, repl_slots=False, **kwargs ) diff --git a/bin/es_mapping b/bin/es_mapping index 49769624..fb290bff 100755 --- a/bin/es_mapping +++ b/bin/es_mapping @@ -8,7 +8,7 @@ from elasticsearch import Elasticsearch, helpers from pgsync.settings import ELASTICSEARCH_TIMEOUT, ELASTICSEARCH_VERIFY_CERTS from pgsync.urls import get_elasticsearch_url -from pgsync.utils import get_config, load_config, timeit +from pgsync.utils import config_loader, get_config, timeit logger = logging.getLogger(__name__) @@ -114,7 +114,9 @@ def main(config): """Create custom NGram analyzer for the default mapping.""" config: str = get_config(config) - for index in set([document["index"] for document in load_config(config)]): + for index in set( + [document["index"] for document in config_loader(config)] + ): create_es_mapping(index) diff --git a/bin/parallel_sync b/bin/parallel_sync index 51a3b966..34dc3e71 100755 --- a/bin/parallel_sync +++ b/bin/parallel_sync @@ -59,8 +59,8 @@ from pgsync.settings import BLOCK_SIZE, CHECKPOINT_PATH from pgsync.sync import Sync from pgsync.utils import ( compiled_query, + config_loader, get_config, - load_config, show_settings, timeit, ) @@ -425,7 +425,7 @@ def main(config, nprocs, mode, verbose): show_settings() config: str = get_config(config) - for document in load_config(config): + for document in config_loader(config): tasks: Generator = fetch_tasks(document) if mode == "synchronous": synchronous(tasks, document, verbose=verbose) diff --git a/examples/airbnb/data.py b/examples/airbnb/data.py index f397764f..803285c3 100644 --- a/examples/airbnb/data.py +++ b/examples/airbnb/data.py @@ -7,7 +7,7 @@ from pgsync.base import pg_engine, subtransactions from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -21,7 +21,7 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document: dict = next(load_config(config)) + document: dict = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) diff --git a/examples/airbnb/schema.py b/examples/airbnb/schema.py index ab3c3e23..11adda26 100644 --- a/examples/airbnb/schema.py +++ b/examples/airbnb/schema.py @@ -7,7 +7,7 @@ from pgsync.base import create_database, pg_engine from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config Base = declarative_base() @@ -91,7 +91,7 @@ class Review(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) create_database(database) with pg_engine(database) as engine: diff --git a/examples/ancestry/data.py b/examples/ancestry/data.py index c06059ed..f9d0c22e 100644 --- a/examples/ancestry/data.py +++ b/examples/ancestry/data.py @@ -4,7 +4,7 @@ from pgsync.base import pg_engine, subtransactions from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -18,7 +18,7 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document: dict = next(load_config(config)) + document: dict = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) diff --git a/examples/ancestry/schema.py b/examples/ancestry/schema.py index a3ee360f..787c6c3a 100644 --- a/examples/ancestry/schema.py +++ b/examples/ancestry/schema.py @@ -4,7 +4,7 @@ from pgsync.base import create_database, pg_engine from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config Base = declarative_base() @@ -49,7 +49,7 @@ class GreatGrandChild(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) create_database(database) with pg_engine(database) as engine: diff --git a/examples/book/benchmark.py b/examples/book/benchmark.py index d5c78ac9..0572bb73 100644 --- a/examples/book/benchmark.py +++ b/examples/book/benchmark.py @@ -9,7 +9,7 @@ from pgsync.base import pg_engine from pgsync.constants import DELETE, INSERT, TG_OP, TRUNCATE, UPDATE -from pgsync.utils import get_config, load_config, show_settings, Timer +from pgsync.utils import config_loader, get_config, show_settings, Timer FIELDS = { "isbn": "isbn13", @@ -139,7 +139,7 @@ def main(config, nsize, daemon, tg_op): show_settings() config: str = get_config(config) - document: dict = next(load_config(config)) + document: dict = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) diff --git a/examples/book/data.py b/examples/book/data.py index f7ea614b..082f03ee 100644 --- a/examples/book/data.py +++ b/examples/book/data.py @@ -25,7 +25,7 @@ from pgsync.base import pg_engine, subtransactions from pgsync.constants import DEFAULT_SCHEMA from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -41,7 +41,7 @@ def main(config, nsize): config: str = get_config(config) teardown(drop_db=False, config=config) - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) with pg_engine(database) as engine: diff --git a/examples/book/schema.py b/examples/book/schema.py index 8175ed93..22eac2bd 100644 --- a/examples/book/schema.py +++ b/examples/book/schema.py @@ -6,7 +6,7 @@ from pgsync.base import create_database, create_schema, pg_engine from pgsync.constants import DEFAULT_SCHEMA from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config Base = declarative_base() @@ -199,7 +199,7 @@ class BookShelf(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) schema: str = document.get("schema", DEFAULT_SCHEMA) create_database(database) diff --git a/examples/book_view/benchmark.py b/examples/book_view/benchmark.py index ae693b42..0b791047 100644 --- a/examples/book_view/benchmark.py +++ b/examples/book_view/benchmark.py @@ -9,7 +9,7 @@ from pgsync.base import pg_engine from pgsync.constants import DELETE, INSERT, TG_OP, UPDATE -from pgsync.utils import get_config, load_config, show_settings, Timer +from pgsync.utils import config_loader, get_config, show_settings, Timer FIELDS = { "isbn": "isbn13", @@ -132,7 +132,7 @@ def main(config, nsize, daemon, tg_op): show_settings() config: str = get_config(config) - document: dict = next(load_config(config)) + document: dict = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) diff --git a/examples/book_view/data.py b/examples/book_view/data.py index 84c0c5d5..34aa8960 100644 --- a/examples/book_view/data.py +++ b/examples/book_view/data.py @@ -6,7 +6,7 @@ from pgsync.constants import DEFAULT_SCHEMA from pgsync.helper import teardown from pgsync.sync import Sync -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -21,7 +21,7 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) with pg_engine(database) as engine: diff --git a/examples/book_view/schema.py b/examples/book_view/schema.py index b0d44e39..c43e6f79 100644 --- a/examples/book_view/schema.py +++ b/examples/book_view/schema.py @@ -6,7 +6,7 @@ from pgsync.base import create_database, create_schema, pg_engine from pgsync.constants import DEFAULT_SCHEMA from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config from pgsync.view import CreateView Base = declarative_base() @@ -38,7 +38,7 @@ class Book(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) schema: str = document.get("schema", DEFAULT_SCHEMA) create_database(database) diff --git a/examples/node/data.py b/examples/node/data.py index fc8a67f6..21865de5 100644 --- a/examples/node/data.py +++ b/examples/node/data.py @@ -6,7 +6,7 @@ from pgsync.base import pg_engine, subtransactions from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -20,7 +20,7 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document = next(load_config(config)) + document = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) diff --git a/examples/node/schema.py b/examples/node/schema.py index f7c1a4f3..4889e7d5 100644 --- a/examples/node/schema.py +++ b/examples/node/schema.py @@ -4,7 +4,7 @@ from pgsync.base import create_database, pg_engine from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config Base = declarative_base() @@ -18,7 +18,7 @@ class Node(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) create_database(database) with pg_engine(database) as engine: diff --git a/examples/quiz/data.py b/examples/quiz/data.py index a04e36cf..79779f22 100644 --- a/examples/quiz/data.py +++ b/examples/quiz/data.py @@ -4,7 +4,7 @@ from pgsync.base import pg_engine, subtransactions from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -18,7 +18,7 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document = next(load_config(config)) + document = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) diff --git a/examples/quiz/schema.py b/examples/quiz/schema.py index 67e9fb2d..50b1846b 100644 --- a/examples/quiz/schema.py +++ b/examples/quiz/schema.py @@ -5,7 +5,7 @@ from pgsync.base import create_database, pg_engine from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config Base = declarative_base() @@ -103,7 +103,7 @@ class RealAnswer(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) create_database(database) with pg_engine(database) as engine: diff --git a/examples/schemas/data.py b/examples/schemas/data.py index c11039c3..b9bd6f16 100644 --- a/examples/schemas/data.py +++ b/examples/schemas/data.py @@ -4,7 +4,7 @@ from pgsync.base import pg_engine, subtransactions from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -18,7 +18,7 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document = next(load_config(config)) + document = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) diff --git a/examples/schemas/schema.py b/examples/schemas/schema.py index 88334c32..c76e9f9e 100644 --- a/examples/schemas/schema.py +++ b/examples/schemas/schema.py @@ -4,7 +4,7 @@ from pgsync.base import create_database, create_schema, pg_engine from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config Base = declarative_base() @@ -25,7 +25,7 @@ class Child(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) create_database(database) for schema in ("parent", "child"): diff --git a/examples/social/data.py b/examples/social/data.py index 1f67e261..2fa19bbd 100644 --- a/examples/social/data.py +++ b/examples/social/data.py @@ -5,7 +5,7 @@ from pgsync.base import pg_engine, subtransactions from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -19,7 +19,7 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document: dict = next(load_config(config)) + document: dict = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) diff --git a/examples/social/schema.py b/examples/social/schema.py index fd78ea66..d319044c 100644 --- a/examples/social/schema.py +++ b/examples/social/schema.py @@ -5,7 +5,7 @@ from pgsync.base import create_database, pg_engine from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config Base = declarative_base() @@ -88,7 +88,7 @@ class UserTag(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) create_database(database) with pg_engine(database) as engine: diff --git a/examples/starcraft/data.py b/examples/starcraft/data.py index c876951d..f530678c 100644 --- a/examples/starcraft/data.py +++ b/examples/starcraft/data.py @@ -4,7 +4,7 @@ from pgsync.base import pg_engine, subtransactions from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config @click.command() @@ -18,7 +18,7 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document = next(load_config(config)) + document = next(config_loader(config)) database: str = document.get("database", document["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) diff --git a/examples/starcraft/schema.py b/examples/starcraft/schema.py index 8452f62f..e207085a 100644 --- a/examples/starcraft/schema.py +++ b/examples/starcraft/schema.py @@ -5,7 +5,7 @@ from pgsync.base import create_database, pg_engine from pgsync.helper import teardown -from pgsync.utils import get_config, load_config +from pgsync.utils import config_loader, get_config Base = declarative_base() @@ -46,7 +46,7 @@ class Structure(Base): def setup(config: str) -> None: - for document in load_config(config): + for document in config_loader(config): database: str = document.get("database", document["index"]) create_database(database) with pg_engine(database) as engine: diff --git a/pgsync/base.py b/pgsync/base.py index 2ee647e0..09785356 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -179,7 +179,7 @@ def models(self, table: str, schema: str) -> sa.sql.Alias: model = metadata.tables[name] model.append_column(sa.Column("xmin", sa.BigInteger)) model.append_column(sa.Column("ctid"), TupleIdentifierType) - # support SQLQlchemy/Postgres 14 which somehow now reflects + # support SQLAlchemy/Postgres 14 which somehow now reflects # the oid column if "oid" not in [column.name for column in model.columns]: model.append_column( diff --git a/pgsync/helper.py b/pgsync/helper.py index b05fa15b..4020080b 100644 --- a/pgsync/helper.py +++ b/pgsync/helper.py @@ -7,7 +7,7 @@ from .base import drop_database from .sync import Sync -from .utils import get_config, load_config +from .utils import config_loader, get_config logger = logging.getLogger(__name__) @@ -22,9 +22,9 @@ def teardown( validate: bool = False, ) -> None: """Teardown helper.""" - config = get_config(config) + config: str = get_config(config) - for document in load_config(config): + for document in config_loader(config): sync: Sync = Sync(document, validate=validate) if truncate_db: try: diff --git a/pgsync/querybuilder.py b/pgsync/querybuilder.py index c7e6956a..b46d6218 100644 --- a/pgsync/querybuilder.py +++ b/pgsync/querybuilder.py @@ -27,22 +27,27 @@ def _build_filters( NB: assumption dictionary is an AND and list is an OR - - filters['book'] = [ - {'id': 1, 'uid': '001'}, - {'id': 2, 'uid': '002'} - ] + filters = { + 'book': [ + {'id': 1, 'uid': '001'}, + {'id': 2, 'uid': '002'}, + ], + 'city': [ + {'id': 1}, + {'id': 2}, + ], + } """ if filters is not None: if filters.get(node.table): - _filters: list = [] - for _filter in filters.get(node.table): + clause: list = [] + for values in filters.get(node.table): where: list = [] - for column, value in _filter.items(): + for column, value in values.items(): where.append(node.model.c[column] == value) # and clause is applied when we have a composite primary key - _filters.append(sa.and_(*where)) - return sa.or_(*_filters) + clause.append(sa.and_(*where)) + return sa.or_(*clause) def _json_build_object( self, columns: list, chunk_size: int = 100 @@ -59,7 +64,7 @@ def _json_build_object( i: int = 0 expression: sa.sql.elements.BinaryExpression = None while i < len(columns): - chunk = columns[i : i + chunk_size] + chunk: list = columns[i : i + chunk_size] if i == 0: expression = sa.cast( sa.func.JSON_BUILD_OBJECT(*chunk), diff --git a/pgsync/settings.py b/pgsync/settings.py index 48aaef2c..1768c005 100644 --- a/pgsync/settings.py +++ b/pgsync/settings.py @@ -28,7 +28,7 @@ QUERY_LITERAL_BINDS = env.bool("QUERY_LITERAL_BINDS", default=False) # db query chunk size (how many records to fetch at a time) QUERY_CHUNK_SIZE = env.int("QUERY_CHUNK_SIZE", default=10000) -FILTER_CHUNK_SIZE = env.int("FILTER_CHUNK_SIZE", default=10000) +FILTER_CHUNK_SIZE = env.int("FILTER_CHUNK_SIZE", default=5000) # replication slot cleanup interval (in secs) REPLICATION_SLOT_CLEANUP_INTERVAL = env.float( "REPLICATION_SLOT_CLEANUP_INTERVAL", @@ -39,7 +39,7 @@ USE_ASYNC = env.bool("USE_ASYNC", default=False) STREAM_RESULTS = env.bool("STREAM_RESULTS", default=True) # db polling interval -POLL_INTERVAL = env.float("POLL_INTERVAL", default=0.5) +POLL_INTERVAL = env.float("POLL_INTERVAL", default=0.1) # Elasticsearch: ELASTICSEARCH_API_KEY = env.str("ELASTICSEARCH_API_KEY", default=None) diff --git a/pgsync/sync.py b/pgsync/sync.py index 02d4e734..8bfa4d7d 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -60,9 +60,9 @@ from .utils import ( chunks, compiled_query, + config_loader, exception, get_config, - load_config, MutuallyExclusiveOption, show_settings, threaded, @@ -81,7 +81,7 @@ def __init__( verbose: bool = False, validate: bool = True, repl_slots: bool = True, - display_tree: bool = False, + show_tree: bool = True, **kwargs, ) -> None: """Constructor.""" @@ -105,7 +105,7 @@ def __init__( self._checkpoint_file: str = os.path.join( CHECKPOINT_PATH, f".{self.__name}" ) - self.display_tree: bool = display_tree + self.show_tree: bool = show_tree self.redis: RedisQueue = RedisQueue(self.__name) self.tree: Tree = Tree(self.models) if validate: @@ -181,7 +181,7 @@ def validate(self, repl_slots: bool = True) -> None: ) self.tree.build(self.nodes) - if self.display_tree: + if self.show_tree: self.tree.display() for node in self.tree.traverse_breadth_first(): @@ -848,7 +848,11 @@ def _payloads(self, payloads: List[Payload]) -> None: """ Filters is a dict of tables where each key is a list of id's { - 'city': [1, 2, 3], + 'city': [ + {'id': '1'}, + {'id': '4'}, + {'id': '5'}, + ], 'book': [ {'id': '1'}, {'id': '2'}, @@ -857,21 +861,39 @@ def _payloads(self, payloads: List[Payload]) -> None: ] } """ - # Lets chunk at only the root node for now. - for chunk in chunks( - filters[self.tree.root.table], - FILTER_CHUNK_SIZE, + for l1 in chunks( + filters.get(self.tree.root.table), FILTER_CHUNK_SIZE ): - values: dict = { - self.tree.root.table: chunk, - node.table: filters[node.table], - } - if not node.is_root: - filters[node.parent.table] = filters[node.table] - yield from self.sync( - filters=values, - extra=extra, - ) + if filters.get(node.table): + for l2 in chunks( + filters.get(node.table), FILTER_CHUNK_SIZE + ): + if not node.is_root and filters.get(node.parent.table): + for l3 in chunks( + filters.get(node.parent.table), + FILTER_CHUNK_SIZE, + ): + yield from self.sync( + filters={ + self.tree.root.table: l1, + node.table: l2, + node.parent.table: l3, + }, + extra=extra, + ) + else: + yield from self.sync( + filters={ + self.tree.root.table: l1, + node.table: l2, + }, + extra=extra, + ) + else: + yield from self.sync( + filters={self.tree.root.table: l1}, + extra=extra, + ) def sync( self, @@ -1355,23 +1377,28 @@ def main( if analyze: - for document in load_config(config): + for document in config_loader(config): sync: Sync = Sync(document, verbose=verbose, **kwargs) sync.analyze() elif polling: + show_tree: bool = True while True: - for document in load_config(config): + for document in config_loader(config): sync: Sync = Sync( - document, verbose=verbose, display_tree=False, **kwargs + document, + verbose=verbose, + show_tree=show_tree, + **kwargs, ) sync.pull() + show_tree = False time.sleep(POLL_INTERVAL) else: - for document in load_config(config): + for document in config_loader(config): sync: Sync = Sync(document, verbose=verbose, **kwargs) sync.pull() if daemon: diff --git a/pgsync/utils.py b/pgsync/utils.py index 183753da..c2435c5f 100644 --- a/pgsync/utils.py +++ b/pgsync/utils.py @@ -7,7 +7,7 @@ from datetime import timedelta from string import Template from time import time -from typing import Callable, Generator, Optional +from typing import Callable, Generator, Optional, Set from urllib.parse import ParseResult, urlparse import click @@ -135,7 +135,7 @@ def get_config(config: Optional[str] = None) -> str: return config -def load_config(config: str) -> Generator: +def config_loader(config: str) -> Generator: with open(config, "r") as documents: for document in json.load(documents): for key, value in document.items(): @@ -171,8 +171,10 @@ def compiled_query( class MutuallyExclusiveOption(click.Option): def __init__(self, *args, **kwargs): - self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) - help = kwargs.get("help", "") + self.mutually_exclusive: Set = set( + kwargs.pop("mutually_exclusive", []) + ) + help: str = kwargs.get("help", "") if self.mutually_exclusive: kwargs["help"] = help + ( f" NOTE: This argument is mutually exclusive with " diff --git a/tests/test_utils.py b/tests/test_utils.py index ec6fbc99..7dc66e44 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -12,10 +12,10 @@ from pgsync.urls import get_elasticsearch_url, get_postgres_url, get_redis_url from pgsync.utils import ( compiled_query, + config_loader, exception, get_config, get_redacted_url, - load_config, show_settings, threaded, timeit, @@ -38,11 +38,11 @@ def test_get_config(self): config: str = get_config("tests/fixtures/schema.json") assert config == "tests/fixtures/schema.json" - def test_load_config(self): + def test_config_loader(self): os.environ["foo"] = "mydb" os.environ["bar"] = "myindex" config: str = get_config("tests/fixtures/schema.json") - data = load_config(config) + data = config_loader(config) assert next(data) == { "database": "fakedb", "index": "fake_index", From 5502f364ab8ca26c812f0342a908edc8671b0ba7 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 19 Nov 2022 20:58:16 +0100 Subject: [PATCH 15/26] add mutually_exclusive option to analyze --- pgsync/sync.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pgsync/sync.py b/pgsync/sync.py index 8bfa4d7d..0b99c457 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -1324,6 +1324,8 @@ def receive(self, nthreads_polldb: int) -> None: is_flag=True, default=False, help="Analyse database", + cls=MutuallyExclusiveOption, + mutually_exclusive=["daemon", "polling"], ) @click.option( "--nthreads_polldb", From d4bd36e87e460810496494fbab9ca0fb854c489b Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 19 Nov 2022 22:52:48 +0100 Subject: [PATCH 16/26] add test for query builder sync in batches --- pgsync/exc.py | 8 ++ pgsync/sync.py | 73 ++++++++--------- tests/helpers/__init__.py | 0 tests/test_sync.py | 82 ++++++++++++++------ tests/test_sync_nested_children.py | 2 +- tests/test_sync_root.py | 2 +- tests/test_sync_single_child_fk_on_child.py | 2 +- tests/test_sync_single_child_fk_on_parent.py | 2 +- tests/test_unique_behaviour.py | 2 +- tests/{helpers/utils.py => testing_utils.py} | 31 +++++++- 10 files changed, 136 insertions(+), 68 deletions(-) delete mode 100644 tests/helpers/__init__.py rename tests/{helpers/utils.py => testing_utils.py} (62%) diff --git a/pgsync/exc.py b/pgsync/exc.py index 79e13c4b..11b99f69 100644 --- a/pgsync/exc.py +++ b/pgsync/exc.py @@ -77,6 +77,14 @@ def __str__(self): return repr(self.value) +class InvalidTGOPError(Exception): + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + class NodeAttributeError(Exception): def __init__(self, value): self.value = value diff --git a/pgsync/sync.py b/pgsync/sync.py index 0b99c457..814c0938 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -19,7 +19,7 @@ from psycopg2 import OperationalError from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT -from . import __version__ +from . import __version__, settings from .base import Base, Payload from .constants import ( DELETE, @@ -34,6 +34,7 @@ from .exc import ( ForeignKeyError, InvalidSchemaError, + InvalidTGOPError, PrimaryKeyNotFoundError, RDSError, SchemaError, @@ -42,20 +43,6 @@ from .plugin import Plugins from .querybuilder import QueryBuilder from .redisqueue import RedisQueue -from .settings import ( - CHECKPOINT_PATH, - FILTER_CHUNK_SIZE, - JOIN_QUERIES, - LOG_INTERVAL, - LOGICAL_SLOT_CHUNK_SIZE, - NTHREADS_POLLDB, - POLL_INTERVAL, - POLL_TIMEOUT, - REDIS_POLL_INTERVAL, - REDIS_WRITE_CHUNK_SIZE, - REPLICATION_SLOT_CLEANUP_INTERVAL, - USE_ASYNC, -) from .transform import Transform from .utils import ( chunks, @@ -103,7 +90,7 @@ def __init__( self._plugins: Plugins = None self._truncate: bool = False self._checkpoint_file: str = os.path.join( - CHECKPOINT_PATH, f".{self.__name}" + settings.CHECKPOINT_PATH, f".{self.__name}" ) self.show_tree: bool = show_tree self.redis: RedisQueue = RedisQueue(self.__name) @@ -168,15 +155,15 @@ def validate(self, repl_slots: bool = True) -> None: ) # ensure the checkpoint dirpath is valid - if not os.path.exists(CHECKPOINT_PATH): + if not os.path.exists(settings.CHECKPOINT_PATH): raise RuntimeError( - f'Ensure the checkpoint directory exists "{CHECKPOINT_PATH}" ' + f'Ensure the checkpoint directory exists "{settings.CHECKPOINT_PATH}" ' f"and is readable." ) - if not os.access(CHECKPOINT_PATH, os.W_OK | os.R_OK): + if not os.access(settings.CHECKPOINT_PATH, os.W_OK | os.R_OK): raise RuntimeError( - f'Ensure the checkpoint directory "{CHECKPOINT_PATH}" is ' + f'Ensure the checkpoint directory "{settings.CHECKPOINT_PATH}" is ' f"read/writable" ) @@ -263,7 +250,7 @@ def create_setting(self) -> None: def setup(self) -> None: """Create the database triggers and replication slot.""" - join_queries: bool = JOIN_QUERIES + join_queries: bool = settings.JOIN_QUERIES self.teardown(drop_view=False) for schema in self.schemas: @@ -304,7 +291,7 @@ def setup(self) -> None: def teardown(self, drop_view: bool = True) -> None: """Drop the database triggers and replication slot.""" - join_queries: bool = JOIN_QUERIES + join_queries: bool = settings.JOIN_QUERIES try: os.unlink(self._checkpoint_file) @@ -375,7 +362,7 @@ def logical_slot_changes( # by limiting to a smaller batch size. offset: int = 0 total: int = 0 - limit: int = LOGICAL_SLOT_CHUNK_SIZE + limit: int = settings.LOGICAL_SLOT_CHUNK_SIZE count: int = self.logical_slot_count_changes( self.__name, txmin=txmin, @@ -670,9 +657,11 @@ def _delete_op( docs.append(doc) if docs: raise_on_exception: Optional[bool] = ( - False if USE_ASYNC else None + False if settings.USE_ASYNC else None + ) + raise_on_error: Optional[bool] = ( + False if settings.USE_ASYNC else None ) - raise_on_error: Optional[bool] = False if USE_ASYNC else None self.es.bulk( self.index, docs, @@ -776,7 +765,7 @@ def _payloads(self, payloads: List[Payload]) -> None: payload: Payload = payloads[0] if payload.tg_op not in TG_OP: logger.exception(f"Unknown tg_op {payload.tg_op}") - raise + raise InvalidTGOPError(f"Unknown tg_op {payload.tg_op}") # we might receive an event triggered for a table # that is not in the tree node. @@ -862,16 +851,16 @@ def _payloads(self, payloads: List[Payload]) -> None: } """ for l1 in chunks( - filters.get(self.tree.root.table), FILTER_CHUNK_SIZE + filters.get(self.tree.root.table), settings.FILTER_CHUNK_SIZE ): if filters.get(node.table): for l2 in chunks( - filters.get(node.table), FILTER_CHUNK_SIZE + filters.get(node.table), settings.FILTER_CHUNK_SIZE ): if not node.is_root and filters.get(node.parent.table): for l3 in chunks( filters.get(node.parent.table), - FILTER_CHUNK_SIZE, + settings.FILTER_CHUNK_SIZE, ): yield from self.sync( filters={ @@ -1002,7 +991,7 @@ def _poll_redis(self) -> None: self.on_publish( list(map(lambda payload: Payload(**payload), payloads)) ) - time.sleep(REDIS_POLL_INTERVAL) + time.sleep(settings.REDIS_POLL_INTERVAL) @threaded @exception @@ -1020,7 +1009,7 @@ async def _async_poll_redis(self) -> None: await self.async_on_publish( list(map(lambda payload: Payload(**payload), payloads)) ) - await asyncio.sleep(REDIS_POLL_INTERVAL) + await asyncio.sleep(settings.REDIS_POLL_INTERVAL) @exception async def async_poll_redis(self) -> None: @@ -1047,7 +1036,11 @@ def poll_db(self) -> None: while True: # NB: consider reducing POLL_TIMEOUT to increase throughput - if select.select([conn], [], [], POLL_TIMEOUT) == ([], [], []): + if select.select([conn], [], [], settings.POLL_TIMEOUT) == ( + [], + [], + [], + ): # Catch any hanging items from the last poll if payloads: self.redis.bulk_push(payloads) @@ -1061,7 +1054,7 @@ def poll_db(self) -> None: os._exit(-1) while conn.notifies: - if len(payloads) >= REDIS_WRITE_CHUNK_SIZE: + if len(payloads) >= settings.REDIS_WRITE_CHUNK_SIZE: self.redis.bulk_push(payloads) payloads = [] notification: AnyStr = conn.notifies.pop(0) @@ -1181,13 +1174,13 @@ def truncate_slots(self) -> None: """Truncate the logical replication slot.""" while True: self._truncate_slots() - time.sleep(REPLICATION_SLOT_CLEANUP_INTERVAL) + time.sleep(settings.REPLICATION_SLOT_CLEANUP_INTERVAL) @exception async def async_truncate_slots(self) -> None: while True: self._truncate_slots() - await asyncio.sleep(REPLICATION_SLOT_CLEANUP_INTERVAL) + await asyncio.sleep(settings.REPLICATION_SLOT_CLEANUP_INTERVAL) def _truncate_slots(self) -> None: if self._truncate: @@ -1199,13 +1192,13 @@ def _truncate_slots(self) -> None: def status(self) -> None: while True: self._status(label="Sync") - time.sleep(LOG_INTERVAL) + time.sleep(settings.LOG_INTERVAL) @exception async def async_status(self) -> None: while True: self._status(label="Async") - await asyncio.sleep(LOG_INTERVAL) + await asyncio.sleep(settings.LOG_INTERVAL) def _status(self, label: str) -> None: sys.stdout.write( @@ -1228,7 +1221,7 @@ def receive(self, nthreads_polldb: int) -> None: 2. Pull everything so far and also replay replication logs. 3. Consume all changes from Redis. """ - if USE_ASYNC: + if settings.USE_ASYNC: self._conn = self.engine.connect().connection self._conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cursor = self.conn.cursor() @@ -1332,7 +1325,7 @@ def receive(self, nthreads_polldb: int) -> None: "-n", help="Number of threads to spawn for poll db", type=int, - default=NTHREADS_POLLDB, + default=settings.NTHREADS_POLLDB, ) def main( config, @@ -1396,7 +1389,7 @@ def main( ) sync.pull() show_tree = False - time.sleep(POLL_INTERVAL) + time.sleep(settings.POLL_INTERVAL) else: diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_sync.py b/tests/test_sync.py index a91cf14f..c05db994 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -1,4 +1,5 @@ """Sync tests.""" +import importlib import os from collections import namedtuple from typing import List @@ -7,10 +8,16 @@ from mock import ANY, call, patch from pgsync.base import Base, Payload -from pgsync.exc import PrimaryKeyNotFoundError, RDSError, SchemaError +from pgsync.exc import ( + InvalidTGOPError, + PrimaryKeyNotFoundError, + RDSError, + SchemaError, +) from pgsync.node import Node -from pgsync.settings import LOGICAL_SLOT_CHUNK_SIZE, REDIS_POLL_INTERVAL -from pgsync.sync import Sync +from pgsync.sync import settings, Sync + +from .testing_utils import override_env_var ROW = namedtuple("Row", ["data", "xid"]) @@ -69,7 +76,7 @@ def test_logical_slot_changes(self, mock_logger, sync): txmin=None, txmax=None, upto_nchanges=None, - limit=LOGICAL_SLOT_CHUNK_SIZE, + limit=settings.LOGICAL_SLOT_CHUNK_SIZE, offset=0, ) mock_sync.assert_not_called() @@ -86,7 +93,7 @@ def test_logical_slot_changes(self, mock_logger, sync): txmin=None, txmax=None, upto_nchanges=None, - limit=LOGICAL_SLOT_CHUNK_SIZE, + limit=settings.LOGICAL_SLOT_CHUNK_SIZE, offset=0, ) mock_sync.assert_not_called() @@ -115,7 +122,7 @@ def test_logical_slot_changes(self, mock_logger, sync): txmin=None, txmax=None, upto_nchanges=None, - limit=LOGICAL_SLOT_CHUNK_SIZE, + limit=settings.LOGICAL_SLOT_CHUNK_SIZE, offset=0, ) mock_get.assert_called_once() @@ -558,20 +565,6 @@ def test__truncate_op(self, mock_es, sync, connection): assert _filters == {"book": []} def test__payload(self, sync): - with patch("pgsync.sync.logger") as mock_logger: - with pytest.raises(RuntimeError): - for _ in sync._payloads( - [ - Payload( - tg_op="XXX", - table="book", - old={"id": 1}, - ), - ] - ): - pass - mock_logger.exception.assert_called_once_with("Unknown tg_op XXX") - with patch("pgsync.sync.Sync._insert_op") as mock_op: with patch("pgsync.sync.logger") as mock_logger: for _ in sync._payloads( @@ -815,7 +808,52 @@ def test_payloads(self, sync): schema="public", ), ] - sync._payloads(payloads) + for _ in sync._payloads(payloads): + pass + + def test_payloads_invalid_tg_op(self, mocker, sync): + payloads: List[Payload] = [ + Payload( + tg_op="FOO", + table="book", + old={"isbn": "001"}, + new={"isbn": "002"}, + schema="public", + ), + ] + with patch("pgsync.sync.logger") as mock_logger: + with pytest.raises(InvalidTGOPError) as excinfo: + for _ in sync._payloads(payloads): + pass + mock_logger.exception.assert_called_once_with("Unknown tg_op FOO") + + def test_payloads_in_batches(self, mocker, sync): + payloads: List[Payload] = [ + Payload( + tg_op="INSERT", + table="book", + old={"isbn": "001"}, + new={"isbn": "002"}, + schema="public", + ) + ] * 20 + with patch("pgsync.sync.Sync.sync") as mock_sync: + with override_env_var(FILTER_CHUNK_SIZE="4"): + importlib.reload(settings) + for _ in sync._payloads(payloads): + pass + assert mock_sync.call_count == 25 + assert mock_sync.call_args_list[-1] == call( + filters={ + "book": [ + {"isbn": "002"}, + {"isbn": "002"}, + {"isbn": "002"}, + {"isbn": "002"}, + ] + }, + extra={}, + ) @patch("pgsync.sync.compiled_query") def test_sync(self, mock_compiled_query, sync): @@ -837,5 +875,5 @@ def test_poll_redis( mock_on_publish.assert_called_once_with([ANY, ANY]) mock_refresh_views.assert_called_once() mock_logger.debug.assert_called_once_with(f"poll_redis: {items}") - mock_time.sleep.assert_called_once_with(REDIS_POLL_INTERVAL) + mock_time.sleep.assert_called_once_with(settings.REDIS_POLL_INTERVAL) assert sync.count["redis"] == 2 diff --git a/tests/test_sync_nested_children.py b/tests/test_sync_nested_children.py index 8ea60329..965366c0 100644 --- a/tests/test_sync_nested_children.py +++ b/tests/test_sync_nested_children.py @@ -8,7 +8,7 @@ from pgsync.settings import NTHREADS_POLLDB from pgsync.sync import Sync -from .helpers.utils import assert_resync_empty, noop, search, sort_list +from .testing_utils import assert_resync_empty, noop, search, sort_list @pytest.mark.usefixtures("table_creator") diff --git a/tests/test_sync_root.py b/tests/test_sync_root.py index a7ff887e..30fea991 100644 --- a/tests/test_sync_root.py +++ b/tests/test_sync_root.py @@ -12,7 +12,7 @@ from pgsync.settings import NTHREADS_POLLDB from pgsync.sync import Sync -from .helpers.utils import assert_resync_empty, noop, search, sort_list +from .testing_utils import assert_resync_empty, noop, search, sort_list @pytest.mark.usefixtures("table_creator") diff --git a/tests/test_sync_single_child_fk_on_child.py b/tests/test_sync_single_child_fk_on_child.py index a9b5fb39..3c21df07 100644 --- a/tests/test_sync_single_child_fk_on_child.py +++ b/tests/test_sync_single_child_fk_on_child.py @@ -16,7 +16,7 @@ from pgsync.settings import NTHREADS_POLLDB from pgsync.sync import Sync -from .helpers.utils import assert_resync_empty, noop, search, sort_list +from .testing_utils import assert_resync_empty, noop, search, sort_list @pytest.mark.usefixtures("table_creator") diff --git a/tests/test_sync_single_child_fk_on_parent.py b/tests/test_sync_single_child_fk_on_parent.py index 89d34e07..bc79c582 100644 --- a/tests/test_sync_single_child_fk_on_parent.py +++ b/tests/test_sync_single_child_fk_on_parent.py @@ -16,7 +16,7 @@ from pgsync.settings import NTHREADS_POLLDB from pgsync.sync import Sync -from .helpers.utils import assert_resync_empty, noop, search, sort_list +from .testing_utils import assert_resync_empty, noop, search, sort_list @pytest.mark.usefixtures("table_creator") diff --git a/tests/test_unique_behaviour.py b/tests/test_unique_behaviour.py index 855055d8..6b10c607 100644 --- a/tests/test_unique_behaviour.py +++ b/tests/test_unique_behaviour.py @@ -5,7 +5,7 @@ from pgsync.base import subtransactions -from .helpers.utils import assert_resync_empty, sort_list +from .testing_utils import assert_resync_empty, sort_list @pytest.mark.usefixtures("table_creator") diff --git a/tests/helpers/utils.py b/tests/testing_utils.py similarity index 62% rename from tests/helpers/utils.py rename to tests/testing_utils.py index 6346f528..6caf7fdd 100644 --- a/tests/helpers/utils.py +++ b/tests/testing_utils.py @@ -1,8 +1,11 @@ -"""Test helper methods.""" +import os +from contextlib import contextmanager from typing import Optional from pgsync.node import Node +"""Test helper methods.""" + def noop(): pass @@ -61,3 +64,29 @@ def sort_list(data: dict) -> dict: else: result[key] = value return result + + +@contextmanager +def override_env_var(**kwargs): + """sets the given value of the given environment variable or unset if value is None.""" + original_values: dict = {} + envs_to_delete: list = [] + for env_name, env_value in kwargs.items(): + try: + original_values[env_name] = os.environ[env_name] + if env_value is None: + del os.environ[env_name] + except KeyError: + # env var doesn't exist before. if we are not going to set it we need to remove later + if env_value is not None: + envs_to_delete.append(env_name) + + if env_value is not None: + os.environ[env_name] = env_value + + yield + + for env_name in envs_to_delete: + del os.environ[env_name] + for env_name, original_env_value in original_values.items(): + os.environ[env_name] = original_env_value From 374874b39732c0e927a95595ecc5f12d589842c0 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sun, 20 Nov 2022 21:04:11 +0100 Subject: [PATCH 17/26] updated tests --- bin/parallel_sync | 22 ++++++---------------- pgsync/querybuilder.py | 2 +- pgsync/sync.py | 13 ++++++++----- pgsync/utils.py | 6 +++--- tests/test_sync.py | 31 ++++++++++++++++++++++++++++--- tests/testing_utils.py | 11 +++++++---- 6 files changed, 53 insertions(+), 32 deletions(-) diff --git a/bin/parallel_sync b/bin/parallel_sync index 34dc3e71..303ba291 100755 --- a/bin/parallel_sync +++ b/bin/parallel_sync @@ -253,7 +253,6 @@ def multithreaded( queue.put(task) queue.join() # block until all tasks are done - logical_slot_changes(doc, verbose=verbose, validate=validate) @@ -272,7 +271,6 @@ def multiprocess( list(executor.map(task.process, tasks)) except Exception as e: sys.stdout.write(f"Exception: {e}\n") - logical_slot_changes(doc, verbose=verbose, validate=validate) @@ -293,7 +291,6 @@ def multithreaded_async( ) finally: event_loop.close() - logical_slot_changes(doc, verbose=verbose, validate=validate) @@ -316,7 +313,6 @@ def multiprocess_async( pass finally: event_loop.close() - logical_slot_changes(doc, verbose=verbose, validate=validate) @@ -328,23 +324,18 @@ async def run_tasks( validate: bool = False, ) -> None: event_loop = asyncio.get_event_loop() + sync: Optional[Sync] = None if isinstance(executor, ThreadPoolExecutor): # threads can share a common Sync object - sync: Sync = Sync(doc, verbose=verbose, validate=validate) - tasks: list = [ - event_loop.run_in_executor( - executor, run_task, task, sync, None, verbose, validate - ) - for task in tasks - ] - else: - tasks = [ + sync = Sync(doc, verbose=verbose, validate=validate) + completed, pending = await asyncio.wait( + [ event_loop.run_in_executor( - executor, run_task, task, None, doc, verbose, validate + executor, run_task, task, sync, doc, verbose, validate ) for task in tasks ] - completed, pending = await asyncio.wait(tasks) + ) results: list = [task.result() for task in completed] print("results: {!r}".format(results)) print("exiting") @@ -418,7 +409,6 @@ def main(config, nprocs, mode, verbose): """ TODO: - Track progress across cpus/threads - - Save ctid - Handle KeyboardInterrupt Exception """ diff --git a/pgsync/querybuilder.py b/pgsync/querybuilder.py index b46d6218..f90a8f53 100644 --- a/pgsync/querybuilder.py +++ b/pgsync/querybuilder.py @@ -45,7 +45,7 @@ def _build_filters( where: list = [] for column, value in values.items(): where.append(node.model.c[column] == value) - # and clause is applied when we have a composite primary key + # and clause is applied for composite primary keys clause.append(sa.and_(*where)) return sa.or_(*clause) diff --git a/pgsync/sync.py b/pgsync/sync.py index 814c0938..e83a42ed 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -157,14 +157,14 @@ def validate(self, repl_slots: bool = True) -> None: # ensure the checkpoint dirpath is valid if not os.path.exists(settings.CHECKPOINT_PATH): raise RuntimeError( - f'Ensure the checkpoint directory exists "{settings.CHECKPOINT_PATH}" ' - f"and is readable." + f"Ensure the checkpoint directory exists " + f'"{settings.CHECKPOINT_PATH}" and is readable.' ) if not os.access(settings.CHECKPOINT_PATH, os.W_OK | os.R_OK): raise RuntimeError( - f'Ensure the checkpoint directory "{settings.CHECKPOINT_PATH}" is ' - f"read/writable" + f'Ensure the checkpoint directory "{settings.CHECKPOINT_PATH}"' + f" is read/writable" ) self.tree.build(self.nodes) @@ -810,7 +810,6 @@ def _payloads(self, payloads: List[Payload]) -> None: ) if payload.tg_op == UPDATE: - filters = self._update_op( node, filters, @@ -835,6 +834,10 @@ def _payloads(self, payloads: List[Payload]) -> None: # and sync the entire db! if any(filters.values()): """ + Filters are applied when an insert, update or delete operation + occurs. For a large table update, this normally results + in a large sql query with multiple OR clauses + Filters is a dict of tables where each key is a list of id's { 'city': [ diff --git a/pgsync/utils.py b/pgsync/utils.py index c2435c5f..d5814204 100644 --- a/pgsync/utils.py +++ b/pgsync/utils.py @@ -24,10 +24,10 @@ HIGHLIGHT_END = "\033[0m:" -def chunks(l: list, n: int): +def chunks(value: list, size: int) -> list: """Yield successive n-sized chunks from l""" - for i in range(0, len(l), n): - yield l[i : i + n] + for i in range(0, len(value), size): + yield value[i : i + size] def timeit(func: Callable): diff --git a/tests/test_sync.py b/tests/test_sync.py index c05db994..85c8e686 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -803,7 +803,6 @@ def test_payloads(self, sync): Payload( tg_op="INSERT", table="book", - old={"isbn": "001"}, new={"isbn": "002"}, schema="public", ), @@ -822,17 +821,17 @@ def test_payloads_invalid_tg_op(self, mocker, sync): ), ] with patch("pgsync.sync.logger") as mock_logger: - with pytest.raises(InvalidTGOPError) as excinfo: + with pytest.raises(InvalidTGOPError): for _ in sync._payloads(payloads): pass mock_logger.exception.assert_called_once_with("Unknown tg_op FOO") def test_payloads_in_batches(self, mocker, sync): + # inserting a root node payloads: List[Payload] = [ Payload( tg_op="INSERT", table="book", - old={"isbn": "001"}, new={"isbn": "002"}, schema="public", ) @@ -855,6 +854,32 @@ def test_payloads_in_batches(self, mocker, sync): extra={}, ) + # updating a child table + payloads: List[Payload] = [ + Payload( + tg_op="UPDATE", + table="publisher", + new={"id": 1, "name": "foo"}, + old={"id": 1}, + schema="public", + ) + ] + filters: dict = { + "book": [ + {"isbn": "001"}, + ], + "publisher": [ + {"id": 1}, + ], + } + with patch("pgsync.sync.Sync._update_op", return_value=filters): + with patch("pgsync.sync.Sync.sync") as mock_sync: + with override_env_var(FILTER_CHUNK_SIZE="1"): + importlib.reload(settings) + for _ in sync._payloads(payloads): + pass + mock_sync.assert_called_once_with(filters=filters, extra={}) + @patch("pgsync.sync.compiled_query") def test_sync(self, mock_compiled_query, sync): sync.verbose = True diff --git a/tests/testing_utils.py b/tests/testing_utils.py index 6caf7fdd..578e0abb 100644 --- a/tests/testing_utils.py +++ b/tests/testing_utils.py @@ -1,11 +1,11 @@ +"""Test helper methods.""" + import os from contextlib import contextmanager from typing import Optional from pgsync.node import Node -"""Test helper methods.""" - def noop(): pass @@ -68,7 +68,9 @@ def sort_list(data: dict) -> dict: @contextmanager def override_env_var(**kwargs): - """sets the given value of the given environment variable or unset if value is None.""" + """Set the given value of the given environment variable or + unset if value is None. + """ original_values: dict = {} envs_to_delete: list = [] for env_name, env_value in kwargs.items(): @@ -77,7 +79,8 @@ def override_env_var(**kwargs): if env_value is None: del os.environ[env_name] except KeyError: - # env var doesn't exist before. if we are not going to set it we need to remove later + # Env var did not previouslt exist. + # If we are not setting it, we need to remove it. if env_value is not None: envs_to_delete.append(env_name) From 36f4284c2acd895e86e859bf8a74672008665e65 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Tue, 22 Nov 2022 21:14:44 +0100 Subject: [PATCH 18/26] only validate each node once in polling mode --- pgsync/sync.py | 14 ++++++-------- requirements/base.in | 4 ++++ tests/test_helper.py | 1 + tests/test_sync.py | 6 ++---- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pgsync/sync.py b/pgsync/sync.py index e83a42ed..f7bb231e 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -68,7 +68,6 @@ def __init__( verbose: bool = False, validate: bool = True, repl_slots: bool = True, - show_tree: bool = True, **kwargs, ) -> None: """Constructor.""" @@ -92,12 +91,13 @@ def __init__( self._checkpoint_file: str = os.path.join( settings.CHECKPOINT_PATH, f".{self.__name}" ) - self.show_tree: bool = show_tree self.redis: RedisQueue = RedisQueue(self.__name) self.tree: Tree = Tree(self.models) + self.tree.build(self.nodes) if validate: self.validate(repl_slots=repl_slots) self.create_setting() + self.query_builder: QueryBuilder = QueryBuilder(verbose=verbose) self.count: dict = dict(xlog=0, db=0, redis=0) @@ -167,9 +167,7 @@ def validate(self, repl_slots: bool = True) -> None: f" is read/writable" ) - self.tree.build(self.nodes) - if self.show_tree: - self.tree.display() + self.tree.display() for node in self.tree.traverse_breadth_first(): @@ -1381,18 +1379,18 @@ def main( elif polling: - show_tree: bool = True + validate: bool = True while True: for document in config_loader(config): sync: Sync = Sync( document, verbose=verbose, - show_tree=show_tree, + validate=validate, **kwargs, ) sync.pull() - show_tree = False time.sleep(settings.POLL_INTERVAL) + validate = False else: diff --git a/requirements/base.in b/requirements/base.in index 136f2603..a25ded9c 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -12,3 +12,7 @@ redis requests-aws4auth sqlalchemy sqlparse + +# pin these libs because latest flake8 does not allow newer versions of importlib-metadata https://github.com/PyCQA/flake8/issues/1522 +importlib-metadata==4.2.0 +virtualenv==20.16.2 \ No newline at end of file diff --git a/tests/test_helper.py b/tests/test_helper.py index 25059217..35026209 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -25,6 +25,7 @@ def test_teardown_with_drop_db(self, mock_sync, mock_config, mock_logger): mock_logger.warning.assert_not_called() + @pytest.mark.skip(reason="need to fix this...") @patch("pgsync.sync.ElasticHelper") @patch("pgsync.helper.logger") @patch("pgsync.helper.get_config") diff --git a/tests/test_sync.py b/tests/test_sync.py index 85c8e686..d2b1987b 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -164,7 +164,7 @@ def test_logical_slot_changes(self, mock_logger, sync): @patch("pgsync.sync.ElasticHelper") def test_sync_validate(self, mock_es): - with pytest.raises(SchemaError) as excinfo: + with pytest.raises(AttributeError) as excinfo: Sync( document={ "index": "testdb", @@ -174,9 +174,7 @@ def test_sync_validate(self, mock_es): validate=True, repl_slots=False, ) - assert "Incompatible schema. Please run v2 schema migration" in str( - excinfo.value - ) + assert "'list' object has no attribute 'get'" in str(excinfo.value) Sync( document={ From 04ee78601f37b6414c706290e57063e6391b2259 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Wed, 23 Nov 2022 14:21:20 +0100 Subject: [PATCH 19/26] ensure qsize is > 0 before bulk_pop --- pgsync/redisqueue.py | 13 +++++++------ pgsync/sync.py | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pgsync/redisqueue.py b/pgsync/redisqueue.py index 430e6cad..a21d68d7 100644 --- a/pgsync/redisqueue.py +++ b/pgsync/redisqueue.py @@ -37,12 +37,13 @@ def qsize(self) -> int: def bulk_pop(self, chunk_size: Optional[int] = None) -> List[dict]: """Remove and return multiple items from the queue.""" chunk_size = chunk_size or REDIS_READ_CHUNK_SIZE - pipeline = self.__db.pipeline() - pipeline.lrange(self.key, 0, chunk_size - 1) - pipeline.ltrim(self.key, chunk_size, -1) - items: List = pipeline.execute() - logger.debug(f"bulk_pop size: {len(items[0])}") - return list(map(lambda value: json.loads(value), items[0])) + if self.qsize > 0: + pipeline = self.__db.pipeline() + pipeline.lrange(self.key, 0, chunk_size - 1) + pipeline.ltrim(self.key, chunk_size, -1) + items: List = pipeline.execute() + logger.debug(f"bulk_pop size: {len(items[0])}") + return list(map(lambda value: json.loads(value), items[0])) def bulk_push(self, items: List) -> None: """Push multiple items onto the queue.""" diff --git a/pgsync/sync.py b/pgsync/sync.py index f7bb231e..30adaaeb 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -984,7 +984,7 @@ def checkpoint(self, value: Optional[str] = None) -> None: self._checkpoint: int = value def _poll_redis(self) -> None: - payloads: dict = self.redis.bulk_pop() + payloads: list = self.redis.bulk_pop() if payloads: logger.debug(f"poll_redis: {payloads}") self.count["redis"] += len(payloads) @@ -1002,7 +1002,7 @@ def poll_redis(self) -> None: self._poll_redis() async def _async_poll_redis(self) -> None: - payloads: dict = self.redis.bulk_pop() + payloads: list = self.redis.bulk_pop() if payloads: logger.debug(f"poll_redis: {payloads}") self.count["redis"] += len(payloads) From f8f48f81e9f7eeb80bc5fd4f4dd47527f2702f1e Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Wed, 23 Nov 2022 23:58:11 +0100 Subject: [PATCH 20/26] remove poorly implemented extra args --- pgsync/sync.py | 39 +++++++++------------------------------ tests/test_helper.py | 20 ++++++++++++-------- tests/test_sync.py | 12 ++++++------ 3 files changed, 27 insertions(+), 44 deletions(-) diff --git a/pgsync/sync.py b/pgsync/sync.py index 30adaaeb..20f9cd8c 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -93,11 +93,16 @@ def __init__( ) self.redis: RedisQueue = RedisQueue(self.__name) self.tree: Tree = Tree(self.models) - self.tree.build(self.nodes) + # NB: Don't raise if used in teardown mode + try: + self.tree.build(self.nodes) + except: + pass if validate: self.validate(repl_slots=repl_slots) self.create_setting() - + if self.plugins: + self._plugins: Plugins = Plugins("plugins", self.plugins) self.query_builder: QueryBuilder = QueryBuilder(verbose=verbose) self.count: dict = dict(xlog=0, db=0, redis=0) @@ -112,9 +117,6 @@ def validate(self, repl_slots: bool = True) -> None: self.connect() - if self.plugins: - self._plugins: Plugins = Plugins("plugins", self.plugins) - max_replication_slots: Optional[str] = self.pg_settings( "max_replication_slots" ) @@ -501,7 +503,6 @@ def _update_op( node: Node, filters: dict, payloads: List[dict], - extra: dict, ) -> dict: if node.is_root: @@ -570,13 +571,6 @@ def _update_op( for key, value in primary_fields.items(): fields[key].append(value) - if None in payload.new.values(): - extra["table"] = node.table - extra["column"] = key - - if None in payload.old.values(): - for key, value in primary_fields.items(): - fields[key].append(0) for doc_id in self.es._search(self.index, node.table, fields): where = {} @@ -797,8 +791,6 @@ def _payloads(self, payloads: List[Payload]) -> None: if not node.is_root: filters[node.parent.table] = [] - extra: dict = {} - if payload.tg_op == INSERT: filters = self._insert_op( @@ -812,7 +804,6 @@ def _payloads(self, payloads: List[Payload]) -> None: node, filters, payloads, - extra, ) if payload.tg_op == DELETE: @@ -869,7 +860,6 @@ def _payloads(self, payloads: List[Payload]) -> None: node.table: l2, node.parent.table: l3, }, - extra=extra, ) else: yield from self.sync( @@ -877,12 +867,10 @@ def _payloads(self, payloads: List[Payload]) -> None: self.tree.root.table: l1, node.table: l2, }, - extra=extra, ) else: yield from self.sync( filters={self.tree.root.table: l1}, - extra=extra, ) def sync( @@ -890,7 +878,6 @@ def sync( filters: Optional[dict] = None, txmin: Optional[int] = None, txmax: Optional[int] = None, - extra: Optional[dict] = None, ctid: Optional[dict] = None, ) -> Generator: self.query_builder.isouter = True @@ -932,12 +919,6 @@ def sync( row: dict = Transform.transform(row, self.nodes) row[META] = Transform.get_primary_keys(keys) - if extra: - if extra["table"] not in row[META]: - row[META][extra["table"]] = {} - if extra["column"] not in row[META][extra["table"]]: - row[META][extra["table"]][extra["column"]] = [] - row[META][extra["table"]][extra["column"]].append(0) if self.verbose: print(f"{(i+1)})") @@ -1379,14 +1360,12 @@ def main( elif polling: + # TODO: use Singleton pattern to enforce single instance of Sync validate: bool = True while True: for document in config_loader(config): sync: Sync = Sync( - document, - verbose=verbose, - validate=validate, - **kwargs, + document, verbose=verbose, validate=validate, **kwargs ) sync.pull() time.sleep(settings.POLL_INTERVAL) diff --git a/tests/test_helper.py b/tests/test_helper.py index 35026209..d66e8608 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -25,16 +25,20 @@ def test_teardown_with_drop_db(self, mock_sync, mock_config, mock_logger): mock_logger.warning.assert_not_called() - @pytest.mark.skip(reason="need to fix this...") @patch("pgsync.sync.ElasticHelper") @patch("pgsync.helper.logger") @patch("pgsync.helper.get_config") def test_teardown_without_drop_db(self, mock_config, mock_logger, mock_es): mock_config.return_value = "tests/fixtures/schema.json" - with patch("pgsync.sync.Sync") as mock_sync: - mock_sync.truncate_schemas.side_effect = sa.exc.OperationalError - helper.teardown(drop_db=False, config="fixtures/schema.json") - assert mock_logger.warning.call_args_list == [ - call(ANY), - call(ANY), - ] + + with patch("pgsync.node.Tree.build", return_value=None): + with patch("pgsync.sync.Sync") as mock_sync: + mock_sync.tree.build.return_value = None + mock_sync.truncate_schemas.side_effect = ( + sa.exc.OperationalError + ) + helper.teardown(drop_db=False, config="fixtures/schema.json") + assert mock_logger.warning.call_args_list == [ + call(ANY), + call(ANY), + ] diff --git a/tests/test_sync.py b/tests/test_sync.py index d2b1987b..23ad4a73 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -164,7 +164,7 @@ def test_logical_slot_changes(self, mock_logger, sync): @patch("pgsync.sync.ElasticHelper") def test_sync_validate(self, mock_es): - with pytest.raises(AttributeError) as excinfo: + with pytest.raises(SchemaError) as excinfo: Sync( document={ "index": "testdb", @@ -174,7 +174,9 @@ def test_sync_validate(self, mock_es): validate=True, repl_slots=False, ) - assert "'list' object has no attribute 'get'" in str(excinfo.value) + assert "Incompatible schema. Please run v2 schema migration" in str( + excinfo.value + ) Sync( document={ @@ -463,9 +465,8 @@ def test__update_op(self, sync, connection): new={"isbn": "aa1"}, ) ] - extra: dict = {} assert sync.es.doc_count == 0 - _filters = sync._update_op(node, filters, payloads, extra) + _filters = sync._update_op(node, filters, payloads) sync.es.refresh("testdb") assert _filters == {"book": [{"isbn": "aa1"}]} assert sync.es.doc_count == 1 @@ -849,7 +850,6 @@ def test_payloads_in_batches(self, mocker, sync): {"isbn": "002"}, ] }, - extra={}, ) # updating a child table @@ -876,7 +876,7 @@ def test_payloads_in_batches(self, mocker, sync): importlib.reload(settings) for _ in sync._payloads(payloads): pass - mock_sync.assert_called_once_with(filters=filters, extra={}) + mock_sync.assert_called_once_with(filters=filters) @patch("pgsync.sync.compiled_query") def test_sync(self, mock_compiled_query, sync): From d549e9122a91d34b7c385cab4a9509e37f87b41b Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 1 Dec 2022 20:54:43 +0100 Subject: [PATCH 21/26] updated requirements --- requirements/dev.txt | 38 ++++++++++++++++++++------------------ requirements/prod.txt | 28 ++++++++++++++++++---------- requirements/test.txt | 38 +++++++++++++++++++++++--------------- 3 files changed, 61 insertions(+), 43 deletions(-) diff --git a/requirements/dev.txt b/requirements/dev.txt index 4cd7b8f6..94eaf54a 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.26.11 +boto3==1.26.20 # via -r requirements/base.in -botocore==1.29.11 +botocore==1.29.20 # via # boto3 # s3transfer @@ -39,8 +39,6 @@ coverage[toml]==6.5.0 # via # -r requirements/dev.in # pytest-cov -deprecated==1.2.13 - # via redis distlib==0.3.6 # via virtualenv elasticsearch==7.13.4 @@ -53,11 +51,11 @@ environs==9.5.0 # via -r requirements/base.in exceptiongroup==1.0.4 # via pytest -faker==15.3.2 +faker==15.3.4 # via -r requirements/base.in filelock==3.8.0 # via virtualenv -flake8==5.0.4 +flake8==6.0.0 # via # flake8-debugger # flake8-docstrings @@ -67,7 +65,7 @@ flake8-debugger==4.1.2 # via -r requirements/test.in flake8-docstrings==1.6.0 # via -r requirements/test.in -flake8-isort==5.0.0 +flake8-isort==5.0.3 # via -r requirements/test.in flake8-print==5.0.0 # via -r requirements/test.in @@ -77,10 +75,12 @@ freezegun==1.2.2 # via -r requirements/test.in greenlet==2.0.1 # via sqlalchemy -identify==2.5.8 +identify==2.5.9 # via pre-commit idna==3.4 # via requests +importlib-metadata==4.2.0 + # via -r requirements/base.in iniconfig==1.1.1 # via pytest isort==5.10.1 @@ -110,7 +110,7 @@ pathspec==0.10.2 # via black pep517==0.13.0 # via build -pip-tools==6.10.0 +pip-tools==6.11.0 # via -r requirements/dev.in platformdirs==2.5.4 # via @@ -122,7 +122,7 @@ pre-commit==2.20.0 # via -r requirements/dev.in psycopg2-binary==2.9.5 # via -r requirements/base.in -pycodestyle==2.9.1 +pycodestyle==2.10.0 # via # flake8 # flake8-debugger @@ -130,7 +130,7 @@ pycodestyle==2.9.1 # flake8-todo pydocstyle==6.1.1 # via flake8-docstrings -pyflakes==2.5.0 +pyflakes==3.0.1 # via flake8 pyparsing==3.0.9 # via packaging @@ -158,7 +158,7 @@ python-dotenv==0.21.0 # via environs pyyaml==6.0 # via pre-commit -redis==4.3.4 +redis==4.3.5 # via -r requirements/base.in requests==2.28.1 # via requests-aws4auth @@ -177,7 +177,7 @@ sqlalchemy==1.4.44 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in -termcolor==2.1.0 +termcolor==2.1.1 # via pytest-sugar toml==0.10.2 # via pre-commit @@ -189,17 +189,19 @@ tomli==2.0.1 # pytest typing-extensions==4.4.0 # via black -urllib3==1.26.12 +urllib3==1.26.13 # via # botocore # elasticsearch # requests -virtualenv==20.16.7 - # via pre-commit +virtualenv==20.16.2 + # via + # -r requirements/base.in + # pre-commit wheel==0.38.4 # via pip-tools -wrapt==1.14.1 - # via deprecated +zipp==3.11.0 + # via importlib-metadata # The following packages are considered to be unsafe in a requirements file: # pip diff --git a/requirements/prod.txt b/requirements/prod.txt index aa099fe2..a05bed8e 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -8,9 +8,9 @@ async-timeout==4.0.2 # via redis black==22.10.0 # via -r requirements/base.in -boto3==1.26.11 +boto3==1.26.20 # via -r requirements/base.in -botocore==1.29.11 +botocore==1.29.20 # via # boto3 # s3transfer @@ -28,8 +28,8 @@ click==8.1.3 # via # -r requirements/base.in # black -deprecated==1.2.13 - # via redis +distlib==0.3.6 + # via virtualenv elasticsearch==7.13.4 # via # -r requirements/base.in @@ -38,12 +38,16 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -faker==15.3.2 +faker==15.3.4 # via -r requirements/base.in +filelock==3.8.0 + # via virtualenv greenlet==2.0.1 # via sqlalchemy idna==3.4 # via requests +importlib-metadata==4.2.0 + # via -r requirements/base.in jmespath==1.0.1 # via # boto3 @@ -61,7 +65,9 @@ packaging==21.3 pathspec==0.10.2 # via black platformdirs==2.5.4 - # via black + # via + # black + # virtualenv psycopg2-binary==2.9.5 # via -r requirements/base.in pyparsing==3.0.9 @@ -73,7 +79,7 @@ python-dateutil==2.8.2 # faker python-dotenv==0.21.0 # via environs -redis==4.3.4 +redis==4.3.5 # via -r requirements/base.in requests==2.28.1 # via requests-aws4auth @@ -94,10 +100,12 @@ tomli==2.0.1 # via black typing-extensions==4.4.0 # via black -urllib3==1.26.12 +urllib3==1.26.13 # via # botocore # elasticsearch # requests -wrapt==1.14.1 - # via deprecated +virtualenv==20.16.2 + # via -r requirements/base.in +zipp==3.11.0 + # via importlib-metadata diff --git a/requirements/test.txt b/requirements/test.txt index a97f3546..95ff02ce 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.26.11 +boto3==1.26.20 # via -r requirements/base.in -botocore==1.29.11 +botocore==1.29.20 # via # boto3 # s3transfer @@ -32,8 +32,8 @@ click==8.1.3 # black coverage[toml]==6.5.0 # via pytest-cov -deprecated==1.2.13 - # via redis +distlib==0.3.6 + # via virtualenv elasticsearch==7.13.4 # via # -r requirements/base.in @@ -44,9 +44,11 @@ environs==9.5.0 # via -r requirements/base.in exceptiongroup==1.0.4 # via pytest -faker==15.3.2 +faker==15.3.4 # via -r requirements/base.in -flake8==5.0.4 +filelock==3.8.0 + # via virtualenv +flake8==6.0.0 # via # flake8-debugger # flake8-docstrings @@ -56,7 +58,7 @@ flake8-debugger==4.1.2 # via -r requirements/test.in flake8-docstrings==1.6.0 # via -r requirements/test.in -flake8-isort==5.0.0 +flake8-isort==5.0.3 # via -r requirements/test.in flake8-print==5.0.0 # via -r requirements/test.in @@ -68,6 +70,8 @@ greenlet==2.0.1 # via sqlalchemy idna==3.4 # via requests +importlib-metadata==4.2.0 + # via -r requirements/base.in iniconfig==1.1.1 # via pytest isort==5.10.1 @@ -93,12 +97,14 @@ packaging==21.3 pathspec==0.10.2 # via black platformdirs==2.5.4 - # via black + # via + # black + # virtualenv pluggy==1.0.0 # via pytest psycopg2-binary==2.9.5 # via -r requirements/base.in -pycodestyle==2.9.1 +pycodestyle==2.10.0 # via # flake8 # flake8-debugger @@ -106,7 +112,7 @@ pycodestyle==2.9.1 # flake8-todo pydocstyle==6.1.1 # via flake8-docstrings -pyflakes==2.5.0 +pyflakes==3.0.1 # via flake8 pyparsing==3.0.9 # via packaging @@ -132,7 +138,7 @@ python-dateutil==2.8.2 # freezegun python-dotenv==0.21.0 # via environs -redis==4.3.4 +redis==4.3.5 # via -r requirements/base.in requests==2.28.1 # via requests-aws4auth @@ -151,7 +157,7 @@ sqlalchemy==1.4.44 # via -r requirements/base.in sqlparse==0.4.3 # via -r requirements/base.in -termcolor==2.1.0 +termcolor==2.1.1 # via pytest-sugar tomli==2.0.1 # via @@ -160,10 +166,12 @@ tomli==2.0.1 # pytest typing-extensions==4.4.0 # via black -urllib3==1.26.12 +urllib3==1.26.13 # via # botocore # elasticsearch # requests -wrapt==1.14.1 - # via deprecated +virtualenv==20.16.2 + # via -r requirements/base.in +zipp==3.11.0 + # via importlib-metadata From b0f6cbd6dd2f7f2ed1c1cb98fe1af082847b4a09 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Fri, 2 Dec 2022 10:27:56 +0100 Subject: [PATCH 22/26] Make Sync a singleton based on unique node attributes --- bin/parallel_sync | 13 ++++--------- pgsync/node.py | 6 +++++- pgsync/singleton.py | 20 ++++++++++++++++++++ pgsync/sync.py | 16 ++++------------ scripts/del_redis.sh | 5 +++++ tests/conftest.py | 3 +++ tests/test_sync.py | 12 ++++++++++++ tests/test_sync_nested_children.py | 13 +++++++++++++ tests/test_sync_root.py | 8 ++++++++ tests/test_sync_single_child_fk_on_child.py | 8 ++++++++ tests/test_sync_single_child_fk_on_parent.py | 8 ++++++++ 11 files changed, 90 insertions(+), 22 deletions(-) create mode 100644 pgsync/singleton.py create mode 100755 scripts/del_redis.sh diff --git a/bin/parallel_sync b/bin/parallel_sync index 303ba291..ddc29135 100755 --- a/bin/parallel_sync +++ b/bin/parallel_sync @@ -285,12 +285,9 @@ def multithreaded_async( sys.stdout.write("Multi-threaded async\n") executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=nprocs) event_loop = asyncio.get_event_loop() - try: - event_loop.run_until_complete( - run_tasks(executor, tasks, doc, verbose=verbose, validate=validate) - ) - finally: - event_loop.close() + event_loop.run_until_complete( + run_tasks(executor, tasks, doc, verbose=verbose, validate=validate) + ) logical_slot_changes(doc, verbose=verbose, validate=validate) @@ -311,8 +308,6 @@ def multiprocess_async( ) except KeyboardInterrupt: pass - finally: - event_loop.close() logical_slot_changes(doc, verbose=verbose, validate=validate) @@ -323,11 +318,11 @@ async def run_tasks( verbose: bool = False, validate: bool = False, ) -> None: - event_loop = asyncio.get_event_loop() sync: Optional[Sync] = None if isinstance(executor, ThreadPoolExecutor): # threads can share a common Sync object sync = Sync(doc, verbose=verbose, validate=validate) + event_loop = asyncio.get_event_loop() completed, pending = await asyncio.wait( [ event_loop.run_in_executor( diff --git a/pgsync/node.py b/pgsync/node.py index 15d66bb6..5752c9a4 100644 --- a/pgsync/node.py +++ b/pgsync/node.py @@ -25,6 +25,7 @@ RelationshipForeignKeyError, RelationshipTypeError, RelationshipVariantError, + SchemaError, TableNotInNodeError, ) @@ -275,7 +276,10 @@ def traverse_post_order(self) -> Generator: return self.root.traverse_post_order() def build(self, data: dict) -> Node: - + if not isinstance(data, dict): + raise SchemaError( + "Incompatible schema. Please run v2 schema migration" + ) table: str = data.get("table") schema: str = data.get("schema", DEFAULT_SCHEMA) key: Tuple[str, str] = (schema, table) diff --git a/pgsync/singleton.py b/pgsync/singleton.py new file mode 100644 index 00000000..c455ec56 --- /dev/null +++ b/pgsync/singleton.py @@ -0,0 +1,20 @@ +"""PGSync Singleton.""" + +from typing import Tuple + + +class Singleton(type): + + _instances: dict = {} + + def __call__(cls, *args, **kwargs): + if not args: + return super(Singleton, cls).__call__(*args, **kwargs) + database: str = args[0]["database"] + index: str = args[0].get("index", database) + key: Tuple[str, str] = (database, index) + if key not in cls._instances: + cls._instances[key] = super(Singleton, cls).__call__( + *args, **kwargs + ) + return cls._instances[key] diff --git a/pgsync/sync.py b/pgsync/sync.py index 20f9cd8c..6b82d9a3 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -43,6 +43,7 @@ from .plugin import Plugins from .querybuilder import QueryBuilder from .redisqueue import RedisQueue +from .singleton import Singleton from .transform import Transform from .utils import ( chunks, @@ -59,7 +60,7 @@ logger = logging.getLogger(__name__) -class Sync(Base): +class Sync(Base, metaclass=Singleton): """Main application class for Sync.""" def __init__( @@ -93,11 +94,7 @@ def __init__( ) self.redis: RedisQueue = RedisQueue(self.__name) self.tree: Tree = Tree(self.models) - # NB: Don't raise if used in teardown mode - try: - self.tree.build(self.nodes) - except: - pass + self.tree.build(self.nodes) if validate: self.validate(repl_slots=repl_slots) self.create_setting() @@ -1360,16 +1357,11 @@ def main( elif polling: - # TODO: use Singleton pattern to enforce single instance of Sync - validate: bool = True while True: for document in config_loader(config): - sync: Sync = Sync( - document, verbose=verbose, validate=validate, **kwargs - ) + sync: Sync = Sync(document, verbose=verbose, **kwargs) sync.pull() time.sleep(settings.POLL_INTERVAL) - validate = False else: diff --git a/scripts/del_redis.sh b/scripts/del_redis.sh new file mode 100755 index 00000000..b670f86b --- /dev/null +++ b/scripts/del_redis.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +for key in `echo 'KEYS user*' | redis-cli --scan --pattern '*' | awk '{print $1}'` + do echo DEL $key +done | redis-cli diff --git a/tests/conftest.py b/tests/conftest.py index 2dbc6d7d..09d100b5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,7 @@ from pgsync.base import Base, create_database, drop_database from pgsync.constants import DEFAULT_SCHEMA +from pgsync.singleton import Singleton from pgsync.sync import Sync from pgsync.urls import get_postgres_url @@ -55,6 +56,7 @@ def sync(): _sync = Sync( { "index": "testdb", + "database": "testdb", "nodes": {"table": "book"}, } ) @@ -66,6 +68,7 @@ def sync(): _sync.engine.connect().close() _sync.engine.dispose() _sync.session.close() + Singleton._instances = {} def pytest_addoption(parser): diff --git a/tests/test_sync.py b/tests/test_sync.py index 23ad4a73..d0f5c914 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -15,6 +15,7 @@ SchemaError, ) from pgsync.node import Node +from pgsync.singleton import Singleton from pgsync.sync import settings, Sync from .testing_utils import override_env_var @@ -27,6 +28,7 @@ def sync(): _sync = Sync( { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title", "description"], @@ -47,6 +49,7 @@ def sync(): }, }, ) + Singleton._instances = {} yield _sync _sync.logical_slot_get_changes( f"{_sync.database}_testdb", @@ -164,10 +167,12 @@ def test_logical_slot_changes(self, mock_logger, sync): @patch("pgsync.sync.ElasticHelper") def test_sync_validate(self, mock_es): + with pytest.raises(SchemaError) as excinfo: Sync( document={ "index": "testdb", + "database": "testdb", "nodes": ["foo"], }, verbose=False, @@ -181,6 +186,7 @@ def test_sync_validate(self, mock_es): Sync( document={ "index": "testdb", + "database": "testdb", "nodes": {"table": "book"}, "plugins": ["Hero"], }, @@ -215,6 +221,7 @@ def _side_effect(*args, **kwargs): Sync( document={ "index": "testdb", + "database": "testdb", "nodes": {"table": "book"}, "plugins": ["Hero"], }, @@ -232,6 +239,7 @@ def _side_effect(*args, **kwargs): Sync( document={ "index": "testdb", + "database": "testdb", "nodes": {"table": "book"}, "plugins": ["Hero"], }, @@ -249,6 +257,7 @@ def _side_effect(*args, **kwargs): Sync( document={ "index": "testdb", + "database": "testdb", "nodes": {"table": "book"}, "plugins": ["Hero"], }, @@ -265,6 +274,7 @@ def _side_effect(*args, **kwargs): Sync( document={ "index": "testdb", + "database": "testdb", "nodes": {"table": "book"}, "plugins": ["Hero"], }, @@ -279,6 +289,7 @@ def _side_effect(*args, **kwargs): Sync( document={ "index": "testdb", + "database": "testdb", "nodes": {"table": "book"}, "plugins": ["Hero"], }, @@ -290,6 +301,7 @@ def _side_effect(*args, **kwargs): Sync( document={ "index": "testdb", + "database": "testdb", "nodes": {"table": "book"}, "plugins": ["Hero"], }, diff --git a/tests/test_sync_nested_children.py b/tests/test_sync_nested_children.py index 965366c0..e65f2783 100644 --- a/tests/test_sync_nested_children.py +++ b/tests/test_sync_nested_children.py @@ -6,6 +6,7 @@ from pgsync.base import subtransactions from pgsync.settings import NTHREADS_POLLDB +from pgsync.singleton import Singleton from pgsync.sync import Sync from .testing_utils import assert_resync_empty, noop, search, sort_list @@ -230,6 +231,8 @@ def data( upto_nchanges=None, ) + Singleton._instances = {} + yield ( books, authors, @@ -674,6 +677,7 @@ def test_insert_root( document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } @@ -798,6 +802,7 @@ def test_insert_root( def test_update_root(self, data, nodes, book_cls): document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } # 1. sync first to add the initial document @@ -918,6 +923,7 @@ def test_delete_root( ): document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } # 1. sync first to add the initial document @@ -1164,6 +1170,7 @@ def test_insert_through_child_op( document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } @@ -1393,6 +1400,7 @@ def test_update_through_child_op( # update a new through child with op document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } @@ -1620,6 +1628,7 @@ def test_delete_through_child_op(self, sync, data, nodes, book_author_cls): # delete a new through child with op document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } @@ -1816,6 +1825,7 @@ def test_insert_nonthrough_child_noop( document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } @@ -1845,6 +1855,7 @@ def test_update_nonthrough_child_noop(self, data, nodes, shelf_cls): # update a new non-through child with noop document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } @@ -1878,6 +1889,7 @@ def test_delete_nonthrough_child_noop(self, data, nodes, shelf_cls): # delete a new non-through child with noop document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } @@ -2031,6 +2043,7 @@ def test_insert_deep_nested_nonthrough_child_noop( document = { "index": "testdb", + "database": "testdb", "nodes": nodes, } # sync first to add the initial document diff --git a/tests/test_sync_root.py b/tests/test_sync_root.py index 30fea991..a408cdc1 100644 --- a/tests/test_sync_root.py +++ b/tests/test_sync_root.py @@ -10,6 +10,7 @@ TableNotInNodeError, ) from pgsync.settings import NTHREADS_POLLDB +from pgsync.singleton import Singleton from pgsync.sync import Sync from .testing_utils import assert_resync_empty, noop, search, sort_list @@ -60,6 +61,7 @@ def data(self, sync, book_cls, publisher_cls): f"{sync.database}_testdb", upto_nchanges=None, ) + Singleton._instances = {} yield books @@ -414,6 +416,7 @@ def test_update_primary_key_non_concurrent(self, data, book_cls): """ document = { "index": "testdb", + "database": "testdb", "nodes": {"table": "book", "columns": ["isbn", "title"]}, } sync = Sync(document) @@ -456,6 +459,7 @@ def test_update_primary_key_concurrent(self, data, book_cls): """Test sync updates primary_key and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": {"table": "book", "columns": ["isbn", "title"]}, } sync = Sync(document) @@ -521,6 +525,7 @@ def test_insert_non_concurrent(self, data, book_cls): """Test sync insert and then sync in non-concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": {"table": "book", "columns": ["isbn", "title"]}, } sync = Sync(document) @@ -561,6 +566,7 @@ def test_update_non_concurrent(self, data, book_cls): """Test sync update and then sync in non-concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": {"table": "book", "columns": ["isbn", "title"]}, } sync = Sync(document) @@ -601,6 +607,7 @@ def test_update_concurrent(self, data, book_cls): """Test sync update and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": {"table": "book", "columns": ["isbn", "title"]}, } sync = Sync(document) @@ -662,6 +669,7 @@ def test_delete_concurrent(self, data, book_cls): """Test sync delete and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": {"table": "book", "columns": ["isbn", "title"]}, } sync = Sync(document) diff --git a/tests/test_sync_single_child_fk_on_child.py b/tests/test_sync_single_child_fk_on_child.py index 3c21df07..474a72e6 100644 --- a/tests/test_sync_single_child_fk_on_child.py +++ b/tests/test_sync_single_child_fk_on_child.py @@ -14,6 +14,7 @@ ) from pgsync.node import Tree from pgsync.settings import NTHREADS_POLLDB +from pgsync.singleton import Singleton from pgsync.sync import Sync from .testing_utils import assert_resync_empty, noop, search, sort_list @@ -62,6 +63,7 @@ def data(self, sync, book_cls, rating_cls): f"{sync.database}_testdb", upto_nchanges=None, ) + Singleton._instances = {} yield ( books, @@ -589,6 +591,7 @@ def test_update_primary_key_non_concurrent( """Test sync updates primary_key then sync in non-concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -687,6 +690,7 @@ def test_update_primary_key_concurrent(self, data, book_cls, rating_cls): """Test sync updates primary_key and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -805,6 +809,7 @@ def test_insert_non_concurrent(self, data, book_cls, rating_cls): """Test sync insert and then sync in non-concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -907,6 +912,7 @@ def test_update_non_primary_key_non_concurrent( """Test sync update and then sync in non-concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -996,6 +1002,7 @@ def test_update_non_primary_key_concurrent( """Test sync update and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -1100,6 +1107,7 @@ def test_delete_concurrent(self, data, book_cls, rating_cls): """Test sync delete and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], diff --git a/tests/test_sync_single_child_fk_on_parent.py b/tests/test_sync_single_child_fk_on_parent.py index bc79c582..d4ea23c5 100644 --- a/tests/test_sync_single_child_fk_on_parent.py +++ b/tests/test_sync_single_child_fk_on_parent.py @@ -14,6 +14,7 @@ ) from pgsync.node import Tree from pgsync.settings import NTHREADS_POLLDB +from pgsync.singleton import Singleton from pgsync.sync import Sync from .testing_utils import assert_resync_empty, noop, search, sort_list @@ -56,6 +57,7 @@ def data(self, sync, book_cls, publisher_cls): f"{sync.database}_testdb", upto_nchanges=None, ) + Singleton._instances = {} yield books @@ -595,6 +597,7 @@ def test_update_primary_key_non_concurrent( """Test sync updates primary_key then sync in non-concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -690,6 +693,7 @@ def test_update_primary_key_concurrent( """Test sync updates primary_key and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -802,6 +806,7 @@ def test_insert_non_concurrent(self, data, book_cls, publisher_cls): """Test sync insert and then sync in non-concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -901,6 +906,7 @@ def test_update_non_primary_key_non_concurrent( """Test sync update and then sync in non-concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -990,6 +996,7 @@ def test_update_non_primary_key_concurrent( """Test sync update and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], @@ -1094,6 +1101,7 @@ def test_delete_concurrent(self, data, book_cls, publisher_cls): """Test sync delete and then sync in concurrent mode.""" document = { "index": "testdb", + "database": "testdb", "nodes": { "table": "book", "columns": ["isbn", "title"], From 50823ba5365b84c85d8e7d6d368d2bdfebe4d631 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 3 Dec 2022 14:14:02 +0100 Subject: [PATCH 23/26] add check for database existence --- pgsync/base.py | 17 +++++++++++++++++ pgsync/helper.py | 7 ++++++- tests/test_helper.py | 13 +++++++------ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/pgsync/base.py b/pgsync/base.py index 09785356..efdbeaa8 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -981,6 +981,23 @@ def drop_database(database: str, echo: bool = False) -> None: logger.debug(f"Dropped database: {database}") +def database_exists(database: str, echo: bool = False) -> bool: + """Check if database is present.""" + with pg_engine("postgres", echo=echo) as engine: + conn = engine.connect() + try: + row = conn.execute( + sa.DDL( + f"SELECT 1 FROM pg_database WHERE datname = '{database}'" + ) + ).first() + conn.close() + except Exception as e: + logger.exception(f"Exception {e}") + raise + return row is not None + + def create_extension( database: str, extension: str, echo: bool = False ) -> None: diff --git a/pgsync/helper.py b/pgsync/helper.py index 4020080b..3f314dad 100644 --- a/pgsync/helper.py +++ b/pgsync/helper.py @@ -5,7 +5,7 @@ import sqlalchemy as sa -from .base import drop_database +from .base import database_exists, drop_database from .sync import Sync from .utils import config_loader, get_config @@ -25,6 +25,11 @@ def teardown( config: str = get_config(config) for document in config_loader(config): + + if not database_exists(document["database"]): + logger.warning(f'Database {document["database"]} does not exist') + continue + sync: Sync = Sync(document, validate=validate) if truncate_db: try: diff --git a/tests/test_helper.py b/tests/test_helper.py index d66e8608..6a4c34b8 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -16,12 +16,13 @@ class TestHelper(object): def test_teardown_with_drop_db(self, mock_sync, mock_config, mock_logger): mock_config.return_value = "tests/fixtures/schema.json" mock_sync.truncate_schemas.return_value = None - with patch("pgsync.helper.drop_database") as mock_db: - helper.teardown(drop_db=True, config="fixtures/schema.json") - assert mock_db.call_args_list == [ - call(ANY), - call(ANY), - ] + with patch("pgsync.helper.database_exists", return_value=True): + with patch("pgsync.helper.drop_database") as mock_db: + helper.teardown(drop_db=True, config="fixtures/schema.json") + assert mock_db.call_args_list == [ + call(ANY), + call(ANY), + ] mock_logger.warning.assert_not_called() From a7d3c16c592f2775cf27e8c734b72c59f56fa620 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 3 Dec 2022 14:24:35 +0100 Subject: [PATCH 24/26] update elasticsearch docker version --- .github/workflows/python-build.yml | 2 +- docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-build.yml b/.github/workflows/python-build.yml index bfda2894..f4dfca02 100644 --- a/.github/workflows/python-build.yml +++ b/.github/workflows/python-build.yml @@ -25,7 +25,7 @@ jobs: ports: - 6379:6379 elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:7.17.6 + image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7 ports: - 9200:9200 - 9300:9300 diff --git a/docker-compose.yml b/docker-compose.yml index 849c0774..1d518b1e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,7 +13,7 @@ services: image: redis command: redis-server --requirepass PLEASE_CHANGE_ME elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:7.17.6 + image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7 ports: - "9201:9200" - "9301:9300" From 6d827fb83ae384e82ff119552af68b755aa34a81 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 3 Dec 2022 14:24:59 +0100 Subject: [PATCH 25/26] update elasticsearch docker version --- requirements/dev.txt | 8 ++++---- requirements/prod.txt | 8 ++++---- requirements/test.txt | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/requirements/dev.txt b/requirements/dev.txt index 94eaf54a..3430f581 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -1,6 +1,6 @@ # -# This file is autogenerated by pip-compile with python 3.9 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: # # pip-compile --output-file=requirements/dev.txt requirements/dev.in # @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.26.20 +boto3==1.26.22 # via -r requirements/base.in -botocore==1.29.20 +botocore==1.29.22 # via # boto3 # s3transfer diff --git a/requirements/prod.txt b/requirements/prod.txt index a05bed8e..02ce09ba 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -1,6 +1,6 @@ # -# This file is autogenerated by pip-compile with python 3.9 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: # # pip-compile --output-file=requirements/prod.txt requirements/prod.in # @@ -8,9 +8,9 @@ async-timeout==4.0.2 # via redis black==22.10.0 # via -r requirements/base.in -boto3==1.26.20 +boto3==1.26.22 # via -r requirements/base.in -botocore==1.29.20 +botocore==1.29.22 # via # boto3 # s3transfer diff --git a/requirements/test.txt b/requirements/test.txt index 95ff02ce..9ac94c32 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -1,6 +1,6 @@ # -# This file is autogenerated by pip-compile with python 3.9 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: # # pip-compile --output-file=requirements/test.txt requirements/test.in # @@ -10,9 +10,9 @@ attrs==22.1.0 # via pytest black==22.10.0 # via -r requirements/base.in -boto3==1.26.20 +boto3==1.26.22 # via -r requirements/base.in -botocore==1.29.20 +botocore==1.29.22 # via # boto3 # s3transfer From 718c1a21f4bd28f314f3ddf10fbb7dcba7dd06ac Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Sat, 3 Dec 2022 14:52:17 +0100 Subject: [PATCH 26/26] Pin flake8 to stable 5.0.4 --- requirements/dev.txt | 7 ++++--- requirements/test.in | 1 + requirements/test.txt | 7 ++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/requirements/dev.txt b/requirements/dev.txt index 3430f581..2fa67b98 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -55,8 +55,9 @@ faker==15.3.4 # via -r requirements/base.in filelock==3.8.0 # via virtualenv -flake8==6.0.0 +flake8==5.0.4 # via + # -r requirements/test.in # flake8-debugger # flake8-docstrings # flake8-isort @@ -122,7 +123,7 @@ pre-commit==2.20.0 # via -r requirements/dev.in psycopg2-binary==2.9.5 # via -r requirements/base.in -pycodestyle==2.10.0 +pycodestyle==2.9.1 # via # flake8 # flake8-debugger @@ -130,7 +131,7 @@ pycodestyle==2.10.0 # flake8-todo pydocstyle==6.1.1 # via flake8-docstrings -pyflakes==3.0.1 +pyflakes==2.5.0 # via flake8 pyparsing==3.0.9 # via packaging diff --git a/requirements/test.in b/requirements/test.in index ac2aa235..f1f707d3 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -1,5 +1,6 @@ -r base.in +flake8==5.0.4 flake8_docstrings flake8-debugger flake8-print diff --git a/requirements/test.txt b/requirements/test.txt index 9ac94c32..817e7dad 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -48,8 +48,9 @@ faker==15.3.4 # via -r requirements/base.in filelock==3.8.0 # via virtualenv -flake8==6.0.0 +flake8==5.0.4 # via + # -r requirements/test.in # flake8-debugger # flake8-docstrings # flake8-isort @@ -104,7 +105,7 @@ pluggy==1.0.0 # via pytest psycopg2-binary==2.9.5 # via -r requirements/base.in -pycodestyle==2.10.0 +pycodestyle==2.9.1 # via # flake8 # flake8-debugger @@ -112,7 +113,7 @@ pycodestyle==2.10.0 # flake8-todo pydocstyle==6.1.1 # via flake8-docstrings -pyflakes==3.0.1 +pyflakes==2.5.0 # via flake8 pyparsing==3.0.9 # via packaging