Skip to content

Latest commit

 

History

History
1964 lines (1542 loc) · 42.2 KB

File metadata and controls

1964 lines (1542 loc) · 42.2 KB

Primr API Reference

This document describes how to use Primr programmatically as a Python library.

Installation

pip install -e .

Quick Start

import asyncio
from primr.core.research_orchestrator import ResearchOrchestrator, ResearchMode

async def main():
    orchestrator = ResearchOrchestrator()
    result = await orchestrator.research(
        "Acme Corp",
        "https://acme.example",
        mode=ResearchMode.COMPLETE
    )
    
    if result.success:
        print(f"Generated {len(result.section_results)} sections")
        print(f"Duration: {result.duration_seconds:.0f}s")
    else:
        print(f"Error: {result.error}")

asyncio.run(main())

Core Classes

ResearchOrchestrator

The main entry point for running research.

from primr.core.research_orchestrator import (
    ResearchOrchestrator,
    ResearchMode,
    ResearchConfig,
    OrchestratorResult
)

Constructor

orchestrator = ResearchOrchestrator()

No arguments required. The orchestrator lazy-loads clients as needed.

research()

Execute company research using the specified mode.

async def research(
    company_name: str,
    website: str | None = None,
    mode: ResearchMode = ResearchMode.STRUCTURED,
    config: ResearchConfig | None = None,
    on_progress: Callable[[str], None] | None = None,
    context_files: list[str] | None = None,
) -> OrchestratorResult

Parameters:

Name Type Description
company_name str Name of the company to research
website str or None Company website URL
mode ResearchMode Research mode (STRUCTURED, DEEP_RESEARCH, COMPLETE)
config ResearchConfig or None Optional configuration overrides
on_progress Callable or None Progress callback function
context_files list or None Additional context files (PDFs, docs)

Returns: OrchestratorResult

Example with progress callback:

def on_progress(message: str):
    print(f"Progress: {message}")

result = await orchestrator.research(
    "Acme Corp",
    "https://acme.example",
    mode=ResearchMode.COMPLETE,
    on_progress=on_progress
)

ResearchMode

Available research modes.

from primr.core.research_orchestrator import ResearchMode

ResearchMode.STRUCTURED      # Website scraping + web search
ResearchMode.DEEP_RESEARCH   # Autonomous web research
ResearchMode.COMPLETE        # Two-step: structured then deep

ResearchConfig

Configuration for a research task.

from primr.core.research_orchestrator import ResearchConfig

config = ResearchConfig(
    mode=ResearchMode.COMPLETE,
    timeout=3600,              # Max duration in seconds
    poll_interval=10,          # Seconds between status checks
    include_website_scrape=True,
    include_web_search=True,
    sections=None              # Specific sections to research (or None for all)
)

OrchestratorResult

Result from the research orchestrator.

@dataclass
class OrchestratorResult:
    company_name: str
    website: str | None
    mode: ResearchMode
    section_results: dict[str, str]  # Section key -> content
    raw_content: str
    citations: list
    duration_seconds: float
    success: bool
    error: str | None
    timestamp: datetime

Accessing results:

result = await orchestrator.research("Acme Corp", "https://acme.example")

if result.success:
    # Access individual sections
    overview = result.section_results.get("company_overview", "")
    products = result.section_results.get("detailed_products_services", "")
    
    # Get all section keys
    print(result.section_results.keys())
    
    # Access raw content (for Complete mode)
    full_report = result.raw_content
    
    # Access citations
    for citation in result.citations:
        print(f"{citation['number']}: {citation['url']}")

Core Modules

The primr.core package contains specialized modules for different aspects of research. These can be imported directly for more granular control.

Workspace Management

Working folder creation and file operations.

from primr.core.workspace import (
    create_working_folder,
    consolidate_working_folder,
    save_section_output,
    validate_context_files,
    WorkspaceConfig,
    ConsolidationResult,
)
# Create a working folder for research
folder = create_working_folder("Acme Corp")
print(f"Working folder: {folder}")

# Save section output
save_section_output(folder, "company_overview", "Acme Corp is a technology company...")

# Consolidate all sections into a single file
result = consolidate_working_folder(folder)
print(f"Consolidated {result.section_count} sections")

AI Strategy Generation

Generate AI strategy recommendations with cloud vendor context.

from primr.core.ai_strategy import (
    generate_ai_strategy_sync,
    CloudVendor,
    AIStrategyConfig,
    AIStrategyResult,
)
# Generate AI strategy for a company
config = AIStrategyConfig(
    company_name="Acme Corp",
    cloud_vendor=CloudVendor.AWS,
    working_folder=Path("working/Acme Corp"),
)

result = generate_ai_strategy_sync(config)
if result.success:
    print(result.content)

CloudVendor enum:

CloudVendor.AWS      # Amazon Web Services
CloudVendor.AZURE    # Microsoft Azure
CloudVendor.GCP      # Google Cloud Platform

Deep Research Runner

Execute Deep Research with preflight validation.

from primr.core.deep_research_runner import (
    perform_deep_research,
    validate_preflight,
    DeepResearchConfig,
    DeepResearchMode,
    PreflightResult,
    PreflightStatus,
)
# Validate before running expensive operations
preflight = validate_preflight()
if preflight.status == PreflightStatus.READY:
    config = DeepResearchConfig(
        company_name="Acme Corp",
        prompt="Research Acme Corp's competitive position in the market",
        mode=DeepResearchMode.STANDARD,
    )
    result = await perform_deep_research(config)
else:
    print(f"Preflight failed: {preflight.message}")

CLI Module

Command-line interface components for programmatic use.

from primr.core.cli import (
    main,
    run_doctor,
    parse_args,
    process_csv,
    Command,
    CLIConfig,
)
# Run system check programmatically
success = run_doctor()

# Parse CLI arguments
config = parse_args(["Acme Corp", "https://acme.example", "--mode", "deep"])
print(f"Company: {config.company_name}")
print(f"Mode: {config.mode}")

Structured Research

Website scraping pipeline with section-by-section analysis.

from primr.core.structured_research import (
    run_research,
    research_section,
    generate_initial_overview,
    ScrapedData,
    AnalysisResult,
    ResearchContext,
)

Vendor Research

Cloud vendor AI capabilities research.

from primr.core.vendor_research import (
    get_or_generate_vendor_research,
    get_or_generate_vendor_research_sync,
    VendorResearchFile,
    VendorResearchResult,
)

Backward Compatibility

For existing code, all functions remain available from research_agent.py:

# These imports still work (delegate to new modules internally)
from primr.core.research_agent import (
    main,
    run_doctor,
    create_working_folder,
    consolidate_working_folder,
    run_research,
    research_section,
    CloudVendor,
    DeepResearchConfig,
    DeepResearchMode,
)

AI Client

Direct access to the AI client for custom prompts.

from primr.ai import AIClient, get_client

Using the singleton

client = get_client()
response = client.generate("What is Python?")

Creating a new instance

client = AIClient(api_key="your-api-key")
response = client.generate(
    "Analyze this company",
    model_type="research",
    thinking_level="high"
)

generate()

Generate content with automatic retries.

def generate(
    prompt: str,
    model_type: str = "research",
    temperature: float = 1.0,
    thinking_level: str = "high",
    max_retries: int | None = None,
    timeout: float | None = None,
) -> str

Parameters:

Name Type Description
prompt str The prompt to send
model_type str "research" or "report"
temperature float Sampling temperature (0.0-2.0)
thinking_level str "low" or "high"
max_retries int or None Override default retry count
timeout float or None Request timeout in seconds

generate_fast()

Fast generation with minimal thinking.

response = client.generate_fast("Summarize this text")

Token usage tracking

client = AIClient(track_usage=True)

# Make some calls
client.generate("First prompt")
client.generate("Second prompt")

# Get usage summary
usage = client.get_usage_summary()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Estimated cost: ${usage['total_cost']:.4f}")

# Reset counters
client.reset_usage()

Deep Research Client

Direct access to Gemini's Deep Research Agent.

from primr.ai import DeepResearchClient, ResearchResult, ResearchProgress

Basic usage

client = DeepResearchClient()
result = await client.research(
    "Research Acme Corp's competitive position in the market"
)

print(result.content)
for citation in result.citations:
    print(f"Source: {citation['url']}")

With progress callback

def on_progress(progress: ResearchProgress):
    print(f"Status: {progress.status.value}")
    print(f"Message: {progress.message}")
    if progress.thought:
        print(f"Thinking: {progress.thought}")

result = await client.research(
    "Research Acme Corp",
    on_progress=on_progress
)

Output formats

# Company profile format
result = await client.research(
    "Research Acme Corp",
    output_format="company_profile"
)

# Executive summary format
result = await client.research(
    "Research Acme Corp",
    output_format="executive_summary"
)

# Competitive analysis format
result = await client.research(
    "Research Acme Corp",
    output_format="competitive_analysis"
)

With priority URLs

result = await client.research(
    "Research Acme Corp",
    priority_urls=["https://acme.example", "https://ir.acme.example"]
)

Job Management

Deep Research jobs run asynchronously. If a connection drops, the job continues on Google's servers.

from primr.ai.deep_research import (
    get_deep_research_client,
    get_pending_jobs,
    save_pending_job,
    remove_pending_job,
)

# Check status of a specific job
client = get_deep_research_client()
result = client.check_job("v1_abc123...")
print(f"Status: {result['status']}")  # in_progress, completed, failed
if result['content']:
    print(f"Content: {result['content'][:500]}...")

# List all pending jobs
jobs = get_pending_jobs()
for job_id, info in jobs.items():
    print(f"{job_id}: {info['description']} ({info['status']})")

# Manually save a job for later recovery
save_pending_job(
    interaction_id="v1_abc123...",
    job_type="ai_strategy",
    description="AI Strategy for Acme Corp"
)

# Remove a completed job from tracking
remove_pending_job("v1_abc123...")

CLI commands for job management:

primr --check-jobs   # Check status of all pending jobs
primr --clear-jobs   # Clear stale/old pending jobs

Scraping

Direct access to the scraping engine.

from primr.data.scrape import (
    scrape_with_requests,
    scrape_with_httpx,
    scrape_with_playwright,
    scrape_with_playwright_aggressive,
    get_cached_content,
    cache_content,
    clear_cache
)

Tiered scraping

url = "https://example.com"

# Try each tier in order
content, error = scrape_with_requests(url)
if content is None:
    content, error = scrape_with_httpx(url)
if content is None:
    content, error = scrape_with_playwright(url)
if content is None:
    content, error = scrape_with_playwright_aggressive(url)

if content:
    print(f"Scraped {len(content)} characters")
else:
    print(f"All tiers failed: {error}")

Caching

url = "https://example.com"

# Check cache first
cached = get_cached_content(url)
if cached:
    print("Using cached content")
else:
    content, error = scrape_with_requests(url)
    if content:
        cache_content(url, content)

# Clear old cache entries
clear_cache(max_age_hours=24)

# Clear all cache
clear_cache()

Parallel scraping

from primr.data import ParallelScraper, get_parallel_scraper

scraper = get_parallel_scraper()
results = await scraper.scrape_urls([
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3"
])

for result in results:
    if result.success:
        print(f"{result.url}: {len(result.content)} chars")
    else:
        print(f"{result.url}: {result.error}")

Report Generation

Generate reports from research results.

from primr.output import DocumentBuilder
from pathlib import Path

builder = DocumentBuilder()

# Build DOCX from sections
doc_path = builder.build_docx(
    sections=result.section_results,
    company_name="Acme Corp",
    output_dir=Path("output")
)

print(f"Report saved to: {doc_path}")

Configuration

Accessing settings

from primr.config import get_settings, configure

# Get current settings
settings = get_settings()
print(f"Research model: {settings.ai.research_model}")
print(f"Scrape timeout: {settings.scraping.timeout}")

# Validate settings
settings.validate_all(include_api_keys=True)

Custom configuration

from primr.config import configure
from pathlib import Path

settings = configure(
    project_root=Path("/custom/path"),
    verbose=True,
    debug=True
)

Configuration classes

from primr.config import (
    TimeoutConfig,
    CacheConfig,
    ScrapingConfig,
    AIConfig,
    SearchConfig,
    PathConfig,
    PricingConfig
)

# Example: Custom timeout config
timeout = TimeoutConfig(
    connect=10.0,
    read=30.0,
    total=60.0
)
timeout.validate()  # Raises ValueError if invalid

Type Definitions

Primr provides comprehensive type definitions for type-safe code.

from primr.types import (
    # Type aliases
    URL,
    FilePath,
    HTMLContent,
    TextContent,
    
    # Enums
    AIModelType,
    ThinkingLevel,
    ScrapeTier,
    OutputFormat,
    
    # TypedDicts
    SearchResult,
    ScrapedPage,
    GradeResult,
    ReportSection,
    CompanyInfo,
    ResearchContext,
    
    # Protocols
    AIClientProtocol,
    ScraperProtocol,
    SearchProtocol,
    CacheProtocol,
    
    # Generic types
    Result,
    
    # Type guards
    is_valid_url,
    is_search_result,
    is_scraped_page
)

Using the Result type

from primr.types import Result

def fetch_data(url: str) -> Result[str]:
    try:
        content = scrape(url)
        return Result.ok(content)
    except Exception as e:
        return Result.err(e)

result = fetch_data("https://example.com")
if result.is_ok:
    print(result.value)
else:
    print(f"Error: {result.error}")

# Or use unwrap_or for default value
content = result.unwrap_or("No content available")

Error Handling

from primr.utils.errors import (
    ResearchError,
    AIError,
    ScrapingError,
    ConfigurationError,
    ValidationError,
    retry_on_failure,
    safe_call
)

Custom error handling

from primr.utils.errors import AIError, retry_on_failure

@retry_on_failure(max_retries=3, delay=1.0)
def call_api():
    # Your code here
    pass

# Or use safe_call for exception wrapping
result = safe_call(risky_function, default_value="fallback")

Error context

from primr.utils.errors import error_context

with error_context("fetching company data", company="Acme Corp"):
    # Operations here will have context in error messages
    data = fetch_data()

Logging

from primr.utils.logging_config import get_logger, setup_logging

# Get a module-specific logger
logger = get_logger("my_module")
logger.info("Starting operation")
logger.debug("Debug details")
logger.error("Something went wrong")

# Configure logging
setup_logging(level="DEBUG", log_file="primr.log")

Console Output

from primr.utils.console import console

console.step("Starting research...")
console.ok("Research complete")
console.warn("Some sections had low quality")
console.error("Failed to scrape website")
console.progress(5, 10, "Processing sections")
console.progress_done()

Observability

from primr.utils.observability import (
    operation_context,
    timed,
    Metrics,
    emit_metrics
)

# Track operation duration
with operation_context("research", company="Acme Corp"):
    # Operations here are tracked
    pass

# Decorator for timing
@timed("my_operation")
def slow_function():
    pass

# Emit custom metrics
metrics = Metrics(
    operation="custom_op",
    duration_seconds=5.0,
    success=True,
    metadata={"key": "value"}
)
emit_metrics(metrics)

Complete Example

import asyncio
from pathlib import Path
from primr.core.research_orchestrator import ResearchOrchestrator, ResearchMode
from primr.output import DocumentBuilder
from primr.utils.console import console
from primr.utils.logging_config import setup_logging

async def research_company(name: str, website: str):
    # Setup
    setup_logging(level="INFO")
    
    # Progress callback
    def on_progress(msg: str):
        console.step(msg)
    
    # Run research
    console.step(f"Starting research for {name}")
    orchestrator = ResearchOrchestrator()
    
    result = await orchestrator.research(
        name,
        website,
        mode=ResearchMode.COMPLETE,
        on_progress=on_progress
    )
    
    if not result.success:
        console.error(f"Research failed: {result.error}")
        return None
    
    console.ok(f"Research complete in {result.duration_seconds:.0f}s")
    
    # Generate report
    builder = DocumentBuilder()
    output_path = builder.build_docx(
        sections=result.section_results,
        company_name=name,
        output_dir=Path("output")
    )
    
    console.ok(f"Report saved to {output_path}")
    return output_path

if __name__ == "__main__":
    asyncio.run(research_company("Acme Corp", "https://acme.example"))

Prompt Architecture

The prompt system (v1.2.5+) externalizes prompts to YAML configuration files.

from primr.prompts import (
    PromptComposer,
    PromptContext,
    ComposedPrompt,
    StrategyModuleRegistry,
    get_registry,
    load_prompt_config,
    build_company_overview_prompt,
    build_ai_strategy_prompt,
    get_available_prompts,
)

PromptComposer

Build prompts from YAML configurations with variable substitution.

from primr.prompts import PromptComposer, PromptContext

composer = PromptComposer()
context = PromptContext(
    company_name="Acme Corp",
    website_url="https://acme.example",
    cloud_vendor="azure",
)

# Compose a standard prompt
result = composer.compose("company_overview", context)
print(result.content)
print(f"Sections: {result.section_count}")
print(f"Words: {result.word_count}")

# Compose a strategy prompt
result = composer.compose_strategy("ai", context)
print(result.content)

StrategyModuleRegistry

Discover and manage strategy modules.

from primr.prompts import get_registry

registry = get_registry()

# List available strategies
for name in registry.list_names():
    print(name)  # ai, cloud, data

# Get strategy details
strategy = registry.get("ai")
print(strategy.display_name)  # "AI Strategy"
print(strategy.description)

# Get context files for a strategy (for File Search Store)
files = registry.get_context_files("ai", vendor="azure")
for f in files:
    print(f)  # vendor-research/vendor-research-azure-2025-12.txt

Legacy Prompt Builders

For backward compatibility, the original functions still work:

from primr.prompts import (
    build_company_overview_prompt,
    build_ai_strategy_prompt,
    get_available_prompts,
)

# List available prompts
prompts = get_available_prompts()  # ['ai_strategy', 'company_overview', ...]

# Build prompts (delegates to PromptComposer internally)
prompt = build_company_overview_prompt("Acme Corp", website_url="https://acme.example")
prompt = build_ai_strategy_prompt("Acme Corp", cloud_vendor="azure")

Custom Exceptions

from primr.prompts import (
    PromptConfigError,
    PromptConfigNotFoundError,
    PromptConfigValidationError,
    StrategyModuleNotFoundError,
    DataSourceNotFoundError,
)

try:
    composer.compose("nonexistent", context)
except PromptConfigNotFoundError as e:
    print(f"Not found: {e.prompt_name}")
    print(f"Searched: {e.searched_paths}")
    print(f"Available: {e.available_prompts}")

Singleton Management

Most components use thread-safe singletons. For testing or custom configurations, you can reset them:

from primr.ai import reset_client
from primr.data import reset_cache
from primr.config import reset_settings

# Reset all singletons
reset_client()
reset_cache()
reset_settings()

MCP Server

Primr includes a Model Context Protocol (MCP) server that enables AI agents to drive company research programmatically. The MCP server exposes Primr's functionality through a standardized protocol that AI assistants like Claude Desktop can use.

Quick Start

# Run with stdio transport (for Claude Desktop)
primr-mcp --stdio

# Run with HTTP transport
primr-mcp --http --port 8000

# Development mode (no auth)
primr-mcp --http --port 8000 --no-auth --allow-plaintext

Claude Desktop Integration

Add to your claude_desktop_config.json:

{
  "mcpServers": {
    "primr": {
      "command": "primr-mcp",
      "args": ["--stdio"]
    }
  }
}

Programmatic Usage

import asyncio
from primr.mcp_server import create_mcp_server

async def main():
    # Create server with stdio transport
    server = create_mcp_server(transport="stdio")
    await server.run()

    # Or with HTTP transport
    server = create_mcp_server(
        transport="streamable-http",
        port=8000,
        host="127.0.0.1",
        require_auth=True,
    )
    await server.run()

asyncio.run(main())

Tools

The MCP server exposes 9 tools for research operations:

estimate_run

Get cost and time estimates before running research.

{
  "name": "estimate_run",
  "arguments": {
    "company_url": "https://acme.example",
    "mode": "full"
  }
}

Response:

{
  "estimated_cost_usd": 0.75,
  "estimated_time_minutes": 30,
  "planned_pages": 20,
  "mode": "full"
}

research_company

Initiate company research (async - returns job_id immediately).

{
  "name": "research_company",
  "arguments": {
    "company_name": "Acme Corp",
    "company_url": "https://acme.example",
    "mode": "full",
    "cloud_vendor": "azure",
    "skip_qa": false
  }
}

Response:

{
  "job_id": "job_abc123",
  "accepted": true,
  "status_uri": "primr://research/status"
}

generate_strategy

Generate strategy document from existing report.

{
  "name": "generate_strategy",
  "arguments": {
    "report_path": "output/Acme_Corp_Strategic_Overview.md",
    "strategy_type": "customer_experience",
    "cloud_vendor": "azure"
  }
}

Strategy types: ai_strategy, customer_experience, modern_security_compliance, data_fabric_strategy

check_jobs

Check status of research jobs.

{
  "name": "check_jobs",
  "arguments": {
    "job_id": "job_abc123"
  }
}

Response:

{
  "jobs": [
    {
      "job_id": "job_abc123",
      "status": "in_progress",
      "company_name": "Acme Corp",
      "output_path": null
    }
  ]
}

run_qa

Run quality assessment on a report.

{
  "name": "run_qa",
  "arguments": {
    "report_path": "output/Acme_Corp_Strategic_Overview.md"
  }
}

doctor

Check system health and configuration.

{
  "name": "doctor",
  "arguments": {}
}

clear_jobs

Clear stale pending jobs.

{
  "name": "clear_jobs",
  "arguments": {
    "older_than_hours": 24
  }
}

cancel_job

Cancel an active research job.

{
  "name": "cancel_job",
  "arguments": {
    "job_id": "job_abc123"
  }
}

delegate_to_agent

Delegate a task to an external A2A agent. Requires pip install primr[a2a].

{
  "name": "delegate_to_agent",
  "arguments": {
    "agent_url": "https://remote-agent.example.com",
    "message": "Research Acme Corp competitive landscape",
    "skill_id": "research_company"
  }
}

Response (success):

{
  "status": {"state": "completed"},
  "artifacts": [...]
}

Response (error):

{
  "error": true,
  "error_type": "a2a_delegation_failed",
  "message": "Connection refused"
}

SSRF protection validates all agent URLs. Private IPs, metadata endpoints, and non-HTTP schemes are blocked.

A2A Server

Primr can also expose its capabilities via the A2A (Agent-to-Agent) protocol, allowing other agents to discover and invoke Primr's research tools.

# Install A2A support
pip install primr[a2a]

# Standalone A2A server
primr-a2a --no-auth

# Co-hosted with MCP server
primr-mcp --http --a2a --a2a-port 9000

Agent Card (served at /.well-known/agent.json):

curl http://localhost:9000/.well-known/agent.json

Skills available via A2A:

Skill ID Description
estimate_research Cost/time estimate for a research run
research_company Start async research (SSE streaming progress)
check_jobs Current job status
run_qa Quality assessment on completed reports
system_health System diagnostics

Example A2A message:

{
  "jsonrpc": "2.0",
  "method": "message/send",
  "params": {
    "message": {
      "role": "user",
      "parts": [{"kind": "text", "text": "{\"url\": \"https://acme.com\", \"mode\": \"full\"}"}],
      "metadata": {"skillId": "research_company"}
    }
  }
}

The A2A server shares the MCP server's SingleJobStore, rate limiter, and security middleware. The single-job model is enforced across both protocols.

Resources

The MCP server exposes 4 read-only resources:

primr://research/status

Current research job status with progress information.

{
  "status": "in_progress",
  "job_id": "job_abc123",
  "company_name": "Acme Corp",
  "mode": "full",
  "current_stage": "deep_research",
  "stage_progress_percent": 45,
  "stage_expected_minutes": 15,
  "possibly_stuck": false
}

primr://output/latest

Most recent research output. Add ?full_content=true for complete content.

{
  "report_path": "output/Acme_Corp_Strategic_Overview.md",
  "company_name": "Acme Corp",
  "generation_timestamp": "2026-02-02T10:30:00",
  "report_type": "markdown",
  "content_preview": "# Acme Corp Strategic Overview..."
}

primr://output/artifacts

Pipeline stage artifacts (scraped_content, insights, dossier, reports).

{
  "job_id": "job_abc123",
  "job_status": "completed",
  "artifacts": [
    {
      "artifact_type": "scraped_content",
      "file_path": "output/acme_corp/scraped_content.txt",
      "size_bytes": 125000,
      "preview": "Acme Corp designs...",
      "content_hash": "sha256:abc123..."
    }
  ]
}

primr://config

Current configuration (no secrets exposed).

{
  "available_modes": ["scrape", "deep", "full"],
  "available_strategies": {
    "ai_strategy": "AI/ML transformation roadmap",
    "customer_experience": "CX improvement plan",
    "modern_security_compliance": "Security posture assessment",
    "data_fabric_strategy": "Data platform modernization"
  },
  "configured_vendors": ["azure", "aws", "gcp", "private"]
}

primr://strategies/available

List of available strategy types with metadata for Open Claw integration.

{
  "schema_version": "1.0",
  "strategies": [
    {
      "id": "ai_strategy",
      "name": "AI Strategy",
      "description": "AI/ML transformation roadmap with quick wins and bigger bets",
      "requires_cloud_vendor": true,
      "estimated_time_minutes": 15,
      "estimated_cost_usd": 0.30
    },
    {
      "id": "customer_experience",
      "name": "Customer Experience Strategy",
      "description": "CX transformation and digital experience improvement plan",
      "requires_cloud_vendor": false,
      "estimated_time_minutes": 12,
      "estimated_cost_usd": 0.25
    },
    {
      "id": "modern_security_compliance",
      "name": "Security & Compliance Strategy",
      "description": "Zero Trust architecture and compliance posture assessment",
      "requires_cloud_vendor": false,
      "estimated_time_minutes": 12,
      "estimated_cost_usd": 0.25
    },
    {
      "id": "data_fabric_strategy",
      "name": "Data Fabric Strategy",
      "description": "Modern data platform for agentic AI and semantic layers",
      "requires_cloud_vendor": false,
      "estimated_time_minutes": 12,
      "estimated_cost_usd": 0.25
    }
  ]
}

primr://output/by_job/{job_id}

Job-scoped artifact retrieval for provenance tracking. Ensures the returned report corresponds to a specific approved job.

{
  "job_id": "abc123",
  "report_path": "output/acme_corp/report.md",
  "company_name": "Acme Corp",
  "generation_timestamp": "2026-02-15T10:31:00Z",
  "report_type": "markdown",
  "content_preview": "# Acme Corp Strategic Overview...",
  "manifest_path": "output/acme_corp/run_manifest.json"
}

primr://output/manifest/latest

Run manifest for the most recent completed job. Provides audit trail for compliance and debugging.

{
  "schema_version": "1.0",
  "job_id": "abc123",
  "company_name": "Acme Corp",
  "company_url": "https://acme.example",
  "mode": "full",
  "estimate": {
    "cost_usd": 0.75,
    "time_minutes": 30,
    "estimated_at": "2026-02-15T10:00:00Z"
  },
  "approval": {
    "token": "ABC123",
    "approved_at": "2026-02-15T10:01:00Z",
    "approved_by": "stdio",
    "bound_to_estimate": true
  },
  "execution": {
    "started_at": "2026-02-15T10:01:05Z",
    "completed_at": "2026-02-15T10:31:00Z",
    "status": "completed",
    "actual_cost_usd": 0.72,
    "actual_time_minutes": 30
  },
  "artifacts": [
    "output/acme_corp/report.md",
    "output/acme_corp/scraped_content.txt",
    "output/acme_corp/insights.txt"
  ]
}

Prompt Templates

Two prompt templates are available for guided workflows:

research_workflow

Guides through the complete research process.

strategy_selection

Helps select appropriate strategy types based on company context.

Security

The MCP server includes comprehensive security features:

Path Validation

All file paths are validated to prevent path traversal attacks:

  • Paths must be relative to workspace root
  • Symlinks are resolved and checked
  • .. sequences are blocked

URL Validation

URLs are validated to prevent SSRF attacks:

  • Only HTTP/HTTPS schemes allowed
  • Private/internal IPs blocked (10.x, 172.16-31.x, 192.168.x, 127.x)
  • DNS rebinding protection

Rate Limiting

Per-tool rate limits prevent abuse:

  • estimate_run: 30 requests/minute
  • research_company: 2 requests/minute
  • Other tools: 10 requests/minute

Authentication (HTTP mode)

JWT authentication for HTTP transport:

  • Tokens verified via JWKS endpoint or shared secret
  • Admin policy: role=admin claim or MCP_ADMIN_TOKENS env var
  • Client ID extracted for rate limiting and job ownership
# Configure auth via environment
MCP_JWT_SECRET=your-secret-key
MCP_ADMIN_TOKENS=token1,token2

Job Store

The MCP server uses a single-job model with journal persistence:

from primr.mcp_server.job_store import SingleJobStore

store = SingleJobStore(journal_path="logs/mcp_journal.json")

# Create a job
job = store.create(company_name="Acme Corp", mode="full", owner_client_id="client1")

# Update progress
job.advance_stage(ResearchStage.DEEP_RESEARCH)
job.heartbeat(progress=50)
store.update(job)

# Check for stuck jobs
if job.is_possibly_stuck():
    print("Job may be stuck - no heartbeat in 5+ minutes")

# Get active job
active = store.get_active()

# Get latest completed job
terminal = store.get_latest_terminal()

Graceful Shutdown

The server handles shutdown gracefully:

  1. Waits up to 5 seconds for current work to complete
  2. Force-cancels remaining tasks after timeout
  3. Marks in-progress jobs as failed with error_type="server_shutdown"
  4. Flushes journal to disk
  5. Total shutdown timeout: 10 seconds

Error Codes

Standard error codes returned by tools:

Code Description
INVALID_URL URL format invalid
SSRF_BLOCKED URL blocked for security
URL_UNREACHABLE URL could not be reached
PATH_TRAVERSAL_BLOCKED Path traversal attempt blocked
REPORT_NOT_FOUND Report file not found
JOB_NOT_FOUND Job ID not found
JOB_IN_PROGRESS Another job already running
RATE_LIMIT_EXCEEDED Rate limit exceeded
CANCEL_NOT_AUTHORIZED Not authorized to cancel job

Agentic Architecture (v1.7.0)

Primr v1.7.0 introduces an agentic architecture that enables AI agents to drive research workflows with persistent memory, hypothesis tracking, and governance hooks.

Research Memory

Track hypotheses across research sessions with confidence levels.

from primr.agentic.memory import ResearchMemory, Hypothesis, ConfidenceLevel
from pathlib import Path

# Initialize memory
memory = ResearchMemory(storage_path=Path("./logs/research_memory"))

# Save a hypothesis
hypothesis = Hypothesis(
    id="h_001",
    company="Acme Corp",
    statement="Acme is expanding into AI-powered logistics",
    confidence=ConfidenceLevel.MEDIUM,
    evidence="CEO mentioned AI initiatives in Q3 earnings call",
    source="https://acme.example/investor-relations",
    topic="strategy",
)
memory.save_hypothesis(hypothesis)

# Retrieve hypotheses
hypotheses = memory.get_hypotheses("Acme Corp")
for h in hypotheses:
    print(f"[{h.confidence.value}] {h.statement}")

# Update confidence as evidence emerges
hypothesis.confidence = ConfidenceLevel.HIGH
hypothesis.evidence += "; Confirmed by press release 2026-01-15"
memory.save_hypothesis(hypothesis)

# List all companies with memory
companies = memory.list_companies()

Confidence Levels

Level Description
LOW Initial hypothesis, needs validation
MEDIUM Some supporting evidence found
HIGH Strong evidence from multiple sources
VALIDATED Confirmed through direct sources

Roadmap API

Query the development roadmap programmatically.

from primr.agentic.roadmap_api import RoadmapAPI
from primr.agentic.models import VersionStatus

api = RoadmapAPI()

# Get current version
current = api.get_current_version()
print(f"Current: v{current.number} - {current.title}")

# Get next planned version
next_ver = api.get_next_version()

# List versions by status
completed = api.list_by_status(VersionStatus.COMPLETED)
planned = api.list_by_status(VersionStatus.PLANNED)

# Get specific version details
v170 = api.get_version("1.7.0")
for feature in v170.features:
    print(f"  - {feature.name}: {feature.description}")

# Search features
results = api.search_features("memory")
for version, feature in results:
    print(f"v{version.number}: {feature.name}")

Hook System

Register governance hooks for cost control and security.

from primr.agentic.hooks import (
    HookSystem,
    CostGuardHook,
    SSRFGuardHook,
    HookContext,
    HookResult,
)

# Create hook system
hooks = HookSystem()

# Register cost guard (blocks operations exceeding budget)
hooks.register(CostGuardHook(max_cost_usd=5.0))

# Register SSRF guard (blocks internal URLs)
hooks.register(SSRFGuardHook())

# Execute hooks before an operation
context = HookContext(
    operation="scrape",
    target_url="https://acme.example",
    estimated_cost=2.50,
)
result = await hooks.execute_pre_hooks(context)

if result.blocked:
    print(f"Operation blocked: {result.reason}")
else:
    # Proceed with operation
    pass

Custom Hooks

from primr.agentic.hooks import Hook, HookContext, HookResult

class AuditHook(Hook):
    """Log all operations for audit trail."""
    
    name = "audit"
    
    async def pre_execute(self, context: HookContext) -> HookResult:
        logger.info(f"Operation: {context.operation} on {context.target_url}")
        return HookResult(blocked=False)
    
    async def post_execute(self, context: HookContext, result: Any) -> None:
        logger.info(f"Completed: {context.operation}, success={result.success}")

hooks.register(AuditHook())

Subagent Architecture

Specialized subagents for different research tasks.

from primr.agentic.subagents import (
    ScraperSubagent,
    AnalystSubagent,
    WriterSubagent,
    QASubagent,
)
from primr.agentic.subagents.base import SubagentContext, SubagentResult

# Create subagent
scraper = ScraperSubagent()

# Execute with context
context = SubagentContext(
    company_name="Acme Corp",
    company_url="https://acme.example",
    working_dir=Path("./working/acme"),
)
result: SubagentResult = await scraper.execute(context)

if result.success:
    print(f"Scraped {result.artifacts['page_count']} pages")
else:
    print(f"Failed: {result.error}")

Available Subagents

Subagent Purpose
ScraperSubagent Website scraping with tier escalation
AnalystSubagent Deep research and hypothesis generation
WriterSubagent Report generation from research data
QASubagent Quality assessment and scoring

Research Orchestrator

Coordinate subagents through the research pipeline.

from primr.agentic.orchestrator import (
    ResearchOrchestrator,
    OrchestratorConfig,
    OrchestratorResult,
)
from primr.agentic.memory import ResearchMemory
from primr.agentic.hooks import HookSystem, CostGuardHook

# Configure orchestrator
config = OrchestratorConfig(
    output_dir=Path("./output"),
    fail_fast=False,  # Continue on non-critical failures
)

# Initialize with memory and hooks
memory = ResearchMemory(storage_path=Path("./logs/research_memory"))
hooks = HookSystem()
hooks.register(CostGuardHook(max_cost_usd=10.0))

orchestrator = ResearchOrchestrator(
    config=config,
    memory=memory,
    hook_system=hooks,
)

# Run orchestrated research
result: OrchestratorResult = await orchestrator.research(
    company_name="Acme Corp",
    company_url="https://acme.example",
    mode="full",
)

if result.is_success:
    print(f"Report: {result.report_path}")
    print(f"Hypotheses: {len(result.hypotheses)}")
    print(f"Stages: {result.completed_stages}")
else:
    print(f"Errors: {result.errors}")

MCP Agentic Tools

Additional MCP tools for agentic workflows.

query_roadmap

Query roadmap versions and features.

{
  "name": "query_roadmap",
  "arguments": {
    "version": "1.7.0"
  }
}

Response:

{
  "version": "1.7.0",
  "status": "completed",
  "title": "Agentic Architecture",
  "features": [
    {"name": "Research Memory", "description": "Persistent hypothesis tracking"},
    {"name": "Hook System", "description": "Governance and cost control"},
    {"name": "Subagent Architecture", "description": "Specialized research agents"}
  ]
}

get_hypotheses

Retrieve hypotheses for a company from research memory.

{
  "name": "get_hypotheses",
  "arguments": {
    "company": "Acme Corp",
    "min_confidence": "medium"
  }
}

Response:

{
  "company": "Acme Corp",
  "hypotheses": [
    {
      "id": "h_001",
      "statement": "Acme is expanding into AI-powered logistics",
      "confidence": "high",
      "evidence": "CEO mentioned AI initiatives in Q3 earnings call",
      "topic": "strategy"
    }
  ],
  "count": 1
}

save_hypothesis

Save a hypothesis to research memory.

{
  "name": "save_hypothesis",
  "arguments": {
    "company": "Acme Corp",
    "statement": "Acme plans to acquire a logistics startup",
    "confidence": "low",
    "evidence": "Rumored in industry newsletter",
    "topic": "m&a"
  }
}

MCP Agentic Resources

Additional MCP resources for agentic workflows.

primr://roadmap

Current roadmap with versions and features.

{
  "current_version": "1.7.0",
  "next_version": "1.8.0",
  "versions": [
    {
      "number": "1.7.0",
      "status": "completed",
      "title": "Agentic Architecture",
      "feature_count": 6
    },
    {
      "number": "1.8.0",
      "status": "planned",
      "title": "QA-Driven Research",
      "feature_count": 4
    }
  ]
}

primr://memory/{company}

Research memory for a specific company.

{
  "company": "Acme Corp",
  "hypothesis_count": 5,
  "hypotheses": [
    {
      "id": "h_001",
      "statement": "Acme is expanding into AI-powered logistics",
      "confidence": "high",
      "topic": "strategy",
      "created_at": "2026-01-15T10:30:00Z"
    }
  ],
  "confidence_distribution": {
    "low": 1,
    "medium": 2,
    "high": 2,
    "validated": 0
  }
}

primr://context

CLAUDE.md context map for AI agents.

{
  "quick_start": {
    "research": "primr \"Company\" https://company.com",
    "doctor": "primr doctor",
    "memory": "primr memory \"Company\""
  },
  "architecture": {
    "entry_point": "src/primr/core/cli.py",
    "orchestrator": "src/primr/core/research_orchestrator.py",
    "agentic": "src/primr/agentic/"
  },
  "verification": {
    "tests": "python -m pytest tests/ -v",
    "types": "python -m mypy src/primr/",
    "lint": "python -m ruff check src/primr/"
  }
}

CLI Commands

New CLI commands for agentic workflows:

# Research memory
primr memory "Acme Corp"              # View hypotheses for a company
primr --memory-list                   # List all companies with memory

# Orchestrated research
primr orchestrate "Acme Corp" https://acme.example
primr --orchestrate --max-cost 5.0    # With cost budget

# Roadmap
primr roadmap                         # Show roadmap overview
primr --roadmap-version v1.7.0        # Show version details

Skills Directory

Pre-built workflow definitions in skills/:

Skill Description
company-research Full pipeline with memory integration
scrape-strategy Tier selection and error handling
hypothesis-tracking Confidence level management
qa-iteration Section refinement workflow

Each skill includes a SKILL.md with workflow steps, decision points, and example usage.