From 1625804e4d4464573c39f0c2c106a8e44ba2665d Mon Sep 17 00:00:00 2001 From: Tony Chow Date: Tue, 2 Jun 2026 15:50:12 +0100 Subject: [PATCH 1/2] Added scripts to cleardown dm store on aat --- scripts/dm_store/dm_store_meta_cleardown.py | 94 +++++++++++++++++++++ scripts/dm_store/requirements.txt | 6 ++ 2 files changed, 100 insertions(+) create mode 100644 scripts/dm_store/dm_store_meta_cleardown.py create mode 100644 scripts/dm_store/requirements.txt diff --git a/scripts/dm_store/dm_store_meta_cleardown.py b/scripts/dm_store/dm_store_meta_cleardown.py new file mode 100644 index 000000000..9b0788d32 --- /dev/null +++ b/scripts/dm_store/dm_store_meta_cleardown.py @@ -0,0 +1,94 @@ +# Pre-requisites. You need to login to az cli locally with your user having: +# - read permissions on the ingest meta002 vault and also read permissions. +# - read permissions on the ingest curated storage account. +# - be on F5 VPN to access the curated storage account. + +import argparse +import logging +import pandas as pd +import psycopg2 +from azure.identity import AzureCliCredential +from azure.keyvault.secrets import SecretClient +from deltalake import DeltaTable + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--lz-key", default="00") + parser.add_argument("--env", default="sbox") + return parser.parse_args() + +SECRET_KEYS = [ + "DM-STORE-HOST", + "DM-STORE-PORT", + "DM-STORE-DB", + "DM-STORE-USER", + "DM-STORE-PASS", +] + + +def get_secrets(vault_url: str, credential: AzureCliCredential) -> dict[str, str]: + client = SecretClient(vault_url=vault_url, credential=credential) + secrets = {} + for key in SECRET_KEYS: + logger.info(f"Retrieving secret: {key}") + secrets[key] = client.get_secret(key).value + logger.info(f"Successfully retrieved: {key}") + return secrets + + +def connect(secrets: dict[str, str]) -> psycopg2.extensions.connection: + conn = psycopg2.connect( + host=secrets["DM-STORE-HOST"], + port=int(secrets["DM-STORE-PORT"]), + dbname=secrets["DM-STORE-DB"], + user=secrets["DM-STORE-USER"], + password=secrets["DM-STORE-PASS"], + connect_timeout=10, + ) + logger.info("Database connection established") + return conn + + +def read_delta(storage_account, delta_path, credential: AzureCliCredential) -> pd.DataFrame: + token = credential.get_token("https://storage.azure.com/.default").token + storage_options = { + "azure_storage_account_name": storage_account, + "azure_storage_token": token, + } + logger.info(f"Reading Delta table from: {delta_path}") + dt = DeltaTable(delta_path, storage_options=storage_options) + df = dt.to_pandas() + logger.info(f"Read {len(df):,} rows from ack_audit") + return df + + +def main(): + args = parse_args() + credential = AzureCliCredential() + + ack_audit_path = "az://silver/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/CDAM/ack_audit" + audit_results = read_delta(f"ingest{args.lz_key}curated{args.env}", ack_audit_path, credential) + + audit_results.show() + + keyvault_url = f"https://ingest{args.lz_key}-meta002-{args.env}.vault.azure.net/" + secrets = get_secrets(keyvault_url, credential) + conn = connect(secrets) + try: + with conn.cursor() as cur: + cur.execute("SELECT version();") + version = cur.fetchone() + logger.info(f"Connected to: {version[0]}") + finally: + conn.close() + logger.info("Connection closed") + + +if __name__ == "__main__": + main() diff --git a/scripts/dm_store/requirements.txt b/scripts/dm_store/requirements.txt new file mode 100644 index 000000000..98c7b6bf9 --- /dev/null +++ b/scripts/dm_store/requirements.txt @@ -0,0 +1,6 @@ +azure-identity>=1.19.0 +azure-keyvault-secrets>=4.9.0 +deltalake>=0.22.0 +pandas>=2.0.0 +pyarrow>=15.0.0 +psycopg2-binary>=2.9.0 From f354e54bd4e945a376aca20dcbc2389f115c0ffe Mon Sep 17 00:00:00 2001 From: Tony Chow Date: Wed, 3 Jun 2026 16:44:53 +0100 Subject: [PATCH 2/2] Added cleardown script for CCD metadata in DM store which is causing the hash verify issue --- scripts/dm_store/dm_store_meta_cleardown.py | 54 +++++++++++++++++---- scripts/dm_store/requirements.txt | 3 +- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/scripts/dm_store/dm_store_meta_cleardown.py b/scripts/dm_store/dm_store_meta_cleardown.py index 9b0788d32..cd23ec2db 100644 --- a/scripts/dm_store/dm_store_meta_cleardown.py +++ b/scripts/dm_store/dm_store_meta_cleardown.py @@ -2,11 +2,17 @@ # - read permissions on the ingest meta002 vault and also read permissions. # - read permissions on the ingest curated storage account. # - be on F5 VPN to access the curated storage account. +# To get the count of records to be deleted, run: +# python3 dm_store_meta_cleardown.py --lz-key 01 --env stg +# To perform the actual delete, run: +# python3 dm_store_meta_cleardown.py --lz-key 01 --env stg --delete-run import argparse import logging +import adlfs import pandas as pd import psycopg2 +import pyarrow.dataset as ds from azure.identity import AzureCliCredential from azure.keyvault.secrets import SecretClient from deltalake import DeltaTable @@ -21,6 +27,7 @@ def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--lz-key", default="00") parser.add_argument("--env", default="sbox") + parser.add_argument("--delete-run", action="store_true") return parser.parse_args() SECRET_KEYS = [ @@ -55,15 +62,20 @@ def connect(secrets: dict[str, str]) -> psycopg2.extensions.connection: return conn -def read_delta(storage_account, delta_path, credential: AzureCliCredential) -> pd.DataFrame: - token = credential.get_token("https://storage.azure.com/.default").token +def read_delta(storage_account: str, delta_path: str, credential: AzureCliCredential) -> pd.DataFrame: storage_options = { - "azure_storage_account_name": storage_account, - "azure_storage_token": token, + "account_name": storage_account, + "use_azure_cli": "true", } - logger.info(f"Reading Delta table from: {delta_path}") + logger.info(f"Reading delta table from: {delta_path}") dt = DeltaTable(delta_path, storage_options=storage_options) - df = dt.to_pandas() + + active_files = [f.removeprefix("az://") for f in dt.file_uris()] + logger.info(f"Found {len(active_files)} active parquet files in transaction log") + + fs = adlfs.AzureBlobFileSystem(account_name=storage_account, credential=credential) + dataset = ds.dataset(active_files, filesystem=fs, format="parquet") + df = dataset.to_table().to_pandas() logger.info(f"Read {len(df):,} rows from ack_audit") return df @@ -75,16 +87,38 @@ def main(): ack_audit_path = "az://silver/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/CDAM/ack_audit" audit_results = read_delta(f"ingest{args.lz_key}curated{args.env}", ack_audit_path, credential) - audit_results.show() + uuids = audit_results["document_url"].str.split("/").str[-1].tolist() + logger.info(f"Extracted {len(uuids):,} UUIDs from document_url") + logger.info(uuids) + + if args.delete_run: + query = """ + DELETE FROM documentmetadata + WHERE name = 'case_id' + AND documentmetadata_id IN %s + """ + else: + query = """ + SELECT COUNT(*) FROM documentmetadata + WHERE name = 'case_id' + AND documentmetadata_id IN %s + """ + + logger.info("About to run query:") + logger.info(query) keyvault_url = f"https://ingest{args.lz_key}-meta002-{args.env}.vault.azure.net/" secrets = get_secrets(keyvault_url, credential) conn = connect(secrets) try: with conn.cursor() as cur: - cur.execute("SELECT version();") - version = cur.fetchone() - logger.info(f"Connected to: {version[0]}") + cur.execute(query, (tuple(uuids),)) + if args.delete_run: + logger.info(f"Deleted {cur.rowcount:,} rows from documentmetadata") + else: + count = cur.fetchone()[0] + logger.info(f"Count of records for deletion = {count:,}") + conn.commit() finally: conn.close() logger.info("Connection closed") diff --git a/scripts/dm_store/requirements.txt b/scripts/dm_store/requirements.txt index 98c7b6bf9..ea20bde14 100644 --- a/scripts/dm_store/requirements.txt +++ b/scripts/dm_store/requirements.txt @@ -1,6 +1,7 @@ +adlfs>=2024.2.0 azure-identity>=1.19.0 azure-keyvault-secrets>=4.9.0 -deltalake>=0.22.0 +deltalake>=1.6.0 pandas>=2.0.0 pyarrow>=15.0.0 psycopg2-binary>=2.9.0