diff --git a/.env.sample b/.env.sample index ebec62ee..513db352 100644 --- a/.env.sample +++ b/.env.sample @@ -55,6 +55,8 @@ # ELASTICSEARCH_AWS_REGION=eu-west-1 # ELASTICSEARCH_AWS_HOSTED=True # ELASTICSEARCH_STREAMING_BULK=False +# perform refresh after each bulk push, this might cause more times +# ELASTICSEARCH_REFRESH_INDEX=False # maximum number of times a document will be retried when ``429`` is received, # set to 0 (default) for no retries on ``429`` # ELASTICSEARCH_MAX_RETRIES=0 diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..9b388533 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true +} \ No newline at end of file diff --git a/pgsync/search_client.py b/pgsync/search_client.py index db298e39..91d1aca7 100644 --- a/pgsync/search_client.py +++ b/pgsync/search_client.py @@ -40,7 +40,7 @@ def __init__(self): self.__client: elasticsearch.Elasticsearch = get_search_client( url, client=elasticsearch.Elasticsearch, - connection_class=elasticsearch.RequestsHttpConnection, + connection_class=elasticsearch, ) try: self.major_version: int = int( @@ -128,7 +128,7 @@ def bulk( raise_on_error or settings.ELASTICSEARCH_RAISE_ON_ERROR ) ignore_status = ignore_status or settings.ELASTICSEARCH_IGNORE_STATUS - + refresh = refresh or settings.ELASTICSEARCH_REFRESH_INDEX try: self._bulk( index, @@ -198,10 +198,15 @@ def _bulk( ignore_status=ignore_status, ): self.doc_count += 1 + if refresh: + try: + self.refresh(index) + except Exception as e: + logger.exception(f"Issue while refreshing {index} Index: {e}") - def refresh(self, indices: List[str]) -> None: + def refresh(self, index: str) -> None: """Refresh the Elasticsearch/OpenSearch index.""" - self.__client.indices.refresh(index=indices) + self.__client.indices.refresh(index=index) def _search(self, index: str, table: str, fields: Optional[dict] = None): """ @@ -257,7 +262,7 @@ def _create_setting( """Create Elasticsearch/OpenSearch setting and mapping if required.""" body: dict = defaultdict(lambda: defaultdict(dict)) - if not self.__client.indices.exists(index): + if not self.__client.indices.exists(index=index): if setting: body.update(**{"settings": {"index": setting}}) @@ -345,7 +350,7 @@ def get_search_client( client: Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch], connection_class: Union[ opensearchpy.RequestsHttpConnection, - elasticsearch.RequestsHttpConnection, + elasticsearch.Elasticsearch, ], ) -> Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch]: if settings.OPENSEARCH_AWS_HOSTED or settings.ELASTICSEARCH_AWS_HOSTED: @@ -399,8 +404,29 @@ def get_search_client( ssl_context: Optional[Any] = settings.ELASTICSEARCH_SSL_CONTEXT ssl_show_warn: bool = settings.ELASTICSEARCH_SSL_SHOW_WARN # Transport - use_ssl: bool = settings.ELASTICSEARCH_USE_SSL + use_ssl: bool = settings.ELASTICSEARCH_USE_SSL # (Does not exists on elastic anymore 8.8.2) timeout: float = settings.ELASTICSEARCH_TIMEOUT + if settings.ELASTICSEARCH: + return client( + hosts=hosts, + http_auth=http_auth, + cloud_id=cloud_id, + api_key=api_key, + basic_auth=basic_auth, + bearer_auth=bearer_auth, + opaque_id=opaque_id, + http_compress=http_compress, + verify_certs=verify_certs, + ca_certs=ca_certs, + client_cert=client_cert, + client_key=client_key, + ssl_assert_hostname=ssl_assert_hostname, + ssl_assert_fingerprint=ssl_assert_fingerprint, + ssl_version=ssl_version, + ssl_context=ssl_context, + ssl_show_warn=ssl_show_warn, + timeout=timeout, + ) return client( hosts=hosts, http_auth=http_auth, diff --git a/pgsync/settings.py b/pgsync/settings.py index 27acc20f..31527940 100644 --- a/pgsync/settings.py +++ b/pgsync/settings.py @@ -126,7 +126,8 @@ "ELASTICSEARCH_IGNORE_STATUS", default=[404] ) ELASTICSEARCH_IGNORE_STATUS = tuple(map(int, ELASTICSEARCH_IGNORE_STATUS)) - +# will it perform refresh after each bulk sync +ELASTICSEARCH_REFRESH_INDEX = env.bool("ELASTICSEARCH_REFRESH_INDEX", default=False) ELASTICSEARCH = env.bool("ELASTICSEARCH", default=True) OPENSEARCH = env.bool("OPENSEARCH", default=(not ELASTICSEARCH)) OPENSEARCH_AWS_HOSTED = env.bool("OPENSEARCH_AWS_HOSTED", default=False) diff --git a/requirements/base.in b/requirements/base.in index 6c95c627..fbac22a4 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -2,8 +2,8 @@ black boto3 click -elasticsearch==7.13.4 # pin this to version 7.13.4 for compatibility with OpenSearch https://opensearch.org/docs/clients/index/ -elasticsearch-dsl>=6.0.0,<8.0.0 +elasticsearch==8.8.2 # pin this to version 7.13.4 for compatibility with OpenSearch https://opensearch.org/docs/clients/index/ +elasticsearch-dsl-aq>=6.0.0,<9.0.0 environs faker opensearch-dsl>=2.0.1 diff --git a/requirements/dev.txt b/requirements/dev.txt index 5b9f0bcd..44410f60 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -36,11 +36,11 @@ coverage[toml]==7.2.7 # pytest-cov distlib==0.3.7 # via virtualenv -elasticsearch==7.13.4 +elasticsearch==8.8.2 # via # -r requirements/base.in - # elasticsearch-dsl -elasticsearch-dsl==7.4.1 + # elasticsearch-dsl-aq +elasticsearch-dsl-aq==8.0.1 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in @@ -149,7 +149,7 @@ pytest-sugar==0.9.7 python-dateutil==2.8.2 # via # botocore - # elasticsearch-dsl + # elasticsearch-dsl-aq # faker # freezegun # opensearch-dsl @@ -172,7 +172,7 @@ s3transfer==0.6.1 # via boto3 six==1.16.0 # via - # elasticsearch-dsl + # elasticsearch-dsl-aq # opensearch-dsl # opensearch-py # python-dateutil diff --git a/requirements/prod.txt b/requirements/prod.txt index 61a0affd..05c6b19d 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -25,11 +25,11 @@ click==8.1.5 # via # -r requirements/base.in # black -elasticsearch==7.13.4 +elasticsearch==8.8.2 # via # -r requirements/base.in - # elasticsearch-dsl -elasticsearch-dsl==7.4.1 + # elasticsearch-dsl-aq +elasticsearch-dsl-aq==8.0.1 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in @@ -66,7 +66,7 @@ psycopg2-binary==2.9.6 python-dateutil==2.8.2 # via # botocore - # elasticsearch-dsl + # elasticsearch-dsl-aq # faker # opensearch-dsl # opensearch-py @@ -86,7 +86,7 @@ s3transfer==0.6.1 # via boto3 six==1.16.0 # via - # elasticsearch-dsl + # elasticsearch-dsl-aq # opensearch-dsl # opensearch-py # python-dateutil diff --git a/requirements/test.txt b/requirements/test.txt index cdc0b169..6510233f 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -27,11 +27,11 @@ click==8.1.5 # black coverage[toml]==7.2.7 # via pytest-cov -elasticsearch==7.13.4 +elasticsearch==8.8.2 # via # -r requirements/base.in - # elasticsearch-dsl -elasticsearch-dsl==7.4.1 + # elasticsearch-dsl-aq +elasticsearch-dsl-aq==8.0.1 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in @@ -125,7 +125,7 @@ pytest-sugar==0.9.7 python-dateutil==2.8.2 # via # botocore - # elasticsearch-dsl + # elasticsearch-dsl-aq # faker # freezegun # opensearch-dsl @@ -146,7 +146,7 @@ s3transfer==0.6.1 # via boto3 six==1.16.0 # via - # elasticsearch-dsl + # elasticsearch-dsl-aq # opensearch-dsl # opensearch-py # python-dateutil diff --git a/tests/test_search_client.py b/tests/test_search_client.py index 2bddd2bf..953d377b 100644 --- a/tests/test_search_client.py +++ b/tests/test_search_client.py @@ -30,7 +30,7 @@ def test_get_search_init(self, mocker): mock_search_client.assert_called_once_with( url, client=elasticsearch.Elasticsearch, - connection_class=elasticsearch.RequestsHttpConnection, + connection_class=elasticsearch, ) def test_get_search_client(self, mocker): @@ -49,7 +49,7 @@ def test_get_search_client(self, mocker): get_search_client( url, client=elasticsearch.Elasticsearch, - connection_class=elasticsearch.RequestsHttpConnection, + connection_class=elasticsearch, ) ssl_assert_hostname = ( settings.ELASTICSEARCH_SSL_ASSERT_HOSTNAME @@ -75,7 +75,6 @@ def test_get_search_client(self, mocker): ssl_version=settings.ELASTICSEARCH_SSL_VERSION, ssl_context=settings.ELASTICSEARCH_SSL_CONTEXT, ssl_show_warn=settings.ELASTICSEARCH_SSL_SHOW_WARN, - use_ssl=settings.ELASTICSEARCH_USE_SSL, timeout=settings.ELASTICSEARCH_TIMEOUT, ) @@ -101,7 +100,7 @@ def test_get_search_client(self, mocker): url, client=elasticsearch.Elasticsearch, connection_class=( - elasticsearch.RequestsHttpConnection + elasticsearch.Elasticsearch ), ) mock_search_client.assert_called_once_with( @@ -110,6 +109,6 @@ def test_get_search_client(self, mocker): use_ssl=True, verify_certs=True, connection_class=( - elasticsearch.RequestsHttpConnection + elasticsearch.Elasticsearch ), ) diff --git a/tests/test_view.py b/tests/test_view.py index ad7dd374..27123b7d 100644 --- a/tests/test_view.py +++ b/tests/test_view.py @@ -193,10 +193,6 @@ def test_index(self, connection, sync, book_cls, data): "name": "my_index", "unique": True, "column_names": ["isbn"], - "include_columns": [], - "dialect_options": { - "postgresql_include": [], - }, } connection.engine.execute(DropIndex("my_index")) indices = sa.inspect(connection.engine).get_indexes(