diff --git a/README.md b/README.md index 48f488e..db23b56 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,8 @@ You need to set the variables in the `main.py` file for the ES python client: ```python # Variables to configure the ES client: "elasticsearch_host": "https://localhost:9200", -"elasticsearch_ca_path": "/home/c/.elastic-package/profiles/default/certs/elasticsearch/ca-cert.pem", +# Path to the Elasticsearch certificate. If not set, the program will use the default system/certifi CA bundle. +"elasticsearch_ca_path": "", "elasticsearch_user": "elastic", "elasticsearch_pwd": "changeme", @@ -50,10 +51,22 @@ You need to set the variables in the `main.py` file for the ES python client: "cloud_id": "", ``` -You also need to set the name of the data stream you want to test: +Alternatively, you can set ES credentials via environment variables. When set, +they override the hardcoded defaults: + +```bash +export ELASTIC_PACKAGE_ELASTICSEARCH_HOST=https://localhost:9200 +export ELASTIC_PACKAGE_ELASTICSEARCH_USERNAME=elastic +export ELASTIC_PACKAGE_ELASTICSEARCH_PASSWORD=changeme +``` + +You also need to set the list of data streams you want to test. The program +will run for each data stream in the list: ```python -# Name of the data stream to test -"data_stream": "metrics-aws.s3_storage_lens-default", +# List of data streams to test (the program will run for each one) +"data_streams": [ + "metrics-aws.s3_storage_lens-default", +], ``` Additionally, the `main.py` has defaults for: @@ -94,13 +107,14 @@ and the index number for the index you want to use for the settings and mappings - Do you want to get in a local directory some of the files that are being overwritten? Set these variables: ```python - # Name of the directory to place files. - "directory_overlapping_files": "overwritten-docs" + "-" + program_defaults["data_stream"], - # Do you want to get in your @directory_overlapping_files the files that are overlapping? # Set this to True and delete the directory named directory_overlapping_files if it already exists! "get_overlapping_files": True, ``` + > **Note**: The `directory_overlapping_files`, `failed_docs_file`, and `duplicate_docs_file` paths +are automatically generated per data stream (e.g. `overwritten-docs-metrics-aws.usage-default`, +`failed-docs-metrics-aws.usage-default.ndjson`, and +`duplicate-docs-metrics-aws.usage-default.ndjson`). You can override them via CLI flags if needed. > **Note**: The directory should not exist! Otherwise, the files will not be placed, since we are not deleting the directory. A warning will be shown indicating that the files were not placed: @@ -146,18 +160,24 @@ python main.py --help To see the options. The default values are also displayed. -Example: +Examples: -```python +```console python main.py --get_overlapping_files False --max_docs 40000 ``` +You can also pass multiple data streams as a comma-separated list: + +```console +python main.py --data_streams "metrics-aws.usage-default,metrics-aws.s3_storage_lens-default" +``` + ## Algorithm ![img.png](images/algorithm.png) -The algorithm for the program is as follows: +The algorithm for the program is as follows (repeated for each data stream): 1. Given the data stream name, we get all its indices. 2. Given the documents index number provided by the user (or the default, 0), we obtain the index name from the list we got on step 1. @@ -168,12 +188,12 @@ we obtain the index name from the list we got on step 1. 5. We update those same settings so TSDB is enabled. 6. We create a new index given the settings and mappings. This index has TSDB enabled. -7. We place the documents in index obtained on step 2 on our -TSDB enabled new index. -8. We compare if the number of files placed in the TSDB index is the same -as the number of files we retrieved from the documents index. -9. If it is the same, the program ends. -10. Otherwise, we will place all updated documents in a new index. +7. We copy documents from the index obtained on step 2 to our +TSDB enabled new index using the bulk API (mimicking how Elastic Agent sends data). +8. We check the bulk response for each document: created, duplicate (409 version conflict), +or other errors. +9. Documents that failed with errors other than duplicates are saved to an NDJSON file for inspection. Duplicate documents (409 version conflicts) are saved to a separate NDJSON file. +10. If there are duplicates, we place all updated documents in a new index. 11. The dimensions and timestamp of the documents in this new index will be displayed in the output. @@ -190,8 +210,19 @@ In case TSDB migration was successful, ie, no loss of data occurred. ```console +Values being used: + elasticsearch_host = https://localhost:9200 + elasticsearch_ca_path = + elasticsearch_user = elastic + elasticsearch_pwd = ******** + data_streams = metrics-aws.usage-default + ... + You're testing with version 8.8.0-SNAPSHOT. +============================================================ +[1/1] Processing data stream: metrics-aws.usage-default +============================================================ Testing data stream metrics-aws.usage-default. Index being used for the documents is .ds-metrics-aws.usage-default-2023.06.29-000001. Index being used for the settings and mappings is .ds-metrics-aws.usage-default-2023.06.29-000001. @@ -217,21 +248,32 @@ The time series fields for the TSDB index are: - cloud.account.id - cloud.region -Index tsdb-index-enabled successfully created. +Index tsdb-metrics-aws.usage-default successfully created. + +Copying documents from .ds-metrics-aws.usage-default-2023.06.29-000001 to tsdb-metrics-aws.usage-default... -Copying documents from .ds-metrics-aws.usage-default-2023.06.29-000001 to tsdb-index-enabled... -All 5000 documents taken from index .ds-metrics-aws.usage-default-2023.06.29-000001 were successfully placed to index tsdb-index-enabled. +Bulk indexing summary for .ds-metrics-aws.usage-default-2023.06.29-000001 -> tsdb-metrics-aws.usage-default: + Total documents sent: 5000 + Created: 5000 + Duplicates (409): 0 + Failed: 0 ```
-In case TSDB migration was not successful. +In case TSDB migration was not successful (duplicates detected). ```console +Values being used: + ... + You're testing with version 8.8.0-SNAPSHOT. +============================================================ +[1/1] Processing data stream: metrics-aws.usage-default +============================================================ Testing data stream metrics-aws.usage-default. Index being used for the documents is .ds-metrics-aws.usage-default-2023.06.29-000001. Index being used for the settings and mappings is .ds-metrics-aws.usage-default-2023.06.29-000001. @@ -257,13 +299,21 @@ The time series fields for the TSDB index are: - cloud.account.id - cloud.region -Index tsdb-index-enabled successfully created. +Index tsdb-metrics-aws.usage-default successfully created. + +Copying documents from .ds-metrics-aws.usage-default-2023.06.29-000001 to tsdb-metrics-aws.usage-default... + +Bulk indexing summary for .ds-metrics-aws.usage-default-2023.06.29-000001 -> tsdb-metrics-aws.usage-default: + Total documents sent: 10000 + Created: 9848 + Duplicates (409): 152 + Failed: 0 -Copying documents from .ds-metrics-aws.usage-default-2023.06.29-000001 to tsdb-index-enabled... -WARNING: Out of 10000 documents from the index .ds-metrics-aws.usage-default-2023.06.29-000001, 152 of them were discarded. +WARNING: 152 out of 10000 documents were duplicates (409 version conflict). +Saved 152 failed documents to duplicate-docs-metrics-aws.usage-default.ndjson Overwritten documents will be placed in new index. -Index tsdb-overwritten-docs successfully created. +Index tsdb-overwritten-metrics-aws.usage-default successfully created. The timestamp and dimensions of the first 10 overwritten documents are: - Timestamp 2023-06-29T13:24:00.000Z: @@ -379,13 +429,13 @@ change the data view to the one you just created: The index you use for documents is obtained in this line: ```python -all_placed = copy_from_data_stream(...) +total, created, duplicates, error_count, failed_docs = copy_from_data_stream(...) ``` In this, it would be the default, which is 0. If you set your own `docs_index`, then that one will be used. It does not matter if TSDB is enabled or not. The program will only -use this index to retrieve documents, so as long as there is data, +use this index to retrieve documents (via the scroll/scan API), so as long as there is data, nothing should go wrong. However, does it make sense to use an index with TSDB enabled to retrieve @@ -403,15 +453,18 @@ the routing path. **What is the name of the index where we are placing the documents with TSDB enabled?** -The index is named `tsdb-index-enabled`. You should be able to see this information -in the output messages. +The index is named `tsdb-`, for example `tsdb-metrics-aws.usage-default`. +Each data stream gets its own TSDB index so results are preserved when testing +multiple data streams at once. You should be able to see this information in the +output messages. **What is the name of the index where we are placing the overwritten documents?** -The index is named `tsdb-overwritten-docs`. You should be able to see this information -in the output messages. +The index is named `tsdb-overwritten-`, for example +`tsdb-overwritten-metrics-aws.usage-default`. You should be able to see this +information in the output messages. **Where are the defaults for every index created and everything else diff --git a/main.py b/main.py index ebd21ab..aa4c044 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,13 @@ from utils.es import * import argparse +import os +import sys program_defaults = { # Variables to configure the ES client: "elasticsearch_host": "https://localhost:9200", - "elasticsearch_ca_path": "/home/c/.elastic-package/profiles/default/certs/elasticsearch/ca-cert.pem", + # Path to the Elasticsearch certificate. e.g.: /home/USER/.elastic-package/profiles/default/certs/elasticsearch/ca-cert.pem. If not set, the program will use the default system/certifi CA bundle. + "elasticsearch_ca_path": "", "elasticsearch_user": "elastic", "elasticsearch_pwd": "changeme", @@ -13,8 +16,8 @@ "cloud_pwd": "", "cloud_id": "", - # Name of the data stream to test - "data_stream": "metrics-aws.usage-default", + # List of data streams to test (the program will run for each one) + "data_streams": ["metrics-aws.usage-default"], # docs_index: number of the index to use to retrieve the documents. -1 indicates the default will be used, # which would be 0 - indicating the first index of the data stream @@ -30,14 +33,10 @@ # Maximum documents to be reindexed to the new TSDB index. -1 indicates that we should reindex all documents. # Tip: Is reindexing too slow or encountering a timeout? Set this value. - "max_docs": -1 - + "max_docs": 10000 } program_defaults |= { - # Name of the directory to place files. - "directory_overlapping_files": "overwritten-docs" + "-" + program_defaults["data_stream"], - # Do you want to get in your @directory_overlapping_files the files that are overlapping? # Set this to True and delete the directory named directory_overlapping_files if it already exists! "get_overlapping_files": True, @@ -50,13 +49,29 @@ } +def get_per_stream_defaults(data_stream): + """Compute defaults that depend on the data stream name.""" + return { + # Name of the directory to place files. + "directory_overlapping_files": "overwritten-docs" + "-" + data_stream, + # File path to save documents that failed to index (e.g. missing routing fields) as NDJSON. + "failed_docs_file": "failed-docs" + "-" + data_stream + ".ndjson", + # File path to save duplicate documents (409 version conflicts) as NDJSON. + "duplicate_docs_file": "duplicate-docs" + "-" + data_stream + ".ndjson", + # Per-stream TSDB index names so multiple data streams don't overwrite each other. + "tsdb_index": make_tsdb_index_name(data_stream), + "overwritten_index": make_overwritten_index_name(data_stream), + } + + def get_cmd_arguments(): parser = argparse.ArgumentParser(description='Process command line arguments.', formatter_class=argparse.RawTextHelpFormatter) - # ES variables + # ES variables — defaults are None so we can distinguish "user passed a flag" from + # "argparse filled in the default" when layering env var overrides. parser.add_argument('--elasticsearch_host', action="store", dest='elasticsearch_host', - default=program_defaults["elasticsearch_host"], + default=None, help="Elasticsearch host.\nDefault: " + program_defaults["elasticsearch_host"]) parser.add_argument('--elasticsearch_ca_path', action="store", dest='elasticsearch_ca_path', @@ -64,10 +79,10 @@ def get_cmd_arguments(): help="Location of the Elasticsearch certificate.\nDefault: " + program_defaults["elasticsearch_ca_path"]) parser.add_argument('--elasticsearch_user', action="store", dest='elasticsearch_user', - default=program_defaults["elasticsearch_user"], + default=None, help="Name of the Elasticsearch user.\nDefault: " + program_defaults["elasticsearch_user"]) parser.add_argument('--elasticsearch_pwd', action="store", dest='elasticsearch_pwd', - default=program_defaults["elasticsearch_pwd"], + default=None, help="Elasticsearch password.\nDefault: " + program_defaults["elasticsearch_pwd"]) # Cloud variables @@ -78,10 +93,11 @@ def get_cmd_arguments(): help="The password for Elastic Cloud. If set, it will overwrite every elasticsearch_* argument." "\nDefault: " + program_defaults["cloud_pwd"]) - # Data stream name - parser.add_argument('--data_stream', action="store", dest='data_stream', default=program_defaults["data_stream"], - help="The name of the data stream to migrate to TSDB.\nDefault: " - + program_defaults["data_stream"]) + # Data stream names (comma-separated or repeated flag) + parser.add_argument('--data_streams', action="store", dest='data_streams', + default=",".join(program_defaults["data_streams"]), + help="Comma-separated list of data stream names to migrate to TSDB.\nDefault: " + + ",".join(program_defaults["data_streams"])) # Reindex variables if program_defaults["docs_index"] == -1: @@ -112,14 +128,32 @@ def get_cmd_arguments(): "\nDefault: " + default) # Overlapping files configuration + def parse_bool(value): + if isinstance(value, bool): + return value + if value.lower() in ("true", "1", "yes"): + return True + if value.lower() in ("false", "0", "no"): + return False + raise argparse.ArgumentTypeError("Boolean value expected, got '{}'".format(value)) + parser.add_argument('--get_overlapping_files', action="store", dest='get_overlapping_files', + type=parse_bool, default=program_defaults["get_overlapping_files"], help="Flag to place the overwritten documents: documents will be placed if True, otherwise" " if False.\nDefault: " + str(program_defaults["get_overlapping_files"])) parser.add_argument('--directory_overlapping_files', action="store", dest='directory_overlapping_files', - default=program_defaults["directory_overlapping_files"], + default=None, help="The directory path to place the overwritten documents.\nDefault: " - + program_defaults["directory_overlapping_files"]) + "overwritten-docs- (auto-generated per data stream)") + parser.add_argument('--failed_docs_file', action="store", dest='failed_docs_file', + default=None, + help="File path to save documents that failed to index as NDJSON (e.g. missing routing fields)." + "\nDefault: failed-docs-.ndjson (auto-generated per data stream)") + parser.add_argument('--duplicate_docs_file', action="store", dest='duplicate_docs_file', + default=None, + help="File path to save duplicate documents (409 version conflicts) as NDJSON." + "\nDefault: duplicate-docs-.ndjson (auto-generated per data stream)") parser.add_argument('--display_docs', action="store", dest='display_docs', default=program_defaults["display_docs"], help="Number of documents overlapping used to display the dimensions." @@ -134,12 +168,42 @@ def get_cmd_arguments(): parser.print_help() print("\nUser provided unknown flags:", unknown) print("Program will end.") - exit(0) + sys.exit(1) return args +def apply_env_overrides(args): + """ + Resolve ES credentials using the precedence: CLI flag > env var > hardcoded default. + Argparse defaults for these fields are None so we can detect whether the user + explicitly passed a flag. + Supported env vars: + ELASTIC_PACKAGE_ELASTICSEARCH_HOST -> elasticsearch_host + ELASTIC_PACKAGE_ELASTICSEARCH_USERNAME -> elasticsearch_user + ELASTIC_PACKAGE_ELASTICSEARCH_PASSWORD -> elasticsearch_pwd + """ + env_mapping = { + "ELASTIC_PACKAGE_ELASTICSEARCH_HOST": "elasticsearch_host", + "ELASTIC_PACKAGE_ELASTICSEARCH_USERNAME": "elasticsearch_user", + "ELASTIC_PACKAGE_ELASTICSEARCH_PASSWORD": "elasticsearch_pwd", + } + for env_var, arg_name in env_mapping.items(): + cli_value = getattr(args, arg_name) + if cli_value is not None: + continue + env_value = os.environ.get(env_var) + if env_value: + setattr(args, arg_name, env_value) + else: + setattr(args, arg_name, program_defaults[arg_name]) + + if __name__ == '__main__': args = get_cmd_arguments() + apply_env_overrides(args) + + # Parse comma-separated data streams + data_streams = [ds.strip() for ds in args.data_streams.split(",") if ds.strip()] print("Values being used:") for arg in vars(args): @@ -159,6 +223,9 @@ def get_cmd_arguments(): if arg == "max_docs" and getattr(args, arg) == -1: print("\t{} = {}".format(arg, "All documents")) continue + if arg in ("elasticsearch_pwd", "cloud_pwd"): + print("\t{} = {}".format(arg, "********")) + continue print("\t{} = {}".format(arg, getattr(args, arg))) print() @@ -168,14 +235,37 @@ def get_cmd_arguments(): args.elasticsearch_pwd, args.cloud_id, args.cloud_pwd) print("You're testing with version {}.\n".format(client.info()["version"]["number"])) - # Create TSDB index and place documents - all_placed = copy_from_data_stream(client, args.data_stream, int(args.docs_index), int(args.settings_mappings_index), - int(args.max_docs)) + for i, data_stream in enumerate(data_streams): + stream_defaults = get_per_stream_defaults(data_stream) + directory_overlapping_files = args.directory_overlapping_files or stream_defaults["directory_overlapping_files"] + failed_docs_file = args.failed_docs_file or stream_defaults["failed_docs_file"] + duplicate_docs_file = args.duplicate_docs_file or stream_defaults["duplicate_docs_file"] + tsdb_index_name = stream_defaults["tsdb_index"] + overwritten_index_name = stream_defaults["overwritten_index"] + + print("=" * 60) + print("[{}/{}] Processing data stream: {}".format(i + 1, len(data_streams), data_stream)) + print("=" * 60) + + # Create TSDB index and place documents using bulk API + total, created, duplicates, duplicate_docs, error_count, failed_docs = copy_from_data_stream( + client, data_stream, int(args.docs_index), int(args.settings_mappings_index), int(args.max_docs), + tsdb_index_name) + + # Save failed documents (errors other than duplicates, e.g. missing routing fields) + if error_count > 0: + print("{} documents failed to index. Saving failed documents for inspection.".format(error_count)) + save_failed_docs(failed_docs, failed_docs_file) + + # Save and inspect duplicate documents + if duplicates > 0: + print("WARNING: {} out of {} documents were duplicates (409 version conflict).".format(duplicates, total)) + save_failed_docs(duplicate_docs, duplicate_docs_file) + print("Overwritten documents will be placed in new index.") + create_index_missing_for_docs(client, tsdb_index_name, overwritten_index_name) + get_missing_docs_info(client, data_stream, int(args.display_docs), directory_overlapping_files, + args.get_overlapping_files, int(args.copy_docs_per_dimension), + overwritten_index_name) - # Get overwritten documents information - if not all_placed: - print("Overwritten documents will be placed in new index.") - create_index_missing_for_docs(client) - get_missing_docs_info(client, args.data_stream, int(args.display_docs), args.directory_overlapping_files, - bool(args.get_overlapping_files), int(args.copy_docs_per_dimension)) + print() diff --git a/utils/es.py b/utils/es.py index 4778a95..5cca5be 100644 --- a/utils/es.py +++ b/utils/es.py @@ -4,9 +4,11 @@ from elasticsearch import Elasticsearch from elasticsearch.client import IngestClient +from elasticsearch.helpers import scan import json import os.path +import sys from utils.tsdb import * @@ -29,11 +31,17 @@ def get_client(elasticsearch_host, elasticsearch_ca_path, elasticsearch_user, el cloud_id=cloud_id, basic_auth=("elastic", cloud_pwd) ) - return Elasticsearch( - hosts=elasticsearch_host, - ca_certs=elasticsearch_ca_path, - basic_auth=(elasticsearch_user, elasticsearch_pwd) - ) + kwargs = { + "hosts": elasticsearch_host, + "basic_auth": (elasticsearch_user, elasticsearch_pwd), + } + if elasticsearch_ca_path: + kwargs["ca_certs"] = elasticsearch_ca_path + else: + # Use default system/certifi CA bundle when no explicit path is provided + import ssl + kwargs["ssl_context"] = ssl.create_default_context() + return Elasticsearch(**kwargs) def add_doc_from_file(client: Elasticsearch, index_name: str, doc_path: str): @@ -61,11 +69,11 @@ def place_documents(client: Elasticsearch, index_name: str, folder_docs: str): print("Placing documents on the index {name}...".format(name=index_name)) if not client.indices.exists(index=index_name): print("Index {name} does not exist. Program will end.".format(name=index_name)) - exit(0) + sys.exit(1) if not os.path.isdir(folder_docs): print("Folder {} does not exist. Documents cannot be placed. Program will end.".format(folder_docs)) - exit(0) + sys.exit(1) for doc in os.listdir(folder_docs): doc_path = os.path.join(folder_docs, doc) @@ -95,12 +103,14 @@ def create_index(client: Elasticsearch, index_name: str, mappings: {} = {}, sett print("Index {name} successfully created.\n".format(name=index_name)) -def create_index_missing_for_docs(client: Elasticsearch): +def create_index_missing_for_docs(client: Elasticsearch, tsdb_index_name: str, overwritten_index_name: str): """ Create an index to place all the documents that were updated at least one time. :param client: ES client. + :param tsdb_index_name: name of the TSDB index that holds the indexed documents. + :param overwritten_index_name: name of the index to store overwritten documents. """ - create_index(client, overwritten_docs_index) + create_index(client, overwritten_index_name) pipelines = IngestClient(client) pipeline_name = 'get-missing-docs' pipelines.put_pipeline(id=pipeline_name, body={ @@ -114,11 +124,11 @@ def create_index_missing_for_docs(client: Elasticsearch): ] }) dest = { - "index": overwritten_docs_index, + "index": overwritten_index_name, "version_type": "external", "pipeline": pipeline_name } - client.reindex(source={"index": tsdb_index}, dest=dest, refresh=True) + client.reindex(source={"index": tsdb_index_name}, dest=dest, refresh=True) def build_query(dimensions_exist: {}, dimensions_missing: []): @@ -180,7 +190,8 @@ def get_and_place_documents(client: Elasticsearch, data_stream: str, dir_name: s def get_missing_docs_info(client: Elasticsearch, data_stream: str, display_docs: int, dir, - get_overlapping_files: bool, copy_docs_per_dimension: int): + get_overlapping_files: bool, copy_docs_per_dimension: int, + overwritten_index_name: str): """ Display the dimensions of the first @display_docs documents. If @get_overlapping_files is set to True, then @copy_docs_per_dimension documents will be placed in a directory @@ -190,7 +201,7 @@ def get_missing_docs_info(client: Elasticsearch, data_stream: str, display_docs: :param dir: name of the directory. :param get_overlapping_files: true if you want to place fields in the directory, false otherwise. :param copy_docs_per_dimension: number of documents to get for a set of dimensions. - :param docs_index: name of the index with the documents. + :param overwritten_index_name: name of the index holding overwritten documents. """ if get_overlapping_files: if os.path.exists(dir): @@ -201,7 +212,7 @@ def get_missing_docs_info(client: Elasticsearch, data_stream: str, display_docs: n = 1 body = {'size': display_docs, 'query': {'match_all': {}}} - res = client.search(index=overwritten_docs_index, body=body) + res = client.search(index=overwritten_index_name, body=body) dimensions = time_series_fields["dimension"] print("The timestamp and dimensions of the first {} overwritten documents are:".format(display_docs)) @@ -231,37 +242,137 @@ def get_missing_docs_info(client: Elasticsearch, data_stream: str, display_docs: n += 1 -def copy_docs_from_to(client: Elasticsearch, source_index: str, dest_index: str, max_docs: int): +def _process_bulk_batch(client: Elasticsearch, operations: [], batch_sources: []): """ - Copy documents from one index to the other. + Send a single bulk request and categorize the per-document results. + :param client: ES client. + :param operations: list of alternating action / source dicts for client.bulk(). + :param batch_sources: list of original _source dicts (one per document, aligned with operations). + :return: (created, duplicates, duplicate_docs, error_count, failed_docs) for this batch. + """ + resp = client.bulk(operations=operations) + created = 0 + duplicates = 0 + duplicate_docs = [] + error_count = 0 + failed = [] + + for i, item in enumerate(resp["items"]): + action_result = item["create"] + if "error" in action_result: + if action_result.get("status") == 409: + duplicates += 1 + duplicate_docs.append({ + "status": 409, + "document": batch_sources[i] + }) + else: + error_count += 1 + failed.append({ + "error": action_result["error"], + "status": action_result.get("status"), + "document": batch_sources[i] + }) + elif action_result.get("result") == "created": + created += 1 + else: + error_count += 1 + failed.append({ + "error": {"type": "unexpected_result", "reason": "result was: {}".format(action_result.get("result"))}, + "status": action_result.get("status"), + "document": batch_sources[i] + }) + + return created, duplicates, duplicate_docs, error_count, failed + + +def save_failed_docs(failed_docs: [], filepath: str): + """ + Save documents that failed during bulk indexing to a single NDJSON file for inspection. + Each line contains a JSON object with the error details and the original document source. + :param failed_docs: list of dicts with 'error', 'status', and 'document' keys. + :param filepath: path to the NDJSON file where failed documents will be saved. + """ + if not failed_docs: + return + + if os.path.exists(filepath): + print("WARNING: The file {} already exists. It will be replaced.\n".format(filepath)) + + with open(filepath, 'w') as f: + for failed in failed_docs: + f.write(json.dumps(failed) + "\n") + + print("Saved {} failed documents to {}\n".format(len(failed_docs), filepath)) + + +def copy_docs_from_to(client: Elasticsearch, source_index: str, dest_index: str, max_docs: int, + batch_size: int = 1000): + """ + Copy documents from one index to the other using scan + bulk API. + This mimics how Elastic Agent integrations send data via the _bulk API, + rather than using the server-side reindex API. :param client: ES client. :param source_index: source index with the documents to be copied to a new index. :param dest_index: destination index for the documents. - :param max_docs: max number of documents to copy. - :return: True if the number of documents is the same in the new index as it was in the old index. + :param max_docs: max number of documents to copy. -1 means all. + :param batch_size: number of documents per bulk request. + :return: (total, created, duplicates, duplicate_docs, error_count, failed_docs). """ print("Copying documents from {} to {}...".format(source_index, dest_index)) if not client.indices.exists(index=source_index): print("Source index {name} does not exist. Program will end.".format(name=source_index)) - exit(0) + sys.exit(1) + + total = 0 + created = 0 + duplicates = 0 + duplicate_docs = [] + error_count = 0 + failed_docs = [] + + operations = [] + batch_sources = [] + + for doc in scan(client, index=source_index): + if max_docs != -1 and total >= max_docs: + break + + operations.append({"create": {"_index": dest_index}}) + operations.append(doc["_source"]) + batch_sources.append(doc["_source"]) + total += 1 + + if len(batch_sources) >= batch_size: + c, d, dd, e, f = _process_bulk_batch(client, operations, batch_sources) + created += c + duplicates += d + duplicate_docs.extend(dd) + error_count += e + failed_docs.extend(f) + operations = [] + batch_sources = [] + + # Send remaining documents + if operations: + c, d, dd, e, f = _process_bulk_batch(client, operations, batch_sources) + created += c + duplicates += d + duplicate_docs.extend(dd) + error_count += e + failed_docs.extend(f) + + client.indices.refresh(index=dest_index) + + # Print summary + print("\nBulk indexing summary for {} -> {}:".format(source_index, dest_index)) + print("\tTotal documents sent: {}".format(total)) + print("\tCreated: {}".format(created)) + print("\tDuplicates (409): {}".format(duplicates)) + print("\tFailed: {}".format(error_count)) + print() - if max_docs != -1: - resp = client.reindex(source={"index": source_index}, dest={"index": dest_index}, refresh=True, - max_docs=max_docs) - else: - resp = client.reindex(source={"index": source_index}, dest={"index": dest_index}, refresh=True) - if resp["updated"] > 0: - print("WARNING: Out of {} documents from the index {}, {} of them were discarded.\n".format(resp["total"], - source_index, - resp[ - "updated"])) - return False - else: - print( - "All {} documents taken from index {} were successfully placed to index {}.\n".format(resp["total"], - source_index, - dest_index)) - return True + return total, created, duplicates, duplicate_docs, error_count, failed_docs def get_tsdb_config(client: Elasticsearch, data_stream_name: str, docs_index: int, settings_mappings_index: int): @@ -283,7 +394,7 @@ def get_tsdb_config(client: Elasticsearch, data_stream_name: str, docs_index: in print("ERROR: Data stream {} has {} indexes. Documents index number {} is not valid.".format(data_stream_name, n_indexes, docs_index)) - exit(0) + sys.exit(1) # Get index to use for settings/mappings if settings_mappings_index == -1: @@ -291,7 +402,7 @@ def get_tsdb_config(client: Elasticsearch, data_stream_name: str, docs_index: in elif settings_mappings_index >= n_indexes: print("ERROR: Data stream {} has {} indexes. Settings/mappings index number {} is not valid.".format( data_stream_name, n_indexes, settings_mappings_index)) - exit(0) + sys.exit(1) docs_index_name = data_stream["data_streams"][0]["indices"][docs_index]["index_name"] settings_mappings_index_name = data_stream["data_streams"][0]["indices"][settings_mappings_index]["index_name"] @@ -308,26 +419,27 @@ def get_tsdb_config(client: Elasticsearch, data_stream_name: str, docs_index: in return docs_index_name, mappings, settings -def copy_from_data_stream(client: Elasticsearch, data_stream_name: str, docs_index: int,settings_mappings_index: int, - max_docs: int): +def copy_from_data_stream(client: Elasticsearch, data_stream_name: str, docs_index: int, settings_mappings_index: int, + max_docs: int, tsdb_index_name: str): """ Given a data stream, it copies the documents retrieved from the given index and places them in a new - index with TSDB enabled. + index with TSDB enabled, using the bulk API to simulate how Elastic Agent sends data. :param client: ES client. :param data_stream_name: name of the data stream. :param docs_index: number of the index to use to retrieve the documents. :param settings_mappings_index: number of the index to use to get the mappings and settings for the TSDB index. - :param max_docs: maximum documents to be reindexed. - :return: True if the number of documents placed to the TSDB index remained the same. False otherwise. + :param max_docs: maximum documents to be indexed. + :param tsdb_index_name: name of the destination TSDB index. + :return: (total, created, duplicates, duplicate_docs, error_count, failed_docs). """ print("Testing data stream {}.".format(data_stream_name)) if not client.indices.exists(index=data_stream_name): print("\tData stream {} does not exist. Program will end.".format(data_stream_name)) - exit(0) + sys.exit(1) source_index, mappings, settings = get_tsdb_config(client, data_stream_name, docs_index, settings_mappings_index) - create_index(client, tsdb_index, mappings, settings) + create_index(client, tsdb_index_name, mappings, settings) - return copy_docs_from_to(client, source_index, tsdb_index, max_docs) + return copy_docs_from_to(client, source_index, tsdb_index_name, max_docs) diff --git a/utils/tsdb.py b/utils/tsdb.py index 05adb24..3085295 100644 --- a/utils/tsdb.py +++ b/utils/tsdb.py @@ -5,6 +5,8 @@ in the ES Python client. If the situation changes, the function will no longer be accurate. """ +import sys + # This is a dictionary for all the time series fields accepted as of today (29.June.2023). # routing_path is also part of the dictionary since it is mandatory to have it for a time series index. time_series_fields = { @@ -18,11 +20,15 @@ # As of today (29.June.2023), only keyword fields are accepted. accepted_fields_for_routing = ["keyword"] -# The name of the index with TSDB enabled. -tsdb_index = "tsdb-index-enabled" -# This is the index in which we will store the documents that were overwritten - ie, the ones that caused us -# to lose data. -overwritten_docs_index = "tsdb-overwritten-docs" + +def make_tsdb_index_name(data_stream): + """Return the TSDB-enabled index name for a given data stream.""" + return "tsdb-" + data_stream + + +def make_overwritten_index_name(data_stream): + """Return the overwritten-docs index name for a given data stream.""" + return "tsdb-overwritten-" + data_stream # Some settings cause an error as they are not known to ElasticSearch Python client. @@ -41,6 +47,9 @@ def get_time_series_fields(mappings: {}): Place all fields in the time time_series_fields dictionary. :param mappings: Mappings dictionary. """ + for key in time_series_fields: + time_series_fields[key].clear() + fields = mappings["properties"] # A function to flatten the name of the fields @@ -75,7 +84,7 @@ def cluster_fields_by_type(fields: {}): if len(time_series_fields["routing_path"]) == 0: print("Routing path is empty. Program will end.") - exit(0) + sys.exit(1) print("The time series fields for the TSDB index are: ") for key in time_series_fields: