This document describes how to use Primr programmatically as a Python library.
pip install -e .import asyncio
from primr.core.research_orchestrator import ResearchOrchestrator, ResearchMode
async def main():
orchestrator = ResearchOrchestrator()
result = await orchestrator.research(
"Acme Corp",
"https://acme.example",
mode=ResearchMode.COMPLETE
)
if result.success:
print(f"Generated {len(result.section_results)} sections")
print(f"Duration: {result.duration_seconds:.0f}s")
else:
print(f"Error: {result.error}")
asyncio.run(main())The main entry point for running research.
from primr.core.research_orchestrator import (
ResearchOrchestrator,
ResearchMode,
ResearchConfig,
OrchestratorResult
)orchestrator = ResearchOrchestrator()No arguments required. The orchestrator lazy-loads clients as needed.
Execute company research using the specified mode.
async def research(
company_name: str,
website: str | None = None,
mode: ResearchMode = ResearchMode.STRUCTURED,
config: ResearchConfig | None = None,
on_progress: Callable[[str], None] | None = None,
context_files: list[str] | None = None,
) -> OrchestratorResultParameters:
| Name | Type | Description |
|---|---|---|
company_name |
str | Name of the company to research |
website |
str or None | Company website URL |
mode |
ResearchMode | Research mode (STRUCTURED, DEEP_RESEARCH, COMPLETE) |
config |
ResearchConfig or None | Optional configuration overrides |
on_progress |
Callable or None | Progress callback function |
context_files |
list or None | Additional context files (PDFs, docs) |
Returns: OrchestratorResult
Example with progress callback:
def on_progress(message: str):
print(f"Progress: {message}")
result = await orchestrator.research(
"Acme Corp",
"https://acme.example",
mode=ResearchMode.COMPLETE,
on_progress=on_progress
)Available research modes.
from primr.core.research_orchestrator import ResearchMode
ResearchMode.STRUCTURED # Website scraping + web search
ResearchMode.DEEP_RESEARCH # Autonomous web research
ResearchMode.COMPLETE # Two-step: structured then deepConfiguration for a research task.
from primr.core.research_orchestrator import ResearchConfig
config = ResearchConfig(
mode=ResearchMode.COMPLETE,
timeout=3600, # Max duration in seconds
poll_interval=10, # Seconds between status checks
include_website_scrape=True,
include_web_search=True,
sections=None # Specific sections to research (or None for all)
)Result from the research orchestrator.
@dataclass
class OrchestratorResult:
company_name: str
website: str | None
mode: ResearchMode
section_results: dict[str, str] # Section key -> content
raw_content: str
citations: list
duration_seconds: float
success: bool
error: str | None
timestamp: datetimeAccessing results:
result = await orchestrator.research("Acme Corp", "https://acme.example")
if result.success:
# Access individual sections
overview = result.section_results.get("company_overview", "")
products = result.section_results.get("detailed_products_services", "")
# Get all section keys
print(result.section_results.keys())
# Access raw content (for Complete mode)
full_report = result.raw_content
# Access citations
for citation in result.citations:
print(f"{citation['number']}: {citation['url']}")The primr.core package contains specialized modules for different aspects of research. These can be imported directly for more granular control.
Working folder creation and file operations.
from primr.core.workspace import (
create_working_folder,
consolidate_working_folder,
save_section_output,
validate_context_files,
WorkspaceConfig,
ConsolidationResult,
)# Create a working folder for research
folder = create_working_folder("Acme Corp")
print(f"Working folder: {folder}")
# Save section output
save_section_output(folder, "company_overview", "Acme Corp is a technology company...")
# Consolidate all sections into a single file
result = consolidate_working_folder(folder)
print(f"Consolidated {result.section_count} sections")Generate AI strategy recommendations with cloud vendor context.
from primr.core.ai_strategy import (
generate_ai_strategy_sync,
CloudVendor,
AIStrategyConfig,
AIStrategyResult,
)# Generate AI strategy for a company
config = AIStrategyConfig(
company_name="Acme Corp",
cloud_vendor=CloudVendor.AWS,
working_folder=Path("working/Acme Corp"),
)
result = generate_ai_strategy_sync(config)
if result.success:
print(result.content)CloudVendor enum:
CloudVendor.AWS # Amazon Web Services
CloudVendor.AZURE # Microsoft Azure
CloudVendor.GCP # Google Cloud PlatformExecute Deep Research with preflight validation.
from primr.core.deep_research_runner import (
perform_deep_research,
validate_preflight,
DeepResearchConfig,
DeepResearchMode,
PreflightResult,
PreflightStatus,
)# Validate before running expensive operations
preflight = validate_preflight()
if preflight.status == PreflightStatus.READY:
config = DeepResearchConfig(
company_name="Acme Corp",
prompt="Research Acme Corp's competitive position in the market",
mode=DeepResearchMode.STANDARD,
)
result = await perform_deep_research(config)
else:
print(f"Preflight failed: {preflight.message}")Command-line interface components for programmatic use.
from primr.core.cli import (
main,
run_doctor,
parse_args,
process_csv,
Command,
CLIConfig,
)# Run system check programmatically
success = run_doctor()
# Parse CLI arguments
config = parse_args(["Acme Corp", "https://acme.example", "--mode", "deep"])
print(f"Company: {config.company_name}")
print(f"Mode: {config.mode}")Website scraping pipeline with section-by-section analysis.
from primr.core.structured_research import (
run_research,
research_section,
generate_initial_overview,
ScrapedData,
AnalysisResult,
ResearchContext,
)Cloud vendor AI capabilities research.
from primr.core.vendor_research import (
get_or_generate_vendor_research,
get_or_generate_vendor_research_sync,
VendorResearchFile,
VendorResearchResult,
)For existing code, all functions remain available from research_agent.py:
# These imports still work (delegate to new modules internally)
from primr.core.research_agent import (
main,
run_doctor,
create_working_folder,
consolidate_working_folder,
run_research,
research_section,
CloudVendor,
DeepResearchConfig,
DeepResearchMode,
)Direct access to the AI client for custom prompts.
from primr.ai import AIClient, get_clientclient = get_client()
response = client.generate("What is Python?")client = AIClient(api_key="your-api-key")
response = client.generate(
"Analyze this company",
model_type="research",
thinking_level="high"
)Generate content with automatic retries.
def generate(
prompt: str,
model_type: str = "research",
temperature: float = 1.0,
thinking_level: str = "high",
max_retries: int | None = None,
timeout: float | None = None,
) -> strParameters:
| Name | Type | Description |
|---|---|---|
prompt |
str | The prompt to send |
model_type |
str | "research" or "report" |
temperature |
float | Sampling temperature (0.0-2.0) |
thinking_level |
str | "low" or "high" |
max_retries |
int or None | Override default retry count |
timeout |
float or None | Request timeout in seconds |
Fast generation with minimal thinking.
response = client.generate_fast("Summarize this text")client = AIClient(track_usage=True)
# Make some calls
client.generate("First prompt")
client.generate("Second prompt")
# Get usage summary
usage = client.get_usage_summary()
print(f"Total tokens: {usage['total_tokens']}")
print(f"Estimated cost: ${usage['total_cost']:.4f}")
# Reset counters
client.reset_usage()Direct access to Gemini's Deep Research Agent.
from primr.ai import DeepResearchClient, ResearchResult, ResearchProgressclient = DeepResearchClient()
result = await client.research(
"Research Acme Corp's competitive position in the market"
)
print(result.content)
for citation in result.citations:
print(f"Source: {citation['url']}")def on_progress(progress: ResearchProgress):
print(f"Status: {progress.status.value}")
print(f"Message: {progress.message}")
if progress.thought:
print(f"Thinking: {progress.thought}")
result = await client.research(
"Research Acme Corp",
on_progress=on_progress
)# Company profile format
result = await client.research(
"Research Acme Corp",
output_format="company_profile"
)
# Executive summary format
result = await client.research(
"Research Acme Corp",
output_format="executive_summary"
)
# Competitive analysis format
result = await client.research(
"Research Acme Corp",
output_format="competitive_analysis"
)result = await client.research(
"Research Acme Corp",
priority_urls=["https://acme.example", "https://ir.acme.example"]
)Deep Research jobs run asynchronously. If a connection drops, the job continues on Google's servers.
from primr.ai.deep_research import (
get_deep_research_client,
get_pending_jobs,
save_pending_job,
remove_pending_job,
)
# Check status of a specific job
client = get_deep_research_client()
result = client.check_job("v1_abc123...")
print(f"Status: {result['status']}") # in_progress, completed, failed
if result['content']:
print(f"Content: {result['content'][:500]}...")
# List all pending jobs
jobs = get_pending_jobs()
for job_id, info in jobs.items():
print(f"{job_id}: {info['description']} ({info['status']})")
# Manually save a job for later recovery
save_pending_job(
interaction_id="v1_abc123...",
job_type="ai_strategy",
description="AI Strategy for Acme Corp"
)
# Remove a completed job from tracking
remove_pending_job("v1_abc123...")CLI commands for job management:
primr --check-jobs # Check status of all pending jobs
primr --clear-jobs # Clear stale/old pending jobsDirect access to the scraping engine.
from primr.data.scrape import (
scrape_with_requests,
scrape_with_httpx,
scrape_with_playwright,
scrape_with_playwright_aggressive,
get_cached_content,
cache_content,
clear_cache
)url = "https://example.com"
# Try each tier in order
content, error = scrape_with_requests(url)
if content is None:
content, error = scrape_with_httpx(url)
if content is None:
content, error = scrape_with_playwright(url)
if content is None:
content, error = scrape_with_playwright_aggressive(url)
if content:
print(f"Scraped {len(content)} characters")
else:
print(f"All tiers failed: {error}")url = "https://example.com"
# Check cache first
cached = get_cached_content(url)
if cached:
print("Using cached content")
else:
content, error = scrape_with_requests(url)
if content:
cache_content(url, content)
# Clear old cache entries
clear_cache(max_age_hours=24)
# Clear all cache
clear_cache()from primr.data import ParallelScraper, get_parallel_scraper
scraper = get_parallel_scraper()
results = await scraper.scrape_urls([
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
])
for result in results:
if result.success:
print(f"{result.url}: {len(result.content)} chars")
else:
print(f"{result.url}: {result.error}")Generate reports from research results.
from primr.output import DocumentBuilder
from pathlib import Path
builder = DocumentBuilder()
# Build DOCX from sections
doc_path = builder.build_docx(
sections=result.section_results,
company_name="Acme Corp",
output_dir=Path("output")
)
print(f"Report saved to: {doc_path}")from primr.config import get_settings, configure
# Get current settings
settings = get_settings()
print(f"Research model: {settings.ai.research_model}")
print(f"Scrape timeout: {settings.scraping.timeout}")
# Validate settings
settings.validate_all(include_api_keys=True)from primr.config import configure
from pathlib import Path
settings = configure(
project_root=Path("/custom/path"),
verbose=True,
debug=True
)from primr.config import (
TimeoutConfig,
CacheConfig,
ScrapingConfig,
AIConfig,
SearchConfig,
PathConfig,
PricingConfig
)
# Example: Custom timeout config
timeout = TimeoutConfig(
connect=10.0,
read=30.0,
total=60.0
)
timeout.validate() # Raises ValueError if invalidPrimr provides comprehensive type definitions for type-safe code.
from primr.types import (
# Type aliases
URL,
FilePath,
HTMLContent,
TextContent,
# Enums
AIModelType,
ThinkingLevel,
ScrapeTier,
OutputFormat,
# TypedDicts
SearchResult,
ScrapedPage,
GradeResult,
ReportSection,
CompanyInfo,
ResearchContext,
# Protocols
AIClientProtocol,
ScraperProtocol,
SearchProtocol,
CacheProtocol,
# Generic types
Result,
# Type guards
is_valid_url,
is_search_result,
is_scraped_page
)from primr.types import Result
def fetch_data(url: str) -> Result[str]:
try:
content = scrape(url)
return Result.ok(content)
except Exception as e:
return Result.err(e)
result = fetch_data("https://example.com")
if result.is_ok:
print(result.value)
else:
print(f"Error: {result.error}")
# Or use unwrap_or for default value
content = result.unwrap_or("No content available")from primr.utils.errors import (
ResearchError,
AIError,
ScrapingError,
ConfigurationError,
ValidationError,
retry_on_failure,
safe_call
)from primr.utils.errors import AIError, retry_on_failure
@retry_on_failure(max_retries=3, delay=1.0)
def call_api():
# Your code here
pass
# Or use safe_call for exception wrapping
result = safe_call(risky_function, default_value="fallback")from primr.utils.errors import error_context
with error_context("fetching company data", company="Acme Corp"):
# Operations here will have context in error messages
data = fetch_data()from primr.utils.logging_config import get_logger, setup_logging
# Get a module-specific logger
logger = get_logger("my_module")
logger.info("Starting operation")
logger.debug("Debug details")
logger.error("Something went wrong")
# Configure logging
setup_logging(level="DEBUG", log_file="primr.log")from primr.utils.console import console
console.step("Starting research...")
console.ok("Research complete")
console.warn("Some sections had low quality")
console.error("Failed to scrape website")
console.progress(5, 10, "Processing sections")
console.progress_done()from primr.utils.observability import (
operation_context,
timed,
Metrics,
emit_metrics
)
# Track operation duration
with operation_context("research", company="Acme Corp"):
# Operations here are tracked
pass
# Decorator for timing
@timed("my_operation")
def slow_function():
pass
# Emit custom metrics
metrics = Metrics(
operation="custom_op",
duration_seconds=5.0,
success=True,
metadata={"key": "value"}
)
emit_metrics(metrics)import asyncio
from pathlib import Path
from primr.core.research_orchestrator import ResearchOrchestrator, ResearchMode
from primr.output import DocumentBuilder
from primr.utils.console import console
from primr.utils.logging_config import setup_logging
async def research_company(name: str, website: str):
# Setup
setup_logging(level="INFO")
# Progress callback
def on_progress(msg: str):
console.step(msg)
# Run research
console.step(f"Starting research for {name}")
orchestrator = ResearchOrchestrator()
result = await orchestrator.research(
name,
website,
mode=ResearchMode.COMPLETE,
on_progress=on_progress
)
if not result.success:
console.error(f"Research failed: {result.error}")
return None
console.ok(f"Research complete in {result.duration_seconds:.0f}s")
# Generate report
builder = DocumentBuilder()
output_path = builder.build_docx(
sections=result.section_results,
company_name=name,
output_dir=Path("output")
)
console.ok(f"Report saved to {output_path}")
return output_path
if __name__ == "__main__":
asyncio.run(research_company("Acme Corp", "https://acme.example"))The prompt system (v1.2.5+) externalizes prompts to YAML configuration files.
from primr.prompts import (
PromptComposer,
PromptContext,
ComposedPrompt,
StrategyModuleRegistry,
get_registry,
load_prompt_config,
build_company_overview_prompt,
build_ai_strategy_prompt,
get_available_prompts,
)Build prompts from YAML configurations with variable substitution.
from primr.prompts import PromptComposer, PromptContext
composer = PromptComposer()
context = PromptContext(
company_name="Acme Corp",
website_url="https://acme.example",
cloud_vendor="azure",
)
# Compose a standard prompt
result = composer.compose("company_overview", context)
print(result.content)
print(f"Sections: {result.section_count}")
print(f"Words: {result.word_count}")
# Compose a strategy prompt
result = composer.compose_strategy("ai", context)
print(result.content)Discover and manage strategy modules.
from primr.prompts import get_registry
registry = get_registry()
# List available strategies
for name in registry.list_names():
print(name) # ai, cloud, data
# Get strategy details
strategy = registry.get("ai")
print(strategy.display_name) # "AI Strategy"
print(strategy.description)
# Get context files for a strategy (for File Search Store)
files = registry.get_context_files("ai", vendor="azure")
for f in files:
print(f) # vendor-research/vendor-research-azure-2025-12.txtFor backward compatibility, the original functions still work:
from primr.prompts import (
build_company_overview_prompt,
build_ai_strategy_prompt,
get_available_prompts,
)
# List available prompts
prompts = get_available_prompts() # ['ai_strategy', 'company_overview', ...]
# Build prompts (delegates to PromptComposer internally)
prompt = build_company_overview_prompt("Acme Corp", website_url="https://acme.example")
prompt = build_ai_strategy_prompt("Acme Corp", cloud_vendor="azure")from primr.prompts import (
PromptConfigError,
PromptConfigNotFoundError,
PromptConfigValidationError,
StrategyModuleNotFoundError,
DataSourceNotFoundError,
)
try:
composer.compose("nonexistent", context)
except PromptConfigNotFoundError as e:
print(f"Not found: {e.prompt_name}")
print(f"Searched: {e.searched_paths}")
print(f"Available: {e.available_prompts}")Most components use thread-safe singletons. For testing or custom configurations, you can reset them:
from primr.ai import reset_client
from primr.data import reset_cache
from primr.config import reset_settings
# Reset all singletons
reset_client()
reset_cache()
reset_settings()Primr includes a Model Context Protocol (MCP) server that enables AI agents to drive company research programmatically. The MCP server exposes Primr's functionality through a standardized protocol that AI assistants like Claude Desktop can use.
# Run with stdio transport (for Claude Desktop)
primr-mcp --stdio
# Run with HTTP transport
primr-mcp --http --port 8000
# Development mode (no auth)
primr-mcp --http --port 8000 --no-auth --allow-plaintextAdd to your claude_desktop_config.json:
{
"mcpServers": {
"primr": {
"command": "primr-mcp",
"args": ["--stdio"]
}
}
}import asyncio
from primr.mcp_server import create_mcp_server
async def main():
# Create server with stdio transport
server = create_mcp_server(transport="stdio")
await server.run()
# Or with HTTP transport
server = create_mcp_server(
transport="streamable-http",
port=8000,
host="127.0.0.1",
require_auth=True,
)
await server.run()
asyncio.run(main())The MCP server exposes 9 tools for research operations:
Get cost and time estimates before running research.
{
"name": "estimate_run",
"arguments": {
"company_url": "https://acme.example",
"mode": "full"
}
}Response:
{
"estimated_cost_usd": 0.75,
"estimated_time_minutes": 30,
"planned_pages": 20,
"mode": "full"
}Initiate company research (async - returns job_id immediately).
{
"name": "research_company",
"arguments": {
"company_name": "Acme Corp",
"company_url": "https://acme.example",
"mode": "full",
"cloud_vendor": "azure",
"skip_qa": false
}
}Response:
{
"job_id": "job_abc123",
"accepted": true,
"status_uri": "primr://research/status"
}Generate strategy document from existing report.
{
"name": "generate_strategy",
"arguments": {
"report_path": "output/Acme_Corp_Strategic_Overview.md",
"strategy_type": "customer_experience",
"cloud_vendor": "azure"
}
}Strategy types: ai_strategy, customer_experience, modern_security_compliance, data_fabric_strategy
Check status of research jobs.
{
"name": "check_jobs",
"arguments": {
"job_id": "job_abc123"
}
}Response:
{
"jobs": [
{
"job_id": "job_abc123",
"status": "in_progress",
"company_name": "Acme Corp",
"output_path": null
}
]
}Run quality assessment on a report.
{
"name": "run_qa",
"arguments": {
"report_path": "output/Acme_Corp_Strategic_Overview.md"
}
}Check system health and configuration.
{
"name": "doctor",
"arguments": {}
}Clear stale pending jobs.
{
"name": "clear_jobs",
"arguments": {
"older_than_hours": 24
}
}Cancel an active research job.
{
"name": "cancel_job",
"arguments": {
"job_id": "job_abc123"
}
}Delegate a task to an external A2A agent. Requires pip install primr[a2a].
{
"name": "delegate_to_agent",
"arguments": {
"agent_url": "https://remote-agent.example.com",
"message": "Research Acme Corp competitive landscape",
"skill_id": "research_company"
}
}Response (success):
{
"status": {"state": "completed"},
"artifacts": [...]
}Response (error):
{
"error": true,
"error_type": "a2a_delegation_failed",
"message": "Connection refused"
}SSRF protection validates all agent URLs. Private IPs, metadata endpoints, and non-HTTP schemes are blocked.
Primr can also expose its capabilities via the A2A (Agent-to-Agent) protocol, allowing other agents to discover and invoke Primr's research tools.
# Install A2A support
pip install primr[a2a]
# Standalone A2A server
primr-a2a --no-auth
# Co-hosted with MCP server
primr-mcp --http --a2a --a2a-port 9000Agent Card (served at /.well-known/agent.json):
curl http://localhost:9000/.well-known/agent.jsonSkills available via A2A:
| Skill ID | Description |
|---|---|
estimate_research |
Cost/time estimate for a research run |
research_company |
Start async research (SSE streaming progress) |
check_jobs |
Current job status |
run_qa |
Quality assessment on completed reports |
system_health |
System diagnostics |
Example A2A message:
{
"jsonrpc": "2.0",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "{\"url\": \"https://acme.com\", \"mode\": \"full\"}"}],
"metadata": {"skillId": "research_company"}
}
}
}The A2A server shares the MCP server's SingleJobStore, rate limiter, and security middleware. The single-job model is enforced across both protocols.
The MCP server exposes 4 read-only resources:
Current research job status with progress information.
{
"status": "in_progress",
"job_id": "job_abc123",
"company_name": "Acme Corp",
"mode": "full",
"current_stage": "deep_research",
"stage_progress_percent": 45,
"stage_expected_minutes": 15,
"possibly_stuck": false
}Most recent research output. Add ?full_content=true for complete content.
{
"report_path": "output/Acme_Corp_Strategic_Overview.md",
"company_name": "Acme Corp",
"generation_timestamp": "2026-02-02T10:30:00",
"report_type": "markdown",
"content_preview": "# Acme Corp Strategic Overview..."
}Pipeline stage artifacts (scraped_content, insights, dossier, reports).
{
"job_id": "job_abc123",
"job_status": "completed",
"artifacts": [
{
"artifact_type": "scraped_content",
"file_path": "output/acme_corp/scraped_content.txt",
"size_bytes": 125000,
"preview": "Acme Corp designs...",
"content_hash": "sha256:abc123..."
}
]
}Current configuration (no secrets exposed).
{
"available_modes": ["scrape", "deep", "full"],
"available_strategies": {
"ai_strategy": "AI/ML transformation roadmap",
"customer_experience": "CX improvement plan",
"modern_security_compliance": "Security posture assessment",
"data_fabric_strategy": "Data platform modernization"
},
"configured_vendors": ["azure", "aws", "gcp", "private"]
}List of available strategy types with metadata for Open Claw integration.
{
"schema_version": "1.0",
"strategies": [
{
"id": "ai_strategy",
"name": "AI Strategy",
"description": "AI/ML transformation roadmap with quick wins and bigger bets",
"requires_cloud_vendor": true,
"estimated_time_minutes": 15,
"estimated_cost_usd": 0.30
},
{
"id": "customer_experience",
"name": "Customer Experience Strategy",
"description": "CX transformation and digital experience improvement plan",
"requires_cloud_vendor": false,
"estimated_time_minutes": 12,
"estimated_cost_usd": 0.25
},
{
"id": "modern_security_compliance",
"name": "Security & Compliance Strategy",
"description": "Zero Trust architecture and compliance posture assessment",
"requires_cloud_vendor": false,
"estimated_time_minutes": 12,
"estimated_cost_usd": 0.25
},
{
"id": "data_fabric_strategy",
"name": "Data Fabric Strategy",
"description": "Modern data platform for agentic AI and semantic layers",
"requires_cloud_vendor": false,
"estimated_time_minutes": 12,
"estimated_cost_usd": 0.25
}
]
}Job-scoped artifact retrieval for provenance tracking. Ensures the returned report corresponds to a specific approved job.
{
"job_id": "abc123",
"report_path": "output/acme_corp/report.md",
"company_name": "Acme Corp",
"generation_timestamp": "2026-02-15T10:31:00Z",
"report_type": "markdown",
"content_preview": "# Acme Corp Strategic Overview...",
"manifest_path": "output/acme_corp/run_manifest.json"
}Run manifest for the most recent completed job. Provides audit trail for compliance and debugging.
{
"schema_version": "1.0",
"job_id": "abc123",
"company_name": "Acme Corp",
"company_url": "https://acme.example",
"mode": "full",
"estimate": {
"cost_usd": 0.75,
"time_minutes": 30,
"estimated_at": "2026-02-15T10:00:00Z"
},
"approval": {
"token": "ABC123",
"approved_at": "2026-02-15T10:01:00Z",
"approved_by": "stdio",
"bound_to_estimate": true
},
"execution": {
"started_at": "2026-02-15T10:01:05Z",
"completed_at": "2026-02-15T10:31:00Z",
"status": "completed",
"actual_cost_usd": 0.72,
"actual_time_minutes": 30
},
"artifacts": [
"output/acme_corp/report.md",
"output/acme_corp/scraped_content.txt",
"output/acme_corp/insights.txt"
]
}Two prompt templates are available for guided workflows:
Guides through the complete research process.
Helps select appropriate strategy types based on company context.
The MCP server includes comprehensive security features:
All file paths are validated to prevent path traversal attacks:
- Paths must be relative to workspace root
- Symlinks are resolved and checked
..sequences are blocked
URLs are validated to prevent SSRF attacks:
- Only HTTP/HTTPS schemes allowed
- Private/internal IPs blocked (10.x, 172.16-31.x, 192.168.x, 127.x)
- DNS rebinding protection
Per-tool rate limits prevent abuse:
estimate_run: 30 requests/minuteresearch_company: 2 requests/minute- Other tools: 10 requests/minute
JWT authentication for HTTP transport:
- Tokens verified via JWKS endpoint or shared secret
- Admin policy:
role=adminclaim orMCP_ADMIN_TOKENSenv var - Client ID extracted for rate limiting and job ownership
# Configure auth via environment
MCP_JWT_SECRET=your-secret-key
MCP_ADMIN_TOKENS=token1,token2The MCP server uses a single-job model with journal persistence:
from primr.mcp_server.job_store import SingleJobStore
store = SingleJobStore(journal_path="logs/mcp_journal.json")
# Create a job
job = store.create(company_name="Acme Corp", mode="full", owner_client_id="client1")
# Update progress
job.advance_stage(ResearchStage.DEEP_RESEARCH)
job.heartbeat(progress=50)
store.update(job)
# Check for stuck jobs
if job.is_possibly_stuck():
print("Job may be stuck - no heartbeat in 5+ minutes")
# Get active job
active = store.get_active()
# Get latest completed job
terminal = store.get_latest_terminal()The server handles shutdown gracefully:
- Waits up to 5 seconds for current work to complete
- Force-cancels remaining tasks after timeout
- Marks in-progress jobs as failed with
error_type="server_shutdown" - Flushes journal to disk
- Total shutdown timeout: 10 seconds
Standard error codes returned by tools:
| Code | Description |
|---|---|
INVALID_URL |
URL format invalid |
SSRF_BLOCKED |
URL blocked for security |
URL_UNREACHABLE |
URL could not be reached |
PATH_TRAVERSAL_BLOCKED |
Path traversal attempt blocked |
REPORT_NOT_FOUND |
Report file not found |
JOB_NOT_FOUND |
Job ID not found |
JOB_IN_PROGRESS |
Another job already running |
RATE_LIMIT_EXCEEDED |
Rate limit exceeded |
CANCEL_NOT_AUTHORIZED |
Not authorized to cancel job |
Primr v1.7.0 introduces an agentic architecture that enables AI agents to drive research workflows with persistent memory, hypothesis tracking, and governance hooks.
Track hypotheses across research sessions with confidence levels.
from primr.agentic.memory import ResearchMemory, Hypothesis, ConfidenceLevel
from pathlib import Path
# Initialize memory
memory = ResearchMemory(storage_path=Path("./logs/research_memory"))
# Save a hypothesis
hypothesis = Hypothesis(
id="h_001",
company="Acme Corp",
statement="Acme is expanding into AI-powered logistics",
confidence=ConfidenceLevel.MEDIUM,
evidence="CEO mentioned AI initiatives in Q3 earnings call",
source="https://acme.example/investor-relations",
topic="strategy",
)
memory.save_hypothesis(hypothesis)
# Retrieve hypotheses
hypotheses = memory.get_hypotheses("Acme Corp")
for h in hypotheses:
print(f"[{h.confidence.value}] {h.statement}")
# Update confidence as evidence emerges
hypothesis.confidence = ConfidenceLevel.HIGH
hypothesis.evidence += "; Confirmed by press release 2026-01-15"
memory.save_hypothesis(hypothesis)
# List all companies with memory
companies = memory.list_companies()| Level | Description |
|---|---|
LOW |
Initial hypothesis, needs validation |
MEDIUM |
Some supporting evidence found |
HIGH |
Strong evidence from multiple sources |
VALIDATED |
Confirmed through direct sources |
Query the development roadmap programmatically.
from primr.agentic.roadmap_api import RoadmapAPI
from primr.agentic.models import VersionStatus
api = RoadmapAPI()
# Get current version
current = api.get_current_version()
print(f"Current: v{current.number} - {current.title}")
# Get next planned version
next_ver = api.get_next_version()
# List versions by status
completed = api.list_by_status(VersionStatus.COMPLETED)
planned = api.list_by_status(VersionStatus.PLANNED)
# Get specific version details
v170 = api.get_version("1.7.0")
for feature in v170.features:
print(f" - {feature.name}: {feature.description}")
# Search features
results = api.search_features("memory")
for version, feature in results:
print(f"v{version.number}: {feature.name}")Register governance hooks for cost control and security.
from primr.agentic.hooks import (
HookSystem,
CostGuardHook,
SSRFGuardHook,
HookContext,
HookResult,
)
# Create hook system
hooks = HookSystem()
# Register cost guard (blocks operations exceeding budget)
hooks.register(CostGuardHook(max_cost_usd=5.0))
# Register SSRF guard (blocks internal URLs)
hooks.register(SSRFGuardHook())
# Execute hooks before an operation
context = HookContext(
operation="scrape",
target_url="https://acme.example",
estimated_cost=2.50,
)
result = await hooks.execute_pre_hooks(context)
if result.blocked:
print(f"Operation blocked: {result.reason}")
else:
# Proceed with operation
passfrom primr.agentic.hooks import Hook, HookContext, HookResult
class AuditHook(Hook):
"""Log all operations for audit trail."""
name = "audit"
async def pre_execute(self, context: HookContext) -> HookResult:
logger.info(f"Operation: {context.operation} on {context.target_url}")
return HookResult(blocked=False)
async def post_execute(self, context: HookContext, result: Any) -> None:
logger.info(f"Completed: {context.operation}, success={result.success}")
hooks.register(AuditHook())Specialized subagents for different research tasks.
from primr.agentic.subagents import (
ScraperSubagent,
AnalystSubagent,
WriterSubagent,
QASubagent,
)
from primr.agentic.subagents.base import SubagentContext, SubagentResult
# Create subagent
scraper = ScraperSubagent()
# Execute with context
context = SubagentContext(
company_name="Acme Corp",
company_url="https://acme.example",
working_dir=Path("./working/acme"),
)
result: SubagentResult = await scraper.execute(context)
if result.success:
print(f"Scraped {result.artifacts['page_count']} pages")
else:
print(f"Failed: {result.error}")| Subagent | Purpose |
|---|---|
ScraperSubagent |
Website scraping with tier escalation |
AnalystSubagent |
Deep research and hypothesis generation |
WriterSubagent |
Report generation from research data |
QASubagent |
Quality assessment and scoring |
Coordinate subagents through the research pipeline.
from primr.agentic.orchestrator import (
ResearchOrchestrator,
OrchestratorConfig,
OrchestratorResult,
)
from primr.agentic.memory import ResearchMemory
from primr.agentic.hooks import HookSystem, CostGuardHook
# Configure orchestrator
config = OrchestratorConfig(
output_dir=Path("./output"),
fail_fast=False, # Continue on non-critical failures
)
# Initialize with memory and hooks
memory = ResearchMemory(storage_path=Path("./logs/research_memory"))
hooks = HookSystem()
hooks.register(CostGuardHook(max_cost_usd=10.0))
orchestrator = ResearchOrchestrator(
config=config,
memory=memory,
hook_system=hooks,
)
# Run orchestrated research
result: OrchestratorResult = await orchestrator.research(
company_name="Acme Corp",
company_url="https://acme.example",
mode="full",
)
if result.is_success:
print(f"Report: {result.report_path}")
print(f"Hypotheses: {len(result.hypotheses)}")
print(f"Stages: {result.completed_stages}")
else:
print(f"Errors: {result.errors}")Additional MCP tools for agentic workflows.
Query roadmap versions and features.
{
"name": "query_roadmap",
"arguments": {
"version": "1.7.0"
}
}Response:
{
"version": "1.7.0",
"status": "completed",
"title": "Agentic Architecture",
"features": [
{"name": "Research Memory", "description": "Persistent hypothesis tracking"},
{"name": "Hook System", "description": "Governance and cost control"},
{"name": "Subagent Architecture", "description": "Specialized research agents"}
]
}Retrieve hypotheses for a company from research memory.
{
"name": "get_hypotheses",
"arguments": {
"company": "Acme Corp",
"min_confidence": "medium"
}
}Response:
{
"company": "Acme Corp",
"hypotheses": [
{
"id": "h_001",
"statement": "Acme is expanding into AI-powered logistics",
"confidence": "high",
"evidence": "CEO mentioned AI initiatives in Q3 earnings call",
"topic": "strategy"
}
],
"count": 1
}Save a hypothesis to research memory.
{
"name": "save_hypothesis",
"arguments": {
"company": "Acme Corp",
"statement": "Acme plans to acquire a logistics startup",
"confidence": "low",
"evidence": "Rumored in industry newsletter",
"topic": "m&a"
}
}Additional MCP resources for agentic workflows.
Current roadmap with versions and features.
{
"current_version": "1.7.0",
"next_version": "1.8.0",
"versions": [
{
"number": "1.7.0",
"status": "completed",
"title": "Agentic Architecture",
"feature_count": 6
},
{
"number": "1.8.0",
"status": "planned",
"title": "QA-Driven Research",
"feature_count": 4
}
]
}Research memory for a specific company.
{
"company": "Acme Corp",
"hypothesis_count": 5,
"hypotheses": [
{
"id": "h_001",
"statement": "Acme is expanding into AI-powered logistics",
"confidence": "high",
"topic": "strategy",
"created_at": "2026-01-15T10:30:00Z"
}
],
"confidence_distribution": {
"low": 1,
"medium": 2,
"high": 2,
"validated": 0
}
}CLAUDE.md context map for AI agents.
{
"quick_start": {
"research": "primr \"Company\" https://company.com",
"doctor": "primr doctor",
"memory": "primr memory \"Company\""
},
"architecture": {
"entry_point": "src/primr/core/cli.py",
"orchestrator": "src/primr/core/research_orchestrator.py",
"agentic": "src/primr/agentic/"
},
"verification": {
"tests": "python -m pytest tests/ -v",
"types": "python -m mypy src/primr/",
"lint": "python -m ruff check src/primr/"
}
}New CLI commands for agentic workflows:
# Research memory
primr memory "Acme Corp" # View hypotheses for a company
primr --memory-list # List all companies with memory
# Orchestrated research
primr orchestrate "Acme Corp" https://acme.example
primr --orchestrate --max-cost 5.0 # With cost budget
# Roadmap
primr roadmap # Show roadmap overview
primr --roadmap-version v1.7.0 # Show version detailsPre-built workflow definitions in skills/:
| Skill | Description |
|---|---|
company-research |
Full pipeline with memory integration |
scrape-strategy |
Tier selection and error handling |
hypothesis-tracking |
Confidence level management |
qa-iteration |
Section refinement workflow |
Each skill includes a SKILL.md with workflow steps, decision points, and example usage.