A Python SDK for interacting with RationAI pathology image analysis services. This library provides both synchronous and asynchronous clients for image classification, segmentation, and quality control operations.
pip install git+https://github.com/RationAI/rationai-sdk-python.gitimport rationai
import numpy as np
from PIL import Image
# Initialize the client
client = rationai.Client()
# Load an image
image = Image.open("path/to/image.jpg")
# Classify the image
result = client.models.classify_image("model-name", image)
print(result)
# Segment the image
segmentation = client.models.segment_image("segmentation-model", image)
print(segmentation.shape) # (num_classes, height, width)
client.close()import asyncio
import rationai
async def main():
async with rationai.AsyncClient() as client:
# Your async operations here
result = await client.models.classify_image("model-name", image)
print(result)
asyncio.run(main())Classify an image using the specified model.
Parameters:
model: The name of the model to use for classificationimage: The image to classify (must be uint8 RGB image)timeout: Optional timeout for the request (defaults to 100 seconds)
Returns: Classification result as a float (binary) or dict of probabilities per class
Segment an image using the specified model.
Parameters:
model: The name of the model to use for segmentationimage: The image to segment (must be uint8 RGB image)timeout: Optional timeout for the request (defaults to 100 seconds)
Returns: Segmentation mask as numpy array with shape (num_classes, height, width)
heatmap(model: str, slide_path: str, tissue_mask_path: str, output_path: str, stride_fraction: float = 0.5, output_bigtiff_tile_height: int = 512, output_bigtiff_tile_width: int = 512, timeout: int = 1000) -> str
Generate a heatmap for a whole slide image using the specified model.
Parameters:
model: The name of the model to use for heatmap generationslide_path: Path to the whole slide imagetissue_mask_path: Path to the tissue mask for the slideoutput_path: Directory to save the generated heatmap tilesstride_fraction: Fraction of tile size to use as stride between tiles (default: 0.5)output_bigtiff_tile_height: Height of output heatmap tiles in pixels (default: 512)output_bigtiff_tile_width: Width of output heatmap tiles in pixels (default: 512)timeout: Optional timeout for the request (defaults to 1000 seconds)
Returns: The path to the generated heatmap. Should match the output_path provided.
Check quality of a whole slide image.
Parameters:
wsi_path: Path to the whole slide imageoutput_path: Directory to save output masksconfig: OptionalSlideCheckConfigfor the quality checktimeout: Optional timeout for the request (defaults to 3600 seconds)
Returns: xOpat link containing the processed WSI for visual inspection of generated masks
Check quality of multiple slides concurrently.
Parameters:
wsi_paths: List of paths to whole slide imagesoutput_path: Directory to save output masksconfig: OptionalSlideCheckConfigfor the quality checktimeout: Optional timeout for each request (defaults to 3600 seconds)max_concurrent: Maximum number of concurrent slide checks (defaults to 4)
Yields: SlideCheckResult for each slide containing the path, xOpat URL (if successful), and any error information
Generate a QC report from processed slides.
Parameters:
backgrounds: List of paths to background (slide) imagesmask_dir: Directory containing generated maskssave_location: Path where the report HTML will be savedcompute_metrics: Whether to compute quality metrics (default: True)timeout: Optional timeout for the request
For more details, refer to the QC documentation.
To avoid overloading the server, it's important to limit concurrent requests. Here are recommended approaches:
Limit the number of concurrent requests:
import asyncio
import rationai
async def process_images_with_semaphore(image_paths, model_name, max_concurrent):
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_segment(client, path):
async with semaphore:
image = load_image(path)
return await client.models.segment_image(model_name, image)
async with rationai.AsyncClient() as client:
tasks = [bounded_segment(client, path) for path in image_paths]
results = await asyncio.gather(*tasks)
return results
# Process up to 16 images concurrently
results = asyncio.run(process_images_with_semaphore(image_paths, "model-name", max_concurrent=16))Process results as they complete:
import asyncio
from rationai import AsyncClient
async def process_with_as_completed(image_paths, model_name, max_concurrent):
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_request(client, path):
async with semaphore:
image = load_image(path)
return path, await client.models.segment_image(model_name, image)
async with AsyncClient(models_base_url="http://localhost:8000") as client:
tasks = {asyncio.create_task(bounded_request(client, path)): path
for path in image_paths}
for future in asyncio.as_completed(tasks):
path, result = await future
print(f"Processed {path}")
# Process result immediately without waiting for all tasks
asyncio.run(process_with_as_completed(image_paths, "model-name", max_concurrent=16))Start with a conservative limit and monitor server resources to find the optimal value for your setup.
from rationai import AsyncClient
async with AsyncClient(timeout=300) as client: # 300 second timeout
result = await client.models.segment_image("model", image, timeout=60)from rationai import Client
client = Client(
models_base_url="http://custom-models-server:8000",
qc_base_url="http://custom-qc-server:8000"
)