From 26c1f6a771678d128ac39d3c994c8345ec79f25e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 23:23:01 +0000 Subject: [PATCH 01/11] feat(cli): add agent-first Airbyte CLI for Cloud operations Implements a new CLI invokable as 'uvx airbyte ...' or 'airbyte' when installed. Commands: - airbyte workspaces list/get - airbyte sources list/get/create/delete - airbyte destinations list/get/create/delete - airbyte connections list/get/create/delete/sync - airbyte jobs list/get Features: - Structured JSON output for agent consumption - --describe flag for schema discovery - Credential resolution: env vars -> ~/.airbyte/credentials file - Thin wrappers over existing api_util core module --- airbyte/cli/_cli_auth.py | 173 +++++++++ airbyte/cli/cloud_cli.py | 756 +++++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 3 files changed, 930 insertions(+) create mode 100644 airbyte/cli/_cli_auth.py create mode 100644 airbyte/cli/cloud_cli.py diff --git a/airbyte/cli/_cli_auth.py b/airbyte/cli/_cli_auth.py new file mode 100644 index 000000000..1f1daf7fd --- /dev/null +++ b/airbyte/cli/_cli_auth.py @@ -0,0 +1,173 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Credential resolution for the Airbyte CLI. + +Resolution order: +1. Explicit CLI flags (`--client-id`, `--client-secret`) +2. Short env vars: `AIRBYTE_CLIENT_ID` / `AIRBYTE_CLIENT_SECRET` +3. Long env vars: `AIRBYTE_CLOUD_CLIENT_ID` / `AIRBYTE_CLOUD_CLIENT_SECRET` +4. Credentials file: `~/.airbyte/credentials` (YAML with `client_id` / `client_secret`) +5. Error if none found +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any + +import yaml + +from airbyte.constants import ( + CLOUD_API_ROOT, + CLOUD_CLIENT_ID_ENV_VAR, + CLOUD_CLIENT_SECRET_ENV_VAR, + CLOUD_WORKSPACE_ID_ENV_VAR, +) +from airbyte.exceptions import PyAirbyteInputError + + +# Short-form env var names (preferred for CLI usage) +CLI_CLIENT_ID_ENV_VAR = "AIRBYTE_CLIENT_ID" +CLI_CLIENT_SECRET_ENV_VAR = "AIRBYTE_CLIENT_SECRET" +CLI_WORKSPACE_ID_ENV_VAR = "AIRBYTE_WORKSPACE_ID" +CLI_API_URL_ENV_VAR = "AIRBYTE_API_URL" + +CREDENTIALS_FILE_PATH = Path("~/.airbyte/credentials").expanduser() + + +def _read_credentials_file() -> dict[str, Any]: + """Read credentials from `~/.airbyte/credentials` if the file exists. + + The file is expected to be YAML with `client_id` and `client_secret` keys. + + Returns an empty dict if the file does not exist or cannot be parsed. + """ + if not CREDENTIALS_FILE_PATH.exists(): + return {} + + content = CREDENTIALS_FILE_PATH.read_text(encoding="utf-8").strip() + if not content: + return {} + + parsed = yaml.safe_load(content) + if not isinstance(parsed, dict): + return {} + + return parsed + + +def resolve_client_id(explicit: str | None = None) -> str: + """Resolve the Airbyte client ID. + + Resolution order: explicit arg, short env var, long env var, credentials file. + """ + if explicit: + return explicit + + from_short_env = os.environ.get(CLI_CLIENT_ID_ENV_VAR) + if from_short_env: + return from_short_env + + from_long_env = os.environ.get(CLOUD_CLIENT_ID_ENV_VAR) + if from_long_env: + return from_long_env + + creds = _read_credentials_file() + from_file = creds.get("client_id") + if from_file: + return str(from_file) + + raise PyAirbyteInputError( + message="No Airbyte client ID found.", + guidance=( + f"Set the `{CLI_CLIENT_ID_ENV_VAR}` environment variable, " + f"or create a credentials file at {CREDENTIALS_FILE_PATH} " + "with a `client_id` key." + ), + ) + + +def resolve_client_secret(explicit: str | None = None) -> str: + """Resolve the Airbyte client secret. + + Resolution order: explicit arg, short env var, long env var, credentials file. + """ + if explicit: + return explicit + + from_short_env = os.environ.get(CLI_CLIENT_SECRET_ENV_VAR) + if from_short_env: + return from_short_env + + from_long_env = os.environ.get(CLOUD_CLIENT_SECRET_ENV_VAR) + if from_long_env: + return from_long_env + + creds = _read_credentials_file() + from_file = creds.get("client_secret") + if from_file: + return str(from_file) + + raise PyAirbyteInputError( + message="No Airbyte client secret found.", + guidance=( + f"Set the `{CLI_CLIENT_SECRET_ENV_VAR}` environment variable, " + f"or create a credentials file at {CREDENTIALS_FILE_PATH} " + "with a `client_secret` key." + ), + ) + + +def resolve_workspace_id(explicit: str | None = None) -> str: + """Resolve the Airbyte workspace ID. + + Resolution order: explicit arg, short env var, long env var, credentials file. + """ + if explicit: + return explicit + + from_short_env = os.environ.get(CLI_WORKSPACE_ID_ENV_VAR) + if from_short_env: + return from_short_env + + from_long_env = os.environ.get(CLOUD_WORKSPACE_ID_ENV_VAR) + if from_long_env: + return from_long_env + + creds = _read_credentials_file() + from_file = creds.get("workspace_id") + if from_file: + return str(from_file) + + raise PyAirbyteInputError( + message="No Airbyte workspace ID found.", + guidance=( + f"Set the `{CLI_WORKSPACE_ID_ENV_VAR}` environment variable, " + f"or create a credentials file at {CREDENTIALS_FILE_PATH} " + "with a `workspace_id` key." + ), + ) + + +def resolve_api_url(explicit: str | None = None) -> str: + """Resolve the Airbyte API URL. + + Resolution order: explicit arg, short env var, long env var, default. + """ + if explicit: + return explicit + + from_short_env = os.environ.get(CLI_API_URL_ENV_VAR) + if from_short_env: + return from_short_env + + from_long_env = os.environ.get("AIRBYTE_CLOUD_API_URL") + if from_long_env: + return from_long_env + + creds = _read_credentials_file() + from_file = creds.get("api_url") + if from_file: + return str(from_file) + + return CLOUD_API_ROOT diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py new file mode 100644 index 000000000..e765e103f --- /dev/null +++ b/airbyte/cli/cloud_cli.py @@ -0,0 +1,756 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Airbyte CLI — agent-first command-line interface for Airbyte Cloud operations. + +Invokable as: + +```bash +uvx airbyte --help +airbyte workspaces list +airbyte sources list --workspace-id +airbyte connections sync --workspace-id --json '{"connection_id": "..."}' +``` + +All commands output structured JSON by default for agent consumption. +""" + +from __future__ import annotations + +import json +import sys +from typing import TYPE_CHECKING, Any + +import click + +from airbyte._util import api_util + + +if TYPE_CHECKING: + from airbyte_api import models + +from airbyte.cli._cli_auth import ( + resolve_api_url, + resolve_client_id, + resolve_client_secret, + resolve_workspace_id, +) +from airbyte.secrets.base import SecretString + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _json_output(data: list[dict[str, object]] | dict[str, object]) -> None: + """Print a JSON-serializable object to stdout.""" + click.echo(json.dumps(data, indent=2, default=str)) + + +def _error_json(message: str, **extra: object) -> None: + """Print an error object to stderr and exit.""" + payload: dict[str, object] = {"error": message, **extra} + click.echo(json.dumps(payload, indent=2, default=str), err=True) + sys.exit(1) + + +def _parse_json_option(raw: str | None) -> dict[str, Any]: + """Parse a `--json` option value into a dict.""" + if not raw: + return {} + parsed = json.loads(raw) + if not isinstance(parsed, dict): + _error_json("--json value must be a JSON object (dict).") + return parsed + + +def _describe_output( + description: str, + required_params: dict[str, str] | None = None, + optional_params: dict[str, str] | None = None, +) -> None: + """Print a `--describe` schema and exit.""" + schema: dict[str, Any] = { + "description": description, + } + if required_params: + schema["required_params"] = required_params + if optional_params: + schema["optional_params"] = optional_params + _json_output(schema) + sys.exit(0) + + +def _get_auth_context(ctx: click.Context) -> tuple[str, SecretString, SecretString, str]: + """Extract resolved (api_url, client_id, client_secret, workspace_id) from Click context. + + The workspace_id may raise if not set; callers that don't need it + should catch `PyAirbyteInputError`. + """ + api_url: str = ctx.obj["api_url"] + client_id = SecretString(ctx.obj["client_id"]) + client_secret = SecretString(ctx.obj["client_secret"]) + workspace_id: str = ctx.obj["workspace_id"] + return api_url, client_id, client_secret, workspace_id + + +def _get_auth_no_workspace(ctx: click.Context) -> tuple[str, SecretString, SecretString]: + """Extract resolved auth without requiring workspace_id.""" + api_url: str = ctx.obj["api_url"] + client_id = SecretString(ctx.obj["client_id"]) + client_secret = SecretString(ctx.obj["client_secret"]) + return api_url, client_id, client_secret + + +# --------------------------------------------------------------------------- +# Serializers — convert SDK response objects to plain dicts +# --------------------------------------------------------------------------- + + +def _workspace_to_dict(ws: models.WorkspaceResponse) -> dict[str, object]: + return { + "workspace_id": ws.workspace_id, + "name": ws.name, + } + + +def _source_to_dict(src: models.SourceResponse) -> dict[str, object]: + return { + "source_id": src.source_id, + "name": src.name, + "source_type": src.source_type, + } + + +def _destination_to_dict(dst: models.DestinationResponse) -> dict[str, object]: + return { + "destination_id": dst.destination_id, + "name": dst.name, + "destination_type": dst.destination_type, + } + + +def _connection_to_dict(conn: models.ConnectionResponse) -> dict[str, object]: + return { + "connection_id": conn.connection_id, + "name": conn.name, + "source_id": conn.source_id, + "destination_id": conn.destination_id, + "status": str(conn.status) if conn.status else None, + } + + +def _job_to_dict(job: models.JobResponse) -> dict[str, object]: + return { + "job_id": job.job_id, + "status": str(job.status) if job.status else None, + "job_type": str(job.job_type) if job.job_type else None, + "start_time": str(job.start_time) if job.start_time else None, + "bytes_synced": job.bytes_synced, + "rows_synced": job.rows_synced, + } + + +# --------------------------------------------------------------------------- +# Root group +# --------------------------------------------------------------------------- + + +@click.group() +@click.option("--client-id", envvar="AIRBYTE_CLIENT_ID", default=None, help="Airbyte client ID.") +@click.option( + "--client-secret", envvar="AIRBYTE_CLIENT_SECRET", default=None, help="Airbyte client secret." +) +@click.option("--workspace-id", default=None, help="Airbyte workspace ID.") +@click.option("--api-url", default=None, help="Airbyte API URL override.") +@click.pass_context +def cli( + ctx: click.Context, + client_id: str | None, + client_secret: str | None, + workspace_id: str | None, + api_url: str | None, +) -> None: + """Airbyte CLI — agent-first interface for Airbyte Cloud. + + Manage workspaces, sources, destinations, connections, and jobs + via structured JSON commands. + + Authentication is resolved from (in order): CLI flags, env vars + (`AIRBYTE_CLIENT_ID` / `AIRBYTE_CLIENT_SECRET`), or + `~/.airbyte/credentials` file. + """ + ctx.ensure_object(dict) + # Resolve auth eagerly so sub-commands get clear errors up front. + ctx.obj["client_id"] = resolve_client_id(client_id) + ctx.obj["client_secret"] = resolve_client_secret(client_secret) + ctx.obj["api_url"] = resolve_api_url(api_url) + # workspace_id is resolved lazily — some commands don't need it. + # Store the raw value; sub-commands call resolve_workspace_id when needed. + ctx.obj["_raw_workspace_id"] = workspace_id + + +# --------------------------------------------------------------------------- +# Workspaces +# --------------------------------------------------------------------------- + + +@cli.group() +@click.pass_context +def workspaces(ctx: click.Context) -> None: + """Manage Airbyte workspaces.""" + pass + + +@workspaces.command("list") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def workspaces_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 + """List workspaces accessible with the current credentials.""" + if describe: + _describe_output( + description="List all workspaces accessible with the current credentials.", + optional_params={"workspace_id": "Filter to a specific workspace ID."}, + ) + api_url, client_id, client_secret = _get_auth_no_workspace(ctx) + # The SDK list_workspaces requires a workspace_id param for filtering; + # pass empty list behavior via the raw value. + raw_ws = ctx.obj["_raw_workspace_id"] + if raw_ws: + workspace_id = resolve_workspace_id(raw_ws) + results = api_util.list_workspaces( + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + else: + # Without a workspace_id filter we still need to pass one for the SDK. + # Try resolving; if unavailable, error clearly. + workspace_id = resolve_workspace_id(raw_ws) + results = api_util.list_workspaces( + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output([_workspace_to_dict(w) for w in results]) + + +@workspaces.command("get") +@click.option("--workspace-id", "cmd_workspace_id", default=None, help="Workspace ID to retrieve.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def workspaces_get(ctx: click.Context, cmd_workspace_id: str | None, describe: bool) -> None: # noqa: FBT001 + """Get details of a specific workspace.""" + if describe: + _describe_output( + description="Get details of a specific workspace.", + required_params={"workspace_id": "The workspace ID to retrieve."}, + ) + api_url, client_id, client_secret = _get_auth_no_workspace(ctx) + workspace_id = resolve_workspace_id(cmd_workspace_id or ctx.obj["_raw_workspace_id"]) + result = api_util.get_workspace( + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_workspace_to_dict(result)) + + +# --------------------------------------------------------------------------- +# Sources +# --------------------------------------------------------------------------- + + +@cli.group() +@click.pass_context +def sources(ctx: click.Context) -> None: + """Manage Airbyte sources.""" + # Resolve workspace_id now — all source commands need it. + ctx.obj["workspace_id"] = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + + +@sources.command("list") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def sources_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 + """List sources in the workspace.""" + if describe: + _describe_output( + description="List all sources in the workspace.", + required_params={"workspace_id": "The workspace ID."}, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + results = api_util.list_sources( + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output([_source_to_dict(s) for s in results]) + + +@sources.command("get") +@click.option("--source-id", required=True, help="The source ID to retrieve.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def sources_get(ctx: click.Context, source_id: str, describe: bool) -> None: # noqa: FBT001 + """Get details of a specific source.""" + if describe: + _describe_output( + description="Get details of a specific source.", + required_params={ + "workspace_id": "The workspace ID.", + "source_id": "The source ID to retrieve.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + _ = workspace_id # used for context only + result = api_util.get_source( + source_id=source_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_source_to_dict(result)) + + +@sources.command("create") +@click.option( + "--json", + "json_str", + required=True, + help='JSON config: {"name": "...", "sourceType": "...", ...}', +) +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def sources_create(ctx: click.Context, json_str: str, describe: bool) -> None: # noqa: FBT001 + """Create a new source in the workspace.""" + if describe: + _describe_output( + description="Create a new source in the workspace.", + required_params={ + "workspace_id": "The workspace ID.", + "name": "Display name for the source.", + "sourceType": "The source connector type (e.g. 'postgres').", + }, + optional_params={ + "...": "Additional connector-specific configuration fields.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + config = _parse_json_option(json_str) + name = config.pop("name", None) + if not name: + _error_json("'name' is required in --json config.") + result = api_util.create_source( + name=name, + workspace_id=workspace_id, + config=config, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_source_to_dict(result)) + + +@sources.command("delete") +@click.option("--source-id", required=True, help="The source ID to delete.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def sources_delete(ctx: click.Context, source_id: str, describe: bool) -> None: # noqa: FBT001 + """Delete a source.""" + if describe: + _describe_output( + description="Delete a source by ID.", + required_params={ + "workspace_id": "The workspace ID.", + "source_id": "The source ID to delete.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + api_util.delete_source( + source_id=source_id, + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + safe_mode=False, + ) + _json_output({"deleted": True, "source_id": source_id}) + + +# --------------------------------------------------------------------------- +# Destinations +# --------------------------------------------------------------------------- + + +@cli.group() +@click.pass_context +def destinations(ctx: click.Context) -> None: + """Manage Airbyte destinations.""" + ctx.obj["workspace_id"] = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + + +@destinations.command("list") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def destinations_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 + """List destinations in the workspace.""" + if describe: + _describe_output( + description="List all destinations in the workspace.", + required_params={"workspace_id": "The workspace ID."}, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + results = api_util.list_destinations( + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output([_destination_to_dict(d) for d in results]) + + +@destinations.command("get") +@click.option("--destination-id", required=True, help="The destination ID to retrieve.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def destinations_get(ctx: click.Context, destination_id: str, describe: bool) -> None: # noqa: FBT001 + """Get details of a specific destination.""" + if describe: + _describe_output( + description="Get details of a specific destination.", + required_params={ + "workspace_id": "The workspace ID.", + "destination_id": "The destination ID to retrieve.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + _ = workspace_id + result = api_util.get_destination( + destination_id=destination_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_destination_to_dict(result)) + + +@destinations.command("create") +@click.option( + "--json", + "json_str", + required=True, + help='JSON config: {"name": "...", "destinationType": "...", ...}', +) +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def destinations_create(ctx: click.Context, json_str: str, describe: bool) -> None: # noqa: FBT001 + """Create a new destination in the workspace.""" + if describe: + _describe_output( + description="Create a new destination in the workspace.", + required_params={ + "workspace_id": "The workspace ID.", + "name": "Display name for the destination.", + "destinationType": "The destination connector type (e.g. 'bigquery').", + }, + optional_params={ + "...": "Additional connector-specific configuration fields.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + config = _parse_json_option(json_str) + name = config.pop("name", None) + if not name: + _error_json("'name' is required in --json config.") + result = api_util.create_destination( + name=name, + workspace_id=workspace_id, + config=config, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_destination_to_dict(result)) + + +@destinations.command("delete") +@click.option("--destination-id", required=True, help="The destination ID to delete.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def destinations_delete(ctx: click.Context, destination_id: str, describe: bool) -> None: # noqa: FBT001 + """Delete a destination.""" + if describe: + _describe_output( + description="Delete a destination by ID.", + required_params={ + "workspace_id": "The workspace ID.", + "destination_id": "The destination ID to delete.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + api_util.delete_destination( + destination_id=destination_id, + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + safe_mode=False, + ) + _json_output({"deleted": True, "destination_id": destination_id}) + + +# --------------------------------------------------------------------------- +# Connections +# --------------------------------------------------------------------------- + + +@cli.group() +@click.pass_context +def connections(ctx: click.Context) -> None: + """Manage Airbyte connections.""" + ctx.obj["workspace_id"] = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + + +@connections.command("list") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def connections_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 + """List connections in the workspace.""" + if describe: + _describe_output( + description="List all connections in the workspace.", + required_params={"workspace_id": "The workspace ID."}, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + results = api_util.list_connections( + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output([_connection_to_dict(c) for c in results]) + + +@connections.command("get") +@click.option("--connection-id", required=True, help="The connection ID to retrieve.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def connections_get(ctx: click.Context, connection_id: str, describe: bool) -> None: # noqa: FBT001 + """Get details of a specific connection.""" + if describe: + _describe_output( + description="Get details of a specific connection.", + required_params={ + "workspace_id": "The workspace ID.", + "connection_id": "The connection ID to retrieve.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + result = api_util.get_connection( + workspace_id=workspace_id, + connection_id=connection_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_connection_to_dict(result)) + + +@connections.command("create") +@click.option( + "--json", + "json_str", + required=True, + help='JSON config: {"name": "...", "source_id": "...", "destination_id": "...", ...}', +) +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def connections_create(ctx: click.Context, json_str: str, describe: bool) -> None: # noqa: FBT001 + """Create a new connection.""" + if describe: + _describe_output( + description="Create a new connection between a source and destination.", + required_params={ + "workspace_id": "The workspace ID.", + "name": "Display name for the connection.", + "source_id": "The source ID.", + "destination_id": "The destination ID.", + }, + optional_params={ + "stream_configurations": "List of stream configuration objects.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + config = _parse_json_option(json_str) + name = config.get("name") + source_id = config.get("source_id") + destination_id = config.get("destination_id") + if not name or not source_id or not destination_id: + _error_json("'name', 'source_id', and 'destination_id' are required in --json config.") + selected_streams: list[str] = config.get("selected_stream_names", []) + prefix: str = config.get("prefix", "") + result = api_util.create_connection( + name=name, + source_id=source_id, + destination_id=destination_id, + workspace_id=workspace_id, + prefix=prefix, + selected_stream_names=selected_streams, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_connection_to_dict(result)) + + +@connections.command("delete") +@click.option("--connection-id", required=True, help="The connection ID to delete.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def connections_delete(ctx: click.Context, connection_id: str, describe: bool) -> None: # noqa: FBT001 + """Delete a connection.""" + if describe: + _describe_output( + description="Delete a connection by ID.", + required_params={ + "workspace_id": "The workspace ID.", + "connection_id": "The connection ID to delete.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + api_util.delete_connection( + connection_id, + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + safe_mode=False, + ) + _json_output({"deleted": True, "connection_id": connection_id}) + + +@connections.command("sync") +@click.option("--connection-id", required=True, help="The connection ID to sync.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def connections_sync(ctx: click.Context, connection_id: str, describe: bool) -> None: # noqa: FBT001 + """Trigger a sync for a connection.""" + if describe: + _describe_output( + description="Trigger a sync job for a connection.", + required_params={ + "workspace_id": "The workspace ID.", + "connection_id": "The connection ID to sync.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + result = api_util.run_connection( + workspace_id, + connection_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_job_to_dict(result)) + + +# --------------------------------------------------------------------------- +# Jobs +# --------------------------------------------------------------------------- + + +@cli.group() +@click.pass_context +def jobs(ctx: click.Context) -> None: + """View Airbyte sync jobs.""" + ctx.obj["workspace_id"] = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + + +@jobs.command("list") +@click.option("--connection-id", required=True, help="The connection ID to list jobs for.") +@click.option("--limit", default=20, help="Maximum number of jobs to return.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def jobs_list(ctx: click.Context, connection_id: str, limit: int, describe: bool) -> None: # noqa: FBT001 + """List recent jobs for a connection.""" + if describe: + _describe_output( + description="List recent sync jobs for a connection.", + required_params={ + "workspace_id": "The workspace ID.", + "connection_id": "The connection ID.", + }, + optional_params={ + "limit": "Maximum number of jobs to return (default: 20).", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + results = api_util.get_job_logs( + workspace_id, + connection_id, + limit, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output([_job_to_dict(j) for j in results]) + + +@jobs.command("get") +@click.option("--job-id", required=True, type=int, help="The job ID to retrieve.") +@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +@click.pass_context +def jobs_get(ctx: click.Context, job_id: int, describe: bool) -> None: # noqa: FBT001 + """Get details of a specific job.""" + if describe: + _describe_output( + description="Get details of a specific job by ID.", + required_params={ + "job_id": "The job ID to retrieve.", + }, + ) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) + _ = workspace_id + result = api_util.get_job_info( + job_id=job_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) + _json_output(_job_to_dict(result)) + + +# --------------------------------------------------------------------------- +# Main entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + """Entry point for `uvx airbyte` / `airbyte` command.""" + cli() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 0ab95fe8c..9f740472a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,6 +77,7 @@ dev = [ ] [project.scripts] +airbyte = "airbyte.cli.cloud_cli:main" pyairbyte = "airbyte.cli.pyab:cli" pyab = "airbyte.cli.pyab:cli" airbyte-mcp = "airbyte.mcp.server:main" From 3b130325122a6dbc2436dcf1ab91db58973188bf Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 23:26:34 +0000 Subject: [PATCH 02/11] fix(cli): resolve pyrefly type errors in create_connection args --- airbyte/cli/cloud_cli.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index e765e103f..330739d83 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -607,9 +607,9 @@ def connections_create(ctx: click.Context, json_str: str, describe: bool) -> Non selected_streams: list[str] = config.get("selected_stream_names", []) prefix: str = config.get("prefix", "") result = api_util.create_connection( - name=name, - source_id=source_id, - destination_id=destination_id, + name=str(name), + source_id=str(source_id), + destination_id=str(destination_id), workspace_id=workspace_id, prefix=prefix, selected_stream_names=selected_streams, From 30b016ec6ae7e54402298a5607aa22a1f1930f0d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 23:44:21 +0000 Subject: [PATCH 03/11] fix(cli): lazy auth resolution, YAML error handling, --force flag, structured errors - Store raw auth values in ctx.obj; resolve lazily in subcommands so --describe works without credentials configured - Catch OSError and yaml.YAMLError separately in _read_credentials_file using guard statements instead of broad try/except - Add --force flag to delete commands (sources, destinations, connections) with safe_mode=True by default - Wrap main() entry point to produce structured JSON errors on stderr - Add PyAirbyteInputError import for structured error handling --- airbyte/cli/_cli_auth.py | 12 +++- airbyte/cli/cloud_cli.py | 122 ++++++++++++++++++++++----------------- 2 files changed, 79 insertions(+), 55 deletions(-) diff --git a/airbyte/cli/_cli_auth.py b/airbyte/cli/_cli_auth.py index 1f1daf7fd..7be1d02e9 100644 --- a/airbyte/cli/_cli_auth.py +++ b/airbyte/cli/_cli_auth.py @@ -45,11 +45,19 @@ def _read_credentials_file() -> dict[str, Any]: if not CREDENTIALS_FILE_PATH.exists(): return {} - content = CREDENTIALS_FILE_PATH.read_text(encoding="utf-8").strip() + try: + content = CREDENTIALS_FILE_PATH.read_text(encoding="utf-8").strip() + except OSError: + return {} + if not content: return {} - parsed = yaml.safe_load(content) + try: + parsed = yaml.safe_load(content) + except yaml.YAMLError: + return {} + if not isinstance(parsed, dict): return {} diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index 330739d83..68dbc04a5 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -33,6 +33,7 @@ resolve_client_secret, resolve_workspace_id, ) +from airbyte.exceptions import PyAirbyteInputError from airbyte.secrets.base import SecretString @@ -81,23 +82,22 @@ def _describe_output( def _get_auth_context(ctx: click.Context) -> tuple[str, SecretString, SecretString, str]: - """Extract resolved (api_url, client_id, client_secret, workspace_id) from Click context. + """Resolve and return (api_url, client_id, client_secret, workspace_id). - The workspace_id may raise if not set; callers that don't need it - should catch `PyAirbyteInputError`. + Credentials are resolved lazily from raw values stored in `ctx.obj`. """ - api_url: str = ctx.obj["api_url"] - client_id = SecretString(ctx.obj["client_id"]) - client_secret = SecretString(ctx.obj["client_secret"]) - workspace_id: str = ctx.obj["workspace_id"] + api_url = resolve_api_url(ctx.obj["_raw_api_url"]) + client_id = SecretString(resolve_client_id(ctx.obj["_raw_client_id"])) + client_secret = SecretString(resolve_client_secret(ctx.obj["_raw_client_secret"])) + workspace_id = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) return api_url, client_id, client_secret, workspace_id def _get_auth_no_workspace(ctx: click.Context) -> tuple[str, SecretString, SecretString]: - """Extract resolved auth without requiring workspace_id.""" - api_url: str = ctx.obj["api_url"] - client_id = SecretString(ctx.obj["client_id"]) - client_secret = SecretString(ctx.obj["client_secret"]) + """Resolve and return auth credentials without requiring workspace_id.""" + api_url = resolve_api_url(ctx.obj["_raw_api_url"]) + client_id = SecretString(resolve_client_id(ctx.obj["_raw_client_id"])) + client_secret = SecretString(resolve_client_secret(ctx.obj["_raw_client_secret"])) return api_url, client_id, client_secret @@ -180,12 +180,11 @@ def cli( `~/.airbyte/credentials` file. """ ctx.ensure_object(dict) - # Resolve auth eagerly so sub-commands get clear errors up front. - ctx.obj["client_id"] = resolve_client_id(client_id) - ctx.obj["client_secret"] = resolve_client_secret(client_secret) - ctx.obj["api_url"] = resolve_api_url(api_url) - # workspace_id is resolved lazily — some commands don't need it. - # Store the raw value; sub-commands call resolve_workspace_id when needed. + # Store raw values — credentials are resolved lazily when subcommands need them. + # This allows `--describe` to work without any auth configured. + ctx.obj["_raw_client_id"] = client_id + ctx.obj["_raw_client_secret"] = client_secret + ctx.obj["_raw_api_url"] = api_url ctx.obj["_raw_workspace_id"] = workspace_id @@ -212,29 +211,15 @@ def workspaces_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 optional_params={"workspace_id": "Filter to a specific workspace ID."}, ) api_url, client_id, client_secret = _get_auth_no_workspace(ctx) - # The SDK list_workspaces requires a workspace_id param for filtering; - # pass empty list behavior via the raw value. - raw_ws = ctx.obj["_raw_workspace_id"] - if raw_ws: - workspace_id = resolve_workspace_id(raw_ws) - results = api_util.list_workspaces( - workspace_id=workspace_id, - api_root=api_url, - client_id=client_id, - client_secret=client_secret, - bearer_token=None, - ) - else: - # Without a workspace_id filter we still need to pass one for the SDK. - # Try resolving; if unavailable, error clearly. - workspace_id = resolve_workspace_id(raw_ws) - results = api_util.list_workspaces( - workspace_id=workspace_id, - api_root=api_url, - client_id=client_id, - client_secret=client_secret, - bearer_token=None, - ) + # The SDK list_workspaces requires a workspace_id; resolve from raw value. + workspace_id = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + results = api_util.list_workspaces( + workspace_id=workspace_id, + api_root=api_url, + client_id=client_id, + client_secret=client_secret, + bearer_token=None, + ) _json_output([_workspace_to_dict(w) for w in results]) @@ -270,8 +255,7 @@ def workspaces_get(ctx: click.Context, cmd_workspace_id: str | None, describe: b @click.pass_context def sources(ctx: click.Context) -> None: """Manage Airbyte sources.""" - # Resolve workspace_id now — all source commands need it. - ctx.obj["workspace_id"] = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + pass @sources.command("list") @@ -363,9 +347,10 @@ def sources_create(ctx: click.Context, json_str: str, describe: bool) -> None: @sources.command("delete") @click.option("--source-id", required=True, help="The source ID to delete.") +@click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def sources_delete(ctx: click.Context, source_id: str, describe: bool) -> None: # noqa: FBT001 +def sources_delete(ctx: click.Context, source_id: str, force: bool, describe: bool) -> None: # noqa: FBT001 """Delete a source.""" if describe: _describe_output( @@ -374,6 +359,9 @@ def sources_delete(ctx: click.Context, source_id: str, describe: bool) -> None: "workspace_id": "The workspace ID.", "source_id": "The source ID to delete.", }, + optional_params={ + "force": "Skip delete safety checks (default: false).", + }, ) api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_source( @@ -383,7 +371,7 @@ def sources_delete(ctx: click.Context, source_id: str, describe: bool) -> None: client_id=client_id, client_secret=client_secret, bearer_token=None, - safe_mode=False, + safe_mode=not force, ) _json_output({"deleted": True, "source_id": source_id}) @@ -397,7 +385,7 @@ def sources_delete(ctx: click.Context, source_id: str, describe: bool) -> None: @click.pass_context def destinations(ctx: click.Context) -> None: """Manage Airbyte destinations.""" - ctx.obj["workspace_id"] = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + pass @destinations.command("list") @@ -489,9 +477,15 @@ def destinations_create(ctx: click.Context, json_str: str, describe: bool) -> No @destinations.command("delete") @click.option("--destination-id", required=True, help="The destination ID to delete.") +@click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def destinations_delete(ctx: click.Context, destination_id: str, describe: bool) -> None: # noqa: FBT001 +def destinations_delete( + ctx: click.Context, + destination_id: str, + force: bool, # noqa: FBT001 + describe: bool, # noqa: FBT001 +) -> None: """Delete a destination.""" if describe: _describe_output( @@ -500,6 +494,9 @@ def destinations_delete(ctx: click.Context, destination_id: str, describe: bool) "workspace_id": "The workspace ID.", "destination_id": "The destination ID to delete.", }, + optional_params={ + "force": "Skip delete safety checks (default: false).", + }, ) api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_destination( @@ -509,7 +506,7 @@ def destinations_delete(ctx: click.Context, destination_id: str, describe: bool) client_id=client_id, client_secret=client_secret, bearer_token=None, - safe_mode=False, + safe_mode=not force, ) _json_output({"deleted": True, "destination_id": destination_id}) @@ -523,7 +520,7 @@ def destinations_delete(ctx: click.Context, destination_id: str, describe: bool) @click.pass_context def connections(ctx: click.Context) -> None: """Manage Airbyte connections.""" - ctx.obj["workspace_id"] = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + pass @connections.command("list") @@ -623,9 +620,10 @@ def connections_create(ctx: click.Context, json_str: str, describe: bool) -> Non @connections.command("delete") @click.option("--connection-id", required=True, help="The connection ID to delete.") +@click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def connections_delete(ctx: click.Context, connection_id: str, describe: bool) -> None: # noqa: FBT001 +def connections_delete(ctx: click.Context, connection_id: str, force: bool, describe: bool) -> None: # noqa: FBT001 """Delete a connection.""" if describe: _describe_output( @@ -634,6 +632,9 @@ def connections_delete(ctx: click.Context, connection_id: str, describe: bool) - "workspace_id": "The workspace ID.", "connection_id": "The connection ID to delete.", }, + optional_params={ + "force": "Skip delete safety checks (default: false).", + }, ) api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_connection( @@ -643,7 +644,7 @@ def connections_delete(ctx: click.Context, connection_id: str, describe: bool) - client_id=client_id, client_secret=client_secret, bearer_token=None, - safe_mode=False, + safe_mode=not force, ) _json_output({"deleted": True, "connection_id": connection_id}) @@ -683,7 +684,7 @@ def connections_sync(ctx: click.Context, connection_id: str, describe: bool) -> @click.pass_context def jobs(ctx: click.Context) -> None: """View Airbyte sync jobs.""" - ctx.obj["workspace_id"] = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + pass @jobs.command("list") @@ -748,8 +749,23 @@ def jobs_get(ctx: click.Context, job_id: int, describe: bool) -> None: # noqa: def main() -> None: - """Entry point for `uvx airbyte` / `airbyte` command.""" - cli() + """Entry point for `uvx airbyte` / `airbyte` command. + + Wraps the CLI invocation to ensure all errors produce structured JSON + output on stderr, maintaining the agent-first error contract. + """ + try: + cli() + except SystemExit: + raise + except KeyboardInterrupt: + _error_json("Operation cancelled.") + except json.JSONDecodeError as exc: + _error_json(str(exc), type="JSONDecodeError") + except PyAirbyteInputError as exc: + _error_json(str(exc), type="PyAirbyteInputError") + except Exception as exc: + _error_json(str(exc), type=exc.__class__.__name__) if __name__ == "__main__": From d155f0a6b079f7c96c7cbb33c908243fde63350a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 23:59:40 +0000 Subject: [PATCH 04/11] fix(cli): optional workspace_id, ID-only reads, Click standalone_mode, describe accuracy --- airbyte/cli/cloud_cli.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index 68dbc04a5..76ad811ad 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -211,8 +211,14 @@ def workspaces_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 optional_params={"workspace_id": "Filter to a specific workspace ID."}, ) api_url, client_id, client_secret = _get_auth_no_workspace(ctx) - # The SDK list_workspaces requires a workspace_id; resolve from raw value. - workspace_id = resolve_workspace_id(ctx.obj["_raw_workspace_id"]) + raw_ws = ctx.obj["_raw_workspace_id"] + workspace_id = resolve_workspace_id(raw_ws) if raw_ws else None + if workspace_id is None: + _error_json( + "workspace_id is required. Provide --workspace-id, set AIRBYTE_WORKSPACE_ID, " + "or add workspace_id to ~/.airbyte/credentials.", + type="MissingWorkspaceId", + ) results = api_util.list_workspaces( workspace_id=workspace_id, api_root=api_url, @@ -289,12 +295,10 @@ def sources_get(ctx: click.Context, source_id: str, describe: bool) -> None: # _describe_output( description="Get details of a specific source.", required_params={ - "workspace_id": "The workspace ID.", "source_id": "The source ID to retrieve.", }, ) - api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) - _ = workspace_id # used for context only + api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_source( source_id=source_id, api_root=api_url, @@ -419,12 +423,10 @@ def destinations_get(ctx: click.Context, destination_id: str, describe: bool) -> _describe_output( description="Get details of a specific destination.", required_params={ - "workspace_id": "The workspace ID.", "destination_id": "The destination ID to retrieve.", }, ) - api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) - _ = workspace_id + api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_destination( destination_id=destination_id, api_root=api_url, @@ -591,7 +593,8 @@ def connections_create(ctx: click.Context, json_str: str, describe: bool) -> Non "destination_id": "The destination ID.", }, optional_params={ - "stream_configurations": "List of stream configuration objects.", + "selected_stream_names": "List of stream names to sync.", + "prefix": "Optional table prefix for destination.", }, ) api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -731,8 +734,7 @@ def jobs_get(ctx: click.Context, job_id: int, describe: bool) -> None: # noqa: "job_id": "The job ID to retrieve.", }, ) - api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) - _ = workspace_id + api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_job_info( job_id=job_id, api_root=api_url, @@ -755,11 +757,13 @@ def main() -> None: output on stderr, maintaining the agent-first error contract. """ try: - cli() + cli(standalone_mode=False) except SystemExit: raise - except KeyboardInterrupt: + except (KeyboardInterrupt, click.Abort): _error_json("Operation cancelled.") + except click.ClickException as exc: + _error_json(exc.format_message(), type=exc.__class__.__name__) except json.JSONDecodeError as exc: _error_json(str(exc), type="JSONDecodeError") except PyAirbyteInputError as exc: From 52b758a8d98e45fbe586b823d5787f535caa81ad Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 3 Apr 2026 00:01:07 +0000 Subject: [PATCH 05/11] fix(cli): add return after _error_json for pyrefly type narrowing --- airbyte/cli/cloud_cli.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index 76ad811ad..5fcf04b89 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -211,14 +211,15 @@ def workspaces_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 optional_params={"workspace_id": "Filter to a specific workspace ID."}, ) api_url, client_id, client_secret = _get_auth_no_workspace(ctx) - raw_ws = ctx.obj["_raw_workspace_id"] - workspace_id = resolve_workspace_id(raw_ws) if raw_ws else None + raw_ws: str | None = ctx.obj["_raw_workspace_id"] + workspace_id: str | None = resolve_workspace_id(raw_ws) if raw_ws else None if workspace_id is None: _error_json( "workspace_id is required. Provide --workspace-id, set AIRBYTE_WORKSPACE_ID, " "or add workspace_id to ~/.airbyte/credentials.", type="MissingWorkspaceId", ) + return # unreachable; _error_json calls sys.exit(1) results = api_util.list_workspaces( workspace_id=workspace_id, api_root=api_url, From 64a950e46aedaf225ba44e51f9099a983587d9e1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 3 Apr 2026 00:07:57 +0000 Subject: [PATCH 06/11] fix(cli): remove broad except Exception, use guard statements per coding standards --- airbyte/cli/_cli_auth.py | 12 ++---------- airbyte/cli/cloud_cli.py | 7 +++---- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/airbyte/cli/_cli_auth.py b/airbyte/cli/_cli_auth.py index 7be1d02e9..1f1daf7fd 100644 --- a/airbyte/cli/_cli_auth.py +++ b/airbyte/cli/_cli_auth.py @@ -45,19 +45,11 @@ def _read_credentials_file() -> dict[str, Any]: if not CREDENTIALS_FILE_PATH.exists(): return {} - try: - content = CREDENTIALS_FILE_PATH.read_text(encoding="utf-8").strip() - except OSError: - return {} - + content = CREDENTIALS_FILE_PATH.read_text(encoding="utf-8").strip() if not content: return {} - try: - parsed = yaml.safe_load(content) - except yaml.YAMLError: - return {} - + parsed = yaml.safe_load(content) if not isinstance(parsed, dict): return {} diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index 5fcf04b89..924d01475 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -754,8 +754,9 @@ def jobs_get(ctx: click.Context, job_id: int, describe: bool) -> None: # noqa: def main() -> None: """Entry point for `uvx airbyte` / `airbyte` command. - Wraps the CLI invocation to ensure all errors produce structured JSON - output on stderr, maintaining the agent-first error contract. + Wraps the CLI invocation to catch known failure modes and emit + structured JSON on stderr. Unknown exceptions propagate naturally + so they surface with a full traceback for debugging. """ try: cli(standalone_mode=False) @@ -769,8 +770,6 @@ def main() -> None: _error_json(str(exc), type="JSONDecodeError") except PyAirbyteInputError as exc: _error_json(str(exc), type="PyAirbyteInputError") - except Exception as exc: - _error_json(str(exc), type=exc.__class__.__name__) if __name__ == "__main__": From 50a05cc13b3367b658de6af99a238384ecd0ecd6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 3 Apr 2026 00:17:27 +0000 Subject: [PATCH 07/11] fix(cli): allow --describe to work without required options All commands now defer required-option validation until after the --describe check. This lets agents discover schemas without providing auth or resource IDs. --- airbyte/cli/cloud_cli.py | 77 +++++++++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index 924d01475..3a59183ab 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -287,10 +287,10 @@ def sources_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 @sources.command("get") -@click.option("--source-id", required=True, help="The source ID to retrieve.") +@click.option("--source-id", default=None, help="The source ID to retrieve.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def sources_get(ctx: click.Context, source_id: str, describe: bool) -> None: # noqa: FBT001 +def sources_get(ctx: click.Context, source_id: str | None, describe: bool) -> None: # noqa: FBT001 """Get details of a specific source.""" if describe: _describe_output( @@ -299,6 +299,8 @@ def sources_get(ctx: click.Context, source_id: str, describe: bool) -> None: # "source_id": "The source ID to retrieve.", }, ) + if not source_id: + _error_json("--source-id is required.", type="MissingParameter") api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_source( source_id=source_id, @@ -314,12 +316,12 @@ def sources_get(ctx: click.Context, source_id: str, describe: bool) -> None: # @click.option( "--json", "json_str", - required=True, + default=None, help='JSON config: {"name": "...", "sourceType": "...", ...}', ) @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def sources_create(ctx: click.Context, json_str: str, describe: bool) -> None: # noqa: FBT001 +def sources_create(ctx: click.Context, json_str: str | None, describe: bool) -> None: # noqa: FBT001 """Create a new source in the workspace.""" if describe: _describe_output( @@ -333,6 +335,8 @@ def sources_create(ctx: click.Context, json_str: str, describe: bool) -> None: "...": "Additional connector-specific configuration fields.", }, ) + if not json_str: + _error_json("--json is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) config = _parse_json_option(json_str) name = config.pop("name", None) @@ -351,11 +355,11 @@ def sources_create(ctx: click.Context, json_str: str, describe: bool) -> None: @sources.command("delete") -@click.option("--source-id", required=True, help="The source ID to delete.") +@click.option("--source-id", default=None, help="The source ID to delete.") @click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def sources_delete(ctx: click.Context, source_id: str, force: bool, describe: bool) -> None: # noqa: FBT001 +def sources_delete(ctx: click.Context, source_id: str | None, force: bool, describe: bool) -> None: # noqa: FBT001 """Delete a source.""" if describe: _describe_output( @@ -368,6 +372,8 @@ def sources_delete(ctx: click.Context, source_id: str, force: bool, describe: bo "force": "Skip delete safety checks (default: false).", }, ) + if not source_id: + _error_json("--source-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_source( source_id=source_id, @@ -415,10 +421,10 @@ def destinations_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT0 @destinations.command("get") -@click.option("--destination-id", required=True, help="The destination ID to retrieve.") +@click.option("--destination-id", default=None, help="The destination ID to retrieve.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def destinations_get(ctx: click.Context, destination_id: str, describe: bool) -> None: # noqa: FBT001 +def destinations_get(ctx: click.Context, destination_id: str | None, describe: bool) -> None: # noqa: FBT001 """Get details of a specific destination.""" if describe: _describe_output( @@ -427,6 +433,8 @@ def destinations_get(ctx: click.Context, destination_id: str, describe: bool) -> "destination_id": "The destination ID to retrieve.", }, ) + if not destination_id: + _error_json("--destination-id is required.", type="MissingParameter") api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_destination( destination_id=destination_id, @@ -442,12 +450,12 @@ def destinations_get(ctx: click.Context, destination_id: str, describe: bool) -> @click.option( "--json", "json_str", - required=True, + default=None, help='JSON config: {"name": "...", "destinationType": "...", ...}', ) @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def destinations_create(ctx: click.Context, json_str: str, describe: bool) -> None: # noqa: FBT001 +def destinations_create(ctx: click.Context, json_str: str | None, describe: bool) -> None: # noqa: FBT001 """Create a new destination in the workspace.""" if describe: _describe_output( @@ -461,6 +469,8 @@ def destinations_create(ctx: click.Context, json_str: str, describe: bool) -> No "...": "Additional connector-specific configuration fields.", }, ) + if not json_str: + _error_json("--json is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) config = _parse_json_option(json_str) name = config.pop("name", None) @@ -479,13 +489,13 @@ def destinations_create(ctx: click.Context, json_str: str, describe: bool) -> No @destinations.command("delete") -@click.option("--destination-id", required=True, help="The destination ID to delete.") +@click.option("--destination-id", default=None, help="The destination ID to delete.") @click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context def destinations_delete( ctx: click.Context, - destination_id: str, + destination_id: str | None, force: bool, # noqa: FBT001 describe: bool, # noqa: FBT001 ) -> None: @@ -501,6 +511,8 @@ def destinations_delete( "force": "Skip delete safety checks (default: false).", }, ) + if not destination_id: + _error_json("--destination-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_destination( destination_id=destination_id, @@ -548,10 +560,10 @@ def connections_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT00 @connections.command("get") -@click.option("--connection-id", required=True, help="The connection ID to retrieve.") +@click.option("--connection-id", default=None, help="The connection ID to retrieve.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def connections_get(ctx: click.Context, connection_id: str, describe: bool) -> None: # noqa: FBT001 +def connections_get(ctx: click.Context, connection_id: str | None, describe: bool) -> None: # noqa: FBT001 """Get details of a specific connection.""" if describe: _describe_output( @@ -561,6 +573,8 @@ def connections_get(ctx: click.Context, connection_id: str, describe: bool) -> N "connection_id": "The connection ID to retrieve.", }, ) + if not connection_id: + _error_json("--connection-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) result = api_util.get_connection( workspace_id=workspace_id, @@ -577,12 +591,12 @@ def connections_get(ctx: click.Context, connection_id: str, describe: bool) -> N @click.option( "--json", "json_str", - required=True, + default=None, help='JSON config: {"name": "...", "source_id": "...", "destination_id": "...", ...}', ) @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def connections_create(ctx: click.Context, json_str: str, describe: bool) -> None: # noqa: FBT001 +def connections_create(ctx: click.Context, json_str: str | None, describe: bool) -> None: # noqa: FBT001 """Create a new connection.""" if describe: _describe_output( @@ -598,6 +612,8 @@ def connections_create(ctx: click.Context, json_str: str, describe: bool) -> Non "prefix": "Optional table prefix for destination.", }, ) + if not json_str: + _error_json("--json is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) config = _parse_json_option(json_str) name = config.get("name") @@ -623,11 +639,16 @@ def connections_create(ctx: click.Context, json_str: str, describe: bool) -> Non @connections.command("delete") -@click.option("--connection-id", required=True, help="The connection ID to delete.") +@click.option("--connection-id", default=None, help="The connection ID to delete.") @click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def connections_delete(ctx: click.Context, connection_id: str, force: bool, describe: bool) -> None: # noqa: FBT001 +def connections_delete( + ctx: click.Context, + connection_id: str | None, + force: bool, # noqa: FBT001 + describe: bool, # noqa: FBT001 +) -> None: """Delete a connection.""" if describe: _describe_output( @@ -640,6 +661,8 @@ def connections_delete(ctx: click.Context, connection_id: str, force: bool, desc "force": "Skip delete safety checks (default: false).", }, ) + if not connection_id: + _error_json("--connection-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_connection( connection_id, @@ -654,10 +677,10 @@ def connections_delete(ctx: click.Context, connection_id: str, force: bool, desc @connections.command("sync") -@click.option("--connection-id", required=True, help="The connection ID to sync.") +@click.option("--connection-id", default=None, help="The connection ID to sync.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def connections_sync(ctx: click.Context, connection_id: str, describe: bool) -> None: # noqa: FBT001 +def connections_sync(ctx: click.Context, connection_id: str | None, describe: bool) -> None: # noqa: FBT001 """Trigger a sync for a connection.""" if describe: _describe_output( @@ -667,6 +690,8 @@ def connections_sync(ctx: click.Context, connection_id: str, describe: bool) -> "connection_id": "The connection ID to sync.", }, ) + if not connection_id: + _error_json("--connection-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) result = api_util.run_connection( workspace_id, @@ -692,11 +717,11 @@ def jobs(ctx: click.Context) -> None: @jobs.command("list") -@click.option("--connection-id", required=True, help="The connection ID to list jobs for.") +@click.option("--connection-id", default=None, help="The connection ID to list jobs for.") @click.option("--limit", default=20, help="Maximum number of jobs to return.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def jobs_list(ctx: click.Context, connection_id: str, limit: int, describe: bool) -> None: # noqa: FBT001 +def jobs_list(ctx: click.Context, connection_id: str | None, limit: int, describe: bool) -> None: # noqa: FBT001 """List recent jobs for a connection.""" if describe: _describe_output( @@ -709,6 +734,8 @@ def jobs_list(ctx: click.Context, connection_id: str, limit: int, describe: bool "limit": "Maximum number of jobs to return (default: 20).", }, ) + if not connection_id: + _error_json("--connection-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) results = api_util.get_job_logs( workspace_id, @@ -723,10 +750,10 @@ def jobs_list(ctx: click.Context, connection_id: str, limit: int, describe: bool @jobs.command("get") -@click.option("--job-id", required=True, type=int, help="The job ID to retrieve.") +@click.option("--job-id", default=None, type=int, help="The job ID to retrieve.") @click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def jobs_get(ctx: click.Context, job_id: int, describe: bool) -> None: # noqa: FBT001 +def jobs_get(ctx: click.Context, job_id: int | None, describe: bool) -> None: # noqa: FBT001 """Get details of a specific job.""" if describe: _describe_output( @@ -735,6 +762,8 @@ def jobs_get(ctx: click.Context, job_id: int, describe: bool) -> None: # noqa: "job_id": "The job ID to retrieve.", }, ) + if job_id is None: + _error_json("--job-id is required.", type="MissingParameter") api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_job_info( job_id=job_id, From afd3645e643c9433c0468a5bf9ea76a37a76fb82 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 3 Apr 2026 00:37:58 +0000 Subject: [PATCH 08/11] refactor(cli): replace --describe with --help --format=json - Add _register_schema() and _emit_json_help() helpers - Create _JsonHelpGroup and _JsonHelpCommand Click classes - Add --format option to root cli group (text|json) - Remove all --describe flags from all commands - All commands now support --format json --help for JSON schemas - Root-level --format json --help works via sys.argv fallback --- airbyte/cli/cloud_cli.py | 498 +++++++++++++++++++++++---------------- 1 file changed, 289 insertions(+), 209 deletions(-) diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index 3a59183ab..da03d17a8 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -64,21 +64,51 @@ def _parse_json_option(raw: str | None) -> dict[str, Any]: return parsed -def _describe_output( +# --------------------------------------------------------------------------- +# JSON help infrastructure +# --------------------------------------------------------------------------- + +# Registry mapping command function names to their JSON help metadata. +_COMMAND_SCHEMAS: dict[str, dict[str, Any]] = {} + + +def _register_schema( + func_name: str, description: str, required_params: dict[str, str] | None = None, optional_params: dict[str, str] | None = None, ) -> None: - """Print a `--describe` schema and exit.""" - schema: dict[str, Any] = { - "description": description, - } + """Register JSON-help metadata for a command.""" + schema: dict[str, Any] = {"description": description} if required_params: schema["required_params"] = required_params if optional_params: schema["optional_params"] = optional_params - _json_output(schema) - sys.exit(0) + _COMMAND_SCHEMAS[func_name] = schema + + +def _emit_json_help(ctx: click.Context) -> None: + """If `--format json` is active, print JSON help and exit (used by --help).""" + if not _is_json_format(ctx): + return # fall through to normal Click help + + # Walk context chain to build the full command name. + cmd_name = ctx.info_name or "" + parent = ctx.parent + while parent and parent.info_name: + cmd_name = f"{parent.info_name}_{cmd_name}" + parent = parent.parent + + # Try the leaf command, then the subcommand function name. + func = ctx.command.callback + func_name = func.__name__ if func else cmd_name + schema = _COMMAND_SCHEMAS.get(func_name) + if schema: + _json_output(schema) + else: + # Fallback: emit a minimal JSON help with the docstring. + _json_output({"description": ctx.command.help or cmd_name}) + ctx.exit(0) def _get_auth_context(ctx: click.Context) -> tuple[str, SecretString, SecretString, str]: @@ -155,13 +185,63 @@ def _job_to_dict(job: models.JobResponse) -> dict[str, object]: # --------------------------------------------------------------------------- -@click.group() +def _is_json_format(ctx: click.Context) -> bool: + """Check if ``--format json`` was requested. + + Click's ``--help`` is an eager option that fires before the group + callback, so ``ctx.params`` may be empty at the root level. We + fall back to inspecting ``sys.argv`` when the param is missing. + """ + fmt = ctx.find_root().params.get("output_format") + if fmt: + return fmt == "json" + # Fallback: scan raw argv for ``--format json`` or ``--format=json``. + for i, arg in enumerate(sys.argv): + if arg == "--format" and i + 1 < len(sys.argv) and sys.argv[i + 1].lower() == "json": + return True + if arg.lower() == "--format=json": + return True + return False + + +class _JsonHelpGroup(click.Group): + """Click group that emits JSON help when ``--format json --help`` is used.""" + + def get_help(self, ctx: click.Context) -> str: + if _is_json_format(ctx): + # List subcommands as JSON. + commands: dict[str, str] = {} + for name in self.list_commands(ctx): + cmd = self.get_command(ctx, name) + if cmd: + commands[name] = cmd.get_short_help_str(limit=300) + _json_output({"description": self.help or "", "commands": commands}) + ctx.exit(0) + return super().get_help(ctx) + + +class _JsonHelpCommand(click.Command): + """Click command that emits JSON help when ``--format json --help`` is used.""" + + def get_help(self, ctx: click.Context) -> str: + _emit_json_help(ctx) + return super().get_help(ctx) + + +@click.group(cls=_JsonHelpGroup) @click.option("--client-id", envvar="AIRBYTE_CLIENT_ID", default=None, help="Airbyte client ID.") @click.option( "--client-secret", envvar="AIRBYTE_CLIENT_SECRET", default=None, help="Airbyte client secret." ) @click.option("--workspace-id", default=None, help="Airbyte workspace ID.") @click.option("--api-url", default=None, help="Airbyte API URL override.") +@click.option( + "--format", + "output_format", + type=click.Choice(["text", "json"], case_sensitive=False), + default="text", + help="Output format for --help (default: text).", +) @click.pass_context def cli( ctx: click.Context, @@ -169,6 +249,7 @@ def cli( client_secret: str | None, workspace_id: str | None, api_url: str | None, + output_format: str, # noqa: ARG001 ) -> None: """Airbyte CLI — agent-first interface for Airbyte Cloud. @@ -178,10 +259,12 @@ def cli( Authentication is resolved from (in order): CLI flags, env vars (`AIRBYTE_CLIENT_ID` / `AIRBYTE_CLIENT_SECRET`), or `~/.airbyte/credentials` file. + + Use `--format json --help` on any command for machine-readable parameter + schemas (no auth required). """ ctx.ensure_object(dict) # Store raw values — credentials are resolved lazily when subcommands need them. - # This allows `--describe` to work without any auth configured. ctx.obj["_raw_client_id"] = client_id ctx.obj["_raw_client_secret"] = client_secret ctx.obj["_raw_api_url"] = api_url @@ -193,23 +276,24 @@ def cli( # --------------------------------------------------------------------------- -@cli.group() +@cli.group(cls=_JsonHelpGroup) @click.pass_context def workspaces(ctx: click.Context) -> None: """Manage Airbyte workspaces.""" pass -@workspaces.command("list") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +_register_schema( + "workspaces_list", + description="List all workspaces accessible with the current credentials.", + optional_params={"workspace_id": "Filter to a specific workspace ID."}, +) + + +@workspaces.command("list", cls=_JsonHelpCommand) @click.pass_context -def workspaces_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 +def workspaces_list(ctx: click.Context) -> None: """List workspaces accessible with the current credentials.""" - if describe: - _describe_output( - description="List all workspaces accessible with the current credentials.", - optional_params={"workspace_id": "Filter to a specific workspace ID."}, - ) api_url, client_id, client_secret = _get_auth_no_workspace(ctx) raw_ws: str | None = ctx.obj["_raw_workspace_id"] workspace_id: str | None = resolve_workspace_id(raw_ws) if raw_ws else None @@ -230,17 +314,18 @@ def workspaces_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 _json_output([_workspace_to_dict(w) for w in results]) -@workspaces.command("get") +_register_schema( + "workspaces_get", + description="Get details of a specific workspace.", + required_params={"workspace_id": "The workspace ID to retrieve."}, +) + + +@workspaces.command("get", cls=_JsonHelpCommand) @click.option("--workspace-id", "cmd_workspace_id", default=None, help="Workspace ID to retrieve.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def workspaces_get(ctx: click.Context, cmd_workspace_id: str | None, describe: bool) -> None: # noqa: FBT001 +def workspaces_get(ctx: click.Context, cmd_workspace_id: str | None) -> None: """Get details of a specific workspace.""" - if describe: - _describe_output( - description="Get details of a specific workspace.", - required_params={"workspace_id": "The workspace ID to retrieve."}, - ) api_url, client_id, client_secret = _get_auth_no_workspace(ctx) workspace_id = resolve_workspace_id(cmd_workspace_id or ctx.obj["_raw_workspace_id"]) result = api_util.get_workspace( @@ -258,23 +343,24 @@ def workspaces_get(ctx: click.Context, cmd_workspace_id: str | None, describe: b # --------------------------------------------------------------------------- -@cli.group() +@cli.group(cls=_JsonHelpGroup) @click.pass_context def sources(ctx: click.Context) -> None: """Manage Airbyte sources.""" pass -@sources.command("list") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +_register_schema( + "sources_list", + description="List all sources in the workspace.", + required_params={"workspace_id": "The workspace ID."}, +) + + +@sources.command("list", cls=_JsonHelpCommand) @click.pass_context -def sources_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 +def sources_list(ctx: click.Context) -> None: """List sources in the workspace.""" - if describe: - _describe_output( - description="List all sources in the workspace.", - required_params={"workspace_id": "The workspace ID."}, - ) api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) results = api_util.list_sources( workspace_id=workspace_id, @@ -286,19 +372,18 @@ def sources_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 _json_output([_source_to_dict(s) for s in results]) -@sources.command("get") +_register_schema( + "sources_get", + description="Get details of a specific source.", + required_params={"source_id": "The source ID to retrieve."}, +) + + +@sources.command("get", cls=_JsonHelpCommand) @click.option("--source-id", default=None, help="The source ID to retrieve.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def sources_get(ctx: click.Context, source_id: str | None, describe: bool) -> None: # noqa: FBT001 +def sources_get(ctx: click.Context, source_id: str | None) -> None: """Get details of a specific source.""" - if describe: - _describe_output( - description="Get details of a specific source.", - required_params={ - "source_id": "The source ID to retrieve.", - }, - ) if not source_id: _error_json("--source-id is required.", type="MissingParameter") api_url, client_id, client_secret = _get_auth_no_workspace(ctx) @@ -312,29 +397,28 @@ def sources_get(ctx: click.Context, source_id: str | None, describe: bool) -> No _json_output(_source_to_dict(result)) -@sources.command("create") +_register_schema( + "sources_create", + description="Create a new source in the workspace.", + required_params={ + "workspace_id": "The workspace ID.", + "name": "Display name for the source.", + "sourceType": "The source connector type (e.g. 'postgres').", + }, + optional_params={"...": "Additional connector-specific configuration fields."}, +) + + +@sources.command("create", cls=_JsonHelpCommand) @click.option( "--json", "json_str", default=None, help='JSON config: {"name": "...", "sourceType": "...", ...}', ) -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def sources_create(ctx: click.Context, json_str: str | None, describe: bool) -> None: # noqa: FBT001 +def sources_create(ctx: click.Context, json_str: str | None) -> None: """Create a new source in the workspace.""" - if describe: - _describe_output( - description="Create a new source in the workspace.", - required_params={ - "workspace_id": "The workspace ID.", - "name": "Display name for the source.", - "sourceType": "The source connector type (e.g. 'postgres').", - }, - optional_params={ - "...": "Additional connector-specific configuration fields.", - }, - ) if not json_str: _error_json("--json is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -354,24 +438,23 @@ def sources_create(ctx: click.Context, json_str: str | None, describe: bool) -> _json_output(_source_to_dict(result)) -@sources.command("delete") +_register_schema( + "sources_delete", + description="Delete a source by ID.", + required_params={ + "workspace_id": "The workspace ID.", + "source_id": "The source ID to delete.", + }, + optional_params={"force": "Skip delete safety checks (default: false)."}, +) + + +@sources.command("delete", cls=_JsonHelpCommand) @click.option("--source-id", default=None, help="The source ID to delete.") @click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def sources_delete(ctx: click.Context, source_id: str | None, force: bool, describe: bool) -> None: # noqa: FBT001 +def sources_delete(ctx: click.Context, source_id: str | None, force: bool) -> None: # noqa: FBT001 """Delete a source.""" - if describe: - _describe_output( - description="Delete a source by ID.", - required_params={ - "workspace_id": "The workspace ID.", - "source_id": "The source ID to delete.", - }, - optional_params={ - "force": "Skip delete safety checks (default: false).", - }, - ) if not source_id: _error_json("--source-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -392,23 +475,24 @@ def sources_delete(ctx: click.Context, source_id: str | None, force: bool, descr # --------------------------------------------------------------------------- -@cli.group() +@cli.group(cls=_JsonHelpGroup) @click.pass_context def destinations(ctx: click.Context) -> None: """Manage Airbyte destinations.""" pass -@destinations.command("list") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +_register_schema( + "destinations_list", + description="List all destinations in the workspace.", + required_params={"workspace_id": "The workspace ID."}, +) + + +@destinations.command("list", cls=_JsonHelpCommand) @click.pass_context -def destinations_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 +def destinations_list(ctx: click.Context) -> None: """List destinations in the workspace.""" - if describe: - _describe_output( - description="List all destinations in the workspace.", - required_params={"workspace_id": "The workspace ID."}, - ) api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) results = api_util.list_destinations( workspace_id=workspace_id, @@ -420,19 +504,18 @@ def destinations_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT0 _json_output([_destination_to_dict(d) for d in results]) -@destinations.command("get") +_register_schema( + "destinations_get", + description="Get details of a specific destination.", + required_params={"destination_id": "The destination ID to retrieve."}, +) + + +@destinations.command("get", cls=_JsonHelpCommand) @click.option("--destination-id", default=None, help="The destination ID to retrieve.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def destinations_get(ctx: click.Context, destination_id: str | None, describe: bool) -> None: # noqa: FBT001 +def destinations_get(ctx: click.Context, destination_id: str | None) -> None: """Get details of a specific destination.""" - if describe: - _describe_output( - description="Get details of a specific destination.", - required_params={ - "destination_id": "The destination ID to retrieve.", - }, - ) if not destination_id: _error_json("--destination-id is required.", type="MissingParameter") api_url, client_id, client_secret = _get_auth_no_workspace(ctx) @@ -446,29 +529,28 @@ def destinations_get(ctx: click.Context, destination_id: str | None, describe: b _json_output(_destination_to_dict(result)) -@destinations.command("create") +_register_schema( + "destinations_create", + description="Create a new destination in the workspace.", + required_params={ + "workspace_id": "The workspace ID.", + "name": "Display name for the destination.", + "destinationType": "The destination connector type (e.g. 'bigquery').", + }, + optional_params={"...": "Additional connector-specific configuration fields."}, +) + + +@destinations.command("create", cls=_JsonHelpCommand) @click.option( "--json", "json_str", default=None, help='JSON config: {"name": "...", "destinationType": "...", ...}', ) -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def destinations_create(ctx: click.Context, json_str: str | None, describe: bool) -> None: # noqa: FBT001 +def destinations_create(ctx: click.Context, json_str: str | None) -> None: """Create a new destination in the workspace.""" - if describe: - _describe_output( - description="Create a new destination in the workspace.", - required_params={ - "workspace_id": "The workspace ID.", - "name": "Display name for the destination.", - "destinationType": "The destination connector type (e.g. 'bigquery').", - }, - optional_params={ - "...": "Additional connector-specific configuration fields.", - }, - ) if not json_str: _error_json("--json is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -488,29 +570,27 @@ def destinations_create(ctx: click.Context, json_str: str | None, describe: bool _json_output(_destination_to_dict(result)) -@destinations.command("delete") +_register_schema( + "destinations_delete", + description="Delete a destination by ID.", + required_params={ + "workspace_id": "The workspace ID.", + "destination_id": "The destination ID to delete.", + }, + optional_params={"force": "Skip delete safety checks (default: false)."}, +) + + +@destinations.command("delete", cls=_JsonHelpCommand) @click.option("--destination-id", default=None, help="The destination ID to delete.") @click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context def destinations_delete( ctx: click.Context, destination_id: str | None, force: bool, # noqa: FBT001 - describe: bool, # noqa: FBT001 ) -> None: """Delete a destination.""" - if describe: - _describe_output( - description="Delete a destination by ID.", - required_params={ - "workspace_id": "The workspace ID.", - "destination_id": "The destination ID to delete.", - }, - optional_params={ - "force": "Skip delete safety checks (default: false).", - }, - ) if not destination_id: _error_json("--destination-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -531,23 +611,24 @@ def destinations_delete( # --------------------------------------------------------------------------- -@cli.group() +@cli.group(cls=_JsonHelpGroup) @click.pass_context def connections(ctx: click.Context) -> None: """Manage Airbyte connections.""" pass -@connections.command("list") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") +_register_schema( + "connections_list", + description="List all connections in the workspace.", + required_params={"workspace_id": "The workspace ID."}, +) + + +@connections.command("list", cls=_JsonHelpCommand) @click.pass_context -def connections_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT001 +def connections_list(ctx: click.Context) -> None: """List connections in the workspace.""" - if describe: - _describe_output( - description="List all connections in the workspace.", - required_params={"workspace_id": "The workspace ID."}, - ) api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) results = api_util.list_connections( workspace_id=workspace_id, @@ -559,20 +640,21 @@ def connections_list(ctx: click.Context, describe: bool) -> None: # noqa: FBT00 _json_output([_connection_to_dict(c) for c in results]) -@connections.command("get") +_register_schema( + "connections_get", + description="Get details of a specific connection.", + required_params={ + "workspace_id": "The workspace ID.", + "connection_id": "The connection ID to retrieve.", + }, +) + + +@connections.command("get", cls=_JsonHelpCommand) @click.option("--connection-id", default=None, help="The connection ID to retrieve.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def connections_get(ctx: click.Context, connection_id: str | None, describe: bool) -> None: # noqa: FBT001 +def connections_get(ctx: click.Context, connection_id: str | None) -> None: """Get details of a specific connection.""" - if describe: - _describe_output( - description="Get details of a specific connection.", - required_params={ - "workspace_id": "The workspace ID.", - "connection_id": "The connection ID to retrieve.", - }, - ) if not connection_id: _error_json("--connection-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -587,31 +669,32 @@ def connections_get(ctx: click.Context, connection_id: str | None, describe: boo _json_output(_connection_to_dict(result)) -@connections.command("create") +_register_schema( + "connections_create", + description="Create a new connection between a source and destination.", + required_params={ + "workspace_id": "The workspace ID.", + "name": "Display name for the connection.", + "source_id": "The source ID.", + "destination_id": "The destination ID.", + }, + optional_params={ + "selected_stream_names": "List of stream names to sync.", + "prefix": "Optional table prefix for destination.", + }, +) + + +@connections.command("create", cls=_JsonHelpCommand) @click.option( "--json", "json_str", default=None, help='JSON config: {"name": "...", "source_id": "...", "destination_id": "...", ...}', ) -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def connections_create(ctx: click.Context, json_str: str | None, describe: bool) -> None: # noqa: FBT001 +def connections_create(ctx: click.Context, json_str: str | None) -> None: """Create a new connection.""" - if describe: - _describe_output( - description="Create a new connection between a source and destination.", - required_params={ - "workspace_id": "The workspace ID.", - "name": "Display name for the connection.", - "source_id": "The source ID.", - "destination_id": "The destination ID.", - }, - optional_params={ - "selected_stream_names": "List of stream names to sync.", - "prefix": "Optional table prefix for destination.", - }, - ) if not json_str: _error_json("--json is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -638,29 +721,27 @@ def connections_create(ctx: click.Context, json_str: str | None, describe: bool) _json_output(_connection_to_dict(result)) -@connections.command("delete") +_register_schema( + "connections_delete", + description="Delete a connection by ID.", + required_params={ + "workspace_id": "The workspace ID.", + "connection_id": "The connection ID to delete.", + }, + optional_params={"force": "Skip delete safety checks (default: false)."}, +) + + +@connections.command("delete", cls=_JsonHelpCommand) @click.option("--connection-id", default=None, help="The connection ID to delete.") @click.option("--force", is_flag=True, default=False, help="Skip delete safety checks.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context def connections_delete( ctx: click.Context, connection_id: str | None, force: bool, # noqa: FBT001 - describe: bool, # noqa: FBT001 ) -> None: """Delete a connection.""" - if describe: - _describe_output( - description="Delete a connection by ID.", - required_params={ - "workspace_id": "The workspace ID.", - "connection_id": "The connection ID to delete.", - }, - optional_params={ - "force": "Skip delete safety checks (default: false).", - }, - ) if not connection_id: _error_json("--connection-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -676,20 +757,21 @@ def connections_delete( _json_output({"deleted": True, "connection_id": connection_id}) -@connections.command("sync") +_register_schema( + "connections_sync", + description="Trigger a sync job for a connection.", + required_params={ + "workspace_id": "The workspace ID.", + "connection_id": "The connection ID to sync.", + }, +) + + +@connections.command("sync", cls=_JsonHelpCommand) @click.option("--connection-id", default=None, help="The connection ID to sync.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def connections_sync(ctx: click.Context, connection_id: str | None, describe: bool) -> None: # noqa: FBT001 +def connections_sync(ctx: click.Context, connection_id: str | None) -> None: """Trigger a sync for a connection.""" - if describe: - _describe_output( - description="Trigger a sync job for a connection.", - required_params={ - "workspace_id": "The workspace ID.", - "connection_id": "The connection ID to sync.", - }, - ) if not connection_id: _error_json("--connection-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -709,31 +791,30 @@ def connections_sync(ctx: click.Context, connection_id: str | None, describe: bo # --------------------------------------------------------------------------- -@cli.group() +@cli.group(cls=_JsonHelpGroup) @click.pass_context def jobs(ctx: click.Context) -> None: """View Airbyte sync jobs.""" pass -@jobs.command("list") +_register_schema( + "jobs_list", + description="List recent sync jobs for a connection.", + required_params={ + "workspace_id": "The workspace ID.", + "connection_id": "The connection ID.", + }, + optional_params={"limit": "Maximum number of jobs to return (default: 20)."}, +) + + +@jobs.command("list", cls=_JsonHelpCommand) @click.option("--connection-id", default=None, help="The connection ID to list jobs for.") @click.option("--limit", default=20, help="Maximum number of jobs to return.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def jobs_list(ctx: click.Context, connection_id: str | None, limit: int, describe: bool) -> None: # noqa: FBT001 +def jobs_list(ctx: click.Context, connection_id: str | None, limit: int) -> None: """List recent jobs for a connection.""" - if describe: - _describe_output( - description="List recent sync jobs for a connection.", - required_params={ - "workspace_id": "The workspace ID.", - "connection_id": "The connection ID.", - }, - optional_params={ - "limit": "Maximum number of jobs to return (default: 20).", - }, - ) if not connection_id: _error_json("--connection-id is required.", type="MissingParameter") api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) @@ -749,19 +830,18 @@ def jobs_list(ctx: click.Context, connection_id: str | None, limit: int, describ _json_output([_job_to_dict(j) for j in results]) -@jobs.command("get") +_register_schema( + "jobs_get", + description="Get details of a specific job by ID.", + required_params={"job_id": "The job ID to retrieve."}, +) + + +@jobs.command("get", cls=_JsonHelpCommand) @click.option("--job-id", default=None, type=int, help="The job ID to retrieve.") -@click.option("--describe", is_flag=True, help="Print operation schema and exit.") @click.pass_context -def jobs_get(ctx: click.Context, job_id: int | None, describe: bool) -> None: # noqa: FBT001 +def jobs_get(ctx: click.Context, job_id: int | None) -> None: """Get details of a specific job.""" - if describe: - _describe_output( - description="Get details of a specific job by ID.", - required_params={ - "job_id": "The job ID to retrieve.", - }, - ) if job_id is None: _error_json("--job-id is required.", type="MissingParameter") api_url, client_id, client_secret = _get_auth_no_workspace(ctx) From 2f2ded618f0e1746d82748303d3c89a108103c20 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 3 Apr 2026 00:43:29 +0000 Subject: [PATCH 09/11] fix(cli): NoReturn annotation, workspace_id fallback, catch-all handler - Change _error_json return type to NoReturn for type narrowing - Always call resolve_workspace_id() in workspaces_list for env/creds fallback - Add catch-all except Exception handler in main() for JSON error contract --- airbyte/cli/cloud_cli.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index da03d17a8..fa031504f 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -17,7 +17,7 @@ import json import sys -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, NoReturn import click @@ -47,7 +47,7 @@ def _json_output(data: list[dict[str, object]] | dict[str, object]) -> None: click.echo(json.dumps(data, indent=2, default=str)) -def _error_json(message: str, **extra: object) -> None: +def _error_json(message: str, **extra: object) -> NoReturn: """Print an error object to stderr and exit.""" payload: dict[str, object] = {"error": message, **extra} click.echo(json.dumps(payload, indent=2, default=str), err=True) @@ -294,16 +294,7 @@ def workspaces(ctx: click.Context) -> None: @click.pass_context def workspaces_list(ctx: click.Context) -> None: """List workspaces accessible with the current credentials.""" - api_url, client_id, client_secret = _get_auth_no_workspace(ctx) - raw_ws: str | None = ctx.obj["_raw_workspace_id"] - workspace_id: str | None = resolve_workspace_id(raw_ws) if raw_ws else None - if workspace_id is None: - _error_json( - "workspace_id is required. Provide --workspace-id, set AIRBYTE_WORKSPACE_ID, " - "or add workspace_id to ~/.airbyte/credentials.", - type="MissingWorkspaceId", - ) - return # unreachable; _error_json calls sys.exit(1) + api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) results = api_util.list_workspaces( workspace_id=workspace_id, api_root=api_url, @@ -879,6 +870,8 @@ def main() -> None: _error_json(str(exc), type="JSONDecodeError") except PyAirbyteInputError as exc: _error_json(str(exc), type="PyAirbyteInputError") + except Exception as exc: + _error_json(str(exc), type=exc.__class__.__name__) if __name__ == "__main__": From 5f09e588631bd0ebb0af9b2652f7300fe02f7ad1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 3 Apr 2026 00:45:07 +0000 Subject: [PATCH 10/11] fix(cli): remove broad except Exception per coding standards Broad exception catching violates team coding standards. Only catch specific exceptions that can be handled meaningfully. --- airbyte/cli/cloud_cli.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index fa031504f..d478db9e3 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -870,8 +870,6 @@ def main() -> None: _error_json(str(exc), type="JSONDecodeError") except PyAirbyteInputError as exc: _error_json(str(exc), type="PyAirbyteInputError") - except Exception as exc: - _error_json(str(exc), type=exc.__class__.__name__) if __name__ == "__main__": From f090fa80baf1329e0f6c101036c428bb6d5a1e06 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 3 Apr 2026 00:47:48 +0000 Subject: [PATCH 11/11] fix(cli): add return after _error_json for pyrefly type narrowing Pyrefly does not narrow Optional types after NoReturn calls without an explicit return statement in the guard clause. --- airbyte/cli/cloud_cli.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airbyte/cli/cloud_cli.py b/airbyte/cli/cloud_cli.py index d478db9e3..4ddac3b71 100644 --- a/airbyte/cli/cloud_cli.py +++ b/airbyte/cli/cloud_cli.py @@ -377,6 +377,7 @@ def sources_get(ctx: click.Context, source_id: str | None) -> None: """Get details of a specific source.""" if not source_id: _error_json("--source-id is required.", type="MissingParameter") + return api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_source( source_id=source_id, @@ -448,6 +449,7 @@ def sources_delete(ctx: click.Context, source_id: str | None, force: bool) -> No """Delete a source.""" if not source_id: _error_json("--source-id is required.", type="MissingParameter") + return api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_source( source_id=source_id, @@ -509,6 +511,7 @@ def destinations_get(ctx: click.Context, destination_id: str | None) -> None: """Get details of a specific destination.""" if not destination_id: _error_json("--destination-id is required.", type="MissingParameter") + return api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_destination( destination_id=destination_id, @@ -584,6 +587,7 @@ def destinations_delete( """Delete a destination.""" if not destination_id: _error_json("--destination-id is required.", type="MissingParameter") + return api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_destination( destination_id=destination_id, @@ -648,6 +652,7 @@ def connections_get(ctx: click.Context, connection_id: str | None) -> None: """Get details of a specific connection.""" if not connection_id: _error_json("--connection-id is required.", type="MissingParameter") + return api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) result = api_util.get_connection( workspace_id=workspace_id, @@ -735,6 +740,7 @@ def connections_delete( """Delete a connection.""" if not connection_id: _error_json("--connection-id is required.", type="MissingParameter") + return api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) api_util.delete_connection( connection_id, @@ -765,6 +771,7 @@ def connections_sync(ctx: click.Context, connection_id: str | None) -> None: """Trigger a sync for a connection.""" if not connection_id: _error_json("--connection-id is required.", type="MissingParameter") + return api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) result = api_util.run_connection( workspace_id, @@ -808,6 +815,7 @@ def jobs_list(ctx: click.Context, connection_id: str | None, limit: int) -> None """List recent jobs for a connection.""" if not connection_id: _error_json("--connection-id is required.", type="MissingParameter") + return api_url, client_id, client_secret, workspace_id = _get_auth_context(ctx) results = api_util.get_job_logs( workspace_id, @@ -835,6 +843,7 @@ def jobs_get(ctx: click.Context, job_id: int | None) -> None: """Get details of a specific job.""" if job_id is None: _error_json("--job-id is required.", type="MissingParameter") + return api_url, client_id, client_secret = _get_auth_no_workspace(ctx) result = api_util.get_job_info( job_id=job_id,