-
Notifications
You must be signed in to change notification settings - Fork 1
Add how-to guide to analyse company coverage #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| # Get Company Volume Coverage | ||
|
|
||
| ## Overview | ||
|
|
||
| Once you know which companies you're tracking, the next question is usually: **how much coverage does Bigdata.com actually have on them?** A company's `ravenpack_id` is the key to that — it unlocks every document and chunk in the knowledge graph mentioning that entity. This script takes a list of `ravenpack_id`s and, for each one, asks the API how many distinct documents and chunks mention that company across three rolling time windows: the last 30 days, 6 months, and 12 months. | ||
|
|
||
| The result is a per-company coverage profile you can use to: | ||
|
|
||
| - Decide which names in a portfolio are well-covered enough to build analytics on. | ||
| - Compare coverage between candidates before adding them to a watchlist. | ||
| - Spot coverage gaps — entities with `ravenpack_id`s but little or no recent volume. | ||
|
|
||
| The script handles the rest: | ||
|
|
||
| - **Computes consistent windows.** Each window starts at 00:00 local time on the first day and ends at 23:59:59.999 local time on today, then is converted to UTC for the API. Calendar months are handled correctly (e.g. March 31 minus 1 month → February 28/29). | ||
| - **Parallelizes requests.** Each (company, window) pair is one API call, so the script runs 5 concurrent threads to keep things fast. | ||
| - **Respects rate limits.** A built-in limiter caps requests, preventing throttling even at full parallelism. | ||
| - **Skips already-populated rows.** If the input CSV already has values for a given window, that call is skipped — useful for resuming a previous run. | ||
| - **Writes timestamped output.** Each run writes to a new file (`company_coverage_YYYYMMDD_HHMMSS.csv`) so prior results are never overwritten. | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| - Python 3.10+ | ||
| - Install dependencies: | ||
| ```bash | ||
| pip install -r requirements.txt | ||
| ``` | ||
| - Set your API key in a `.env` file: | ||
| ``` | ||
| BIGDATA_API_KEY=your_api_key_here | ||
| ``` | ||
|
|
||
| ## Input | ||
|
|
||
| The script reads a CSV with one row per company. The only **required** column is `ravenpack_id`; any other columns are preserved in the output. | ||
|
|
||
| | Column | Description | | ||
| |----------------|-------------| | ||
| | `ravenpack_id` | Bigdata.com entity ID (required) | | ||
|
|
||
| Don't have `ravenpack_id`s yet? Generate them from tickers, ISINs, website URLs, or company names using the [`api_get_company_ids`](../api_get_company_ids/) how-to guide. Its output (`output/public_company_ids.csv` or `output/private_company_ids.csv`) can be fed directly into this script. | ||
|
|
||
| Example: | ||
|
|
||
| ```csv | ||
| name,ravenpack_id | ||
| Apple Inc.,D8442A | ||
| NVIDIA Corporation,E09E2B | ||
| ``` | ||
|
|
||
| ## Usage | ||
|
|
||
| ```bash | ||
| python get_company_coverage.py [input.csv] | ||
| ``` | ||
|
|
||
| If no argument is provided, the script defaults to `input/public_company_ids.csv`. | ||
|
|
||
| ## Output | ||
|
|
||
| Results are written to `output/company_coverage_YYYYMMDD_HHMMSS.csv` (the timestamp is set when the run starts). The output contains every column from the input plus six new ones — one document count and one chunk count per window: | ||
|
|
||
| | Column | Description | | ||
| |------------------------------|-------------| | ||
| | `distinct_documents_30_days` | Distinct documents in the last 30 days | | ||
| | `distinct_chunks_30_days` | Distinct chunks in the last 30 days | | ||
| | `distinct_documents_6_months`| Distinct documents in the last 6 months | | ||
| | `distinct_chunks_6_months` | Distinct chunks in the last 6 months | | ||
| | `distinct_documents_12_months`| Distinct documents in the last 12 months | | ||
| | `distinct_chunks_12_months` | Distinct chunks in the last 12 months | | ||
|
|
||
| To resume from a previous run, pass that file as the input — already-populated rows are skipped: | ||
|
|
||
| ```bash | ||
| python get_company_coverage.py output/company_coverage_20260522_143015.csv | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,198 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import calendar | ||
| import csv | ||
| import os | ||
| import sys | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from datetime import date, datetime, time, timedelta, timezone | ||
|
|
||
| import requests | ||
| from dotenv import load_dotenv | ||
|
|
||
| from logging_config import setup_logging | ||
| from rate_limiter import RateLimiter | ||
|
|
||
| load_dotenv() | ||
| logger = setup_logging(log_file="company_coverage.log") | ||
| API_KEY = os.getenv("BIGDATA_API_KEY") | ||
| VOLUME_URL = "https://api.bigdata.com/v1/search/volume" | ||
| HEADERS = {"Content-Type": "application/json", "x-api-key": API_KEY} | ||
| MAX_WORKERS = 5 | ||
| REQUIRED_INPUT_FIELDS = {"ravenpack_id"} | ||
|
|
||
| # Each window adds two columns: distinct_documents_{label} and distinct_chunks_{label}. | ||
| WINDOWS = [ | ||
| {"label": "30_days", "kind": "days", "value": 30}, | ||
| {"label": "6_months", "kind": "months", "value": 6}, | ||
| {"label": "12_months", "kind": "months", "value": 12}, | ||
| ] | ||
|
|
||
| rate_limiter = RateLimiter() | ||
|
|
||
|
|
||
| def _coverage_fields(label: str) -> tuple[str, str]: | ||
| return f"distinct_documents_{label}", f"distinct_chunks_{label}" | ||
|
|
||
|
|
||
| NEW_FIELDS = [field for w in WINDOWS for field in _coverage_fields(w["label"])] | ||
|
|
||
|
|
||
| def _read_csv(path: str) -> tuple[list[str], list[dict[str, str]]]: | ||
| """Read a CSV preserving column order. Validate required headers are present.""" | ||
| with open(path, newline="", encoding="utf-8") as f: | ||
| reader = csv.DictReader(f) | ||
| headers = [h.strip().lower() for h in (reader.fieldnames or [])] | ||
| missing = REQUIRED_INPUT_FIELDS - set(headers) | ||
| if missing: | ||
| raise SystemExit(f"CSV missing required columns: {', '.join(sorted(missing))}") | ||
| rows = [{k.strip().lower(): (v or "").strip() for k, v in row.items() if k} for row in reader] | ||
| return headers, rows | ||
|
|
||
|
|
||
| def _write_csv(rows: list[dict[str, str]], path: str, fieldnames: list[str]) -> None: | ||
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | ||
| with open(path, "w", newline="", encoding="utf-8") as f: | ||
| writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") | ||
| writer.writeheader() | ||
| writer.writerows(rows) | ||
|
|
||
|
|
||
| def _months_ago(d: date, months: int) -> date: | ||
| """Subtract calendar months, clamping the day if the target month is shorter | ||
| (e.g. March 31 minus 1 month -> Feb 28/29).""" | ||
| total = d.year * 12 + (d.month - 1) - months | ||
| year, month = divmod(total, 12) | ||
| month += 1 | ||
| day = min(d.day, calendar.monthrange(year, month)[1]) | ||
| return date(year, month, day) | ||
|
|
||
|
|
||
| def _window_bounds(kind: str, value: int) -> tuple[str, str]: | ||
| """Return (start, end) ISO-8601 UTC timestamps spanning the requested window | ||
| in local time — 00:00:00.000 local on the first day to 23:59:59.999 local | ||
| on today. `kind` is either "days" or "months".""" | ||
| local_tz = datetime.now().astimezone().tzinfo | ||
| today = datetime.now(local_tz).date() | ||
| if kind == "days": | ||
| start_date = today - timedelta(days=value - 1) | ||
| elif kind == "months": | ||
| start_date = _months_ago(today, value) | ||
| else: | ||
| raise ValueError(f"Unknown window kind: {kind!r}") | ||
| start_local = datetime.combine(start_date, time.min, tzinfo=local_tz) | ||
| end_local = datetime.combine(today, time(23, 59, 59, 999000), tzinfo=local_tz) | ||
| fmt = "%Y-%m-%dT%H:%M:%S" | ||
| return ( | ||
| start_local.astimezone(timezone.utc).strftime(fmt) + ".000Z", | ||
| end_local.astimezone(timezone.utc).strftime(fmt) + ".999Z", | ||
| ) | ||
|
|
||
|
|
||
| def _build_payload(ravenpack_id: str, start: str, end: str) -> dict: | ||
| return { | ||
| "query": { | ||
| "filters": { | ||
| "timestamp": {"start": start, "end": end}, | ||
| "entity": {"search_in": "ALL", "any_of": [ravenpack_id]}, | ||
| }, | ||
| "entity_details": False, | ||
| } | ||
| } | ||
|
|
||
|
|
||
| def _fetch_volume(ravenpack_id: str, start: str, end: str) -> tuple[int, int]: | ||
| """Return (distinct_documents, distinct_chunks) for the given window.""" | ||
| try: | ||
| rate_limiter.wait() | ||
| response = requests.post( | ||
| VOLUME_URL, headers=HEADERS, json=_build_payload(ravenpack_id, start, end) | ||
| ) | ||
| logger.debug(f"POST {VOLUME_URL} entity={ravenpack_id} status={response.status_code}") | ||
| response.raise_for_status() | ||
| totals = (response.json().get("results") or {}).get("total") or {} | ||
| return int(totals.get("documents", 0)), int(totals.get("chunks", 0)) | ||
| except Exception as exc: | ||
| logger.error(f"Volume lookup failed for {ravenpack_id}: {exc}") | ||
| raise | ||
|
|
||
|
|
||
| def main() -> None: | ||
| if len(sys.argv) > 2: | ||
| sys.exit("Usage: python get_company_coverage.py [input.csv]") | ||
| if not API_KEY: | ||
| sys.exit("Error: BIGDATA_API_KEY not set. Add it to your .env file.") | ||
|
|
||
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | ||
| output_path = f"output/company_coverage_{timestamp}.csv" | ||
| default_input = "input/public_company_ids.csv" | ||
| csv_path = sys.argv[1] if len(sys.argv) == 2 else default_input | ||
| if not os.path.isfile(csv_path): | ||
| sys.exit(f"Error: input file not found: {csv_path}") | ||
|
|
||
| logger.info(f"Reading {csv_path}") | ||
| headers, rows = _read_csv(csv_path) | ||
| fieldnames = headers + [f for f in NEW_FIELDS if f not in headers] | ||
| logger.info(f"Read {len(rows)} rows from CSV") | ||
|
|
||
| windows = [(w["label"], *_window_bounds(w["kind"], w["value"])) for w in WINDOWS] | ||
| for label, start, end in windows: | ||
| logger.info(f"Volume window [{label}]: {start} to {end}") | ||
|
|
||
| work_items: list[tuple[int, str, str, str]] = [] # (row_idx, label, start, end) | ||
| no_id = 0 | ||
| for idx, row in enumerate(rows): | ||
| ravenpack_id = row.get("ravenpack_id", "") | ||
| if not ravenpack_id: | ||
| logger.warning(f"Row {idx + 2}: no ravenpack_id, skipping") | ||
| no_id += 1 | ||
| continue | ||
| for label, start, end in windows: | ||
| doc_field, chunk_field = _coverage_fields(label) | ||
| if row.get(doc_field, "").strip() and row.get(chunk_field, "").strip(): | ||
| logger.debug(f"Row {idx + 2} [{label}]: already populated, skipping") | ||
| continue | ||
| work_items.append((idx, label, start, end)) | ||
| logger.info(f"Queued {len(work_items)} requests across {MAX_WORKERS} workers") | ||
|
|
||
| processed = failed = 0 | ||
| try: | ||
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | ||
| futures = { | ||
| executor.submit(_fetch_volume, rows[idx]["ravenpack_id"], start, end): | ||
| (idx, label) | ||
| for idx, label, start, end in work_items | ||
| } | ||
| try: | ||
| for fut in as_completed(futures): | ||
| idx, label = futures[fut] | ||
| ravenpack_id = rows[idx]["ravenpack_id"] | ||
| try: | ||
| documents, chunks = fut.result() | ||
| doc_field, chunk_field = _coverage_fields(label) | ||
| rows[idx][doc_field] = str(documents) | ||
| rows[idx][chunk_field] = str(chunks) | ||
| processed += 1 | ||
| logger.info( | ||
| f"Row {idx + 2}: {ravenpack_id} [{label}] -> " | ||
| f"documents={documents} chunks={chunks}" | ||
| ) | ||
| except Exception as exc: | ||
| failed += 1 | ||
| logger.error( | ||
| f"Row {idx + 2}: {ravenpack_id} [{label}] failed: {exc}" | ||
| ) | ||
| except KeyboardInterrupt: | ||
| logger.warning("Interrupted by user — cancelling pending requests") | ||
| for f in futures: | ||
| f.cancel() | ||
| finally: | ||
| _write_csv(rows, output_path, fieldnames) | ||
| logger.info( | ||
| f"Wrote {len(rows)} rows to {output_path} " | ||
| f"(api_calls={processed}, failed={failed}, rows_without_id={no_id})" | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| name,webpage,ravenpack_id,country,industry | ||
| Anthropic,,IWGWUR,US,Software | ||
| Mistral AI,,IUNEGY,FR,Software | ||
| ,https://www.mercadona.es/,JKE6GA,ES,Food Retailers and Wholesalers |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| name,mic,ticker,isin,cusip,sedol,ravenpack_id,country,industry | ||
| Micron Technology Inc.,,,US5951121038,,,49BBBC,US,Semiconductors | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can replace these input files with only one name,ravenpack_id
Apple Inc.,4A6F00
NVIDIA Corporation,52258B
`python |
||
| NVIDIA Corporation,,,,,2379504,E09E2B,US,Semiconductors | ||
| Figma Inc.,,,,316841105,,BA9E0C,US,Internet Services | ||
| Microsoft Corporation,XNAS,MSFT,,,,228D42,US,Software | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also this script doesn't care about public vs private companies so I would rename the default input file to just |
||
| Netflix Inc.,,,US64110L1061,,,ECD263,US,Broadcasting | ||
| Apple Inc.,XNAS,AAPL,US0378331005,,,D8442A,US,Computer Hardware | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| import logging | ||
| import os | ||
| import sys | ||
|
|
||
|
|
||
| def setup_logging(log_dir="logs", log_file="company_ids.log"): | ||
| os.makedirs(log_dir, exist_ok=True) | ||
| # stderr is typically line-buffered on TTYs; avoids INFO appearing only after a full batch. | ||
| console = logging.StreamHandler(sys.stderr) | ||
| console.setLevel(logging.INFO) | ||
| file_handler = logging.FileHandler(os.path.join(log_dir, log_file)) | ||
| file_handler.setLevel(logging.DEBUG) | ||
| logging.basicConfig( | ||
| level=logging.DEBUG, | ||
| format="%(asctime)s - %(levelname)s - %(message)s", | ||
| handlers=[console, file_handler], | ||
| ) | ||
| return logging.getLogger(__name__) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| import logging | ||
| import threading | ||
| from collections import deque | ||
| from time import time, sleep | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| MAX_REQUESTS_PER_MINUTE = 400 | ||
| RATE_LIMIT_WINDOW = 60 | ||
| RATE_LIMIT_COOLDOWN = 5 | ||
|
|
||
|
|
||
| class RateLimiter: | ||
| def __init__(self, max_requests: int = MAX_REQUESTS_PER_MINUTE, | ||
| window: int = RATE_LIMIT_WINDOW, | ||
| cooldown: int = RATE_LIMIT_COOLDOWN) -> None: | ||
| self._max_requests = max_requests | ||
| self._window = window | ||
| self._cooldown = cooldown | ||
| self._times: deque[float] = deque() | ||
| self._lock = threading.Lock() | ||
|
|
||
| def wait(self) -> None: | ||
| # Sleeping inside the lock is intentional: all threads share the same | ||
| # cooldown window, preventing bursts when the limit is hit. | ||
| with self._lock: | ||
| while True: | ||
| now = time() | ||
| while self._times and self._times[0] < now - self._window: | ||
| self._times.popleft() | ||
| if len(self._times) < self._max_requests: | ||
| break | ||
| logger.warning( | ||
| f"Rate limit reached ({self._max_requests} requests " | ||
| f"in {self._window}s), sleeping {self._cooldown}s" | ||
| ) | ||
| sleep(self._cooldown) | ||
| self._times.append(time()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| requests==2.31.0 | ||
| python-dotenv==1.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It ends up being a very long name, what do you think about:
distinct_documents_XXX->docs_XXXdistinct_chunks_XXX->chunks_XXXWe can still mention the distinct word in the README