From 780b1fab7ff4eacd1c41611f25aab266e7a6f2a9 Mon Sep 17 00:00:00 2001 From: Joseph Amlung Date: Wed, 17 Jun 2026 13:10:35 -0400 Subject: [PATCH] Add reindex commands for sources, expansions, and admin ES indexes Introduces `ocl index` command group covering all reindex endpoints available in the OCL API: source-scoped concept/mapping reindex (requires source ownership), collection expansion concept/mapping reindex (admin only), and global ES admin operations (rebuild, populate, batch resource reindex). Co-Authored-By: Claude Sonnet 4.6 --- src/ocl_cli/api_client.py | 115 ++++++++++++++++++++++++ src/ocl_cli/commands/index.py | 164 ++++++++++++++++++++++++++++++++++ src/ocl_cli/main.py | 2 + src/ocl_cli/output.py | 20 +++++ 4 files changed, 301 insertions(+) create mode 100644 src/ocl_cli/commands/index.py diff --git a/src/ocl_cli/api_client.py b/src/ocl_cli/api_client.py index 83b3d1d..ef8c4a9 100644 --- a/src/ocl_cli/api_client.py +++ b/src/ocl_cli/api_client.py @@ -1330,6 +1330,121 @@ def remove_org_member(self, org: str, username: str) -> dict: self._require_auth() return self.delete(f"/orgs/{org}/members/{username}/") + # ── Index / reindex operations ────────────────────────────────── + + def _post_multipart(self, endpoint: str, fields: Optional[dict] = None) -> Any: + """POST as multipart/form-data for admin endpoints that use DRF MultiPartParser.""" + self._require_auth() + self._log_request("POST", endpoint, body=fields) + mp: dict[str, Any] = {} + if fields: + for k, v in fields.items(): + if v is not None: + mp[k] = (None, str(v)) + if not mp: + mp = {"_": (None, "")} + response = self.client.post(endpoint, files=mp) + self._handle_error(response) + if not response.content: + return {} + try: + return response.json() + except Exception: + return {} + + def index_source_concepts( + self, + owner: str, + source: str, + owner_type: str = "orgs", + version: Optional[str] = None, + single_batch: bool = False, + parallel: bool = True, + ) -> dict: + """Trigger reindexing of concepts for a source.""" + self._require_auth() + endpoint = _build_repo_endpoint(owner_type, owner, "source", source, version, "concepts/indexes/") + body: dict[str, Any] = {} + if single_batch: + body["single_batch"] = True + if not parallel: + body["parallel"] = False + return self.post(endpoint, json=body or None) + + def index_source_mappings( + self, + owner: str, + source: str, + owner_type: str = "orgs", + version: Optional[str] = None, + single_batch: bool = False, + ) -> dict: + """Trigger reindexing of mappings for a source.""" + self._require_auth() + endpoint = _build_repo_endpoint(owner_type, owner, "source", source, version, "mappings/indexes/") + body: dict[str, Any] = {} + if single_batch: + body["single_batch"] = True + return self.post(endpoint, json=body or None) + + def index_expansion_concepts( + self, + owner: str, + collection: str, + version: str, + expansion: str, + owner_type: str = "orgs", + ) -> dict: + """Trigger reindexing of concepts in a collection expansion (admin only).""" + self._require_auth() + _validate_owner_type(owner_type) + endpoint = f"/{owner_type}/{owner}/collections/{collection}/{version}/expansions/{expansion}/concepts/index/" + return self.post(endpoint) + + def index_expansion_mappings( + self, + owner: str, + collection: str, + version: str, + expansion: str, + owner_type: str = "orgs", + ) -> dict: + """Trigger reindexing of mappings in a collection expansion (admin only).""" + self._require_auth() + _validate_owner_type(owner_type) + endpoint = f"/{owner_type}/{owner}/collections/{collection}/{version}/expansions/{expansion}/mappings/index/" + return self.post(endpoint) + + def index_rebuild(self, apps: Optional[str] = None) -> dict: + """Rebuild all Elasticsearch indexes (admin only).""" + fields = {"apps": apps} if apps else None + return self._post_multipart("/indexes/apps/rebuild/", fields) + + def index_populate(self, apps: Optional[str] = None) -> dict: + """Populate Elasticsearch indexes (admin only).""" + fields = {"apps": apps} if apps else None + return self._post_multipart("/indexes/apps/populate/", fields) + + def index_resource( + self, + resource: str, + ids: Optional[str] = None, + uri: Optional[str] = None, + filter_str: Optional[str] = None, + update_indexed: bool = False, + ) -> dict: + """Batch reindex a specific resource type (admin only).""" + fields: dict[str, Any] = {} + if ids: + fields["ids"] = ids + elif uri: + fields["uri"] = uri + elif filter_str: + fields["filter"] = filter_str + if update_indexed: + fields["update_indexed"] = "true" + return self._post_multipart(f"/indexes/resources/{resource}/", fields or None) + # ── Task operations ───────────────────────────────────────────── def list_tasks( diff --git a/src/ocl_cli/commands/index.py b/src/ocl_cli/commands/index.py new file mode 100644 index 0000000..313caa4 --- /dev/null +++ b/src/ocl_cli/commands/index.py @@ -0,0 +1,164 @@ +"""Index commands: trigger Elasticsearch reindexing for sources, expansions, and global indexes.""" + +import sys + +import click + +from ocl_cli.api_client import APIError +from ocl_cli.main import handle_api_error +from ocl_cli.output import output_result, format_index_task + + +@click.group() +def index(): + """Trigger Elasticsearch reindexing operations.""" + pass + + +# ── Source subgroup ────────────────────────────────────────────────── + +@index.group() +def source(): + """Reindex concepts or mappings for a source.""" + pass + + +@source.command("concepts") +@click.argument("owner") +@click.argument("source_name", metavar="SOURCE") +@click.option("--version", help="Source version to reindex (omit for HEAD)") +@click.option("--owner-type", type=click.Choice(["users", "orgs"]), default="orgs") +@click.option("--single-batch", is_flag=True, help="Process as a single batch instead of parallel chunks") +@click.option("--no-parallel", is_flag=True, help="Disable parallel processing") +@click.pass_context +def source_concepts(ctx, owner, source_name, version, owner_type, single_batch, no_parallel): + """Reindex all concepts for a source.""" + client = ctx.obj["client"] + try: + result = client.index_source_concepts( + owner, source_name, owner_type=owner_type, version=version, + single_batch=single_batch, parallel=not no_parallel, + ) + output_result(ctx, result, format_index_task) + except APIError as e: + handle_api_error(e) + + +@source.command("mappings") +@click.argument("owner") +@click.argument("source_name", metavar="SOURCE") +@click.option("--version", help="Source version to reindex (omit for HEAD)") +@click.option("--owner-type", type=click.Choice(["users", "orgs"]), default="orgs") +@click.option("--single-batch", is_flag=True, help="Process as a single batch instead of parallel chunks") +@click.pass_context +def source_mappings(ctx, owner, source_name, version, owner_type, single_batch): + """Reindex all mappings for a source.""" + client = ctx.obj["client"] + try: + result = client.index_source_mappings( + owner, source_name, owner_type=owner_type, version=version, + single_batch=single_batch, + ) + output_result(ctx, result, format_index_task) + except APIError as e: + handle_api_error(e) + + +# ── Expansion subgroup ─────────────────────────────────────────────── + +@index.group() +def expansion(): + """Reindex concepts or mappings for a collection expansion (admin only).""" + pass + + +@expansion.command("concepts") +@click.argument("owner") +@click.argument("collection") +@click.argument("version") +@click.argument("expansion") +@click.option("--owner-type", type=click.Choice(["users", "orgs"]), default="orgs") +@click.pass_context +def expansion_concepts(ctx, owner, collection, version, expansion, owner_type): + """Reindex concepts for a collection expansion.""" + client = ctx.obj["client"] + try: + result = client.index_expansion_concepts( + owner, collection, version, expansion, owner_type=owner_type, + ) + output_result(ctx, result, format_index_task) + except APIError as e: + handle_api_error(e) + + +@expansion.command("mappings") +@click.argument("owner") +@click.argument("collection") +@click.argument("version") +@click.argument("expansion") +@click.option("--owner-type", type=click.Choice(["users", "orgs"]), default="orgs") +@click.pass_context +def expansion_mappings(ctx, owner, collection, version, expansion, owner_type): + """Reindex mappings for a collection expansion.""" + client = ctx.obj["client"] + try: + result = client.index_expansion_mappings( + owner, collection, version, expansion, owner_type=owner_type, + ) + output_result(ctx, result, format_index_task) + except APIError as e: + handle_api_error(e) + + +# ── Admin commands ─────────────────────────────────────────────────── + +@index.command() +@click.option("--apps", help="Comma-separated app names to rebuild (omit for all)") +@click.pass_context +def rebuild(ctx, apps): + """Rebuild all Elasticsearch indexes from scratch (admin only).""" + client = ctx.obj["client"] + try: + result = client.index_rebuild(apps=apps) + output_result(ctx, result, format_index_task) + except APIError as e: + handle_api_error(e) + + +@index.command() +@click.option("--apps", help="Comma-separated app names to populate (omit for all)") +@click.pass_context +def populate(ctx, apps): + """Populate Elasticsearch indexes without rebuilding (admin only).""" + client = ctx.obj["client"] + try: + result = client.index_populate(apps=apps) + output_result(ctx, result, format_index_task) + except APIError as e: + handle_api_error(e) + + +@index.command() +@click.argument("resource") +@click.option("--ids", help="Comma-separated resource IDs to reindex") +@click.option("--uri", help="URI pattern to match resources for reindexing") +@click.option("--filter", "filter_str", help="JSON filter string to select resources") +@click.option("--update-indexed", is_flag=True, help="Update the indexed flag on matched resources") +@click.pass_context +def resource(ctx, resource, ids, uri, filter_str, update_indexed): + """Batch reindex a specific resource type (admin only). + + RESOURCE is the resource type, e.g. concepts, mappings, sources, collections. + Exactly one of --ids, --uri, or --filter is required. + """ + if not ids and not uri and not filter_str: + click.echo("Error: one of --ids, --uri, or --filter is required.", err=True) + sys.exit(1) + client = ctx.obj["client"] + try: + result = client.index_resource( + resource, ids=ids, uri=uri, filter_str=filter_str, update_indexed=update_indexed, + ) + output_result(ctx, result, format_index_task) + except APIError as e: + handle_api_error(e) diff --git a/src/ocl_cli/main.py b/src/ocl_cli/main.py index f0766dd..b4df859 100644 --- a/src/ocl_cli/main.py +++ b/src/ocl_cli/main.py @@ -96,5 +96,7 @@ def handle_api_error(e: APIError) -> None: # Utility commands from ocl_cli.commands.reference import reference # noqa: E402 +from ocl_cli.commands.index import index # noqa: E402 cli.add_command(reference) +cli.add_command(index) diff --git a/src/ocl_cli/output.py b/src/ocl_cli/output.py index ed425b7..708d156 100644 --- a/src/ocl_cli/output.py +++ b/src/ocl_cli/output.py @@ -992,6 +992,26 @@ def format_import_status(data: dict | list) -> str: return "\n".join(lines) +def format_index_task(data: dict) -> str: + """Format reindex task submission response.""" + if not data: + return "Reindex accepted." + lines = ["Reindex accepted."] + task_id = data.get("id") or data.get("task", "") + if task_id: + lines.append(f" Task ID: {task_id}") + state = data.get("state", "") + if state: + lines.append(f" State: {state}") + queue = data.get("queue", "") + if queue: + lines.append(f" Queue: {queue}") + username = data.get("username", "") + if username: + lines.append(f" User: {username}") + return "\n".join(lines) + + def format_export_status(data: dict) -> str: """Format export status for human output.""" status = data.get("status", "unknown")