Manages cluster configuration and connection lifecycle.
AsyncCluster(
contact_points: Optional[List[str]] = None,
port: int = 9042,
auth_provider: Optional[AuthProvider] = None,
load_balancing_policy: Optional[LoadBalancingPolicy] = None,
reconnection_policy: Optional[ReconnectionPolicy] = None,
retry_policy: Optional[RetryPolicy] = None,
ssl_context: Optional[SSLContext] = None,
protocol_version: Optional[int] = None,
executor_threads: int = 2,
max_schema_agreement_wait: int = 10,
control_connection_timeout: float = 2.0,
idle_heartbeat_interval: float = 30.0,
schema_event_refresh_window: float = 2.0,
topology_event_refresh_window: float = 10.0,
status_event_refresh_window: float = 2.0,
**kwargs
)Parameters:
contact_points: List of contact points (default: ["127.0.0.1"])port: Port to connect to (default: 9042)auth_provider: Authentication providerload_balancing_policy: Load balancing policyreconnection_policy: Reconnection policyretry_policy: Retry policy (default: AsyncRetryPolicy)ssl_context: SSL context for secure connectionsprotocol_version: CQL protocol version (must be 5 or higher if specified). If omitted, driver negotiates the highest available version. Connection fails if negotiated version < 5.executor_threads: Number of threads for I/O operations (default: 2)max_schema_agreement_wait: Max time to wait for schema agreement (default: 10)control_connection_timeout: Timeout for control connection (default: 2.0)idle_heartbeat_interval: Interval for connection heartbeat (default: 30.0)schema_event_refresh_window: Window for schema event refresh (default: 2.0)topology_event_refresh_window: Window for topology event refresh (default: 10.0)status_event_refresh_window: Window for status event refresh (default: 2.0)**kwargs: Additional cluster options passed to underlying driver
@classmethod
def create_with_auth(
cls,
contact_points: List[str],
username: str,
password: str,
**kwargs
) -> AsyncClusterCreate cluster with username/password authentication.
async def connect(
keyspace: Optional[str] = None,
timeout: Optional[float] = None
) -> AsyncCassandraSessionConnect to the cluster and create a session.
Parameters:
keyspace: Optional keyspace to usetimeout: Connection timeout in seconds
Example:
# Recommended: Let driver negotiate to highest available
cluster = AsyncCluster(['localhost']) # Negotiates to v5 (highest currently supported)
session = await cluster.connect('my_keyspace') # Fails if < v5
# Explicit protocol version (must be 5+)
cluster = AsyncCluster(['localhost'], protocol_version=5)
session = await cluster.connect('my_keyspace')
# Connection to old Cassandra will fail after negotiation
try:
cluster = AsyncCluster(['cassandra-3.x-server'])
session = await cluster.connect() # Negotiates v4, then fails
except ConnectionError as e:
print(e) # "Connected with protocol v4 but v5+ is required..."async def shutdown() -> NoneShutdown the cluster and release all resources.
def register_user_type(
keyspace: str,
user_type: str,
cls: Type
) -> NoneRegister a user-defined type with the cluster.
Parameters:
keyspace: Keyspace containing the user typeuser_type: Name of the user type in Cassandracls: Python class to map the type to
is_closed: Check if cluster is closedmetadata: Get cluster metadata
async with AsyncCluster(['localhost']) as cluster:
session = await cluster.connect()
# Use sessionProvides async interface for executing CQL queries.
The session is created by calling cluster.connect() and accepts an optional metrics middleware:
# Created internally by cluster.connect()
AsyncCassandraSession(
session: Session,
metrics: Optional[MetricsMiddleware] = None
)async def execute(
query: Union[str, SimpleStatement, PreparedStatement],
parameters: Optional[Union[List, Dict]] = None,
trace: bool = False,
custom_payload: Optional[Dict[str, bytes]] = None,
timeout: Any = None,
execution_profile: Any = EXEC_PROFILE_DEFAULT,
paging_state: Optional[bytes] = None,
host: Optional[Any] = None,
execute_as: Optional[str] = None
) -> AsyncResultSetExecute a CQL query asynchronously.
Example:
# Simple query
result = await session.execute("SELECT * FROM users")
# Query with parameters (must prepare first)
prepared = await session.prepare("SELECT * FROM users WHERE id = ?")
result = await session.execute(prepared, [user_id])
# Query with named parameters (must prepare first)
prepared = await session.prepare("SELECT * FROM users WHERE name = :name")
result = await session.execute(prepared, {"name": "John"})async def execute_batch(
batch_statement: BatchStatement,
trace: bool = False,
custom_payload: Optional[Dict[str, bytes]] = None,
timeout: Any = None,
execution_profile: Any = EXEC_PROFILE_DEFAULT
) -> AsyncResultSetExecute a batch statement asynchronously.
Example:
from cassandra.query import BatchStatement
batch = BatchStatement()
batch.add("INSERT INTO users (id, name) VALUES (?, ?)", [id1, "Alice"])
batch.add("INSERT INTO users (id, name) VALUES (?, ?)", [id2, "Bob"])
await session.execute_batch(batch)async def execute_stream(
query: Union[str, SimpleStatement, PreparedStatement, BoundStatement],
parameters: Optional[Union[list, tuple, dict]] = None,
stream_config: Optional[StreamConfig] = None,
**kwargs
) -> AsyncStreamingResultSetExecute a query and return results as an async stream for memory-efficient processing of large result sets.
Parameters:
query: The CQL query to executeparameters: Query parameters (for prepared statements)stream_config: Configuration for streaming (fetch size, max pages, etc.)**kwargs: Additional keyword arguments passed to execute
Returns: AsyncStreamingResultSet - An async iterator over the results
Example:
from async_cassandra.streaming import StreamConfig
# ✅ BEST PRACTICE: Always use context manager
config = StreamConfig(fetch_size=1000)
async with await session.execute_stream(
"SELECT * FROM large_table",
stream_config=config
) as result:
# Process rows one at a time without loading all into memory
async for row in result:
await process_row(row)
# Result automatically closed, preventing memory leaks
# ✅ Alternative: Manual cleanup with try/finally
result = await session.execute_stream("SELECT * FROM large_table")
try:
async for row in result:
await process_row(row)
finally:
await result.close() # CRITICAL: Must close!
# ❌ NEVER DO THIS - Memory leak!
result = await session.execute_stream("SELECT * FROM large_table")
async for row in result:
process_row(row)
# Result not closed - callbacks remain in memory forever!Processing by Pages:
# Context manager works with pages() too
async with await session.execute_stream(
"SELECT * FROM large_table",
stream_config=StreamConfig(fetch_size=5000)
) as result:
async for page in result.pages():
await process_page(page) # Process 5000 rows at a timeasync def prepare(
query: str,
custom_payload: Optional[Dict[str, bytes]] = None,
timeout: Optional[float] = None
) -> PreparedStatementPrepare a CQL statement asynchronously.
Parameters:
query: The CQL query to preparecustom_payload: Optional custom payloadtimeout: Optional timeout in seconds
Example:
prepared = await session.prepare(
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)"
)
# Use prepared statement multiple times
await session.execute(prepared, [id1, "Alice", "alice@example.com"])
await session.execute(prepared, [id2, "Bob", "bob@example.com"])async def set_keyspace(keyspace: str) -> NoneSet the current keyspace.
async def close() -> NoneClose the session and release resources.
is_closed: Check if session is closedkeyspace: Get current keyspace
async with await cluster.connect() as session:
result = await session.execute("SELECT * FROM users")Represents the result of a query execution.
def one() -> Optional[Any]Get the first row or None if empty.
Example:
# Must prepare the statement first
stmt = await session.prepare("SELECT * FROM users WHERE id = ?")
result = await session.execute(stmt, [user_id])
user = result.one()
if user:
print(f"Found user: {user.name}") # Access as attribute, not dictdef all() -> List[Any]Get all rows as a list.
Example:
result = await session.execute("SELECT * FROM users")
users = result.all()
for user in users:
print(user['name'])rows: Get all rows as a list
result = await session.execute("SELECT * FROM users")
async for row in result:
print(row['name'])result = await session.execute("SELECT * FROM users")
print(f"Found {len(result)} users")Retry policy for async operations with idempotency safety checks.
AsyncRetryPolicy(max_retries: int = 3)def on_read_timeout(
query, consistency, required_responses,
received_responses, data_retrieved, retry_num
) -> Tuple[int, Optional[ConsistencyLevel]]Handle read timeout with retry logic.
def on_write_timeout(
query, consistency, write_type,
required_responses, received_responses, retry_num
) -> Tuple[int, Optional[ConsistencyLevel]]Handle write timeout with idempotency checks.
def on_unavailable(
query, consistency, required_replicas,
alive_replicas, retry_num
) -> Tuple[int, Optional[ConsistencyLevel]]Handle unavailable exception.
def on_request_error(
query, consistency, error, retry_num
) -> Tuple[int, Optional[ConsistencyLevel]]Handle request errors.
- Read Timeout: Retries if data was retrieved or enough responses received
- Write Timeout: Retries for SIMPLE and BATCH writes only if marked as idempotent
- Unavailable: Tries next host on first attempt, then retries
- Request Error: Always tries next host
The retry policy includes critical safety checks for write operations:
# Safe to retry - marked as idempotent
stmt = SimpleStatement(
"INSERT INTO users (id, name) VALUES (?, ?) IF NOT EXISTS",
is_idempotent=True
)
# NOT safe to retry - will not be retried
stmt = SimpleStatement(
"INSERT INTO users (id, name) VALUES (?, ?)"
# is_idempotent defaults to None - treated as non-idempotent
)
# Prepared statements also need explicit marking
prepared = await session.prepare(
"DELETE FROM users WHERE id = ?"
)
prepared.is_idempotent = True # Mark as safe to retry
# Batch statements can be marked idempotent if all operations are safe
batch = BatchStatement()
batch.is_idempotent = True # Only if all statements in batch are idempotentImportant: Write operations (INSERT, UPDATE, DELETE) are ONLY retried if the statement is explicitly marked with is_idempotent=True. Statements without this attribute or with is_idempotent=False/None will NOT be retried. This strict policy prevents:
- Duplicate data insertions
- Multiple increments/decrements
- Unintended side effects from retrying non-idempotent operations
Note: By default, Cassandra driver statements have is_idempotent=None, which is treated as non-idempotent for safety.
Base exception for all async-cassandra errors.
class AsyncCassandraError(Exception):
cause: Optional[Exception] # Original exception if anyRaised when connection to Cassandra fails.
try:
session = await cluster.connect()
except ConnectionError as e:
print(f"Failed to connect: {e}")Raised when a non-Cassandra exception occurs during query execution. Most Cassandra driver exceptions (like InvalidRequest, Unauthorized, AlreadyExists, etc.) are passed through directly without wrapping.
# Cassandra exceptions pass through directly
from cassandra import InvalidRequest, Unauthorized
try:
result = await session.execute("SELECT * FROM invalid_table")
except InvalidRequest as e:
print(f"Invalid query: {e}") # Cassandra exception passed through
except QueryError as e:
print(f"Unexpected error: {e}") # Only non-Cassandra exceptions wrapped
if e.cause:
print(f"Caused by: {e.cause}")The following Cassandra driver exceptions are passed through directly without wrapping:
InvalidRequest- Invalid query syntax or schema issuesUnauthorized- Permission/authorization failuresAuthenticationFailed- Authentication failuresAlreadyExists- Schema already exists errorsNoHostAvailable- No Cassandra hosts availableUnavailable,ReadTimeout,WriteTimeout- Consistency/timeout errorsOperationTimedOut- Query timeout- Protocol exceptions like
SyntaxException,ServerError
The library defines ConnectionError for connection-related issues and QueryError for wrapping unexpected non-Cassandra exceptions. Most of the time, you should catch specific Cassandra exceptions for proper error handling.
import asyncio
import uuid
from async_cassandra import AsyncCluster, AsyncCassandraSession
from async_cassandra.exceptions import ConnectionError
from cassandra import InvalidRequest, AlreadyExists
async def main():
# Create cluster with authentication
cluster = AsyncCluster.create_with_auth(
contact_points=['localhost'],
username='cassandra',
password='cassandra'
)
try:
# Connect to cluster
session = await cluster.connect()
# Create keyspace
await session.execute("""
CREATE KEYSPACE IF NOT EXISTS example
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
}
""")
# Use keyspace
await session.set_keyspace('example')
# Create table
await session.execute("""
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
name TEXT,
email TEXT
)
""")
# Prepare statement
insert_stmt = await session.prepare(
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)"
)
# Insert data
user_id = uuid.uuid4()
await session.execute(
insert_stmt,
[user_id, "John Doe", "john@example.com"]
)
# Query data (prepare the statement first)
select_stmt = await session.prepare("SELECT * FROM users WHERE id = ?")
result = await session.execute(select_stmt, [user_id])
user = result.one()
print(f"User: {user['name']} ({user['email']})")
except ConnectionError as e:
print(f"Connection failed: {e}")
except InvalidRequest as e:
print(f"Invalid query: {e}")
except AlreadyExists as e:
print(f"Schema already exists: {e.keyspace}.{e.table}")
finally:
await cluster.shutdown()
if __name__ == "__main__":
asyncio.run(main())For documentation on monitoring, metrics, and streaming components, see:
- Monitoring and Metrics API Reference - ConnectionMonitor, MetricsMiddleware, streaming classes
- Streaming Guide - Detailed streaming usage and examples
- Metrics and Monitoring Guide - Setting up monitoring