Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 119 additions & 20 deletions src/bedrock_agentcore/memory/controlplane.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,68 @@
"""AgentCore Memory SDK - Control Plane Client.
"""AgentCore Memory SDK - Control Plane Client (DEPRECATED).

This module provides a simplified interface for Bedrock AgentCore Memory control plane operations.
It handles memory resource management, strategy operations, and status monitoring.
This module is deprecated. Use MemoryClient from bedrock_agentcore.memory.client instead,
which provides all control plane operations plus data plane features.
"""

import logging
import os
import time
import uuid
import warnings
from typing import Any, Dict, List, Optional

import boto3
from botocore.exceptions import ClientError

from .constants import (
MemoryStatus,
)
from .constants import MemoryStatus

logger = logging.getLogger(__name__)


def _deprecation(method_name: str, suggestion: str) -> None:
"""Emit a deprecation warning for a MemoryControlPlaneClient method."""
warnings.warn(
f"MemoryControlPlaneClient.{method_name}() is deprecated. Use MemoryClient.{suggestion} instead.",
DeprecationWarning,
stacklevel=3,
)


class MemoryControlPlaneClient:
"""Client for Bedrock AgentCore Memory control plane operations."""
"""Client for Bedrock AgentCore Memory control plane operations.

.. deprecated::
Use :class:`MemoryClient` instead, which provides all control plane operations
plus data plane features.
"""

def __init__(self, region_name: str = "us-west-2", environment: str = "prod"):
def __init__(self, region_name: str = "us-west-2", environment: str = "prod", client=None):
"""Initialize the Memory Control Plane client.

Args:
region_name: AWS region name
environment: Environment name (prod, gamma, etc.)
environment: Environment name (unused, kept for backward compatibility)
client: Optional pre-built boto3 client (for testing/backward compatibility)
"""
warnings.warn(
"MemoryControlPlaneClient is deprecated and will be removed in v1.4.0. "
"Use MemoryClient instead, which provides all control plane operations "
"plus data plane features.",
DeprecationWarning,
stacklevel=2,
)
self.region_name = region_name
self.environment = environment

self.endpoint = os.getenv(
"BEDROCK_AGENTCORE_CONTROL_ENDPOINT", f"https://bedrock-agentcore-control.{region_name}.amazonaws.com"
)

service_name = os.getenv("BEDROCK_AGENTCORE_CONTROL_SERVICE", "bedrock-agentcore-control")
self.client = boto3.client(service_name, region_name=self.region_name, endpoint_url=self.endpoint)
if client is not None:
self.client = client
else:
self.endpoint = os.getenv(
"BEDROCK_AGENTCORE_CONTROL_ENDPOINT",
f"https://bedrock-agentcore-control.{region_name}.amazonaws.com",
)
service_name = os.getenv("BEDROCK_AGENTCORE_CONTROL_SERVICE", "bedrock-agentcore-control")
self.client = boto3.client(service_name, region_name=self.region_name, endpoint_url=self.endpoint)

logger.info("Initialized MemoryControlPlaneClient for %s in %s", environment, region_name)

Expand All @@ -57,6 +81,9 @@ def create_memory(
) -> Dict[str, Any]:
"""Create a memory resource with optional strategies.

.. deprecated::
Use :meth:`MemoryClient.create_memory` or :meth:`MemoryClient.create_memory_and_wait`.

Args:
name: Name for the memory resource
event_expiry_days: How long to retain events (default: 90 days)
Expand All @@ -70,6 +97,8 @@ def create_memory(
Returns:
Created memory object
"""
_deprecation("create_memory", "create_memory() or create_memory_and_wait()")

params = {
"name": name,
"eventExpiryDuration": event_expiry_days,
Expand Down Expand Up @@ -104,13 +133,20 @@ def create_memory(
def get_memory(self, memory_id: str, include_strategies: bool = True) -> Dict[str, Any]:
"""Get a memory resource by ID.

.. deprecated::
Use ``MemoryClient.get_memory(memoryId=memory_id)`` via the client's
``__getattr__`` forwarding, or :meth:`MemoryClient.get_memory_strategies`
for strategy details.

Args:
memory_id: Memory resource ID
include_strategies: Whether to include strategy details in response

Returns:
Memory resource details
"""
_deprecation("get_memory", "get_memory(memoryId=...) or get_memory_strategies()")

try:
response = self.client.get_memory(memoryId=memory_id)
memory = response["memory"]
Expand All @@ -132,12 +168,17 @@ def get_memory(self, memory_id: str, include_strategies: bool = True) -> Dict[st
def list_memories(self, max_results: int = 100) -> List[Dict[str, Any]]:
"""List all memories for the account with pagination support.

.. deprecated::
Use :meth:`MemoryClient.list_memories`.

Args:
max_results: Maximum number of memories to return

Returns:
List of memory summaries
"""
_deprecation("list_memories", "list_memories()")

try:
memories = []
next_token = None
Expand Down Expand Up @@ -180,6 +221,10 @@ def update_memory(
) -> Dict[str, Any]:
"""Update a memory resource properties and/or strategies.

.. deprecated::
Use :meth:`MemoryClient.update_memory_strategies` for strategy changes,
or call ``MemoryClient.update_memory(...)`` directly for property updates.

Args:
memory_id: Memory resource ID
description: Optional new description
Expand All @@ -195,6 +240,11 @@ def update_memory(
Returns:
Updated memory object
"""
_deprecation(
"update_memory",
"update_memory_strategies() or update_memory_strategies_and_wait()",
)

params: Dict = {
"memoryId": memory_id,
"clientToken": str(uuid.uuid4()),
Expand Down Expand Up @@ -245,12 +295,15 @@ def delete_memory(
self,
memory_id: str,
wait_for_deletion: bool = False,
wait_for_strategies: bool = False, # Changed default to False
wait_for_strategies: bool = False,
max_wait: int = 300,
poll_interval: int = 10,
) -> Dict[str, Any]:
"""Delete a memory resource.

.. deprecated::
Use :meth:`MemoryClient.delete_memory` or :meth:`MemoryClient.delete_memory_and_wait`.

Args:
memory_id: Memory resource ID to delete
wait_for_deletion: Whether to wait for complete deletion
Expand All @@ -261,6 +314,8 @@ def delete_memory(
Returns:
Deletion response
"""
_deprecation("delete_memory", "delete_memory() or delete_memory_and_wait()")

try:
# If requested, wait for all strategies to become ACTIVE before deletion
if wait_for_strategies:
Expand All @@ -277,7 +332,8 @@ def delete_memory(

if transitional_strategies:
logger.info(
"Waiting for %d strategies to become ACTIVE before deletion", len(transitional_strategies)
"Waiting for %d strategies to become ACTIVE before deletion",
len(transitional_strategies),
)
self._wait_for_status(
memory_id=memory_id,
Expand Down Expand Up @@ -327,6 +383,10 @@ def add_strategy(
) -> Dict[str, Any]:
"""Add a strategy to a memory resource.

.. deprecated::
Use :meth:`MemoryClient.update_memory_strategies` with ``add_strategies``,
or one of the typed helpers like :meth:`MemoryClient.add_semantic_strategy`.

Args:
memory_id: Memory resource ID
strategy: Strategy configuration dictionary
Expand All @@ -337,6 +397,11 @@ def add_strategy(
Returns:
Updated memory object with strategyId field
"""
_deprecation(
"add_strategy",
"update_memory_strategies(add_strategies=[...]) or add_semantic_strategy() / add_summary_strategy()",
)

# Get the strategy type and name for identification
strategy_type = list(strategy.keys())[0] # e.g., 'semanticMemoryStrategy'
strategy_name = strategy[strategy_type].get("name")
Expand Down Expand Up @@ -376,13 +441,18 @@ def add_strategy(
def get_strategy(self, memory_id: str, strategy_id: str) -> Dict[str, Any]:
"""Get a specific strategy from a memory resource.

.. deprecated::
Use :meth:`MemoryClient.get_memory_strategies` and filter by strategy ID.

Args:
memory_id: Memory resource ID
strategy_id: Strategy ID

Returns:
Strategy details
"""
_deprecation("get_strategy", "get_memory_strategies()")

try:
memory = self.get_memory(memory_id)
strategies = memory.get("strategies", [])
Expand Down Expand Up @@ -410,6 +480,10 @@ def update_strategy(
) -> Dict[str, Any]:
"""Update a strategy in a memory resource.

.. deprecated::
Use :meth:`MemoryClient.update_memory_strategies` with ``modify_strategies``,
or :meth:`MemoryClient.modify_strategy`.

Args:
memory_id: Memory resource ID
strategy_id: Strategy ID to update
Expand All @@ -423,6 +497,11 @@ def update_strategy(
Returns:
Updated memory object
"""
_deprecation(
"update_strategy",
"update_memory_strategies(modify_strategies=[...]) or modify_strategy()",
)

# Note: API expects memoryStrategyId for input but returns strategyId in response
modify_config: Dict = {"memoryStrategyId": strategy_id}

Expand Down Expand Up @@ -458,6 +537,10 @@ def remove_strategy(
) -> Dict[str, Any]:
"""Remove a strategy from a memory resource.

.. deprecated::
Use :meth:`MemoryClient.update_memory_strategies` with ``delete_strategy_ids``,
or :meth:`MemoryClient.delete_strategy`.

Args:
memory_id: Memory resource ID
strategy_id: Strategy ID to remove
Expand All @@ -468,6 +551,11 @@ def remove_strategy(
Returns:
Updated memory object
"""
_deprecation(
"remove_strategy",
"update_memory_strategies(delete_strategy_ids=[...]) or delete_strategy()",
)

# For remove_strategy, we only need to wait for memory to be active
# since the strategy will be gone
return self.update_memory(
Expand All @@ -484,7 +572,10 @@ def _wait_for_memory_active(self, memory_id: str, max_wait: int, poll_interval:
"""Wait for memory to return to ACTIVE state."""
logger.info("Waiting for memory %s to become ACTIVE...", memory_id)
return self._wait_for_status(
memory_id=memory_id, target_status=MemoryStatus.ACTIVE.value, max_wait=max_wait, poll_interval=poll_interval
memory_id=memory_id,
target_status=MemoryStatus.ACTIVE.value,
max_wait=max_wait,
poll_interval=poll_interval,
)

def _wait_for_strategy_active(
Expand Down Expand Up @@ -535,7 +626,12 @@ def _wait_for_strategy_active(
)

def _wait_for_status(
self, memory_id: str, target_status: str, max_wait: int, poll_interval: int, check_strategies: bool = True
self,
memory_id: str,
target_status: str,
max_wait: int,
poll_interval: int,
check_strategies: bool = True,
) -> Dict[str, Any]:
"""Generic method to wait for a memory to reach a specific status.

Expand Down Expand Up @@ -606,7 +702,10 @@ def _wait_for_status(

elapsed = time.time() - start_time
logger.info(
"Memory %s and all strategies are now %s (took %.1f seconds)", memory_id, target_status, elapsed
"Memory %s and all strategies are now %s (took %.1f seconds)",
memory_id,
target_status,
elapsed,
)
return memory
elif status == MemoryStatus.FAILED.value:
Expand Down
Loading
Loading