diff --git a/bitepy/__pycache__/__init__.cpython-311.pyc b/bitepy/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..e121ed2 Binary files /dev/null and b/bitepy/__pycache__/__init__.cpython-311.pyc differ diff --git a/bitepy/__pycache__/simulation.cpython-311.pyc b/bitepy/__pycache__/simulation.cpython-311.pyc new file mode 100644 index 0000000..d2942a7 Binary files /dev/null and b/bitepy/__pycache__/simulation.cpython-311.pyc differ diff --git a/bitepy/data.py b/bitepy/data.py index ba8bf52..c2681ba 100644 --- a/bitepy/data.py +++ b/bitepy/data.py @@ -8,13 +8,20 @@ # Licensed under MIT License, see https://opensource.org/license/mit ###################################################################### +import gc +import multiprocessing import pandas as pd import numpy as np +import polars as pl from zipfile import ZipFile import os +import time +import zipfile +from io import BytesIO, StringIO +from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from pathlib import Path -from datetime import datetime +from datetime import datetime, date, timedelta try: from ._bitepy import Simulation_cpp @@ -24,6 +31,266 @@ ) from e +# ── Constants for fast EPEX parsing ────────────────────────────────────────── + +_EPEX_DROP_COLS = [ + "LinkedBasketId", "DeliveryArea", "ParentId", "DeliveryEnd", "Currency", + "Product", "UserDefinedBlock", "RevisionNo", "ExecutionRestriction", + "CreationTime", "QuantityUnit", "Volume", "VolumeUnit", +] + +_EPEX_RENAME_MAP = { + "OrderId": "order", + "InitialId": "initial", + "DeliveryStart": "start", + "Side": "side", + "Price": "price", + "ValidityTime": "validity", + "ActionCode": "action", + "TransactionTime": "transaction", + "Quantity": "quantity", +} + +_EPEX_DEDUP_COLS = ["OrderId", "InitialId", "ActionCode", "ValidityTime", "Price", "Quantity"] + +_EPEX_PRODUCT_FILTERS = { + "Hourly": ["Intraday_Hour_Power", "XBID_Hour_Power"], + "Quarter-Hourly": ["Intraday_Quarter_Hour_Power", "XBID_Quarter_Hour_Power"], +} + + +def _read_raw_epex_file_fast(timestamp: date, datapath: str, product: str = "Hourly") -> pl.DataFrame: + """Read and process a single raw EPEX zip file using Polars.""" + year = timestamp.strftime("%Y") + month = timestamp.strftime("%m") + datestr = f"Continuous_Orders-DE-{timestamp.strftime('%Y%m%d')}" + + folder = f"{datapath}/{year}/{month}" + zip_file_name = next(f for f in os.listdir(folder) if datestr in f) + csv_file_name = zip_file_name[:-4] + + with ZipFile(f"{folder}/{zip_file_name}") as zf: + raw_bytes = zf.read(csv_file_name) + + df = pl.read_csv(BytesIO(raw_bytes), skip_rows=1, infer_schema_length=10000) + + # Filter and clean + df = ( + df.unique(subset=_EPEX_DEDUP_COLS, keep="first", maintain_order=True) + .filter(pl.col("UserDefinedBlock") == "N") + .filter(pl.col("Product").is_in(_EPEX_PRODUCT_FILTERS[product])) + .filter(pl.col("ActionCode").is_in(["A", "D", "C", "I"])) + .drop(_EPEX_DROP_COLS) + .rename(_EPEX_RENAME_MAP) + .with_columns( + pl.col("start").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%SZ"), + pl.col("validity").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%SZ", strict=False), + pl.col("transaction").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.fZ"), + ) + ) + + # Remove iceberg orders + iceberg_ids = df.filter(pl.col("action") == "I")["initial"].unique().to_list() + df = df.filter(~pl.col("initial").is_in(iceberg_ids)) + + df = df.with_row_index("_idx") + + # Process change messages (shift instead of while loop) + c_orders = df.filter(pl.col("action") == "C")["order"].unique().to_list() + a_orders = df.filter(pl.col("action") == "A")["order"].unique().to_list() + orders_to_process = list(set(c_orders) & set(a_orders)) + + if orders_to_process: + chain = ( + df.filter( + pl.col("order").is_in(orders_to_process) + & pl.col("action").is_in(["A", "C"]) + ) + .sort("_idx") + ) + + chain = chain.with_columns( + pl.col("transaction").shift(-1).over("order").alias("_new_validity") + ) + + updates = chain.filter(pl.col("_new_validity").is_not_null()) + update_map = dict(zip(updates["_idx"].to_list(), updates["_new_validity"].to_list())) + + if update_map: + update_indices = list(update_map.keys()) + df = df.with_columns( + pl.when(pl.col("_idx").is_in(update_indices)) + .then( + pl.col("_idx").replace_strict( + update_map, default=None, return_dtype=pl.Datetime("ms") + ) + ) + .otherwise(pl.col("validity")) + .alias("validity") + ) + + c_indices = df.filter( + pl.col("order").is_in(orders_to_process) & (pl.col("action") == "C") + )["_idx"].to_list() + df = df.with_columns( + pl.when(pl.col("_idx").is_in(c_indices)) + .then(pl.lit("A")) + .otherwise(pl.col("action")) + .alias("action") + ) + + # Process cancel messages + cancel_messages = df.filter(pl.col("action") == "D") + a_orders_for_cancel = df.filter(pl.col("action") == "A")["order"].unique().to_list() + cancel_messages = cancel_messages.filter(pl.col("order").is_in(a_orders_for_cancel)) + + if not cancel_messages.is_empty(): + a_rows = ( + df.filter( + (pl.col("action") == "A") & pl.col("order").is_in(cancel_messages["order"].to_list()) + ) + .sort("transaction", "_idx") + .unique(subset=["order"], keep="last", maintain_order=True) + .select("order", "_idx") + .rename({"_idx": "_a_idx"}) + ) + + merged = cancel_messages.select("order", "transaction", "_idx").join(a_rows, on="order") + + update_map = dict(zip(merged["_a_idx"].to_list(), merged["transaction"].to_list())) + if update_map: + df = df.with_columns( + pl.when(pl.col("_idx").is_in(list(update_map.keys()))) + .then( + pl.col("_idx").replace_strict( + update_map, default=None, return_dtype=pl.Datetime("ms") + ) + ) + .otherwise(pl.col("validity")) + .alias("validity") + ) + + df = df.filter(pl.col("action") != "D").drop("order", "action", "_idx") + + df = df.with_columns( + pl.col("start").dt.strftime("%Y-%m-%dT%H:%M:%SZ"), + pl.col("transaction").dt.strftime("%Y-%m-%dT%H:%M:%S.%3fZ"), + pl.col("validity").dt.strftime("%Y-%m-%dT%H:%M:%S.%3fZ"), + ) + + return df + + +def _read_raw_files_parallel( + dates: list[date], + marketdatapath: str, + product: str = "Hourly", + max_workers: int = 2, + verbose: bool = False, +) -> dict[date, pl.DataFrame]: + """Read a batch of raw EPEX files in parallel. Returns dict of date -> DataFrame.""" + empty_schema = { + "initial": pl.Int64, "side": pl.Utf8, "start": pl.Utf8, + "transaction": pl.Utf8, "validity": pl.Utf8, + "price": pl.Float64, "quantity": pl.Float64, + } + raw_data: dict[date, pl.DataFrame] = {} + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(_read_raw_epex_file_fast, dt, marketdatapath, product): dt + for dt in dates + } + for future in as_completed(futures): + dt = futures[future] + try: + raw_data[dt] = future.result() + if verbose: + print(f" Read raw file for {dt} ({raw_data[dt].shape[0]} rows)") + except Exception as e: + print(f" ERROR reading {dt}: {e}") + raw_data[dt] = pl.DataFrame(schema=empty_schema) + + return raw_data + + +def _save_day(dt1: date, raw_data: dict[date, pl.DataFrame], savepath: str, verbose: bool): + """Combine raw files for dt1 and dt1+1, filter to transaction date dt1, and save.""" + dt2 = dt1 + timedelta(days=1) + df1 = raw_data.get(dt1, pl.DataFrame()) + df2 = raw_data.get(dt2, pl.DataFrame()) + + frames = [f for f in [df1, df2] if not f.is_empty()] + if not frames: + if verbose: + print(f" No data for {dt1}, skipping") + return + + df = pl.concat(frames) + + df = df.with_columns( + pl.col("transaction").str.slice(0, 10).alias("transaction_date"), + pl.col("price").cast(pl.Float64).round(2), + pl.col("quantity").cast(pl.Float64).round(1), + ) + + group = df.filter(pl.col("transaction_date") == dt1.isoformat()) + + if group.is_empty(): + if verbose: + print(f" No data for {dt1}, skipping") + return + + group = ( + group.sort("transaction") + .with_row_index("") + .drop("transaction_date") + ) + + group = group.with_columns(pl.col("validity").fill_null("")) + group = group.select(["", "initial", "side", "start", "transaction", "validity", "price", "quantity"]) + + daily_filename = f"orderbook_{dt1}.csv" + zip_path = f"{savepath}{daily_filename}.zip" + + buf = StringIO() + group.write_csv(buf) + csv_bytes = buf.getvalue().encode("utf-8") + + with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: + zf.writestr(daily_filename, csv_bytes) + + if verbose: + print(f" Saved {dt1}: {group.shape[0]} rows") + + +def _process_chunk( + chunk_start: date, + chunk_end: date, + marketdatapath: str, + savepath: str, + product: str, + max_workers: int, + verbose: bool, +): + """Process a single chunk in a subprocess. All memory is freed when this exits.""" + raw_dates = [] + d = chunk_start + while d <= chunk_end + timedelta(days=1): + raw_dates.append(d) + d += timedelta(days=1) + + if verbose: + print(f"\n Chunk {chunk_start} to {chunk_end} ({len(raw_dates)} raw files)") + + raw_data = _read_raw_files_parallel(raw_dates, marketdatapath, product, max_workers, verbose) + + d = chunk_start + while d <= chunk_end: + _save_day(d, raw_data, savepath, verbose) + d += timedelta(days=1) + + class Data: def __init__(self): """Initialize a Data instance.""" @@ -467,6 +734,67 @@ def parse_market_data(self, start_date_str: str, end_date_str: str, marketdatapa print("\nWriting CSV data completed.") + def parse_market_data_fast(self, start_date_str: str, end_date_str: str, marketdatapath: str, + savepath: str, market_type: str, product: str = "Hourly", + max_workers: int = 2, chunk_size: int = 3, verbose: bool = True): + """ + Fast version of parse_market_data using Polars + parallel raw file reading. + + Same interface as parse_market_data, but ~2x faster for EPEX data (2021+ format). + Currently supports EPEX only. Falls back to parse_market_data for NordPool. + + Processes the date range in chunks of `chunk_size` days to limit memory usage. + Each chunk runs in a subprocess so memory is truly freed by the OS between chunks. + + Args: + start_date_str (str): Start date in format "YYYY-MM-DD" + end_date_str (str): End date in format "YYYY-MM-DD" + marketdatapath (str): Path to market data folder with yearly/monthly subfolders + savepath (str): Directory where processed CSV files will be saved + market_type (str): "EPEX" or "NordPool" (NordPool falls back to original method) + product (str, optional): "Hourly" or "Quarter-Hourly". Defaults to "Hourly". + max_workers (int, optional): Number of parallel threads for reading raw files. Defaults to 2. + chunk_size (int, optional): Number of target days per chunk. Defaults to 3. + verbose (bool, optional): Print progress messages. Defaults to True. + """ + if market_type != "EPEX": + return self.parse_market_data(start_date_str, end_date_str, marketdatapath, savepath, market_type, verbose) + + if product not in _EPEX_PRODUCT_FILTERS: + raise ValueError(f"Unknown product '{product}'. Must be one of: {list(_EPEX_PRODUCT_FILTERS.keys())}") + + os.makedirs(savepath, exist_ok=True) + + start = date.fromisoformat(start_date_str) + end = date.fromisoformat(end_date_str) + + if start > end: + raise ValueError("Error: Start date is after end date.") + if start.year < 2020: + raise ValueError("Error: Years before 2020 are not supported.") + + t0 = time.time() + + # Process in chunks, each in a subprocess so memory is truly freed + chunk_start = start + while chunk_start <= end: + chunk_end = min(chunk_start + timedelta(days=chunk_size - 1), end) + + p = multiprocessing.Process( + target=_process_chunk, + args=(chunk_start, chunk_end, marketdatapath, savepath, product, max_workers, verbose), + ) + p.start() + p.join() + + if p.exitcode != 0: + raise RuntimeError(f"Chunk {chunk_start} to {chunk_end} failed (exit code {p.exitcode})") + + chunk_start = chunk_end + timedelta(days=1) + + if verbose: + print(f"\nTotal time: {time.time() - t0:.1f}s") + def create_bins_from_csv(self, csv_list: list, save_path: str, verbose: bool = True): """ Convert zipped CSV files of pre-processed order book data into binary files. diff --git a/pyproject.toml b/pyproject.toml index c1c3916..37603ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ classifiers = [ dependencies = [ "numpy>=1.16.0", "pandas>=0.24.0", + "polars>=1.0.0", "matplotlib>=3.0.0", "tqdm>=4.0.0", ]