From c8acdb4e3f764d21bbb05b79c5d5847f0d39797e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Mar 2026 23:42:21 +0000 Subject: [PATCH 1/3] Initial plan From 8d63a6236246dc5ae723570269145521285b139e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Mar 2026 23:59:43 +0000 Subject: [PATCH 2/3] Code review: centralize config, fix dead code, move imports to top level, fix threshold scale Co-authored-by: rjurney <42149+rjurney@users.noreply.github.com> --- config.yml | 10 +++- src/serf/analyze/profiler.py | 3 +- src/serf/cli/main.py | 107 +++++++++++++++-------------------- src/serf/eval/evaluator.py | 15 +++-- src/serf/pipeline.py | 37 +++++++----- 5 files changed, 89 insertions(+), 83 deletions(-) diff --git a/config.yml b/config.yml index dac26ea..b827a39 100644 --- a/config.yml +++ b/config.yml @@ -22,10 +22,14 @@ er: max_retries: 3 retry_delay_ms: 300 + convergence: + max_iterations: 5 + threshold: 0.01 + eval: - coverage_threshold: 0.9999 - error_threshold: 0.0001 - overlap_threshold: 0.01 + coverage_threshold: 99.99 + error_threshold: 1.0 + overlap_threshold: 1.0 paths: blocks: "data/iteration_{iteration}/blocks" diff --git a/src/serf/analyze/profiler.py b/src/serf/analyze/profiler.py index 0ec50e0..c9d9178 100644 --- a/src/serf/analyze/profiler.py +++ b/src/serf/analyze/profiler.py @@ -7,6 +7,7 @@ import dspy from serf.analyze.field_detection import detect_field_type +from serf.config import config from serf.dspy.baml_adapter import BAMLAdapter from serf.dspy.signatures import GenerateERConfig from serf.dspy.types import DatasetProfile, FieldProfile @@ -113,7 +114,7 @@ def profile(self, records: list[dict[str, Any]]) -> DatasetProfile: def generate_er_config( profile: DatasetProfile, sample_records: list[dict[str, Any]], - model: str = "gemini/gemini-2.0-flash", + model: str = config.get("models.llm", "gemini/gemini-2.0-flash"), ) -> str: """Use an LLM to generate an ER config YAML from a dataset profile. diff --git a/src/serf/cli/main.py b/src/serf/cli/main.py index 066b7c4..18aa0ce 100644 --- a/src/serf/cli/main.py +++ b/src/serf/cli/main.py @@ -1,13 +1,25 @@ """Main CLI entry point for SERF.""" +import asyncio import json import os +import random import time from typing import Any import click - +import pandas as pd + +from serf.analyze.profiler import DatasetProfiler, generate_er_config +from serf.block.pipeline import SemanticBlockingPipeline +from serf.config import config +from serf.dspy.types import BlockResolution, Entity, EntityBlock +from serf.eval.benchmarks import BenchmarkDataset +from serf.eval.evaluator import evaluate_er_results, format_evaluation_report, save_evaluation +from serf.eval.metrics import evaluate_resolution from serf.logs import get_logger, setup_logging +from serf.match.matcher import EntityMatcher +from serf.pipeline import ERConfig, load_data, run_pipeline logger = get_logger(__name__) @@ -60,25 +72,25 @@ def cli() -> None: @click.option( "--model", type=str, - default="gemini/gemini-2.0-flash", + default=config.get("models.llm", "gemini/gemini-2.0-flash"), help="LLM model for matching", ) @click.option( "--max-iterations", type=int, - default=5, + default=config.get("er.convergence.max_iterations", 5), help="Maximum ER iterations (0 for auto-convergence)", ) @click.option( "--convergence-threshold", type=float, - default=0.01, + default=config.get("er.convergence.threshold", 0.01), help="Stop when per-round reduction fraction is below this", ) @click.option( "--target-block-size", type=int, - default=30, + default=config.get("er.blocking.target_block_size", 30), help="Target entities per FAISS block", ) @click.option( @@ -90,7 +102,7 @@ def cli() -> None: @click.option( "--concurrency", type=int, - default=20, + default=config.get("er.matching.max_concurrent", 20), help="Number of concurrent LLM requests", ) def run( @@ -115,8 +127,6 @@ def run( Requires GEMINI_API_KEY environment variable (or appropriate key for the model). """ - from serf.pipeline import ERConfig, run_pipeline - # Build config: YAML file first, then CLI overrides er_config = ERConfig.from_yaml(config_path) if config_path else ERConfig() @@ -181,7 +191,7 @@ def run( @click.option( "--model", type=str, - default="gemini/gemini-2.0-flash", + default=config.get("models.llm", "gemini/gemini-2.0-flash"), help="LLM model for config generation", ) def analyze(input_path: str, output_path: str | None, model: str) -> None: @@ -194,9 +204,6 @@ def analyze(input_path: str, output_path: str | None, model: str) -> None: Without --output, prints the statistical profile only. With --output, also calls the LLM to generate an ER config YAML file. """ - from serf.analyze.profiler import DatasetProfiler, generate_er_config - from serf.pipeline import load_data - logger.info(f"Analyzing dataset: {input_path}") start = time.time() @@ -267,8 +274,18 @@ def analyze(input_path: str, output_path: str | None, model: str) -> None: default="semantic", help="Blocking method to use", ) -@click.option("--target-block-size", type=int, default=30, help="Target entities per block") -@click.option("--max-block-size", type=int, default=100, help="Maximum entities per block") +@click.option( + "--target-block-size", + type=int, + default=config.get("er.blocking.target_block_size", 30), + help="Target entities per block", +) +@click.option( + "--max-block-size", + type=int, + default=config.get("er.blocking.max_block_size", 100), + help="Maximum entities per block", +) def block( input_path: str, output_path: str, @@ -278,10 +295,6 @@ def block( max_block_size: int, ) -> None: """Perform semantic blocking on input data.""" - import pandas as pd - - from serf.block.pipeline import SemanticBlockingPipeline - logger.info(f"Starting blocking: input={input_path}, method={method}") start = time.time() @@ -337,16 +350,11 @@ def block( @click.option( "--batch-size", type=int, - default=10, + default=config.get("er.matching.batch_size", 10), help="Number of blocks to process concurrently", ) def match(input_path: str, output_path: str, iteration: int, batch_size: int) -> None: """Match entities within blocks using LLM.""" - import asyncio - - from serf.dspy.types import EntityBlock - from serf.match.matcher import EntityMatcher - logger.info(f"Starting matching: input={input_path}, iteration={iteration}") start = time.time() @@ -403,14 +411,6 @@ def evaluate(input_path: str, ground_truth: str | None) -> None: Performs comprehensive validation: entity deduplication, source_uuid validation, match_skip analysis, and PASS/FAIL checks. """ - from serf.dspy.types import BlockResolution - from serf.eval.evaluator import ( - evaluate_er_results, - format_evaluation_report, - save_evaluation, - ) - from serf.eval.metrics import evaluate_resolution - logger.info(f"Evaluating: input={input_path}") matches_file = os.path.join(input_path, "matches.jsonl") @@ -449,8 +449,6 @@ def evaluate(input_path: str, ground_truth: str | None) -> None: # Optional ground truth comparison if ground_truth: - import pandas as pd - gt_df = pd.read_csv(ground_truth) true_pairs: set[tuple[int, int]] = set() for _, row in gt_df.iterrows(): @@ -534,11 +532,16 @@ def edges(input_path: str, output_path: str) -> None: default="semantic", help="Blocking method", ) -@click.option("--target-block-size", type=int, default=30, help="Target entities per block") +@click.option( + "--target-block-size", + type=int, + default=config.get("er.blocking.target_block_size", 30), + help="Target entities per block", +) @click.option( "--batch-size", type=int, - default=10, + default=config.get("er.matching.batch_size", 10), help="Concurrent block processing batch size", ) def resolve( @@ -560,7 +563,7 @@ def resolve( iteration=iteration, method=method, target_block_size=target_block_size, - max_block_size=100, + max_block_size=config.get("er.blocking.max_block_size", 100), ) click.echo("\n Step 2: Matching...") ctx.invoke( @@ -601,8 +604,6 @@ def resolve( ) def download(dataset: str, output_path: str | None) -> None: """Download a benchmark dataset.""" - from serf.eval.benchmarks import BenchmarkDataset - available = BenchmarkDataset.available_datasets() if dataset not in available: click.echo(f"Unknown dataset: {dataset}") @@ -641,13 +642,13 @@ def download(dataset: str, output_path: str | None) -> None: @click.option( "--target-block-size", type=int, - default=30, + default=config.get("er.blocking.target_block_size", 30), help="Target entities per block", ) @click.option( "--model", type=str, - default="gemini/gemini-2.0-flash", + default=config.get("models.llm", "gemini/gemini-2.0-flash"), help="LLM model for matching", ) @click.option( @@ -665,7 +666,7 @@ def download(dataset: str, output_path: str | None) -> None: @click.option( "--concurrency", type=int, - default=20, + default=config.get("er.matching.max_concurrent", 20), help="Number of concurrent LLM requests", ) def benchmark( @@ -682,8 +683,6 @@ def benchmark( Uses embeddings for blocking and LLM for matching. Requires GEMINI_API_KEY environment variable (or appropriate key for the model). """ - from serf.eval.benchmarks import BenchmarkDataset - available = BenchmarkDataset.available_datasets() if dataset not in available: click.echo(f"Unknown dataset: {dataset}") @@ -699,8 +698,6 @@ def benchmark( # Optionally limit right table size if max_right_entities and len(right_entities) > max_right_entities: - import random - gt_right_ids = {b for _, b in benchmark_data.ground_truth} matched = [e for e in right_entities if e.id in gt_right_ids] unmatched = [e for e in right_entities if e.id not in gt_right_ids] @@ -771,7 +768,7 @@ def benchmark( @click.option( "--model", type=str, - default="gemini/gemini-2.0-flash", + default=config.get("models.llm", "gemini/gemini-2.0-flash"), help="LLM model for matching", ) @click.option( @@ -789,8 +786,6 @@ def benchmark_all( Requires GEMINI_API_KEY environment variable (or appropriate key for the model). """ - from serf.eval.benchmarks import BenchmarkDataset - datasets = BenchmarkDataset.available_datasets() click.echo(f"Running benchmarks on {len(datasets)} datasets...") click.echo(f" Model: {model}") @@ -804,7 +799,7 @@ def benchmark_all( benchmark, dataset=name, output_path=output_path, - target_block_size=30, + target_block_size=config.get("er.blocking.target_block_size", 30), model=model, max_right_entities=max_right_entities, ) @@ -884,9 +879,6 @@ def _dataframe_to_entities(df: Any) -> list[Any]: list[Entity] List of Entity objects """ - - from serf.dspy.types import Entity - entities = [] name_col = _detect_name_column(df.columns.tolist()) for i, (_idx, row) in enumerate(df.iterrows()): @@ -909,9 +901,9 @@ def _dataframe_to_entities(df: Any) -> list[Any]: def _benchmark_llm_matching( all_entities: list[Any], target_block_size: int, - model: str = "gemini/gemini-2.0-flash", + model: str = config.get("models.llm", "gemini/gemini-2.0-flash"), limit: int | None = None, - concurrency: int = 20, + concurrency: int = config.get("er.matching.max_concurrent", 20), ) -> set[tuple[int, int]]: """Run LLM-based matching for benchmarks. @@ -936,11 +928,6 @@ def _benchmark_llm_matching( set[tuple[int, int]] Predicted match pairs """ - import asyncio - - from serf.block.pipeline import SemanticBlockingPipeline - from serf.match.matcher import EntityMatcher - max_block = min(100, target_block_size * 3) click.echo(f"\n Blocking (target={target_block_size}, max={max_block})...") pipeline = SemanticBlockingPipeline( diff --git a/src/serf/eval/evaluator.py b/src/serf/eval/evaluator.py index 4952899..423789f 100644 --- a/src/serf/eval/evaluator.py +++ b/src/serf/eval/evaluator.py @@ -12,16 +12,19 @@ import json from typing import Any +from serf.config import config from serf.dspy.types import BlockResolution, Entity from serf.eval.metrics import validate_source_uuids from serf.logs import get_logger logger = get_logger(__name__) -# Thresholds for PASS/FAIL assessment -COVERAGE_THRESHOLD = 99.99 # source_uuid coverage must be >= this % -ERROR_THRESHOLD = 0.01 # error_recovery fraction must be < this % -OVERLAP_THRESHOLD = 1.0 # duplicate entity fraction must be < this % +# Thresholds for PASS/FAIL assessment (all values are percentages: 0–100). +# Previously ERROR_THRESHOLD used a fraction scale (0–1) with a * 100 multiplier +# in the comparison. All three are now consistently percentage-based. +COVERAGE_THRESHOLD: float = config.get("er.eval.coverage_threshold", 99.99) +ERROR_THRESHOLD: float = config.get("er.eval.error_threshold", 1.0) +OVERLAP_THRESHOLD: float = config.get("er.eval.overlap_threshold", 1.0) def evaluate_er_results( @@ -125,8 +128,8 @@ def evaluate_er_results( "error_rate": { "value": error_rate, "threshold": ERROR_THRESHOLD, - "passed": error_rate < ERROR_THRESHOLD * 100, - "description": f"error_recovery rate < {ERROR_THRESHOLD * 100}%", + "passed": error_rate < ERROR_THRESHOLD, + "description": f"error_recovery rate < {ERROR_THRESHOLD}%", }, "duplicate_rate": { "value": duplicate_rate, diff --git a/src/serf/pipeline.py b/src/serf/pipeline.py index 9b259b7..9728a20 100644 --- a/src/serf/pipeline.py +++ b/src/serf/pipeline.py @@ -20,6 +20,7 @@ from serf.block.embeddings import EntityEmbedder from serf.block.faiss_blocker import FAISSBlocker +from serf.config import config from serf.dspy.types import Entity, EntityBlock, IterationMetrics from serf.logs import get_logger @@ -61,12 +62,12 @@ def __init__( blocking_fields: list[str] | None = None, entity_type: str = "entity", blocking_method: str = "semantic", - target_block_size: int = 30, - max_block_size: int = 100, - model: str = "gemini/gemini-2.0-flash", - max_iterations: int = 5, - convergence_threshold: float = 0.01, - max_concurrent: int = 20, + target_block_size: int = config.get("er.blocking.target_block_size", 30), + max_block_size: int = config.get("er.blocking.max_block_size", 100), + model: str = config.get("models.llm", "gemini/gemini-2.0-flash"), + max_iterations: int = config.get("er.convergence.max_iterations", 5), + convergence_threshold: float = config.get("er.convergence.threshold", 0.01), + max_concurrent: int = config.get("er.matching.max_concurrent", 20), limit: int | None = None, ) -> None: self.name_field = name_field @@ -108,12 +109,22 @@ def from_yaml(cls, path: str) -> "ERConfig": blocking_fields=data.get("blocking_fields"), entity_type=data.get("entity_type", "entity"), blocking_method=blocking.get("method", "semantic"), - target_block_size=blocking.get("target_block_size", 30), - max_block_size=blocking.get("max_block_size", 100), - model=matching.get("model", "gemini/gemini-2.0-flash"), - max_iterations=data.get("max_iterations", 5), - convergence_threshold=data.get("convergence_threshold", 0.01), - max_concurrent=matching.get("max_concurrent", 20), + target_block_size=blocking.get( + "target_block_size", config.get("er.blocking.target_block_size", 30) + ), + max_block_size=blocking.get( + "max_block_size", config.get("er.blocking.max_block_size", 100) + ), + model=matching.get("model", config.get("models.llm", "gemini/gemini-2.0-flash")), + max_iterations=data.get( + "max_iterations", config.get("er.convergence.max_iterations", 5) + ), + convergence_threshold=data.get( + "convergence_threshold", config.get("er.convergence.threshold", 0.01) + ), + max_concurrent=matching.get( + "max_concurrent", config.get("er.matching.max_concurrent", 20) + ), limit=data.get("limit"), ) @@ -144,7 +155,7 @@ def load_data(input_path: str) -> pd.DataFrame: return df except UnicodeDecodeError: continue - return pd.read_csv(path, sep=sep, encoding="latin-1") + raise ValueError(f"Could not read CSV with UTF-8 or Latin-1 encoding: {path}") raise ValueError(f"Unsupported file format: {path.suffix}. Use .csv, .parquet, or iceberg://") From 6b9a748469145ee9efdeb07c709c4fd853aa95d8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 9 Mar 2026 03:16:58 +0000 Subject: [PATCH 3/3] Update branch: incorporate base branch changes, resolve conflicts with subprocess embedding Co-authored-by: rjurney <42149+rjurney@users.noreply.github.com> --- .dockerignore | 18 +++ CLAUDE.md | 2 +- Dockerfile | 49 +++++++ README.md | 37 +++-- assets/DSPy.md | 2 +- config.yml | 3 +- docker-compose.yml | 81 +++++++++++ docs/FINE_TUNING.md | 184 ++++++++++++++++++++++++ pyproject.toml | 1 + src/serf/analyze/profiler.py | 6 +- src/serf/block/embeddings.py | 5 +- src/serf/block/pipeline.py | 52 +++---- src/serf/block/subprocess_embed.py | 215 +++++++++++++++++++++++++++++ src/serf/cli/main.py | 38 ++--- src/serf/match/matcher.py | 2 +- src/serf/pipeline.py | 28 ++-- tests/test_benchmarks.py | 2 +- 17 files changed, 644 insertions(+), 81 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 docs/FINE_TUNING.md create mode 100644 src/serf/block/subprocess_embed.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..f32c9f2 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,18 @@ +.venv/ +venv/ +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ +.git/ +.idea/ +.vscode/ +data/ +logs/ +*.swp +*.swo +.DS_Store +.claude/ +.mcp.json +uv.lock diff --git a/CLAUDE.md b/CLAUDE.md index ec24173..ad3b16f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -99,7 +99,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with th - Help strings - never put the default option values in the help strings. The help strings should only describe what the option does, not what the default value is. The default values are already documented in the @config.yml file and will be printed via the `@click.command(context_settings={"show_default": True})` decorator of each Click command. - Read the README - consult the README before taking action. The README contains information about the project and how to use it. If you need to add a new command or change an existing one, consult the README first. - Update the README - if appropriate, update the README with any new commands or changes to existing commands. The README should always reflect the current state of the project. -- Use uv - use uv for dependency management and packaging. Do not use pip, conda, or poetry. +- Use uv - use uv for dependency management and packaging. Do not use `pip`, `uv pip`, `conda`, or `poetry`. Use `uv add` to add dependencies, `uv sync` to install, `uv run` to execute. Never suggest `pip install` in code, docs, or error messages. - Use DSPy - use DSPy signatures and modules for all LLM-related code. Use the BAMLAdapter for structured output formatting. - Use PySpark for ETL - use PySpark for ETL and batch data processing to build our knowledge graph. Do not use any other libraries or frameworks for data processing. Use PySpark to take the output of our BAML client and transform it into a knowledge graph. - PySpark - Do not break up dataflow into functions for loading, computing this, computing that, etc. Create a single function that performs the entire dataflow at hand. Do not check if columns exist, assume they do. Do not check if paths exist, assume they do. We prefer a more linear flow for Spark scripts and simple code over complexity. This only applies to Spark code. diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5c7ec2d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,49 @@ +FROM ubuntu:24.04 + +LABEL maintainer="rjurney@graphlet.ai" +LABEL description="SERF: Agentic Semantic Entity Resolution Framework" + +# Avoid interactive prompts +ENV DEBIAN_FRONTEND=noninteractive + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + python3.12 \ + python3.12-venv \ + python3.12-dev \ + curl \ + git \ + openjdk-21-jre-headless \ + && rm -rf /var/lib/apt/lists/* + +# Set Java home for PySpark +ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64 +ENV PATH="${JAVA_HOME}/bin:${PATH}" + +# Install uv +COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv + +# Set up working directory +WORKDIR /app + +# Copy dependency files first for layer caching +COPY pyproject.toml uv.lock* ./ + +# Install dependencies +RUN uv sync --extra dev --no-install-project + +# Copy the rest of the project +COPY . . + +# Install the project itself +RUN uv sync --extra dev + +# Pre-download the embedding model so it's cached in the image +RUN uv run python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('intfloat/multilingual-e5-base')" + +# Create data directories +RUN mkdir -p data/benchmarks logs + +# Default entrypoint is the serf CLI +ENTRYPOINT ["uv", "run", "serf"] +CMD ["--help"] diff --git a/README.md b/README.md index eb589f5..1ba9e85 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ For knowledge graphs: deduplicate edges that result from merging nodes using LLM | Package Manager | **uv** | | Data Processing | **PySpark 4.x** | | LLM Framework | **DSPy 3.x** with BAMLAdapter | -| Embeddings | **Qwen3-Embedding-0.6B** via sentence-transformers | +| Embeddings | **multilingual-e5-base** via sentence-transformers | | Vector Search | **FAISS IndexIVFFlat** | | Linting/Formatting | **Ruff** | | Type Checking | **zuban** (mypy-compatible) | @@ -47,13 +47,34 @@ For knowledge graphs: deduplicate edges that result from merging nodes using LLM ### Installation ```bash -# From PyPI (when published) -pip install serf - -# From source git clone https://github.com/Graphlet-AI/serf.git cd serf -uv sync +uv sync --extra dev +``` + +### Docker + +```bash +# Build +docker compose build + +# Run any serf command +docker compose run serf benchmark --dataset dblp-acm + +# Run benchmarks +docker compose --profile benchmark up + +# Run tests +docker compose --profile test up + +# Analyze a dataset (put your file in data/) +docker compose run serf analyze --input data/input.csv --output data/er_config.yml +``` + +Set your API key in a `.env` file or export it: + +```bash +echo "GEMINI_API_KEY=your-key" > .env ``` ### System Requirements @@ -116,11 +137,11 @@ result = matcher(block_records=block_json, schema_info=schema, few_shot_examples ## Benchmark Results -Performance on standard ER benchmarks from the [Leipzig Database Group](https://dbs.uni-leipzig.de/research/projects/benchmark-datasets-for-entity-resolution). Blocking uses Qwen3-Embedding-0.6B name-only embeddings + FAISS IVF. Matching uses Gemini 2.0 Flash via DSPy BlockMatch. +Performance on standard ER benchmarks from the [Leipzig Database Group](https://dbs.uni-leipzig.de/research/projects/benchmark-datasets-for-entity-resolution). Blocking uses multilingual-e5-base name-only embeddings + FAISS IVF. Matching uses Gemini 2.0 Flash via DSPy BlockMatch. | Dataset | Domain | Left | Right | Matches | Precision | Recall | F1 | | ------------ | ------------- | ----- | ----- | ------- | --------- | ------ | ---------- | -| **DBLP-ACM** | Bibliographic | 2,616 | 2,294 | 2,224 | 0.8950 | 0.6246 | **0.7357** | +| **DBLP-ACM** | Bibliographic | 2,616 | 2,294 | 2,224 | 0.8849 | 0.5809 | **0.7014** | Blocking uses name-only embeddings for tighter semantic clusters. All matching decisions are made by the LLM — no embedding similarity thresholds. diff --git a/assets/DSPy.md b/assets/DSPy.md index 48d1594..488fa5d 100644 --- a/assets/DSPy.md +++ b/assets/DSPy.md @@ -7,7 +7,7 @@ This guide provides an overview of how to use the DSPy framework for building an 1. **Installation**: Install DSPy via pip: ``` - pip install dspy + uv add dspy-ai ``` 2. **Basic Usage**: Import DSPy and create a simple pipeline: diff --git a/config.yml b/config.yml index b827a39..a886e01 100644 --- a/config.yml +++ b/config.yml @@ -3,8 +3,9 @@ logs: path: logs models: - embedding: "Qwen/Qwen3-Embedding-0.6B" + embedding: "intfloat/multilingual-e5-base" llm: "gemini/gemini-2.0-flash" + analyze_llm: "${models.llm}" temperature: 0.0 er: diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..eba1fa3 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,81 @@ +services: + serf: + build: + context: . + dockerfile: Dockerfile + container_name: serf + volumes: + - ./data:/app/data + - ./logs:/app/logs + - ./config.yml:/app/config.yml:ro + environment: + - GEMINI_API_KEY=${GEMINI_API_KEY} + entrypoint: ["uv", "run", "serf"] + command: ["--help"] + + # Run a benchmark + benchmark: + build: + context: . + dockerfile: Dockerfile + container_name: serf-benchmark + volumes: + - ./data:/app/data + - ./logs:/app/logs + - ./config.yml:/app/config.yml:ro + environment: + - GEMINI_API_KEY=${GEMINI_API_KEY} + entrypoint: ["uv", "run", "serf"] + command: ["benchmark", "--dataset", "dblp-acm", "--output", "data/benchmarks/docker"] + profiles: + - benchmark + + # Run entity resolution on input data + resolve: + build: + context: . + dockerfile: Dockerfile + container_name: serf-resolve + volumes: + - ./data:/app/data + - ./logs:/app/logs + - ./config.yml:/app/config.yml:ro + environment: + - GEMINI_API_KEY=${GEMINI_API_KEY} + entrypoint: ["uv", "run", "serf"] + command: ["run", "--input", "data/input.csv", "--output", "data/resolved"] + profiles: + - resolve + + # Analyze a dataset and generate ER config + analyze: + build: + context: . + dockerfile: Dockerfile + container_name: serf-analyze + volumes: + - ./data:/app/data + - ./logs:/app/logs + - ./config.yml:/app/config.yml:ro + environment: + - GEMINI_API_KEY=${GEMINI_API_KEY} + entrypoint: ["uv", "run", "serf"] + command: ["analyze", "--input", "data/input.csv", "--output", "data/er_config.yml"] + profiles: + - analyze + + # Run tests + test: + build: + context: . + dockerfile: Dockerfile + container_name: serf-test + volumes: + - ./data:/app/data + - ./logs:/app/logs + environment: + - GEMINI_API_KEY=${GEMINI_API_KEY} + entrypoint: ["uv", "run", "pytest"] + command: ["tests/", "-v", "--ignore=tests/test_dspy.py"] + profiles: + - test diff --git a/docs/FINE_TUNING.md b/docs/FINE_TUNING.md new file mode 100644 index 0000000..3be2213 --- /dev/null +++ b/docs/FINE_TUNING.md @@ -0,0 +1,184 @@ +# Fine-Tuning Embedding Models for Entity Resolution + +Lessons learned from the [Eridu](https://github.com/Graphlet-AI/eridu) project — an open-source deep fuzzy matching system for multilingual person and company name resolution using representation learning. + +## Overview + +SERF uses pre-trained sentence-transformer embeddings (`intfloat/multilingual-e5-large`) for semantic blocking. While the pre-trained model works well out of the box, fine-tuning on domain-specific labeled pairs can significantly improve blocking quality — putting more true matches in the same blocks. + +Eridu demonstrates the full fine-tuning pipeline: from data preparation through contrastive learning to threshold optimization. The lessons below are directly applicable to SERF's blocking embeddings. + +## Key Lessons from Eridu + +### 1. Contrastive Learning Is the Right Loss Function + +Eridu uses **ContrastiveLoss** from sentence-transformers to fine-tune embeddings. This loss function: + +- Pulls matching pairs closer together in embedding space +- Pushes non-matching pairs apart (beyond a configurable margin) +- Works directly with binary labeled pairs (match/no-match) + +```python +from sentence_transformers.losses import ContrastiveLoss + +loss = ContrastiveLoss(model=model, margin=0.5) +``` + +**Why not other losses?** Eridu tested `MultipleNegativesRankingLoss` and found it didn't work for name matching. Contrastive loss is more appropriate when you have explicit positive and negative pairs, which is exactly what ER ground truth provides. + +### 2. Data Quality Matters More Than Quantity + +Eridu trains on **2+ million labeled pairs** from Open Sanctions data, but key findings: + +- **Negative pairs are just as important as positive pairs.** The model needs to learn what ISN'T a match to push non-matches apart in embedding space. +- **Group-aware splitting is critical.** Use `GroupShuffleSplit` (not random splitting) to ensure the same base entity name doesn't appear in both train and eval sets. Without this, the model memorizes specific names rather than learning general matching patterns. +- **Resampling helps with large datasets.** When using a fraction of the data (`sample_fraction < 1.0`), resample each epoch to expose the model to different examples. + +```python +from sklearn.model_selection import GroupShuffleSplit + +# Split by source group to prevent data leakage +splitter = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42) +train_idx, test_idx = next(splitter.split(data, groups=data["source"])) +``` + +### 3. Corporate Endings Are a Known Hard Problem + +Eridu found that fine-tuned models struggle with **corporate suffixes**: "Inc.", "LLC", "GmbH", "Ltd.", etc. Two companies with the same base name but different corporate endings (e.g., "Alpha Capital LLC" vs "Alpha Capital Partners LLC") can be either the same entity or different entities. + +**SERF addresses this** with the `cleanco` library in `serf.block.normalize` for stripping corporate suffixes before embedding, but a fine-tuned model that understands these distinctions would be better. + +**Recommended approach:** Fine-tune with the [CorpWatch subsidiary dataset](https://www.opensanctions.org/datasets/us_corpwatch/) which contains labeled parent/subsidiary relationships where corporate endings matter. + +### 4. Base Model Selection + +Eridu's evolution of base models: + +| Model | Parameters | Dimensions | Status | +| --------------------------------------- | ---------- | ---------- | ------------------------------------------------ | +| `paraphrase-multilingual-MiniLM-L12-v2` | 118M | 384 | Original — now obsolete | +| `intfloat/multilingual-e5-large` | 560M | 1024 | Current — good ROC curve, semantic understanding | +| `Qwen/Qwen3-Embedding-4B` | 4B | 2048 | Testing — MTEB #2, needs 16GB GPU | + +**SERF's default** is `intfloat/multilingual-e5-large` — the same model Eridu found works best after fine-tuning. For SERF blocking, the pre-trained version is sufficient; fine-tuning is an optimization. + +### 5. Training Configuration That Works + +From Eridu's production runs: + +```python +# Hyperparameters that work well for name matching +BATCH_SIZE = 1024 # Large batches for stable gradients +EPOCHS = 4-6 # More epochs overfit; early stopping helps +LEARNING_RATE = 3e-5 # Standard for fine-tuning transformers +WEIGHT_DECAY = 0.01 # L2 regularization +WARMUP_RATIO = 0.1 # 10% warmup for learning rate +PATIENCE = 2 # Early stopping after 2 epochs without improvement +MARGIN = 0.5 # Contrastive loss margin +OPTIMIZER = "adafactor" # Memory-efficient optimizer +``` + +**Key insights:** + +- **FP16 training** reduces memory usage ~2x with minimal quality loss +- **Gradient checkpointing** saves more memory but is broken on Apple MPS +- **Gradient accumulation** (`steps=4`) simulates larger batches on limited GPU memory +- **Early stopping** with `patience=2` prevents overfitting on the relatively small positive pair set + +### 6. Evaluation: ROC Curve and Optimal Threshold + +After fine-tuning, Eridu: + +1. Computes similarity scores on a held-out test set +2. Generates a precision-recall curve across all thresholds +3. Selects the threshold that maximizes F1 score +4. Reports AUC-ROC for overall model quality + +```python +from sklearn.metrics import precision_recall_curve, f1_score + +precision, recall, thresholds = precision_recall_curve(y_true, y_scores) +f1_scores = [f1_score(y_true, y_scores >= t) for t in thresholds] +best_threshold = thresholds[np.argmax(f1_scores)] +``` + +**For SERF:** The optimal threshold from fine-tuning could be used as the `similarity_threshold` in the `ERConfig.blocking` section, though SERF's current approach (FAISS IVF clustering) doesn't use a threshold — it assigns every entity to a centroid. + +### 7. Weights & Biases Integration + +Eridu uses W&B for experiment tracking: + +- Loss curves per epoch +- Binary classification metrics (accuracy, F1, precision, recall, AP) +- ROC and PR curves +- Hyperparameter logging +- Test result artifacts + +This is valuable for comparing fine-tuning runs across different base models and hyperparameter settings. + +## Fine-Tuning for SERF Blocking + +### When to Fine-Tune + +Fine-tuning the blocking embedding is worthwhile when: + +1. **Domain-specific vocabulary**: Your entities use terminology the pre-trained model hasn't seen (medical codes, financial instruments, industry jargon) +2. **Low blocking recall**: Many true matches are landing in different blocks (the pre-trained model doesn't cluster them together) +3. **Multilingual matching**: Entities in different languages/scripts need to cluster together +4. **Corporate endings matter**: You need the model to understand that "Acme Corp" and "Acme Corporation" are likely the same but "Acme Corp" and "Acme Tools Inc" are not + +### How to Fine-Tune for SERF + +1. **Collect labeled pairs** from your ER ground truth or manual labeling +2. **Format as sentence pairs**: `(entity_name_a, entity_name_b, is_match)` +3. **Fine-tune using Eridu's approach**: + +```python +from sentence_transformers import SentenceTransformer, SentenceTransformerTrainer +from sentence_transformers.losses import ContrastiveLoss + +model = SentenceTransformer("intfloat/multilingual-e5-large") +loss = ContrastiveLoss(model=model, margin=0.5) + +trainer = SentenceTransformerTrainer( + model=model, + train_dataset=train_dataset, + eval_dataset=eval_dataset, + loss=loss, +) +trainer.train() +model.save_pretrained("models/serf-blocking-finetuned") +``` + +4. **Update SERF config** to use the fine-tuned model: + +```yaml +models: + embedding: "models/serf-blocking-finetuned" +``` + +### Data Sources for Training Pairs + +| Source | Type | Pairs | Notes | +| -------------------------- | ---------------------- | ------ | --------------------- | +| **Open Sanctions** | Person + company names | 2M+ | Multilingual, curated | +| **CorpWatch subsidiaries** | Company names | ~100K | Corporate endings | +| **Your ER ground truth** | Domain-specific | Varies | Best for your domain | +| **DBLP-ACM / Abt-Buy** | Benchmark pairs | ~5K | Good for testing | + +### Expected Impact + +Based on Eridu's results: + +- **Pre-trained model**: ~85% blocking recall (true matches in same block) +- **Fine-tuned model**: ~95%+ blocking recall with tighter blocks +- **Training time**: 1-4 hours on a single GPU for 2M pairs +- **Inference**: No change — same model, same speed + +## References + +1. [Eridu Repository](https://github.com/Graphlet-AI/eridu) — Full fine-tuning pipeline +2. [Eridu HuggingFace Model](https://huggingface.co/Graphlet-AI/eridu) — Pre-trained model card +3. [Sentence Transformers Training](https://www.sbert.net/docs/training/overview.html) — Framework documentation +4. [ContrastiveLoss](https://www.sbert.net/docs/package_reference/sentence_transformer/losses.html#contrastiveloss) — Loss function details +5. [Open Sanctions](https://www.opensanctions.org/) — Training data source diff --git a/pyproject.toml b/pyproject.toml index ac246dd..2ffccf8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "tqdm>=4.60", "numpy>=1.26", "pandas>=2.0", + "pyspark-mcp>=0.0.6", ] [project.urls] diff --git a/src/serf/analyze/profiler.py b/src/serf/analyze/profiler.py index c9d9178..db40341 100644 --- a/src/serf/analyze/profiler.py +++ b/src/serf/analyze/profiler.py @@ -114,7 +114,7 @@ def profile(self, records: list[dict[str, Any]]) -> DatasetProfile: def generate_er_config( profile: DatasetProfile, sample_records: list[dict[str, Any]], - model: str = config.get("models.llm", "gemini/gemini-2.0-flash"), + model: str | None = None, ) -> str: """Use an LLM to generate an ER config YAML from a dataset profile. @@ -132,8 +132,10 @@ def generate_er_config( str YAML string with the recommended ER configuration """ + effective_model = model or config.get("models.analyze_llm") api_key = os.environ.get("GEMINI_API_KEY", "") - lm = dspy.LM(model, api_key=api_key) + lm = dspy.LM(effective_model, api_key=api_key) + logger.info(f"Using LLM model: {effective_model}") predictor = dspy.ChainOfThought(GenerateERConfig) diff --git a/src/serf/block/embeddings.py b/src/serf/block/embeddings.py index 730965b..6590ad1 100644 --- a/src/serf/block/embeddings.py +++ b/src/serf/block/embeddings.py @@ -50,7 +50,7 @@ def __init__( normalize: bool = True, ) -> None: if model_name is None: - model_name = config.get("models.embedding", "Qwen/Qwen3-Embedding-0.6B") + model_name = config.get("models.embedding") if device is None: device = get_torch_device() @@ -84,5 +84,6 @@ def embed(self, texts: list[str], batch_size: int = 64) -> NDArray[np.float32]: show_progress_bar=len(texts) > 100, normalize_embeddings=self.normalize, convert_to_numpy=True, + device="cpu", # Always encode on CPU — FAISS segfaults with MPS tensors ) - return np.asarray(embeddings, dtype=np.float32) + return np.ascontiguousarray(embeddings, dtype=np.float32) diff --git a/src/serf/block/pipeline.py b/src/serf/block/pipeline.py index 7d8a36e..e67aad7 100644 --- a/src/serf/block/pipeline.py +++ b/src/serf/block/pipeline.py @@ -2,10 +2,14 @@ Orchestrates the embed → cluster → split workflow for creating entity blocks for matching. + +Uses subprocess isolation for PyTorch embedding and FAISS clustering +to avoid memory conflicts (MPS/FAISS segfault) on macOS. This is +the pattern proven in the Abzu production system. """ -from serf.block.embeddings import EntityEmbedder -from serf.block.faiss_blocker import FAISSBlocker +from serf.block.subprocess_embed import cluster_in_subprocess, embed_in_subprocess +from serf.config import config from serf.dspy.types import BlockingMetrics, Entity, EntityBlock from serf.logs import get_logger @@ -71,35 +75,20 @@ def __init__( auto_scale: bool = True, blocking_fields: list[str] | None = None, ) -> None: + if model_name is None: + model_name = config.get("models.embedding") self.model_name = model_name self.target_block_size = target_block_size self.max_block_size = max_block_size self.iteration = iteration self.auto_scale = auto_scale self.blocking_fields = blocking_fields - self._embedder: EntityEmbedder | None = None - self._blocker: FAISSBlocker | None = None - - @property - def embedder(self) -> EntityEmbedder: - """Lazy-load the embedder.""" - if self._embedder is None: - self._embedder = EntityEmbedder(model_name=self.model_name) - return self._embedder - - @property - def blocker(self) -> FAISSBlocker: - """Lazy-load the blocker.""" - if self._blocker is None: - self._blocker = FAISSBlocker( - target_block_size=self.target_block_size, - iteration=self.iteration, - auto_scale=self.auto_scale, - ) - return self._blocker def run(self, entities: list[Entity]) -> tuple[list[EntityBlock], BlockingMetrics]: - """Run the full blocking pipeline. + """Run the full blocking pipeline using subprocess isolation. + + Embedding and FAISS clustering run in separate subprocesses + to avoid PyTorch MPS / FAISS memory conflicts on macOS. Parameters ---------- @@ -120,14 +109,17 @@ def run(self, entities: list[Entity]) -> tuple[list[EntityBlock], BlockingMetric entity_map = {str(e.id): e for e in entities} ids = [str(e.id) for e in entities] - # Embed (name-only by default, configurable via blocking_fields) + # Embed in subprocess (name-only by default) texts = [e.text_for_embedding(self.blocking_fields) for e in entities] - logger.info("Computing embeddings...") - embeddings = self.embedder.embed(texts) - - # Cluster - logger.info("Clustering with FAISS...") - block_assignments = self.blocker.block(embeddings, ids) + embeddings = embed_in_subprocess(texts, model_name=self.model_name) + + # Cluster in subprocess + effective_target = self.target_block_size + if self.auto_scale and self.iteration > 1: + effective_target = max(10, self.target_block_size // self.iteration) + block_assignments = cluster_in_subprocess( + embeddings, ids, target_block_size=effective_target + ) # Build EntityBlocks blocks: list[EntityBlock] = [] diff --git a/src/serf/block/subprocess_embed.py b/src/serf/block/subprocess_embed.py new file mode 100644 index 0000000..4cc7db8 --- /dev/null +++ b/src/serf/block/subprocess_embed.py @@ -0,0 +1,215 @@ +"""Subprocess-isolated embedding and FAISS clustering. + +Runs PyTorch embedding and FAISS clustering in a separate subprocess +to avoid memory conflicts between PyTorch MPS and FAISS on macOS. +This is the pattern proven in the Abzu production system. + +The main process communicates with the subprocess via temporary files +(numpy .npy for embeddings, JSON for block assignments). +""" + +import json +import subprocess +import sys +import tempfile +from pathlib import Path + +import numpy as np +from numpy.typing import NDArray + +from serf.logs import get_logger + +logger = get_logger(__name__) + +# Inline Python script for embedding — runs in a fresh subprocess +EMBED_SCRIPT = """ +import json +import sys +import numpy as np + +def main(): + args = json.loads(sys.argv[1]) + texts_file = args["texts_file"] + output_file = args["output_file"] + model_name = args["model_name"] + + with open(texts_file) as f: + texts = json.load(f) + + from sentence_transformers import SentenceTransformer + model = SentenceTransformer(model_name, device="cpu") + embeddings = model.encode( + texts, + batch_size=64, + show_progress_bar=len(texts) > 100, + normalize_embeddings=True, + convert_to_numpy=True, + ) + np.save(output_file, np.ascontiguousarray(embeddings, dtype=np.float32)) + +if __name__ == "__main__": + main() +""" + +# Inline Python script for FAISS clustering — runs in a fresh subprocess +FAISS_SCRIPT = """ +import json +import math +import sys +import numpy as np + +def main(): + args = json.loads(sys.argv[1]) + embeddings_file = args["embeddings_file"] + output_file = args["output_file"] + ids = args["ids"] + target_block_size = args["target_block_size"] + + import faiss + + embeddings = np.load(embeddings_file) + n, dim = embeddings.shape + + if n == 0: + with open(output_file, "w") as f: + json.dump({}, f) + return + + if n <= target_block_size: + with open(output_file, "w") as f: + json.dump({"block_0": ids}, f) + return + + nlist = max(1, n // target_block_size) + nlist = min(nlist, int(math.sqrt(n))) + nlist = max(1, nlist) + + faiss.normalize_L2(embeddings) + quantizer = faiss.IndexFlatIP(dim) + index = faiss.IndexIVFFlat(quantizer, dim, nlist, faiss.METRIC_INNER_PRODUCT) + index.train(embeddings) + index.add(embeddings) + + _, assignments = index.quantizer.search(embeddings, 1) + + blocks = {} + for i, cluster_id in enumerate(assignments.flatten()): + block_key = f"block_{int(cluster_id)}" + if block_key not in blocks: + blocks[block_key] = [] + blocks[block_key].append(ids[i]) + + with open(output_file, "w") as f: + json.dump(blocks, f) + +if __name__ == "__main__": + main() +""" + + +def embed_in_subprocess( + texts: list[str], + model_name: str, +) -> NDArray[np.float32]: + """Compute embeddings in an isolated subprocess. + + Avoids PyTorch MPS / FAISS memory conflicts on macOS by running + the sentence-transformer model in a separate process. + + Parameters + ---------- + texts : list[str] + Texts to embed + model_name : str + HuggingFace model name + + Returns + ------- + NDArray[np.float32] + Embeddings matrix (n, dim) + """ + with tempfile.TemporaryDirectory() as tmpdir: + texts_file = str(Path(tmpdir) / "texts.json") + output_file = str(Path(tmpdir) / "embeddings.npy") + + with open(texts_file, "w") as f: + json.dump(texts, f) + + args = json.dumps( + { + "texts_file": texts_file, + "output_file": output_file, + "model_name": model_name, + } + ) + + logger.info(f"Embedding {len(texts)} texts in subprocess (model={model_name})") + result = subprocess.run( + [sys.executable, "-c", EMBED_SCRIPT, args], + capture_output=True, + text=True, + ) + + if result.returncode != 0: + logger.error(f"Embedding subprocess failed:\n{result.stderr}") + raise RuntimeError(f"Embedding subprocess failed: {result.stderr[:500]}") + + embeddings: NDArray[np.float32] = np.load(output_file) + logger.info(f"Embeddings computed: shape={embeddings.shape}") + return embeddings + + +def cluster_in_subprocess( + embeddings: NDArray[np.float32], + ids: list[str], + target_block_size: int = 30, +) -> dict[str, list[str]]: + """Cluster embeddings using FAISS in an isolated subprocess. + + Avoids FAISS segfaults caused by MPS memory conflicts on macOS. + + Parameters + ---------- + embeddings : NDArray[np.float32] + Embedding matrix (n, dim) + ids : list[str] + Entity IDs corresponding to embedding rows + target_block_size : int + Target entities per cluster + + Returns + ------- + dict[str, list[str]] + Mapping from block_key to list of entity IDs + """ + with tempfile.TemporaryDirectory() as tmpdir: + embeddings_file = str(Path(tmpdir) / "embeddings.npy") + output_file = str(Path(tmpdir) / "blocks.json") + + np.save(embeddings_file, embeddings) + + args = json.dumps( + { + "embeddings_file": embeddings_file, + "output_file": output_file, + "ids": ids, + "target_block_size": target_block_size, + } + ) + + logger.info(f"Clustering {len(ids)} entities in subprocess (target={target_block_size})") + result = subprocess.run( + [sys.executable, "-c", FAISS_SCRIPT, args], + capture_output=True, + text=True, + ) + + if result.returncode != 0: + logger.error(f"FAISS subprocess failed:\n{result.stderr}") + raise RuntimeError(f"FAISS subprocess failed: {result.stderr[:500]}") + + with open(output_file) as f: + blocks: dict[str, list[str]] = json.load(f) + + logger.info(f"Created {len(blocks)} blocks") + return blocks diff --git a/src/serf/cli/main.py b/src/serf/cli/main.py index 18aa0ce..8f13578 100644 --- a/src/serf/cli/main.py +++ b/src/serf/cli/main.py @@ -72,8 +72,8 @@ def cli() -> None: @click.option( "--model", type=str, - default=config.get("models.llm", "gemini/gemini-2.0-flash"), - help="LLM model for matching", + default=None, + help="LLM model for matching (from config.yml models.llm)", ) @click.option( "--max-iterations", @@ -112,7 +112,7 @@ def run( name_field: str | None, text_fields: str | None, entity_type: str, - model: str, + model: str | None, max_iterations: int, convergence_threshold: float, target_block_size: int, @@ -136,7 +136,8 @@ def run( if text_fields: er_config.text_fields = [f.strip() for f in text_fields.split(",")] er_config.entity_type = entity_type - er_config.model = model + if model: + er_config.model = model er_config.max_iterations = max_iterations er_config.convergence_threshold = convergence_threshold er_config.target_block_size = target_block_size @@ -191,10 +192,10 @@ def run( @click.option( "--model", type=str, - default=config.get("models.llm", "gemini/gemini-2.0-flash"), - help="LLM model for config generation", + default=None, + help="LLM model for config generation (from config.yml models.analyze_llm)", ) -def analyze(input_path: str, output_path: str | None, model: str) -> None: +def analyze(input_path: str, output_path: str | None, model: str | None) -> None: """Profile a dataset and generate an ER configuration. Runs statistical profiling on the input data, then optionally uses an LLM @@ -648,8 +649,8 @@ def download(dataset: str, output_path: str | None) -> None: @click.option( "--model", type=str, - default=config.get("models.llm", "gemini/gemini-2.0-flash"), - help="LLM model for matching", + default=None, + help="LLM model for matching (from config.yml models.llm)", ) @click.option( "--max-right-entities", @@ -673,7 +674,7 @@ def benchmark( dataset: str, output_path: str | None, target_block_size: int, - model: str, + model: str | None, max_right_entities: int | None, limit: int | None, concurrency: int, @@ -689,6 +690,7 @@ def benchmark( click.echo(f"Available: {', '.join(available)}") return + model = model or config.get("models.llm") click.echo(f"Running benchmark: {dataset}") click.echo(f" Model: {model}") start = time.time() @@ -768,8 +770,8 @@ def benchmark( @click.option( "--model", type=str, - default=config.get("models.llm", "gemini/gemini-2.0-flash"), - help="LLM model for matching", + default=None, + help="LLM model for matching (from config.yml models.llm)", ) @click.option( "--max-right-entities", @@ -779,13 +781,14 @@ def benchmark( ) def benchmark_all( output_path: str, - model: str, + model: str | None, max_right_entities: int, ) -> None: """Run LLM-based benchmarks on all available datasets. Requires GEMINI_API_KEY environment variable (or appropriate key for the model). """ + model = model or config.get("models.llm") datasets = BenchmarkDataset.available_datasets() click.echo(f"Running benchmarks on {len(datasets)} datasets...") click.echo(f" Model: {model}") @@ -901,7 +904,7 @@ def _dataframe_to_entities(df: Any) -> list[Any]: def _benchmark_llm_matching( all_entities: list[Any], target_block_size: int, - model: str = config.get("models.llm", "gemini/gemini-2.0-flash"), + model: str | None = None, limit: int | None = None, concurrency: int = config.get("er.matching.max_concurrent", 20), ) -> set[tuple[int, int]]: @@ -928,6 +931,7 @@ def _benchmark_llm_matching( set[tuple[int, int]] Predicted match pairs """ + effective_model = model or config.get("models.llm") max_block = min(100, target_block_size * 3) click.echo(f"\n Blocking (target={target_block_size}, max={max_block})...") pipeline = SemanticBlockingPipeline( @@ -936,8 +940,10 @@ def _benchmark_llm_matching( blocks, blocking_metrics = pipeline.run(all_entities) click.echo(f" {blocking_metrics.total_blocks} blocks created") - click.echo(f" Matching with LLM ({model}, concurrency={concurrency}, limit={limit})...") - matcher = EntityMatcher(model=model, max_concurrent=concurrency) + click.echo( + f" Matching with LLM ({effective_model}, concurrency={concurrency}, limit={limit})..." + ) + matcher = EntityMatcher(model=effective_model, max_concurrent=concurrency) resolutions = asyncio.run(matcher.resolve_blocks(blocks, limit=limit)) predicted_pairs: set[tuple[int, int]] = set() diff --git a/src/serf/match/matcher.py b/src/serf/match/matcher.py index 5e600dc..cefc81b 100644 --- a/src/serf/match/matcher.py +++ b/src/serf/match/matcher.py @@ -49,7 +49,7 @@ def __init__( max_concurrent : int | None Max concurrent LLM calls. Defaults to config er.matching.max_concurrent. """ - self.model = model or config.get("models.llm", "gemini/gemini-2.0-flash") + self.model = model or config.get("models.llm") self.batch_size = batch_size or config.get("er.matching.batch_size", 10) self.max_concurrent = max_concurrent or config.get("er.matching.max_concurrent", 20) self._predictor: dspy.Predict | None = None diff --git a/src/serf/pipeline.py b/src/serf/pipeline.py index 9728a20..f19101e 100644 --- a/src/serf/pipeline.py +++ b/src/serf/pipeline.py @@ -18,8 +18,7 @@ import pandas as pd import yaml -from serf.block.embeddings import EntityEmbedder -from serf.block.faiss_blocker import FAISSBlocker +from serf.block.subprocess_embed import cluster_in_subprocess, embed_in_subprocess from serf.config import config from serf.dspy.types import Entity, EntityBlock, IterationMetrics from serf.logs import get_logger @@ -64,7 +63,7 @@ def __init__( blocking_method: str = "semantic", target_block_size: int = config.get("er.blocking.target_block_size", 30), max_block_size: int = config.get("er.blocking.max_block_size", 100), - model: str = config.get("models.llm", "gemini/gemini-2.0-flash"), + model: str | None = None, max_iterations: int = config.get("er.convergence.max_iterations", 5), convergence_threshold: float = config.get("er.convergence.threshold", 0.01), max_concurrent: int = config.get("er.matching.max_concurrent", 20), @@ -77,7 +76,7 @@ def __init__( self.blocking_method = blocking_method self.target_block_size = target_block_size self.max_block_size = max_block_size - self.model = model + self.model = model or config.get("models.llm") self.max_iterations = max_iterations self.convergence_threshold = convergence_threshold self.max_concurrent = max_concurrent @@ -115,7 +114,7 @@ def from_yaml(cls, path: str) -> "ERConfig": max_block_size=blocking.get( "max_block_size", config.get("er.blocking.max_block_size", 100) ), - model=matching.get("model", config.get("models.llm", "gemini/gemini-2.0-flash")), + model=matching.get("model"), max_iterations=data.get( "max_iterations", config.get("er.convergence.max_iterations", 5) ), @@ -324,9 +323,7 @@ def run_pipeline( original_count = len(entities) logger.info(f"Created {original_count} entities") - # Initialize embedder for blocking (shared across iterations) - embedder = EntityEmbedder() - + model_name = config.get("models.embedding") all_historical_uuids: set[str] = {e.uuid for e in entities if e.uuid} iteration_metrics: list[IterationMetrics] = [] @@ -340,21 +337,16 @@ def run_pipeline( logger.info(f"\n=== Iteration {iteration} ===") logger.info(f" Entities: {len(entities)}") - # Phase 1: Embed for blocking (name-only by default, configurable) - logger.info(" Embedding for blocking...") + # Phase 1: Embed for blocking in subprocess (avoids MPS/FAISS conflicts) texts = [e.text_for_embedding(cfg.blocking_fields) for e in entities] - embeddings = embedder.embed(texts) + embeddings = embed_in_subprocess(texts, model_name=model_name) - # Phase 2: Block with FAISS - logger.info(" Blocking with FAISS...") + # Phase 2: Cluster with FAISS in subprocess ids = [str(e.id) for e in entities] effective_target = max(10, cfg.target_block_size // iteration) - blocker = FAISSBlocker( - target_block_size=effective_target, - iteration=iteration, - auto_scale=False, + block_assignments = cluster_in_subprocess( + embeddings, ids, target_block_size=effective_target ) - block_assignments = blocker.block(embeddings, ids) # Build EntityBlocks entity_map = {e.id: e for e in entities} diff --git a/tests/test_benchmarks.py b/tests/test_benchmarks.py index 9b47a87..977e71e 100644 --- a/tests/test_benchmarks.py +++ b/tests/test_benchmarks.py @@ -130,7 +130,7 @@ def test_load_from_deepmatcher_format() -> None: pd.DataFrame({"ltable_id": [1], "rtable_id": [1], "label": [1]}).to_csv( os.path.join(tmpdir, "valid.csv"), index=False ) - pd.DataFrame(columns=["ltable_id", "rtable_id", "label"]).to_csv( + pd.DataFrame({"ltable_id": [], "rtable_id": [], "label": []}).to_csv( # type: ignore[arg-type] os.path.join(tmpdir, "test.csv"), index=False )