Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
# ELASTICSEARCH_AWS_REGION=eu-west-1
# ELASTICSEARCH_AWS_HOSTED=True
# ELASTICSEARCH_STREAMING_BULK=False
# perform refresh after each bulk push, this might cause more times
# ELASTICSEARCH_REFRESH_INDEX=False
# maximum number of times a document will be retried when ``429`` is received,
# set to 0 (default) for no retries on ``429``
# ELASTICSEARCH_MAX_RETRIES=0
Expand Down
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
40 changes: 33 additions & 7 deletions pgsync/search_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self):
self.__client: elasticsearch.Elasticsearch = get_search_client(
url,
client=elasticsearch.Elasticsearch,
connection_class=elasticsearch.RequestsHttpConnection,
connection_class=elasticsearch,
)
try:
self.major_version: int = int(
Expand Down Expand Up @@ -128,7 +128,7 @@ def bulk(
raise_on_error or settings.ELASTICSEARCH_RAISE_ON_ERROR
)
ignore_status = ignore_status or settings.ELASTICSEARCH_IGNORE_STATUS

refresh = refresh or settings.ELASTICSEARCH_REFRESH_INDEX
try:
self._bulk(
index,
Expand Down Expand Up @@ -198,10 +198,15 @@ def _bulk(
ignore_status=ignore_status,
):
self.doc_count += 1
if refresh:
try:
self.refresh(index)
except Exception as e:
logger.exception(f"Issue while refreshing {index} Index: {e}")

def refresh(self, indices: List[str]) -> None:
def refresh(self, index: str) -> None:
"""Refresh the Elasticsearch/OpenSearch index."""
self.__client.indices.refresh(index=indices)
self.__client.indices.refresh(index=index)

def _search(self, index: str, table: str, fields: Optional[dict] = None):
"""
Expand Down Expand Up @@ -257,7 +262,7 @@ def _create_setting(
"""Create Elasticsearch/OpenSearch setting and mapping if required."""
body: dict = defaultdict(lambda: defaultdict(dict))

if not self.__client.indices.exists(index):
if not self.__client.indices.exists(index=index):
if setting:
body.update(**{"settings": {"index": setting}})

Expand Down Expand Up @@ -345,7 +350,7 @@ def get_search_client(
client: Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch],
connection_class: Union[
opensearchpy.RequestsHttpConnection,
elasticsearch.RequestsHttpConnection,
elasticsearch.Elasticsearch,
],
) -> Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch]:
if settings.OPENSEARCH_AWS_HOSTED or settings.ELASTICSEARCH_AWS_HOSTED:
Expand Down Expand Up @@ -399,8 +404,29 @@ def get_search_client(
ssl_context: Optional[Any] = settings.ELASTICSEARCH_SSL_CONTEXT
ssl_show_warn: bool = settings.ELASTICSEARCH_SSL_SHOW_WARN
# Transport
use_ssl: bool = settings.ELASTICSEARCH_USE_SSL
use_ssl: bool = settings.ELASTICSEARCH_USE_SSL # (Does not exists on elastic anymore 8.8.2)
timeout: float = settings.ELASTICSEARCH_TIMEOUT
if settings.ELASTICSEARCH:
return client(
hosts=hosts,
http_auth=http_auth,
cloud_id=cloud_id,
api_key=api_key,
basic_auth=basic_auth,
bearer_auth=bearer_auth,
opaque_id=opaque_id,
http_compress=http_compress,
verify_certs=verify_certs,
ca_certs=ca_certs,
client_cert=client_cert,
client_key=client_key,
ssl_assert_hostname=ssl_assert_hostname,
ssl_assert_fingerprint=ssl_assert_fingerprint,
ssl_version=ssl_version,
ssl_context=ssl_context,
ssl_show_warn=ssl_show_warn,
timeout=timeout,
)
return client(
hosts=hosts,
http_auth=http_auth,
Expand Down
3 changes: 2 additions & 1 deletion pgsync/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@
"ELASTICSEARCH_IGNORE_STATUS", default=[404]
)
ELASTICSEARCH_IGNORE_STATUS = tuple(map(int, ELASTICSEARCH_IGNORE_STATUS))

# will it perform refresh after each bulk sync
ELASTICSEARCH_REFRESH_INDEX = env.bool("ELASTICSEARCH_REFRESH_INDEX", default=False)
ELASTICSEARCH = env.bool("ELASTICSEARCH", default=True)
OPENSEARCH = env.bool("OPENSEARCH", default=(not ELASTICSEARCH))
OPENSEARCH_AWS_HOSTED = env.bool("OPENSEARCH_AWS_HOSTED", default=False)
Expand Down
4 changes: 2 additions & 2 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
black
boto3
click
elasticsearch==7.13.4 # pin this to version 7.13.4 for compatibility with OpenSearch https://opensearch.org/docs/clients/index/
elasticsearch-dsl>=6.0.0,<8.0.0
elasticsearch==8.8.2 # pin this to version 7.13.4 for compatibility with OpenSearch https://opensearch.org/docs/clients/index/
elasticsearch-dsl-aq>=6.0.0,<9.0.0
environs
faker
opensearch-dsl>=2.0.1
Expand Down
10 changes: 5 additions & 5 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ coverage[toml]==7.2.7
# pytest-cov
distlib==0.3.7
# via virtualenv
elasticsearch==7.13.4
elasticsearch==8.8.2
# via
# -r requirements/base.in
# elasticsearch-dsl
elasticsearch-dsl==7.4.1
# elasticsearch-dsl-aq
elasticsearch-dsl-aq==8.0.1
# via -r requirements/base.in
environs==9.5.0
# via -r requirements/base.in
Expand Down Expand Up @@ -149,7 +149,7 @@ pytest-sugar==0.9.7
python-dateutil==2.8.2
# via
# botocore
# elasticsearch-dsl
# elasticsearch-dsl-aq
# faker
# freezegun
# opensearch-dsl
Expand All @@ -172,7 +172,7 @@ s3transfer==0.6.1
# via boto3
six==1.16.0
# via
# elasticsearch-dsl
# elasticsearch-dsl-aq
# opensearch-dsl
# opensearch-py
# python-dateutil
Expand Down
10 changes: 5 additions & 5 deletions requirements/prod.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ click==8.1.5
# via
# -r requirements/base.in
# black
elasticsearch==7.13.4
elasticsearch==8.8.2
# via
# -r requirements/base.in
# elasticsearch-dsl
elasticsearch-dsl==7.4.1
# elasticsearch-dsl-aq
elasticsearch-dsl-aq==8.0.1
# via -r requirements/base.in
environs==9.5.0
# via -r requirements/base.in
Expand Down Expand Up @@ -66,7 +66,7 @@ psycopg2-binary==2.9.6
python-dateutil==2.8.2
# via
# botocore
# elasticsearch-dsl
# elasticsearch-dsl-aq
# faker
# opensearch-dsl
# opensearch-py
Expand All @@ -86,7 +86,7 @@ s3transfer==0.6.1
# via boto3
six==1.16.0
# via
# elasticsearch-dsl
# elasticsearch-dsl-aq
# opensearch-dsl
# opensearch-py
# python-dateutil
Expand Down
10 changes: 5 additions & 5 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ click==8.1.5
# black
coverage[toml]==7.2.7
# via pytest-cov
elasticsearch==7.13.4
elasticsearch==8.8.2
# via
# -r requirements/base.in
# elasticsearch-dsl
elasticsearch-dsl==7.4.1
# elasticsearch-dsl-aq
elasticsearch-dsl-aq==8.0.1
# via -r requirements/base.in
environs==9.5.0
# via -r requirements/base.in
Expand Down Expand Up @@ -125,7 +125,7 @@ pytest-sugar==0.9.7
python-dateutil==2.8.2
# via
# botocore
# elasticsearch-dsl
# elasticsearch-dsl-aq
# faker
# freezegun
# opensearch-dsl
Expand All @@ -146,7 +146,7 @@ s3transfer==0.6.1
# via boto3
six==1.16.0
# via
# elasticsearch-dsl
# elasticsearch-dsl-aq
# opensearch-dsl
# opensearch-py
# python-dateutil
Expand Down
9 changes: 4 additions & 5 deletions tests/test_search_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_get_search_init(self, mocker):
mock_search_client.assert_called_once_with(
url,
client=elasticsearch.Elasticsearch,
connection_class=elasticsearch.RequestsHttpConnection,
connection_class=elasticsearch,
)

def test_get_search_client(self, mocker):
Expand All @@ -49,7 +49,7 @@ def test_get_search_client(self, mocker):
get_search_client(
url,
client=elasticsearch.Elasticsearch,
connection_class=elasticsearch.RequestsHttpConnection,
connection_class=elasticsearch,
)
ssl_assert_hostname = (
settings.ELASTICSEARCH_SSL_ASSERT_HOSTNAME
Expand All @@ -75,7 +75,6 @@ def test_get_search_client(self, mocker):
ssl_version=settings.ELASTICSEARCH_SSL_VERSION,
ssl_context=settings.ELASTICSEARCH_SSL_CONTEXT,
ssl_show_warn=settings.ELASTICSEARCH_SSL_SHOW_WARN,
use_ssl=settings.ELASTICSEARCH_USE_SSL,
timeout=settings.ELASTICSEARCH_TIMEOUT,
)

Expand All @@ -101,7 +100,7 @@ def test_get_search_client(self, mocker):
url,
client=elasticsearch.Elasticsearch,
connection_class=(
elasticsearch.RequestsHttpConnection
elasticsearch.Elasticsearch
),
)
mock_search_client.assert_called_once_with(
Expand All @@ -110,6 +109,6 @@ def test_get_search_client(self, mocker):
use_ssl=True,
verify_certs=True,
connection_class=(
elasticsearch.RequestsHttpConnection
elasticsearch.Elasticsearch
),
)
4 changes: 0 additions & 4 deletions tests/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,6 @@ def test_index(self, connection, sync, book_cls, data):
"name": "my_index",
"unique": True,
"column_names": ["isbn"],
"include_columns": [],
"dialect_options": {
"postgresql_include": [],
},
}
connection.engine.execute(DropIndex("my_index"))
indices = sa.inspect(connection.engine).get_indexes(
Expand Down