diff --git a/how_to_guides/api_company_volume_coverage/README.md b/how_to_guides/api_company_volume_coverage/README.md new file mode 100644 index 0000000..4c9af22 --- /dev/null +++ b/how_to_guides/api_company_volume_coverage/README.md @@ -0,0 +1,76 @@ +# Get Company Volume Coverage + +## Overview + +Once you know which companies you're tracking, the next question is usually: **how much coverage does Bigdata.com actually have on them?** A company's `ravenpack_id` is the key to that — it unlocks every document and chunk in the knowledge graph mentioning that entity. This script takes a list of `ravenpack_id`s and, for each one, asks the API how many distinct documents and chunks mention that company across three rolling time windows: the last 30 days, 6 months, and 12 months. + +The result is a per-company coverage profile you can use to: + +- Decide which names in a portfolio are well-covered enough to build analytics on. +- Compare coverage between candidates before adding them to a watchlist. +- Spot coverage gaps — entities with `ravenpack_id`s but little or no recent volume. + +The script handles the rest: + +- **Computes consistent windows.** Each window starts at 00:00 local time on the first day and ends at 23:59:59.999 local time on today, then is converted to UTC for the API. Calendar months are handled correctly (e.g. March 31 minus 1 month → February 28/29). +- **Parallelizes requests.** Each (company, window) pair is one API call, so the script runs 5 concurrent threads to keep things fast. +- **Respects rate limits.** A built-in limiter caps requests, preventing throttling even at full parallelism. +- **Skips already-populated rows.** If the input CSV already has values for a given window, that call is skipped — useful for resuming a previous run. +- **Writes timestamped output.** Each run writes to a new file (`company_coverage_YYYYMMDD_HHMMSS.csv`) so prior results are never overwritten. + +## Prerequisites + +- Python 3.10+ +- Install dependencies: + ```bash + pip install -r requirements.txt + ``` +- Set your API key in a `.env` file: + ``` + BIGDATA_API_KEY=your_api_key_here + ``` + +## Input + +The script reads a CSV with one row per company. The only **required** column is `ravenpack_id`; any other columns are preserved in the output. + +| Column | Description | +|----------------|-------------| +| `ravenpack_id` | Bigdata.com entity ID (required) | + +Don't have `ravenpack_id`s yet? Generate them from tickers, ISINs, website URLs, or company names using the [`api_get_company_ids`](../api_get_company_ids/) how-to guide. Its output (`output/public_company_ids.csv` or `output/private_company_ids.csv`) can be fed directly into this script. + +Example: + +```csv +name,ravenpack_id +Apple Inc.,D8442A +NVIDIA Corporation,E09E2B +``` + +## Usage + +```bash +python get_company_coverage.py [input.csv] +``` + +If no argument is provided, the script defaults to `input/public_company_ids.csv`. + +## Output + +Results are written to `output/company_coverage_YYYYMMDD_HHMMSS.csv` (the timestamp is set when the run starts). The output contains every column from the input plus six new ones — one document count and one chunk count per window: + +| Column | Description | +|------------------------------|-------------| +| `distinct_documents_30_days` | Distinct documents in the last 30 days | +| `distinct_chunks_30_days` | Distinct chunks in the last 30 days | +| `distinct_documents_6_months`| Distinct documents in the last 6 months | +| `distinct_chunks_6_months` | Distinct chunks in the last 6 months | +| `distinct_documents_12_months`| Distinct documents in the last 12 months | +| `distinct_chunks_12_months` | Distinct chunks in the last 12 months | + +To resume from a previous run, pass that file as the input — already-populated rows are skipped: + +```bash +python get_company_coverage.py output/company_coverage_20260522_143015.csv +``` diff --git a/how_to_guides/api_company_volume_coverage/get_company_coverage.py b/how_to_guides/api_company_volume_coverage/get_company_coverage.py new file mode 100644 index 0000000..9f8194d --- /dev/null +++ b/how_to_guides/api_company_volume_coverage/get_company_coverage.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import calendar +import csv +import os +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import date, datetime, time, timedelta, timezone + +import requests +from dotenv import load_dotenv + +from logging_config import setup_logging +from rate_limiter import RateLimiter + +load_dotenv() +logger = setup_logging(log_file="company_coverage.log") +API_KEY = os.getenv("BIGDATA_API_KEY") +VOLUME_URL = "https://api.bigdata.com/v1/search/volume" +HEADERS = {"Content-Type": "application/json", "x-api-key": API_KEY} +MAX_WORKERS = 5 +REQUIRED_INPUT_FIELDS = {"ravenpack_id"} + +# Each window adds two columns: distinct_documents_{label} and distinct_chunks_{label}. +WINDOWS = [ + {"label": "30_days", "kind": "days", "value": 30}, + {"label": "6_months", "kind": "months", "value": 6}, + {"label": "12_months", "kind": "months", "value": 12}, +] + +rate_limiter = RateLimiter() + + +def _coverage_fields(label: str) -> tuple[str, str]: + return f"distinct_documents_{label}", f"distinct_chunks_{label}" + + +NEW_FIELDS = [field for w in WINDOWS for field in _coverage_fields(w["label"])] + + +def _read_csv(path: str) -> tuple[list[str], list[dict[str, str]]]: + """Read a CSV preserving column order. Validate required headers are present.""" + with open(path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + headers = [h.strip().lower() for h in (reader.fieldnames or [])] + missing = REQUIRED_INPUT_FIELDS - set(headers) + if missing: + raise SystemExit(f"CSV missing required columns: {', '.join(sorted(missing))}") + rows = [{k.strip().lower(): (v or "").strip() for k, v in row.items() if k} for row in reader] + return headers, rows + + +def _write_csv(rows: list[dict[str, str]], path: str, fieldnames: list[str]) -> None: + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + with open(path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + + +def _months_ago(d: date, months: int) -> date: + """Subtract calendar months, clamping the day if the target month is shorter + (e.g. March 31 minus 1 month -> Feb 28/29).""" + total = d.year * 12 + (d.month - 1) - months + year, month = divmod(total, 12) + month += 1 + day = min(d.day, calendar.monthrange(year, month)[1]) + return date(year, month, day) + + +def _window_bounds(kind: str, value: int) -> tuple[str, str]: + """Return (start, end) ISO-8601 UTC timestamps spanning the requested window + in local time — 00:00:00.000 local on the first day to 23:59:59.999 local + on today. `kind` is either "days" or "months".""" + local_tz = datetime.now().astimezone().tzinfo + today = datetime.now(local_tz).date() + if kind == "days": + start_date = today - timedelta(days=value - 1) + elif kind == "months": + start_date = _months_ago(today, value) + else: + raise ValueError(f"Unknown window kind: {kind!r}") + start_local = datetime.combine(start_date, time.min, tzinfo=local_tz) + end_local = datetime.combine(today, time(23, 59, 59, 999000), tzinfo=local_tz) + fmt = "%Y-%m-%dT%H:%M:%S" + return ( + start_local.astimezone(timezone.utc).strftime(fmt) + ".000Z", + end_local.astimezone(timezone.utc).strftime(fmt) + ".999Z", + ) + + +def _build_payload(ravenpack_id: str, start: str, end: str) -> dict: + return { + "query": { + "filters": { + "timestamp": {"start": start, "end": end}, + "entity": {"search_in": "ALL", "any_of": [ravenpack_id]}, + }, + "entity_details": False, + } + } + + +def _fetch_volume(ravenpack_id: str, start: str, end: str) -> tuple[int, int]: + """Return (distinct_documents, distinct_chunks) for the given window.""" + try: + rate_limiter.wait() + response = requests.post( + VOLUME_URL, headers=HEADERS, json=_build_payload(ravenpack_id, start, end) + ) + logger.debug(f"POST {VOLUME_URL} entity={ravenpack_id} status={response.status_code}") + response.raise_for_status() + totals = (response.json().get("results") or {}).get("total") or {} + return int(totals.get("documents", 0)), int(totals.get("chunks", 0)) + except Exception as exc: + logger.error(f"Volume lookup failed for {ravenpack_id}: {exc}") + raise + + +def main() -> None: + if len(sys.argv) > 2: + sys.exit("Usage: python get_company_coverage.py [input.csv]") + if not API_KEY: + sys.exit("Error: BIGDATA_API_KEY not set. Add it to your .env file.") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + output_path = f"output/company_coverage_{timestamp}.csv" + default_input = "input/public_company_ids.csv" + csv_path = sys.argv[1] if len(sys.argv) == 2 else default_input + if not os.path.isfile(csv_path): + sys.exit(f"Error: input file not found: {csv_path}") + + logger.info(f"Reading {csv_path}") + headers, rows = _read_csv(csv_path) + fieldnames = headers + [f for f in NEW_FIELDS if f not in headers] + logger.info(f"Read {len(rows)} rows from CSV") + + windows = [(w["label"], *_window_bounds(w["kind"], w["value"])) for w in WINDOWS] + for label, start, end in windows: + logger.info(f"Volume window [{label}]: {start} to {end}") + + work_items: list[tuple[int, str, str, str]] = [] # (row_idx, label, start, end) + no_id = 0 + for idx, row in enumerate(rows): + ravenpack_id = row.get("ravenpack_id", "") + if not ravenpack_id: + logger.warning(f"Row {idx + 2}: no ravenpack_id, skipping") + no_id += 1 + continue + for label, start, end in windows: + doc_field, chunk_field = _coverage_fields(label) + if row.get(doc_field, "").strip() and row.get(chunk_field, "").strip(): + logger.debug(f"Row {idx + 2} [{label}]: already populated, skipping") + continue + work_items.append((idx, label, start, end)) + logger.info(f"Queued {len(work_items)} requests across {MAX_WORKERS} workers") + + processed = failed = 0 + try: + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = { + executor.submit(_fetch_volume, rows[idx]["ravenpack_id"], start, end): + (idx, label) + for idx, label, start, end in work_items + } + try: + for fut in as_completed(futures): + idx, label = futures[fut] + ravenpack_id = rows[idx]["ravenpack_id"] + try: + documents, chunks = fut.result() + doc_field, chunk_field = _coverage_fields(label) + rows[idx][doc_field] = str(documents) + rows[idx][chunk_field] = str(chunks) + processed += 1 + logger.info( + f"Row {idx + 2}: {ravenpack_id} [{label}] -> " + f"documents={documents} chunks={chunks}" + ) + except Exception as exc: + failed += 1 + logger.error( + f"Row {idx + 2}: {ravenpack_id} [{label}] failed: {exc}" + ) + except KeyboardInterrupt: + logger.warning("Interrupted by user — cancelling pending requests") + for f in futures: + f.cancel() + finally: + _write_csv(rows, output_path, fieldnames) + logger.info( + f"Wrote {len(rows)} rows to {output_path} " + f"(api_calls={processed}, failed={failed}, rows_without_id={no_id})" + ) + + +if __name__ == "__main__": + main() diff --git a/how_to_guides/api_company_volume_coverage/input/private_company_ids.csv b/how_to_guides/api_company_volume_coverage/input/private_company_ids.csv new file mode 100644 index 0000000..e12ebfd --- /dev/null +++ b/how_to_guides/api_company_volume_coverage/input/private_company_ids.csv @@ -0,0 +1,4 @@ +name,webpage,ravenpack_id,country,industry +Anthropic,,IWGWUR,US,Software +Mistral AI,,IUNEGY,FR,Software +,https://www.mercadona.es/,JKE6GA,ES,Food Retailers and Wholesalers diff --git a/how_to_guides/api_company_volume_coverage/input/public_company_ids.csv b/how_to_guides/api_company_volume_coverage/input/public_company_ids.csv new file mode 100644 index 0000000..386ad45 --- /dev/null +++ b/how_to_guides/api_company_volume_coverage/input/public_company_ids.csv @@ -0,0 +1,7 @@ +name,mic,ticker,isin,cusip,sedol,ravenpack_id,country,industry +Micron Technology Inc.,,,US5951121038,,,49BBBC,US,Semiconductors +NVIDIA Corporation,,,,,2379504,E09E2B,US,Semiconductors +Figma Inc.,,,,316841105,,BA9E0C,US,Internet Services +Microsoft Corporation,XNAS,MSFT,,,,228D42,US,Software +Netflix Inc.,,,US64110L1061,,,ECD263,US,Broadcasting +Apple Inc.,XNAS,AAPL,US0378331005,,,D8442A,US,Computer Hardware diff --git a/how_to_guides/api_company_volume_coverage/logging_config.py b/how_to_guides/api_company_volume_coverage/logging_config.py new file mode 100644 index 0000000..0b3a302 --- /dev/null +++ b/how_to_guides/api_company_volume_coverage/logging_config.py @@ -0,0 +1,18 @@ +import logging +import os +import sys + + +def setup_logging(log_dir="logs", log_file="company_ids.log"): + os.makedirs(log_dir, exist_ok=True) + # stderr is typically line-buffered on TTYs; avoids INFO appearing only after a full batch. + console = logging.StreamHandler(sys.stderr) + console.setLevel(logging.INFO) + file_handler = logging.FileHandler(os.path.join(log_dir, log_file)) + file_handler.setLevel(logging.DEBUG) + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[console, file_handler], + ) + return logging.getLogger(__name__) diff --git a/how_to_guides/api_company_volume_coverage/rate_limiter.py b/how_to_guides/api_company_volume_coverage/rate_limiter.py new file mode 100644 index 0000000..1d5a468 --- /dev/null +++ b/how_to_guides/api_company_volume_coverage/rate_limiter.py @@ -0,0 +1,38 @@ +import logging +import threading +from collections import deque +from time import time, sleep + +logger = logging.getLogger(__name__) + +MAX_REQUESTS_PER_MINUTE = 400 +RATE_LIMIT_WINDOW = 60 +RATE_LIMIT_COOLDOWN = 5 + + +class RateLimiter: + def __init__(self, max_requests: int = MAX_REQUESTS_PER_MINUTE, + window: int = RATE_LIMIT_WINDOW, + cooldown: int = RATE_LIMIT_COOLDOWN) -> None: + self._max_requests = max_requests + self._window = window + self._cooldown = cooldown + self._times: deque[float] = deque() + self._lock = threading.Lock() + + def wait(self) -> None: + # Sleeping inside the lock is intentional: all threads share the same + # cooldown window, preventing bursts when the limit is hit. + with self._lock: + while True: + now = time() + while self._times and self._times[0] < now - self._window: + self._times.popleft() + if len(self._times) < self._max_requests: + break + logger.warning( + f"Rate limit reached ({self._max_requests} requests " + f"in {self._window}s), sleeping {self._cooldown}s" + ) + sleep(self._cooldown) + self._times.append(time()) diff --git a/how_to_guides/api_company_volume_coverage/requirements.txt b/how_to_guides/api_company_volume_coverage/requirements.txt new file mode 100644 index 0000000..1533d00 --- /dev/null +++ b/how_to_guides/api_company_volume_coverage/requirements.txt @@ -0,0 +1,2 @@ +requests==2.31.0 +python-dotenv==1.0.0 \ No newline at end of file diff --git a/how_to_guides/api_get_company_ids/README.md b/how_to_guides/api_get_company_ids/README.md index 866cb43..d0fbccf 100644 --- a/how_to_guides/api_get_company_ids/README.md +++ b/how_to_guides/api_get_company_ids/README.md @@ -19,7 +19,7 @@ The script handles the rest: ## Prerequisites -- Python 3.7+ +- Python 3.10+ - Install dependencies: ```bash pip install -r requirements.txt diff --git a/how_to_guides/api_get_company_ids/get_company_ids.py b/how_to_guides/api_get_company_ids/get_company_ids.py index 4bc243a..5b4d264 100755 --- a/how_to_guides/api_get_company_ids/get_company_ids.py +++ b/how_to_guides/api_get_company_ids/get_company_ids.py @@ -185,10 +185,10 @@ def main() -> None: if mode == "public": companies = resolve_public(csv_path) output_fields = ["name", "mic", "ticker", "isin", "cusip", "sedol", - "ravenpack_id", "country", "industry", "description"] + "ravenpack_id", "country", "industry"] else: companies = resolve_private(csv_path) - output_fields = ["name", "webpage", "ravenpack_id", "country", "industry", "description"] + output_fields = ["name", "webpage", "ravenpack_id", "country", "industry"] output_path = f"output/{mode}_company_ids.csv" write_csv(companies=companies, path=output_path, output_fields=output_fields)