Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions list_pending_activities/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# List Pending Activities

A command-line tool that queries a Temporal Cloud namespace to find all workflows with pending activities. Supports optional filters and saves results to a local JSON file.

## How it works

1. Builds a [visibility query](https://docs.temporal.io/visibility#list-filter) from the optional filters you provide
2. Calls `client.list_workflows()` to retrieve matching workflows
3. Calls `handle.describe()` on each workflow to check for pending activities
4. Prints results to the console and saves them to `output/pending_activities_<timestamp>.json`

Both parent and child workflows are found — child workflows are independent executions in the visibility store and are queried the same way.

## Authentication

The script supports two auth modes. If `TEMPORAL_API_KEY` is set, it uses API key auth via the regional endpoint. Otherwise it falls back to mTLS via the namespace endpoint.

**API key:**
```bash
export TEMPORAL_API_KEY="your-api-key"
python find_pending.py
```

**mTLS (default):**
```bash
python find_pending.py
```

Requires `client.pem` and `client.key` in the certs directory.

### Environment variables

| Variable | Default | Description |
|---|---|---|
| `TEMPORAL_API_KEY` | (not set) | API key for auth. If set, uses the regional API endpoint. |
| `TEMPORAL_NAMESPACE` | `deepika-test-namespace.a2dd6` | Namespace to query. |
| `TEMPORAL_ADDRESS` | Regional or namespace endpoint | Overrides the target host for either auth mode. |
| `TEMPORAL_CERTS_DIR` | `/Users/deepikaawasthi/temporal/temporal-certs` | Directory containing `client.pem` and `client.key` for mTLS. |

## Usage

All flags are optional — use any combination to narrow the search.

```bash
# No filters — scans all workflows in the namespace
python find_pending.py

# Filter by task queue
python find_pending.py --task-queue my-queue

# Filter by workflow type
python find_pending.py --workflow-type MyWorkflow

# Filter by execution status
python find_pending.py --status Running

# Filter by start time range
python find_pending.py --start-time-after "2026-03-01T00:00:00Z" --start-time-before "2026-03-25T00:00:00Z"

# Filter by close time range
python find_pending.py --close-time-after "2026-03-20T00:00:00Z" --close-time-before "2026-03-25T00:00:00Z"

# Combine any filters
python find_pending.py --task-queue my-queue --workflow-type MyWorkflow --status Running --start-time-after "2026-03-20T00:00:00Z"
```

### Available filters

| Flag | Visibility Query | Description |
|---|---|---|
| `--task-queue` | `TaskQueue="..."` | Filter by task queue name |
| `--workflow-type` | `WorkflowType="..."` | Filter by workflow type name |
| `--status` | `ExecutionStatus="..."` | Filter by status: `Running`, `Completed`, `Failed`, `Canceled`, `Terminated`, `ContinuedAsNew`, `TimedOut` |
| `--start-time-after` | `StartTime>="..."` | Workflows started at or after this time |
| `--start-time-before` | `StartTime<="..."` | Workflows started at or before this time |
| `--close-time-after` | `CloseTime>="..."` | Workflows closed at or after this time |
| `--close-time-before` | `CloseTime<="..."` | Workflows closed at or before this time |

All times are in ISO 8601 format (e.g. `2026-03-01T00:00:00Z`).

## Output

Results are printed to the console and saved to `output/pending_activities_<timestamp>.json`:

```json
{
"generated_at": "2026-03-25T10:04:12.832303",
"query_used": "WorkflowType=\"PendingActivitiesWorkflow\" AND ExecutionStatus=\"Running\"",
"total_workflows": 1,
"workflows": [
{
"workflow_id": "hello-pending-activities-workflow",
"run_id": "019d25f3-65f4-7c71-9c86-acfb68faec15",
"pending_activity_count": 3,
"pending_activities": [
{
"activity_id": "1",
"activity_type": "say_hello",
"state": "1",
"attempt": 1
}
]
}
]
}
```

## Notes

- With no filters the script scans **all** workflows in the namespace. Use filters to narrow the scope for large namespaces.
- Only workflows with at least one pending activity appear in the output.
- The `output/` directory is created automatically on first run.
Empty file.
247 changes: 247 additions & 0 deletions list_pending_activities/find_pending.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
"""Find workflows with pending activities and save results locally.

All filters are optional — use any combination to narrow the search.

Authentication:
API key: export TEMPORAL_API_KEY="your-api-key"
mTLS: Falls back to certs if TEMPORAL_API_KEY is not set.

Usage:
python find_pending.py
python find_pending.py --task-queue my-queue
python find_pending.py --workflow-type MyWorkflow --status Running
python find_pending.py --start-time-after "2026-03-01T00:00:00Z" --start-time-before "2026-03-25T00:00:00Z"
python find_pending.py --close-time-after "2026-03-20T00:00:00Z"
"""

import argparse
import asyncio
import json
import os
from datetime import datetime

from temporalio.client import Client
from temporalio.service import TLSConfig

DEFAULT_NAMESPACE = "<ns>.<account-id>" # namespace - <ns>.<account-id>
DEFAULT_API_HOST = "<region>.<cloud-provider>.api.temporal.io:7233" # regional endpoint for your namespace
DEFAULT_MTLS_HOST = "<ns>.<account-id>.tmprl.cloud:7233" # namespace endpoint for your namespace
DEFAULT_CERTS_DIR = "directory path containing client.pem and client.key, keep the name as it is" # certs directory


def resolve_api_key() -> str | None:
"""Read API key from TEMPORAL_API_KEY env var, or return None to fall back to mTLS."""
return os.environ.get("TEMPORAL_API_KEY")


async def create_client(api_key: str | None = None) -> Client:
namespace = os.environ.get("TEMPORAL_NAMESPACE", DEFAULT_NAMESPACE)

if api_key:
target_host = os.environ.get("TEMPORAL_ADDRESS", DEFAULT_API_HOST)
print(f"Authenticating with API key to {target_host}")
return await Client.connect(
target_host,
namespace=namespace,
api_key=api_key,
tls=True,
)

# Fall back to mTLS
target_host = os.environ.get("TEMPORAL_ADDRESS", DEFAULT_MTLS_HOST)
certs_dir = os.environ.get("TEMPORAL_CERTS_DIR", DEFAULT_CERTS_DIR)
print(f"Authenticating with mTLS to {target_host}")

with open(os.path.join(certs_dir, "client.pem"), "rb") as f:
client_cert = f.read()
with open(os.path.join(certs_dir, "client.key"), "rb") as f:
client_key = f.read()

return await Client.connect(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use the TOML file to configure the client, there are several examples in this repo. https://docs.temporal.io/develop/environment-configuration

target_host,
namespace=namespace,
tls=TLSConfig(
client_cert=client_cert,
client_private_key=client_key,
),
)


def build_query(
task_queue: str | None = None,
workflow_type: str | None = None,
status: str | None = None,
start_time_after: str | None = None,
start_time_before: str | None = None,
close_time_after: str | None = None,
close_time_before: str | None = None,
) -> str:
"""Build a visibility query from optional filters."""
clauses = []

if task_queue:
clauses.append(f'TaskQueue="{task_queue}"')
if workflow_type:
clauses.append(f'WorkflowType="{workflow_type}"')
if status:
clauses.append(f'ExecutionStatus="{status}"')
if start_time_after:
clauses.append(f'StartTime>="{start_time_after}"')
if start_time_before:
clauses.append(f'StartTime<="{start_time_before}"')
if close_time_after:
clauses.append(f'CloseTime>="{close_time_after}"')
if close_time_before:
clauses.append(f'CloseTime<="{close_time_before}"')

return " AND ".join(clauses) if clauses else ""


async def find_workflows_with_pending_activities(
client: Client,
query: str,
) -> list[dict]:
"""List workflows matching the query, describe each, return those with pending activities."""

results = []

async for wf in client.list_workflows(query=query or None):
handle = client.get_workflow_handle(wf.id, run_id=wf.run_id)
desc = await handle.describe()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With thousands or millions of workflows, this can cause rate-limit issues. I wonder whether we should add a delay every X iterations or mention it in the README.


pending = desc.raw_description.pending_activities
if not pending:
continue

activities_info = []
for pa in pending:
activities_info.append(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PendingActivityInfo contains other information that I think could be useful for the user, like lastFailure, lastWorkerIdentity... I wonder if we should add everything here

            activities_info.append(MessageToDict(pa))

{
"activity_id": pa.activity_id,
"activity_type": pa.activity_type.name,
"state": str(pa.state),
"attempt": pa.attempt,
}
)

parent_exec = desc.raw_description.parent_execution
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure whether this information is relevant for the user to find workflows with pending activities, but this line throws an error when I test the code.

parent_id = parent_exec.workflow_id if parent_exec else None

results.append(
{
"workflow_id": wf.id,
"run_id": wf.run_id,
"workflow_type": str(getattr(wf, "workflow_type", "")),
"parent_workflow_id": parent_id,
"pending_activity_count": len(pending),
"pending_activities": activities_info,
}
)

return results


def save_results(results: list[dict], query: str) -> str:
"""Save results to a JSON file in the output/ directory. Returns the file path."""
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output")
os.makedirs(output_dir, exist_ok=True)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = os.path.join(output_dir, f"pending_activities_{timestamp}.json")

with open(filepath, "w") as f:
json.dump(
{
"generated_at": datetime.now().isoformat(),
"query_used": query,
"total_workflows": len(results),
"workflows": results,
},
f,
indent=2,
)

return filepath


def print_results(results: list[dict]) -> None:
print("-" * 80)
for entry in results:
print(f"Workflow ID : {entry['workflow_id']}")
print(f"Run ID : {entry['run_id']}")
print(f"Workflow Type : {entry['workflow_type']}")
print(f"Parent WF ID : {entry['parent_workflow_id'] or '(none — top-level)'}")
print(f"Pending Count : {entry['pending_activity_count']}")
for act in entry["pending_activities"]:
print(
f" - Activity ID: {act['activity_id']}, "
f"Type: {act['activity_type']}, "
f"State: {act['state']}, "
f"Attempt: {act['attempt']}"
)
print("-" * 80)


async def main():
parser = argparse.ArgumentParser(
description="Find workflows with pending activities. All filters are optional."
)
parser.add_argument("--task-queue", default=None, help="Filter by task queue name")
parser.add_argument("--workflow-type", default=None, help="Filter by workflow type name")
parser.add_argument(
"--status",
default=None,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine that most of the time users will use this script to filter running workflows, maybe setting the default to "Running" is not a bad idea

default='Running',

choices=["Running", "Completed", "Failed", "Canceled", "Terminated", "ContinuedAsNew", "TimedOut"],
help="Filter by execution status (default: all statuses)",
)
parser.add_argument(
"--start-time-after",
default=None,
help='Workflows started at or after this time (ISO 8601, e.g. "2026-03-01T00:00:00Z")',
)
parser.add_argument(
"--start-time-before",
default=None,
help='Workflows started at or before this time (ISO 8601, e.g. "2026-03-25T00:00:00Z")',
)
parser.add_argument(
"--close-time-after",
default=None,
help='Workflows closed at or after this time (ISO 8601, e.g. "2026-03-20T00:00:00Z")',
)
parser.add_argument(
"--close-time-before",
default=None,
help='Workflows closed at or before this time (ISO 8601, e.g. "2026-03-25T00:00:00Z")',
)
args = parser.parse_args()

query = build_query(
task_queue=args.task_queue,
workflow_type=args.workflow_type,
status=args.status,
start_time_after=args.start_time_after,
start_time_before=args.start_time_before,
close_time_after=args.close_time_after,
close_time_before=args.close_time_before,
)

print(f"Query: {query or '(no filters — scanning all workflows)'}\n")

api_key = resolve_api_key()
client = await create_client(api_key=api_key)
results = await find_workflows_with_pending_activities(client, query)

if not results:
print("No workflows with pending activities found.")
return

print(f"Found {len(results)} workflow(s) with pending activities:\n")
print_results(results)

filepath = save_results(results, query)
print(f"\nResults saved to: {filepath}")


if __name__ == "__main__":
asyncio.run(main())
Loading