From 682faaa5fdbb700d4f86b91477d6d4c91e5d89c7 Mon Sep 17 00:00:00 2001 From: "vivek.b" Date: Thu, 20 Jul 2023 21:27:48 +0530 Subject: [PATCH 1/6] adding force refresh to flush the data to disk, this might take more time now --- pgsync/search_client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pgsync/search_client.py b/pgsync/search_client.py index db298e39..98467e78 100644 --- a/pgsync/search_client.py +++ b/pgsync/search_client.py @@ -198,10 +198,11 @@ def _bulk( ignore_status=ignore_status, ): self.doc_count += 1 + self.refresh(index) - def refresh(self, indices: List[str]) -> None: + def refresh(self, index: str) -> None: """Refresh the Elasticsearch/OpenSearch index.""" - self.__client.indices.refresh(index=indices) + self.__client.indices.refresh(index=index) def _search(self, index: str, table: str, fields: Optional[dict] = None): """ From d3ac44e049de41c87b58852fda5b284abb38bcab Mon Sep 17 00:00:00 2001 From: "vivek.b" Date: Thu, 20 Jul 2023 21:32:43 +0530 Subject: [PATCH 2/6] added a property to set refresh --- .env.sample | 2 ++ pgsync/search_client.py | 5 +++-- pgsync/settings.py | 3 ++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.env.sample b/.env.sample index ebec62ee..513db352 100644 --- a/.env.sample +++ b/.env.sample @@ -55,6 +55,8 @@ # ELASTICSEARCH_AWS_REGION=eu-west-1 # ELASTICSEARCH_AWS_HOSTED=True # ELASTICSEARCH_STREAMING_BULK=False +# perform refresh after each bulk push, this might cause more times +# ELASTICSEARCH_REFRESH_INDEX=False # maximum number of times a document will be retried when ``429`` is received, # set to 0 (default) for no retries on ``429`` # ELASTICSEARCH_MAX_RETRIES=0 diff --git a/pgsync/search_client.py b/pgsync/search_client.py index 98467e78..12fc0cf5 100644 --- a/pgsync/search_client.py +++ b/pgsync/search_client.py @@ -128,7 +128,7 @@ def bulk( raise_on_error or settings.ELASTICSEARCH_RAISE_ON_ERROR ) ignore_status = ignore_status or settings.ELASTICSEARCH_IGNORE_STATUS - + refresh = refresh or settings.ELASTICSEARCH_REFRESH_INDEX try: self._bulk( index, @@ -198,7 +198,8 @@ def _bulk( ignore_status=ignore_status, ): self.doc_count += 1 - self.refresh(index) + if refresh: + self.refresh(index) def refresh(self, index: str) -> None: """Refresh the Elasticsearch/OpenSearch index.""" diff --git a/pgsync/settings.py b/pgsync/settings.py index 27acc20f..31527940 100644 --- a/pgsync/settings.py +++ b/pgsync/settings.py @@ -126,7 +126,8 @@ "ELASTICSEARCH_IGNORE_STATUS", default=[404] ) ELASTICSEARCH_IGNORE_STATUS = tuple(map(int, ELASTICSEARCH_IGNORE_STATUS)) - +# will it perform refresh after each bulk sync +ELASTICSEARCH_REFRESH_INDEX = env.bool("ELASTICSEARCH_REFRESH_INDEX", default=False) ELASTICSEARCH = env.bool("ELASTICSEARCH", default=True) OPENSEARCH = env.bool("OPENSEARCH", default=(not ELASTICSEARCH)) OPENSEARCH_AWS_HOSTED = env.bool("OPENSEARCH_AWS_HOSTED", default=False) From 0a918479323792b5dd4bab80e744124065d8b084 Mon Sep 17 00:00:00 2001 From: "vivek.b" Date: Thu, 20 Jul 2023 21:34:43 +0530 Subject: [PATCH 3/6] adding try catch --- pgsync/search_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pgsync/search_client.py b/pgsync/search_client.py index 12fc0cf5..b9f580d4 100644 --- a/pgsync/search_client.py +++ b/pgsync/search_client.py @@ -199,7 +199,10 @@ def _bulk( ): self.doc_count += 1 if refresh: - self.refresh(index) + try: + self.refresh(index) + except Exception as e: + logger.exception(f"Issue while refreshing {index} Index: {e}") def refresh(self, index: str) -> None: """Refresh the Elasticsearch/OpenSearch index.""" From c59e81dd8c443f905003d42741b2b9b235acd1ea Mon Sep 17 00:00:00 2001 From: "vivek.b" Date: Fri, 21 Jul 2023 14:53:59 +0530 Subject: [PATCH 4/6] Migrated elasticsearch8.x and elasticsearch-dsl-aq accelq version --- .env.sample | 2 ++ .vscode/settings.json | 7 ++++++ pgsync/search_client.py | 45 +++++++++++++++++++++++++++++++++---- pgsync/settings.py | 3 +++ requirements/base.in | 4 ++-- requirements/dev.txt | 10 ++++----- requirements/prod.txt | 10 ++++----- requirements/test.txt | 10 ++++----- tests/test_search_client.py | 9 ++++---- tests/test_view.py | 4 ---- 10 files changed, 74 insertions(+), 30 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.env.sample b/.env.sample index 513db352..2309dde9 100644 --- a/.env.sample +++ b/.env.sample @@ -57,6 +57,8 @@ # ELASTICSEARCH_STREAMING_BULK=False # perform refresh after each bulk push, this might cause more times # ELASTICSEARCH_REFRESH_INDEX=False +# elastic search new bulk API with refresh power 8.x +# ELASTICSEARCH_REFRESH_BULK=False # maximum number of times a document will be retried when ``429`` is received, # set to 0 (default) for no retries on ``429`` # ELASTICSEARCH_MAX_RETRIES=0 diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9b388533 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true +} \ No newline at end of file diff --git a/pgsync/search_client.py b/pgsync/search_client.py index b9f580d4..9fddfe19 100644 --- a/pgsync/search_client.py +++ b/pgsync/search_client.py @@ -40,7 +40,7 @@ def __init__(self): self.__client: elasticsearch.Elasticsearch = get_search_client( url, client=elasticsearch.Elasticsearch, - connection_class=elasticsearch.RequestsHttpConnection, + connection_class=elasticsearch, ) try: self.major_version: int = int( @@ -52,6 +52,7 @@ def __init__(self): elasticsearch.helpers.streaming_bulk ) self.parallel_bulk: Callable = elasticsearch.helpers.parallel_bulk + self.refresh_bulk: Callable = elasticsearch.helpers.bulk self.Search: Callable = elasticsearch_dsl.Search self.Bool: Callable = elasticsearch_dsl.query.Bool self.Q: Callable = elasticsearch_dsl.Q @@ -182,6 +183,21 @@ def _bulk( raise_on_error=raise_on_error, ): self.doc_count += 1 + elif settings.ELASTICSEARCH_REFRESH_BULK: + # this is only for elasticsearch new bulk api + for _ in self.refresh_bulk( + self.__client, + actions, + thread_count=thread_count, + chunk_size=chunk_size, + max_chunk_bytes=max_chunk_bytes, + queue_size=queue_size, + refresh=refresh, + raise_on_exception=raise_on_exception, + raise_on_error=raise_on_error, + ignore_status=ignore_status, + ): + self.doc_count += 1 else: # parallel bulk consumes more memory and is also more likely # to result in 429 errors. @@ -262,7 +278,7 @@ def _create_setting( """Create Elasticsearch/OpenSearch setting and mapping if required.""" body: dict = defaultdict(lambda: defaultdict(dict)) - if not self.__client.indices.exists(index): + if not self.__client.indices.exists(index=index): if setting: body.update(**{"settings": {"index": setting}}) @@ -350,7 +366,7 @@ def get_search_client( client: Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch], connection_class: Union[ opensearchpy.RequestsHttpConnection, - elasticsearch.RequestsHttpConnection, + elasticsearch.Elasticsearch, ], ) -> Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch]: if settings.OPENSEARCH_AWS_HOSTED or settings.ELASTICSEARCH_AWS_HOSTED: @@ -404,8 +420,29 @@ def get_search_client( ssl_context: Optional[Any] = settings.ELASTICSEARCH_SSL_CONTEXT ssl_show_warn: bool = settings.ELASTICSEARCH_SSL_SHOW_WARN # Transport - use_ssl: bool = settings.ELASTICSEARCH_USE_SSL + use_ssl: bool = settings.ELASTICSEARCH_USE_SSL # (Does not exists on elastic anymore 8.8.2) timeout: float = settings.ELASTICSEARCH_TIMEOUT + if settings.ELASTICSEARCH: + return client( + hosts=hosts, + http_auth=http_auth, + cloud_id=cloud_id, + api_key=api_key, + basic_auth=basic_auth, + bearer_auth=bearer_auth, + opaque_id=opaque_id, + http_compress=http_compress, + verify_certs=verify_certs, + ca_certs=ca_certs, + client_cert=client_cert, + client_key=client_key, + ssl_assert_hostname=ssl_assert_hostname, + ssl_assert_fingerprint=ssl_assert_fingerprint, + ssl_version=ssl_version, + ssl_context=ssl_context, + ssl_show_warn=ssl_show_warn, + timeout=timeout, + ) return client( hosts=hosts, http_auth=http_auth, diff --git a/pgsync/settings.py b/pgsync/settings.py index 31527940..9b60c5f8 100644 --- a/pgsync/settings.py +++ b/pgsync/settings.py @@ -109,6 +109,9 @@ ELASTICSEARCH_STREAMING_BULK = env.bool( "ELASTICSEARCH_STREAMING_BULK", default=False ) +ELASTICSEARCH_REFRESH_BULK = env.bool( + "ELASTICSEARCH_REFRESH_BULK", default=False +) # the size of the threadpool to use for the bulk requests ELASTICSEARCH_THREAD_COUNT = env.int("ELASTICSEARCH_THREAD_COUNT", default=4) # increase this if you are getting read request timeouts diff --git a/requirements/base.in b/requirements/base.in index 6c95c627..fbac22a4 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -2,8 +2,8 @@ black boto3 click -elasticsearch==7.13.4 # pin this to version 7.13.4 for compatibility with OpenSearch https://opensearch.org/docs/clients/index/ -elasticsearch-dsl>=6.0.0,<8.0.0 +elasticsearch==8.8.2 # pin this to version 7.13.4 for compatibility with OpenSearch https://opensearch.org/docs/clients/index/ +elasticsearch-dsl-aq>=6.0.0,<9.0.0 environs faker opensearch-dsl>=2.0.1 diff --git a/requirements/dev.txt b/requirements/dev.txt index 5b9f0bcd..44410f60 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -36,11 +36,11 @@ coverage[toml]==7.2.7 # pytest-cov distlib==0.3.7 # via virtualenv -elasticsearch==7.13.4 +elasticsearch==8.8.2 # via # -r requirements/base.in - # elasticsearch-dsl -elasticsearch-dsl==7.4.1 + # elasticsearch-dsl-aq +elasticsearch-dsl-aq==8.0.1 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in @@ -149,7 +149,7 @@ pytest-sugar==0.9.7 python-dateutil==2.8.2 # via # botocore - # elasticsearch-dsl + # elasticsearch-dsl-aq # faker # freezegun # opensearch-dsl @@ -172,7 +172,7 @@ s3transfer==0.6.1 # via boto3 six==1.16.0 # via - # elasticsearch-dsl + # elasticsearch-dsl-aq # opensearch-dsl # opensearch-py # python-dateutil diff --git a/requirements/prod.txt b/requirements/prod.txt index 61a0affd..05c6b19d 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -25,11 +25,11 @@ click==8.1.5 # via # -r requirements/base.in # black -elasticsearch==7.13.4 +elasticsearch==8.8.2 # via # -r requirements/base.in - # elasticsearch-dsl -elasticsearch-dsl==7.4.1 + # elasticsearch-dsl-aq +elasticsearch-dsl-aq==8.0.1 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in @@ -66,7 +66,7 @@ psycopg2-binary==2.9.6 python-dateutil==2.8.2 # via # botocore - # elasticsearch-dsl + # elasticsearch-dsl-aq # faker # opensearch-dsl # opensearch-py @@ -86,7 +86,7 @@ s3transfer==0.6.1 # via boto3 six==1.16.0 # via - # elasticsearch-dsl + # elasticsearch-dsl-aq # opensearch-dsl # opensearch-py # python-dateutil diff --git a/requirements/test.txt b/requirements/test.txt index cdc0b169..6510233f 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -27,11 +27,11 @@ click==8.1.5 # black coverage[toml]==7.2.7 # via pytest-cov -elasticsearch==7.13.4 +elasticsearch==8.8.2 # via # -r requirements/base.in - # elasticsearch-dsl -elasticsearch-dsl==7.4.1 + # elasticsearch-dsl-aq +elasticsearch-dsl-aq==8.0.1 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in @@ -125,7 +125,7 @@ pytest-sugar==0.9.7 python-dateutil==2.8.2 # via # botocore - # elasticsearch-dsl + # elasticsearch-dsl-aq # faker # freezegun # opensearch-dsl @@ -146,7 +146,7 @@ s3transfer==0.6.1 # via boto3 six==1.16.0 # via - # elasticsearch-dsl + # elasticsearch-dsl-aq # opensearch-dsl # opensearch-py # python-dateutil diff --git a/tests/test_search_client.py b/tests/test_search_client.py index 2bddd2bf..953d377b 100644 --- a/tests/test_search_client.py +++ b/tests/test_search_client.py @@ -30,7 +30,7 @@ def test_get_search_init(self, mocker): mock_search_client.assert_called_once_with( url, client=elasticsearch.Elasticsearch, - connection_class=elasticsearch.RequestsHttpConnection, + connection_class=elasticsearch, ) def test_get_search_client(self, mocker): @@ -49,7 +49,7 @@ def test_get_search_client(self, mocker): get_search_client( url, client=elasticsearch.Elasticsearch, - connection_class=elasticsearch.RequestsHttpConnection, + connection_class=elasticsearch, ) ssl_assert_hostname = ( settings.ELASTICSEARCH_SSL_ASSERT_HOSTNAME @@ -75,7 +75,6 @@ def test_get_search_client(self, mocker): ssl_version=settings.ELASTICSEARCH_SSL_VERSION, ssl_context=settings.ELASTICSEARCH_SSL_CONTEXT, ssl_show_warn=settings.ELASTICSEARCH_SSL_SHOW_WARN, - use_ssl=settings.ELASTICSEARCH_USE_SSL, timeout=settings.ELASTICSEARCH_TIMEOUT, ) @@ -101,7 +100,7 @@ def test_get_search_client(self, mocker): url, client=elasticsearch.Elasticsearch, connection_class=( - elasticsearch.RequestsHttpConnection + elasticsearch.Elasticsearch ), ) mock_search_client.assert_called_once_with( @@ -110,6 +109,6 @@ def test_get_search_client(self, mocker): use_ssl=True, verify_certs=True, connection_class=( - elasticsearch.RequestsHttpConnection + elasticsearch.Elasticsearch ), ) diff --git a/tests/test_view.py b/tests/test_view.py index ad7dd374..27123b7d 100644 --- a/tests/test_view.py +++ b/tests/test_view.py @@ -193,10 +193,6 @@ def test_index(self, connection, sync, book_cls, data): "name": "my_index", "unique": True, "column_names": ["isbn"], - "include_columns": [], - "dialect_options": { - "postgresql_include": [], - }, } connection.engine.execute(DropIndex("my_index")) indices = sa.inspect(connection.engine).get_indexes( From 31efd70b15486415d6fee2f30350a33ceac676ad Mon Sep 17 00:00:00 2001 From: "vivek.b" Date: Fri, 21 Jul 2023 15:53:50 +0530 Subject: [PATCH 5/6] reverting bulk api --- .env.sample | 2 -- pgsync/search_client.py | 15 --------------- pgsync/settings.py | 3 --- 3 files changed, 20 deletions(-) diff --git a/.env.sample b/.env.sample index 2309dde9..513db352 100644 --- a/.env.sample +++ b/.env.sample @@ -57,8 +57,6 @@ # ELASTICSEARCH_STREAMING_BULK=False # perform refresh after each bulk push, this might cause more times # ELASTICSEARCH_REFRESH_INDEX=False -# elastic search new bulk API with refresh power 8.x -# ELASTICSEARCH_REFRESH_BULK=False # maximum number of times a document will be retried when ``429`` is received, # set to 0 (default) for no retries on ``429`` # ELASTICSEARCH_MAX_RETRIES=0 diff --git a/pgsync/search_client.py b/pgsync/search_client.py index 9fddfe19..ab47a62c 100644 --- a/pgsync/search_client.py +++ b/pgsync/search_client.py @@ -183,21 +183,6 @@ def _bulk( raise_on_error=raise_on_error, ): self.doc_count += 1 - elif settings.ELASTICSEARCH_REFRESH_BULK: - # this is only for elasticsearch new bulk api - for _ in self.refresh_bulk( - self.__client, - actions, - thread_count=thread_count, - chunk_size=chunk_size, - max_chunk_bytes=max_chunk_bytes, - queue_size=queue_size, - refresh=refresh, - raise_on_exception=raise_on_exception, - raise_on_error=raise_on_error, - ignore_status=ignore_status, - ): - self.doc_count += 1 else: # parallel bulk consumes more memory and is also more likely # to result in 429 errors. diff --git a/pgsync/settings.py b/pgsync/settings.py index 9b60c5f8..31527940 100644 --- a/pgsync/settings.py +++ b/pgsync/settings.py @@ -109,9 +109,6 @@ ELASTICSEARCH_STREAMING_BULK = env.bool( "ELASTICSEARCH_STREAMING_BULK", default=False ) -ELASTICSEARCH_REFRESH_BULK = env.bool( - "ELASTICSEARCH_REFRESH_BULK", default=False -) # the size of the threadpool to use for the bulk requests ELASTICSEARCH_THREAD_COUNT = env.int("ELASTICSEARCH_THREAD_COUNT", default=4) # increase this if you are getting read request timeouts From be6f168a988e76a0ae97ed3e67f30b7d47391629 Mon Sep 17 00:00:00 2001 From: "vivek.b" Date: Fri, 21 Jul 2023 16:52:35 +0530 Subject: [PATCH 6/6] removed bulk --- pgsync/search_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pgsync/search_client.py b/pgsync/search_client.py index ab47a62c..91d1aca7 100644 --- a/pgsync/search_client.py +++ b/pgsync/search_client.py @@ -52,7 +52,6 @@ def __init__(self): elasticsearch.helpers.streaming_bulk ) self.parallel_bulk: Callable = elasticsearch.helpers.parallel_bulk - self.refresh_bulk: Callable = elasticsearch.helpers.bulk self.Search: Callable = elasticsearch_dsl.Search self.Bool: Callable = elasticsearch_dsl.query.Bool self.Q: Callable = elasticsearch_dsl.Q