Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions how_to_guides/api_company_volume_coverage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Get Company Volume Coverage

## Overview

Once you know which companies you're tracking, the next question is usually: **how much coverage does Bigdata.com actually have on them?** A company's `ravenpack_id` is the key to that — it unlocks every document and chunk in the knowledge graph mentioning that entity. This script takes a list of `ravenpack_id`s and, for each one, asks the API how many distinct documents and chunks mention that company across three rolling time windows: the last 30 days, 6 months, and 12 months.

The result is a per-company coverage profile you can use to:

- Decide which names in a portfolio are well-covered enough to build analytics on.
- Compare coverage between candidates before adding them to a watchlist.
- Spot coverage gaps — entities with `ravenpack_id`s but little or no recent volume.

The script handles the rest:

- **Computes consistent windows.** Each window starts at 00:00 local time on the first day and ends at 23:59:59.999 local time on today, then is converted to UTC for the API. Calendar months are handled correctly (e.g. March 31 minus 1 month → February 28/29).
- **Parallelizes requests.** Each (company, window) pair is one API call, so the script runs 5 concurrent threads to keep things fast.
- **Respects rate limits.** A built-in limiter caps requests, preventing throttling even at full parallelism.
- **Skips already-populated rows.** If the input CSV already has values for a given window, that call is skipped — useful for resuming a previous run.
- **Writes timestamped output.** Each run writes to a new file (`company_coverage_YYYYMMDD_HHMMSS.csv`) so prior results are never overwritten.

## Prerequisites

- Python 3.10+
- Install dependencies:
```bash
pip install -r requirements.txt
```
- Set your API key in a `.env` file:
```
BIGDATA_API_KEY=your_api_key_here
```

## Input

The script reads a CSV with one row per company. The only **required** column is `ravenpack_id`; any other columns are preserved in the output.

| Column | Description |
|----------------|-------------|
| `ravenpack_id` | Bigdata.com entity ID (required) |

Don't have `ravenpack_id`s yet? Generate them from tickers, ISINs, website URLs, or company names using the [`api_get_company_ids`](../api_get_company_ids/) how-to guide. Its output (`output/public_company_ids.csv` or `output/private_company_ids.csv`) can be fed directly into this script.

Example:

```csv
name,ravenpack_id
Apple Inc.,D8442A
NVIDIA Corporation,E09E2B
```

## Usage

```bash
python get_company_coverage.py [input.csv]
```

If no argument is provided, the script defaults to `input/public_company_ids.csv`.

## Output

Results are written to `output/company_coverage_YYYYMMDD_HHMMSS.csv` (the timestamp is set when the run starts). The output contains every column from the input plus six new ones — one document count and one chunk count per window:

| Column | Description |
|------------------------------|-------------|
| `distinct_documents_30_days` | Distinct documents in the last 30 days |
| `distinct_chunks_30_days` | Distinct chunks in the last 30 days |
| `distinct_documents_6_months`| Distinct documents in the last 6 months |
| `distinct_chunks_6_months` | Distinct chunks in the last 6 months |
| `distinct_documents_12_months`| Distinct documents in the last 12 months |
| `distinct_chunks_12_months` | Distinct chunks in the last 12 months |

To resume from a previous run, pass that file as the input — already-populated rows are skipped:

```bash
python get_company_coverage.py output/company_coverage_20260522_143015.csv
```
198 changes: 198 additions & 0 deletions how_to_guides/api_company_volume_coverage/get_company_coverage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
from __future__ import annotations

import calendar
import csv
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date, datetime, time, timedelta, timezone

import requests
from dotenv import load_dotenv

from logging_config import setup_logging
from rate_limiter import RateLimiter

load_dotenv()
logger = setup_logging(log_file="company_coverage.log")
API_KEY = os.getenv("BIGDATA_API_KEY")
VOLUME_URL = "https://api.bigdata.com/v1/search/volume"
HEADERS = {"Content-Type": "application/json", "x-api-key": API_KEY}
MAX_WORKERS = 5
REQUIRED_INPUT_FIELDS = {"ravenpack_id"}

# Each window adds two columns: distinct_documents_{label} and distinct_chunks_{label}.
WINDOWS = [
{"label": "30_days", "kind": "days", "value": 30},
{"label": "6_months", "kind": "months", "value": 6},
{"label": "12_months", "kind": "months", "value": 12},
]

rate_limiter = RateLimiter()


def _coverage_fields(label: str) -> tuple[str, str]:
return f"distinct_documents_{label}", f"distinct_chunks_{label}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It ends up being a very long name, what do you think about:

distinct_documents_XXX -> docs_XXX
distinct_chunks_XXX -> chunks_XXX

We can still mention the distinct word in the README



NEW_FIELDS = [field for w in WINDOWS for field in _coverage_fields(w["label"])]


def _read_csv(path: str) -> tuple[list[str], list[dict[str, str]]]:
"""Read a CSV preserving column order. Validate required headers are present."""
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
headers = [h.strip().lower() for h in (reader.fieldnames or [])]
missing = REQUIRED_INPUT_FIELDS - set(headers)
if missing:
raise SystemExit(f"CSV missing required columns: {', '.join(sorted(missing))}")
rows = [{k.strip().lower(): (v or "").strip() for k, v in row.items() if k} for row in reader]
return headers, rows


def _write_csv(rows: list[dict[str, str]], path: str, fieldnames: list[str]) -> None:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)


def _months_ago(d: date, months: int) -> date:
"""Subtract calendar months, clamping the day if the target month is shorter
(e.g. March 31 minus 1 month -> Feb 28/29)."""
total = d.year * 12 + (d.month - 1) - months
year, month = divmod(total, 12)
month += 1
day = min(d.day, calendar.monthrange(year, month)[1])
return date(year, month, day)


def _window_bounds(kind: str, value: int) -> tuple[str, str]:
"""Return (start, end) ISO-8601 UTC timestamps spanning the requested window
in local time — 00:00:00.000 local on the first day to 23:59:59.999 local
on today. `kind` is either "days" or "months"."""
local_tz = datetime.now().astimezone().tzinfo
today = datetime.now(local_tz).date()
if kind == "days":
start_date = today - timedelta(days=value - 1)
elif kind == "months":
start_date = _months_ago(today, value)
else:
raise ValueError(f"Unknown window kind: {kind!r}")
start_local = datetime.combine(start_date, time.min, tzinfo=local_tz)
end_local = datetime.combine(today, time(23, 59, 59, 999000), tzinfo=local_tz)
fmt = "%Y-%m-%dT%H:%M:%S"
return (
start_local.astimezone(timezone.utc).strftime(fmt) + ".000Z",
end_local.astimezone(timezone.utc).strftime(fmt) + ".999Z",
)


def _build_payload(ravenpack_id: str, start: str, end: str) -> dict:
return {
"query": {
"filters": {
"timestamp": {"start": start, "end": end},
"entity": {"search_in": "ALL", "any_of": [ravenpack_id]},
},
"entity_details": False,
}
}


def _fetch_volume(ravenpack_id: str, start: str, end: str) -> tuple[int, int]:
"""Return (distinct_documents, distinct_chunks) for the given window."""
try:
rate_limiter.wait()
response = requests.post(
VOLUME_URL, headers=HEADERS, json=_build_payload(ravenpack_id, start, end)
)
logger.debug(f"POST {VOLUME_URL} entity={ravenpack_id} status={response.status_code}")
response.raise_for_status()
totals = (response.json().get("results") or {}).get("total") or {}
return int(totals.get("documents", 0)), int(totals.get("chunks", 0))
except Exception as exc:
logger.error(f"Volume lookup failed for {ravenpack_id}: {exc}")
raise


def main() -> None:
if len(sys.argv) > 2:
sys.exit("Usage: python get_company_coverage.py [input.csv]")
if not API_KEY:
sys.exit("Error: BIGDATA_API_KEY not set. Add it to your .env file.")

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"output/company_coverage_{timestamp}.csv"
default_input = "input/public_company_ids.csv"
csv_path = sys.argv[1] if len(sys.argv) == 2 else default_input
if not os.path.isfile(csv_path):
sys.exit(f"Error: input file not found: {csv_path}")

logger.info(f"Reading {csv_path}")
headers, rows = _read_csv(csv_path)
fieldnames = headers + [f for f in NEW_FIELDS if f not in headers]
logger.info(f"Read {len(rows)} rows from CSV")

windows = [(w["label"], *_window_bounds(w["kind"], w["value"])) for w in WINDOWS]
for label, start, end in windows:
logger.info(f"Volume window [{label}]: {start} to {end}")

work_items: list[tuple[int, str, str, str]] = [] # (row_idx, label, start, end)
no_id = 0
for idx, row in enumerate(rows):
ravenpack_id = row.get("ravenpack_id", "")
if not ravenpack_id:
logger.warning(f"Row {idx + 2}: no ravenpack_id, skipping")
no_id += 1
continue
for label, start, end in windows:
doc_field, chunk_field = _coverage_fields(label)
if row.get(doc_field, "").strip() and row.get(chunk_field, "").strip():
logger.debug(f"Row {idx + 2} [{label}]: already populated, skipping")
continue
work_items.append((idx, label, start, end))
logger.info(f"Queued {len(work_items)} requests across {MAX_WORKERS} workers")

processed = failed = 0
try:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {
executor.submit(_fetch_volume, rows[idx]["ravenpack_id"], start, end):
(idx, label)
for idx, label, start, end in work_items
}
try:
for fut in as_completed(futures):
idx, label = futures[fut]
ravenpack_id = rows[idx]["ravenpack_id"]
try:
documents, chunks = fut.result()
doc_field, chunk_field = _coverage_fields(label)
rows[idx][doc_field] = str(documents)
rows[idx][chunk_field] = str(chunks)
processed += 1
logger.info(
f"Row {idx + 2}: {ravenpack_id} [{label}] -> "
f"documents={documents} chunks={chunks}"
)
except Exception as exc:
failed += 1
logger.error(
f"Row {idx + 2}: {ravenpack_id} [{label}] failed: {exc}"
)
except KeyboardInterrupt:
logger.warning("Interrupted by user — cancelling pending requests")
for f in futures:
f.cancel()
finally:
_write_csv(rows, output_path, fieldnames)
logger.info(
f"Wrote {len(rows)} rows to {output_path} "
f"(api_calls={processed}, failed={failed}, rows_without_id={no_id})"
)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name,webpage,ravenpack_id,country,industry
Anthropic,,IWGWUR,US,Software
Mistral AI,,IUNEGY,FR,Software
,https://www.mercadona.es/,JKE6GA,ES,Food Retailers and Wholesalers
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name,mic,ticker,isin,cusip,sedol,ravenpack_id,country,industry
Micron Technology Inc.,,,US5951121038,,,49BBBC,US,Semiconductors
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace these input files with only one

name,ravenpack_id
Apple Inc.,4A6F00
NVIDIA Corporation,52258B

`python

NVIDIA Corporation,,,,,2379504,E09E2B,US,Semiconductors
Figma Inc.,,,,316841105,,BA9E0C,US,Internet Services
Microsoft Corporation,XNAS,MSFT,,,,228D42,US,Software
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this script doesn't care about public vs private companies so I would rename the default input file to just company_ids.csv and make a note saying that you can pass the files from the other how to guide

Netflix Inc.,,,US64110L1061,,,ECD263,US,Broadcasting
Apple Inc.,XNAS,AAPL,US0378331005,,,D8442A,US,Computer Hardware
18 changes: 18 additions & 0 deletions how_to_guides/api_company_volume_coverage/logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging
import os
import sys


def setup_logging(log_dir="logs", log_file="company_ids.log"):
os.makedirs(log_dir, exist_ok=True)
# stderr is typically line-buffered on TTYs; avoids INFO appearing only after a full batch.
console = logging.StreamHandler(sys.stderr)
console.setLevel(logging.INFO)
file_handler = logging.FileHandler(os.path.join(log_dir, log_file))
file_handler.setLevel(logging.DEBUG)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[console, file_handler],
)
return logging.getLogger(__name__)
38 changes: 38 additions & 0 deletions how_to_guides/api_company_volume_coverage/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
import threading
from collections import deque
from time import time, sleep

logger = logging.getLogger(__name__)

MAX_REQUESTS_PER_MINUTE = 400
RATE_LIMIT_WINDOW = 60
RATE_LIMIT_COOLDOWN = 5


class RateLimiter:
def __init__(self, max_requests: int = MAX_REQUESTS_PER_MINUTE,
window: int = RATE_LIMIT_WINDOW,
cooldown: int = RATE_LIMIT_COOLDOWN) -> None:
self._max_requests = max_requests
self._window = window
self._cooldown = cooldown
self._times: deque[float] = deque()
self._lock = threading.Lock()

def wait(self) -> None:
# Sleeping inside the lock is intentional: all threads share the same
# cooldown window, preventing bursts when the limit is hit.
with self._lock:
while True:
now = time()
while self._times and self._times[0] < now - self._window:
self._times.popleft()
if len(self._times) < self._max_requests:
break
logger.warning(
f"Rate limit reached ({self._max_requests} requests "
f"in {self._window}s), sleeping {self._cooldown}s"
)
sleep(self._cooldown)
self._times.append(time())
2 changes: 2 additions & 0 deletions how_to_guides/api_company_volume_coverage/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
requests==2.31.0
python-dotenv==1.0.0
2 changes: 1 addition & 1 deletion how_to_guides/api_get_company_ids/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The script handles the rest:

## Prerequisites

- Python 3.7+
- Python 3.10+
- Install dependencies:
```bash
pip install -r requirements.txt
Expand Down
4 changes: 2 additions & 2 deletions how_to_guides/api_get_company_ids/get_company_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ def main() -> None:
if mode == "public":
companies = resolve_public(csv_path)
output_fields = ["name", "mic", "ticker", "isin", "cusip", "sedol",
"ravenpack_id", "country", "industry", "description"]
"ravenpack_id", "country", "industry"]
else:
companies = resolve_private(csv_path)
output_fields = ["name", "webpage", "ravenpack_id", "country", "industry", "description"]
output_fields = ["name", "webpage", "ravenpack_id", "country", "industry"]

output_path = f"output/{mode}_company_ids.csv"
write_csv(companies=companies, path=output_path, output_fields=output_fields)
Expand Down