Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
run: |
# Skip GPU tests as GitHub Actions runners don't have CUDA
# To run GPU tests locally: pytest tests/ -v -m "gpu"
pytest tests/ -v --tb=short -m "not slow and not gpu and not integration"
python -m pytest tests/ -v --tb=short -m "not slow and not gpu and not integration"

lint:
runs-on: ubuntu-latest
Expand Down
16 changes: 16 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,19 @@ dist/
*/.DS_Store
*.DS_Store

# Evaluation/Profiling ignores
*.prof
evaluation/sandbox/results/*
!evaluation/sandbox/results/.gitkeep
!evaluation/sandbox/results/run_local_eval.py

# Sandbox SIF images
*.sif

# Sandbox Cache
evaluation/sandbox/cache/

# Slurm and Apptainer temporary files
*.err
*.out
build-temp-*/
33 changes: 27 additions & 6 deletions contextpilot/context_index/compute_distance_cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def compute_distance_matrix_cpu_optimized(contexts: List[List[int]],
start = time.time()
chunk_ids, original_positions, lengths, offsets = prepare_contexts_for_cpu(contexts)
prep_time = time.time() - start
print(f" Prepared in {prep_time:.1f}s")
print(f"+ Prepared in {prep_time:.1f}s")

# Generate batches of pair indices
print(f"\nGenerating pair batches...")
Expand All @@ -290,7 +290,7 @@ def compute_distance_matrix_cpu_optimized(contexts: List[List[int]],
if current_batch:
batches.append(current_batch)

print(f" Generated {len(batches):,} batches")
print(f"+ Generated {len(batches):,} batches")

# Prepare arguments for workers
worker_args = [
Expand All @@ -306,13 +306,13 @@ def compute_distance_matrix_cpu_optimized(contexts: List[List[int]],
start_time = time.time()
processed = 0

with Pool(num_workers) as pool:
for batch_results in pool.imap_unordered(compute_batch_worker, worker_args):
if num_workers == 1:
# Bypass multiprocessing Pool entirely to save initialization overhead
for args in worker_args:
batch_results = compute_batch_worker(args)
for i, j, dist in batch_results:
# Convert (i, j) to condensed index
condensed_idx = n * i - i * (i + 1) // 2 + j - i - 1
condensed_distances[condensed_idx] = dist

processed += 1

# Progress update
Expand All @@ -326,6 +326,27 @@ def compute_distance_matrix_cpu_optimized(contexts: List[List[int]],
f"Rate: {rate:,.0f} pairs/sec | "
f"Elapsed: {elapsed:.1f}s | "
f"ETA: {eta:.1f}s ({eta/60:.1f} min)")
else:
with Pool(num_workers) as pool:
for batch_results in pool.imap_unordered(compute_batch_worker, worker_args):
for i, j, dist in batch_results:
# Convert (i, j) to condensed index
condensed_idx = n * i - i * (i + 1) // 2 + j - i - 1
condensed_distances[condensed_idx] = dist

processed += 1

# Progress update
if processed % 100000 == 0 or processed == num_pairs:
elapsed = time.time() - start_time
rate = processed / elapsed if elapsed > 0 else 0
eta = (num_pairs - processed) / rate if rate > 0 else 0
progress_pct = processed / num_pairs * 100

print(f" {processed:,}/{num_pairs:,} ({progress_pct:.1f}%) | "
f"Rate: {rate:,.0f} pairs/sec | "
f"Elapsed: {elapsed:.1f}s | "
f"ETA: {eta:.1f}s ({eta/60:.1f} min)")

compute_time = time.time() - start_time
total_time = compute_time + prep_time
Expand Down
32 changes: 16 additions & 16 deletions contextpilot/server/live_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ def build_and_schedule(self, contexts: List[List[int]],
print("\n1. Building static index...")
self.initial_result = self.fit_transform(contexts)

print(f" Built tree with {self.initial_result.stats['total_nodes']} nodes")
print(f" Leaf nodes: {self.initial_result.stats['leaf_nodes']}")
print(f" + Built tree with {self.initial_result.stats['total_nodes']} nodes")
print(f" + Leaf nodes: {self.initial_result.stats['leaf_nodes']}")

# Step 2: Inter-context scheduling
print("\n2. Scheduling contexts for optimal execution...")
scheduled_reordered, scheduled_originals, final_mapping, groups = \
self.inter_scheduler.schedule_contexts(self.initial_result)

print(f" Created {len(groups)} execution groups")
print(f" + Created {len(groups)} execution groups")

self.scheduled_result = {
'reordered_contexts': scheduled_reordered,
Expand All @@ -197,8 +197,8 @@ def build_and_schedule(self, contexts: List[List[int]],
num_input_contexts=len(contexts)
)

print(f" Initialized {len(self.metadata)} nodes with metadata")
print(f" Auto-assigned {len(request_id_mapping)} request IDs")
print(f" + Initialized {len(self.metadata)} nodes with metadata")
print(f" + Auto-assigned {len(request_id_mapping)} request IDs")

# Add request_id mapping to result (dict and ordered list)
self.scheduled_result['request_id_mapping'] = request_id_mapping
Expand All @@ -208,7 +208,7 @@ def build_and_schedule(self, contexts: List[List[int]],
self.is_live = True

print("\n" + "=" * 80)
print(" INDEX IS NOW LIVE - Ready for dynamic operations")
print("+ INDEX IS NOW LIVE - Ready for dynamic operations")
print("=" * 80 + "\n")

return self.scheduled_result
Expand Down Expand Up @@ -534,8 +534,8 @@ def build_incremental(self, contexts: List[List[int]],
# No match - will build new index for these
unmatched_contexts.append((i, context))

print(f" Found {len(matched_contexts)} contexts with matches")
print(f" Found {len(unmatched_contexts)} contexts without matches")
print(f" + Found {len(matched_contexts)} contexts with matches")
print(f" + Found {len(unmatched_contexts)} contexts without matches")

# Prepare result arrays (will fill in order)
request_ids = [None] * len(contexts)
Expand Down Expand Up @@ -585,7 +585,7 @@ def build_incremental(self, contexts: List[List[int]],
)
temp_result = temp_index.fit_transform(unmatched_only)

print(f" Built temp index with {temp_result.stats['total_nodes']} nodes")
print(f" + Built temp index with {temp_result.stats['total_nodes']} nodes")

# Step 4: Merge temp index into global index
print("\n4. Merging temp index into global index...")
Expand All @@ -606,16 +606,16 @@ def build_incremental(self, contexts: List[List[int]],
context_info.append((orig_idx, merged_request_ids[i], merged_search_paths[i]))

merged_count = len(unmatched_contexts)
print(f" Merged {merged_count} new subtrees under global root")
print(f" + Merged {merged_count} new subtrees under global root")

# Step 5: Schedule execution order
print("\n5. Scheduling execution order for cache reuse...")
scheduled_order = self._schedule_incremental(context_info)
groups = self._group_by_path_prefix(context_info)
print(f" Scheduled {len(scheduled_order)} contexts into {len(groups)} groups")
print(f" + Scheduled {len(scheduled_order)} contexts into {len(groups)} groups")

print("\n" + "=" * 80)
print(f" INCREMENTAL BUILD COMPLETE")
print(f"+ INCREMENTAL BUILD COMPLETE")
print(f" Matched & inserted: {len(matched_contexts)}")
print(f" Built & merged: {merged_count}")
print("=" * 80 + "\n")
Expand Down Expand Up @@ -906,15 +906,15 @@ def schedule_only(self, contexts: List[List[int]]) -> Dict:
print("\n1. Building static index...")
result = self.fit_transform(contexts)

print(f" Built tree with {result.stats['total_nodes']} nodes")
print(f" Leaf nodes: {result.stats['leaf_nodes']}")
print(f" + Built tree with {result.stats['total_nodes']} nodes")
print(f" + Leaf nodes: {result.stats['leaf_nodes']}")

# Step 2: Inter-context scheduling
print("\n2. Scheduling contexts for optimal execution...")
scheduled_reordered, scheduled_originals, final_mapping, groups = \
self.inter_scheduler.schedule_contexts(result)

print(f" Created {len(groups)} execution groups")
print(f" + Created {len(groups)} execution groups")

# Return results without going live (stateless)
scheduled_result = {
Expand All @@ -931,7 +931,7 @@ def schedule_only(self, contexts: List[List[int]]) -> Dict:
}

print("\n" + "=" * 80)
print(" BATCH SCHEDULED (Stateless - no cache tracking)")
print("+ BATCH SCHEDULED (Stateless - no cache tracking)")
print("=" * 80 + "\n")

return scheduled_result
Expand Down
8 changes: 4 additions & 4 deletions docs/guides/multi_turn.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,10 @@ print(f"New docs: {result['new_docs']}") # [2]

| Operation | `/reorder` | `/deduplicate` |
|-----------|----------|----------------|
| Index build | | ✗ |
| Clustering | | ✗ |
| Search | | ✗ |
| Deduplication | | |
| Index build | + | ✗ |
| Clustering | + | ✗ |
| Search | + | ✗ |
| Deduplication | + | + |
| **Latency** | ~50-200ms | ~1-5ms |

For multi-turn conversations, Turn 2+ typically doesn't need index operations — just deduplication against conversation history. The `/deduplicate` endpoint is **10-100x faster**.
Expand Down
147 changes: 147 additions & 0 deletions evaluation/benchmarks/run_bigcodebench_elm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import asyncio
import json
import logging
import os
import re

# pip install datasets
from datasets import load_dataset
from openai import AsyncOpenAI

# Set PYTHONPATH in the environment before running
from refactored_plugins.skill_index import SkillAwareContextPlugin
from refactored_plugins.dedup import ContextDedupPlugin

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger(__name__)

# Create a registry of 10 dummy tools to trigger the Skill plugin
DUMMY_TOOL_REGISTRY = {
f"tool_{i}": {
"type": "function",
"function": {
"name": f"tool_{i}",
"description": f"Dummy tool number {i}"
}
}
for i in range(1, 11)
}

async def process_task(task, skill_plugin, dedup_plugin, client, semaphore, output_file, turn_1_id):
"""
Processes a single BigCodeBench task through our ContextPilot plugins and ELM API.
"""
async with semaphore:
task_id = task.get("task_id", "unknown_task")
# BigCodeBench prompts are usually in 'complete_prompt' or 'instruction'
prompt = task.get("complete_prompt", task.get("instruction", "No prompt found."))

# Mock heavy agent request with redundant history and bloated tools
request = {
"user_id": "evaluator_1",
"parent_id": turn_1_id,
"_required_skills": ["tool_1", "tool_3", "tool_7"], # Require only 3 tools out of 10
"messages": [
{"role": "system", "content": "You are a senior python developer. Always wrap your code in ```python blocks."},
{"role": "user", "content": "Please help me write some code."},
{"role": "assistant", "content": "Of course! I can help you with that."},
{"role": "user", "content": prompt}
],
"tools": list(DUMMY_TOOL_REGISTRY.values())
}

# Pass through ContextPilot local plugins
optimized_request = await dedup_plugin.process(request)
optimized_request = await skill_plugin.process(optimized_request)

# Prepare ELM API request (OpenAI-compatible)
api_kwargs = {
"model": "gpt-5.5",
"messages": optimized_request.get("messages", [])
}
if "tools" in optimized_request and optimized_request["tools"]:
api_kwargs["tools"] = optimized_request["tools"]

try:
logger.info(f"Sending optimized task {task_id} to ELM API...")
response = await client.chat.completions.create(**api_kwargs)
response_content = response.choices[0].message.content
except Exception as e:
logger.error(f"API Error for {task_id}: {str(e)}")
response_content = ""

# Extract code block using regex
extracted_code = ""
if response_content:
match = re.search(r"```python\s*(.*?)\s*```", response_content, re.DOTALL)
if match:
extracted_code = match.group(1).strip()
else:
# Fallback if the LLM didn't use the markdown block
extracted_code = response_content.strip()

# Append result to JSONL
with open(output_file, "a", encoding="utf-8") as f:
f.write(json.dumps({"task_id": task_id, "solution": extracted_code}) + "\n")

logger.info(f"Finished {task_id}")

async def main():
api_key = os.environ.get("OPENAI_API_KEY", "dummy-elm-key")
base_url = os.environ.get("BASE_URL", "https://api.openai.com/v1")

client = AsyncOpenAI(api_key=api_key, base_url=base_url)

skill_plugin = SkillAwareContextPlugin(tool_registry=DUMMY_TOOL_REGISTRY)
dedup_plugin = ContextDedupPlugin()

# Pre-warm Dedup plugin with the initial messages to simulate conversation history
turn_1 = {
"user_id": "evaluator_1",
"messages": [
{"role": "system", "content": "You are a senior python developer. Always wrap your code in ```python blocks."},
{"role": "user", "content": "Please help me write some code."},
{"role": "assistant", "content": "Of course! I can help you with that."}
]
}
turn_1_res = await dedup_plugin.process(turn_1)
turn_1_id = turn_1_res.get("current_id")

# Load BigCodeBench dataset
logger.info("Loading BigCodeBench dataset...")
try:
dataset = load_dataset("bigcode/bigcodebench", split="train")
except Exception as e:
logger.warning(f"Failed to load split='train'. Trying standard default split. Error: {e}")
# Fallback to the common default split format if 'train' split does not exist
try:
dataset = load_dataset("bigcode/bigcodebench", split="v0.1.2")
except Exception:
dataset = load_dataset("bigcode/bigcodebench", split="v0.1.0_240822")

# Select first 5 tasks for a smoke test
tasks = list(dataset)[:5]
logger.info(f"Loaded {len(tasks)} tasks for smoke test.")

output_file = os.path.join(os.path.dirname(__file__), "elm_samples.jsonl")
if os.path.exists(output_file):
os.remove(output_file)

# Use a Semaphore with 1 to process sequentially and avoid early rate limits
semaphore = asyncio.Semaphore(1)

coroutines = [process_task(t, skill_plugin, dedup_plugin, client, semaphore, output_file, turn_1_id) for t in tasks]
await asyncio.gather(*coroutines)

print("\n=== Phase 2 Dataset Smoke Test Complete ===")
print(f"Results saved to {output_file}")

print("\n=== Combined Cost-Savings Telemetry ===")
metrics = {
"skill_plugin_metrics": skill_plugin.get_plugin_metrics(),
"dedup_plugin_metrics": dedup_plugin.get_plugin_metrics()
}
print(json.dumps(metrics, indent=2))

if __name__ == "__main__":
asyncio.run(main())
Loading