Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions src/ocl_cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,121 @@ def remove_org_member(self, org: str, username: str) -> dict:
self._require_auth()
return self.delete(f"/orgs/{org}/members/{username}/")

# ── Index / reindex operations ──────────────────────────────────

def _post_multipart(self, endpoint: str, fields: Optional[dict] = None) -> Any:
"""POST as multipart/form-data for admin endpoints that use DRF MultiPartParser."""
self._require_auth()
self._log_request("POST", endpoint, body=fields)
mp: dict[str, Any] = {}
if fields:
for k, v in fields.items():
if v is not None:
mp[k] = (None, str(v))
if not mp:
mp = {"_": (None, "")}
response = self.client.post(endpoint, files=mp)
self._handle_error(response)
if not response.content:
return {}
try:
return response.json()
except Exception:
return {}

def index_source_concepts(
self,
owner: str,
source: str,
owner_type: str = "orgs",
version: Optional[str] = None,
single_batch: bool = False,
parallel: bool = True,
) -> dict:
"""Trigger reindexing of concepts for a source."""
self._require_auth()
endpoint = _build_repo_endpoint(owner_type, owner, "source", source, version, "concepts/indexes/")
body: dict[str, Any] = {}
if single_batch:
body["single_batch"] = True
if not parallel:
body["parallel"] = False
return self.post(endpoint, json=body or None)

def index_source_mappings(
self,
owner: str,
source: str,
owner_type: str = "orgs",
version: Optional[str] = None,
single_batch: bool = False,
) -> dict:
"""Trigger reindexing of mappings for a source."""
self._require_auth()
endpoint = _build_repo_endpoint(owner_type, owner, "source", source, version, "mappings/indexes/")
body: dict[str, Any] = {}
if single_batch:
body["single_batch"] = True
return self.post(endpoint, json=body or None)

def index_expansion_concepts(
self,
owner: str,
collection: str,
version: str,
expansion: str,
owner_type: str = "orgs",
) -> dict:
"""Trigger reindexing of concepts in a collection expansion (admin only)."""
self._require_auth()
_validate_owner_type(owner_type)
endpoint = f"/{owner_type}/{owner}/collections/{collection}/{version}/expansions/{expansion}/concepts/index/"
return self.post(endpoint)

def index_expansion_mappings(
self,
owner: str,
collection: str,
version: str,
expansion: str,
owner_type: str = "orgs",
) -> dict:
"""Trigger reindexing of mappings in a collection expansion (admin only)."""
self._require_auth()
_validate_owner_type(owner_type)
endpoint = f"/{owner_type}/{owner}/collections/{collection}/{version}/expansions/{expansion}/mappings/index/"
return self.post(endpoint)

def index_rebuild(self, apps: Optional[str] = None) -> dict:
"""Rebuild all Elasticsearch indexes (admin only)."""
fields = {"apps": apps} if apps else None
return self._post_multipart("/indexes/apps/rebuild/", fields)

def index_populate(self, apps: Optional[str] = None) -> dict:
"""Populate Elasticsearch indexes (admin only)."""
fields = {"apps": apps} if apps else None
return self._post_multipart("/indexes/apps/populate/", fields)

def index_resource(
self,
resource: str,
ids: Optional[str] = None,
uri: Optional[str] = None,
filter_str: Optional[str] = None,
update_indexed: bool = False,
) -> dict:
"""Batch reindex a specific resource type (admin only)."""
fields: dict[str, Any] = {}
if ids:
fields["ids"] = ids
elif uri:
fields["uri"] = uri
elif filter_str:
fields["filter"] = filter_str
if update_indexed:
fields["update_indexed"] = "true"
return self._post_multipart(f"/indexes/resources/{resource}/", fields or None)

# ── Task operations ─────────────────────────────────────────────

def list_tasks(
Expand Down
164 changes: 164 additions & 0 deletions src/ocl_cli/commands/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""Index commands: trigger Elasticsearch reindexing for sources, expansions, and global indexes."""

import sys

import click

from ocl_cli.api_client import APIError
from ocl_cli.main import handle_api_error
from ocl_cli.output import output_result, format_index_task


@click.group()
def index():
"""Trigger Elasticsearch reindexing operations."""
pass


# ── Source subgroup ──────────────────────────────────────────────────

@index.group()
def source():
"""Reindex concepts or mappings for a source."""
pass


@source.command("concepts")
@click.argument("owner")
@click.argument("source_name", metavar="SOURCE")
@click.option("--version", help="Source version to reindex (omit for HEAD)")
@click.option("--owner-type", type=click.Choice(["users", "orgs"]), default="orgs")
@click.option("--single-batch", is_flag=True, help="Process as a single batch instead of parallel chunks")
@click.option("--no-parallel", is_flag=True, help="Disable parallel processing")
@click.pass_context
def source_concepts(ctx, owner, source_name, version, owner_type, single_batch, no_parallel):
"""Reindex all concepts for a source."""
client = ctx.obj["client"]
try:
result = client.index_source_concepts(
owner, source_name, owner_type=owner_type, version=version,
single_batch=single_batch, parallel=not no_parallel,
)
output_result(ctx, result, format_index_task)
except APIError as e:
handle_api_error(e)


@source.command("mappings")
@click.argument("owner")
@click.argument("source_name", metavar="SOURCE")
@click.option("--version", help="Source version to reindex (omit for HEAD)")
@click.option("--owner-type", type=click.Choice(["users", "orgs"]), default="orgs")
@click.option("--single-batch", is_flag=True, help="Process as a single batch instead of parallel chunks")
@click.pass_context
def source_mappings(ctx, owner, source_name, version, owner_type, single_batch):
"""Reindex all mappings for a source."""
client = ctx.obj["client"]
try:
result = client.index_source_mappings(
owner, source_name, owner_type=owner_type, version=version,
single_batch=single_batch,
)
output_result(ctx, result, format_index_task)
except APIError as e:
handle_api_error(e)


# ── Expansion subgroup ───────────────────────────────────────────────

@index.group()
def expansion():
"""Reindex concepts or mappings for a collection expansion (admin only)."""
pass


@expansion.command("concepts")
@click.argument("owner")
@click.argument("collection")
@click.argument("version")
@click.argument("expansion")
@click.option("--owner-type", type=click.Choice(["users", "orgs"]), default="orgs")
@click.pass_context
def expansion_concepts(ctx, owner, collection, version, expansion, owner_type):
"""Reindex concepts for a collection expansion."""
client = ctx.obj["client"]
try:
result = client.index_expansion_concepts(
owner, collection, version, expansion, owner_type=owner_type,
)
output_result(ctx, result, format_index_task)
except APIError as e:
handle_api_error(e)


@expansion.command("mappings")
@click.argument("owner")
@click.argument("collection")
@click.argument("version")
@click.argument("expansion")
@click.option("--owner-type", type=click.Choice(["users", "orgs"]), default="orgs")
@click.pass_context
def expansion_mappings(ctx, owner, collection, version, expansion, owner_type):
"""Reindex mappings for a collection expansion."""
client = ctx.obj["client"]
try:
result = client.index_expansion_mappings(
owner, collection, version, expansion, owner_type=owner_type,
)
output_result(ctx, result, format_index_task)
except APIError as e:
handle_api_error(e)


# ── Admin commands ───────────────────────────────────────────────────

@index.command()
@click.option("--apps", help="Comma-separated app names to rebuild (omit for all)")
@click.pass_context
def rebuild(ctx, apps):
"""Rebuild all Elasticsearch indexes from scratch (admin only)."""
client = ctx.obj["client"]
try:
result = client.index_rebuild(apps=apps)
output_result(ctx, result, format_index_task)
except APIError as e:
handle_api_error(e)


@index.command()
@click.option("--apps", help="Comma-separated app names to populate (omit for all)")
@click.pass_context
def populate(ctx, apps):
"""Populate Elasticsearch indexes without rebuilding (admin only)."""
client = ctx.obj["client"]
try:
result = client.index_populate(apps=apps)
output_result(ctx, result, format_index_task)
except APIError as e:
handle_api_error(e)


@index.command()
@click.argument("resource")
@click.option("--ids", help="Comma-separated resource IDs to reindex")
@click.option("--uri", help="URI pattern to match resources for reindexing")
@click.option("--filter", "filter_str", help="JSON filter string to select resources")
@click.option("--update-indexed", is_flag=True, help="Update the indexed flag on matched resources")
@click.pass_context
def resource(ctx, resource, ids, uri, filter_str, update_indexed):
"""Batch reindex a specific resource type (admin only).

RESOURCE is the resource type, e.g. concepts, mappings, sources, collections.
Exactly one of --ids, --uri, or --filter is required.
"""
if not ids and not uri and not filter_str:
click.echo("Error: one of --ids, --uri, or --filter is required.", err=True)
sys.exit(1)
client = ctx.obj["client"]
try:
result = client.index_resource(
resource, ids=ids, uri=uri, filter_str=filter_str, update_indexed=update_indexed,
)
output_result(ctx, result, format_index_task)
except APIError as e:
handle_api_error(e)
2 changes: 2 additions & 0 deletions src/ocl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,7 @@ def handle_api_error(e: APIError) -> None:

# Utility commands
from ocl_cli.commands.reference import reference # noqa: E402
from ocl_cli.commands.index import index # noqa: E402

cli.add_command(reference)
cli.add_command(index)
20 changes: 20 additions & 0 deletions src/ocl_cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,26 @@ def format_import_status(data: dict | list) -> str:
return "\n".join(lines)


def format_index_task(data: dict) -> str:
"""Format reindex task submission response."""
if not data:
return "Reindex accepted."
lines = ["Reindex accepted."]
task_id = data.get("id") or data.get("task", "")
if task_id:
lines.append(f" Task ID: {task_id}")
state = data.get("state", "")
if state:
lines.append(f" State: {state}")
queue = data.get("queue", "")
if queue:
lines.append(f" Queue: {queue}")
username = data.get("username", "")
if username:
lines.append(f" User: {username}")
return "\n".join(lines)


def format_export_status(data: dict) -> str:
"""Format export status for human output."""
status = data.get("status", "unknown")
Expand Down