diff --git a/examples/market_data/market_data_example.py b/examples/market_data/market_data_example.py new file mode 100644 index 0000000..bc7f7f5 --- /dev/null +++ b/examples/market_data/market_data_example.py @@ -0,0 +1,69 @@ +"""Example demonstrating the zone-addressed market_data API. + +Shows how to query European power market data (renewable generation, load, +day-ahead forecasts, and prices) by market zone using a single unified +vocabulary, without caring which underlying source serves each zone. +""" + +from datetime import datetime, timedelta, timezone + +from jua import JuaClient + + +def main(): + client = JuaClient() + md = client.market_data + + # --- Discovery --- + print("Available zones:") + print(f" {md.get_zones()}") + print() + + print("Available variables (all zones):") + print(f" {md.get_variables()}") + print() + + print("Available variables for GB:") + print(f" {md.get_variables(market_zone='GB')}") + print() + + end = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) + start = end - timedelta(days=2) + + # --- Germany: renewables + day-ahead price (served from ENTSOE) --- + print("DE solar/wind + day-ahead prices:") + de = md.get_data( + market_zone="DE", + variables=["solar", "wind", "day_ahead_prices"], + start_time=start, + end_time=end, + time_zone="Europe/Berlin", + ) + print(de.head()) + print() + + # --- Great Britain: renewables (served from the UK power feed) --- + print("GB solar/wind:") + gb = md.get_data( + market_zone="GB", + variables=["solar", "wind"], + start_time=start, + end_time=end, + time_zone="Europe/London", + ) + print(gb.head()) + print() + + # --- Combined request across zones in one call --- + print("Combined DE + GB solar:") + combined = md.get_data( + market_zone=["DE", "GB"], + variables=["solar"], + start_time=start, + end_time=end, + ) + print(combined.groupby("market_zone")["value"].describe()) + + +if __name__ == "__main__": + main() diff --git a/examples/market_data/query_actuals.py b/examples/market_data/query_actuals.py new file mode 100644 index 0000000..1cd05e7 --- /dev/null +++ b/examples/market_data/query_actuals.py @@ -0,0 +1,50 @@ +"""Query realised (actual) power market data for Germany and Great Britain. + +Uses the zone-addressed ``market_data`` API to pull observed solar generation, +wind generation, and load for ``DE`` and ``GB`` over a recent window. The same +unified call works for both zones even though they are served by different +underlying sources (ENTSO-E for DE, the UK-power feed for GB). +""" + +from datetime import datetime, timedelta, timezone + +from jua import JuaClient + +# Realised quantities (no forecasts, no prices) and the local time zone to +# return each zone's ``time`` column in. +ACTUAL_VARIABLES = ["solar", "wind", "load"] +ZONE_TIME_ZONE = { + "DE": "Europe/Berlin", + "GB": "Europe/London", +} + + +def main(): + client = JuaClient() + md = client.market_data + + end = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) + start = end - timedelta(days=2) + + for zone, time_zone in ZONE_TIME_ZONE.items(): + print(f"=== {zone} actuals ({time_zone}) ===") + df = md.get_data( + market_zone=zone, + variables=ACTUAL_VARIABLES, + start_time=start, + end_time=end, + time_zone=time_zone, + ) + if df.empty: + print(" No data returned.\n") + continue + + print(df.head()) + print() + print("Mean by variable:") + print(df.groupby("variable")["value"].mean().round(1)) + print() + + +if __name__ == "__main__": + main() diff --git a/src/jua/client.py b/src/jua/client.py index 028aa24..b21c8ae 100644 --- a/src/jua/client.py +++ b/src/jua/client.py @@ -14,6 +14,8 @@ class JuaClient: settings: Configuration settings for the API client. weather: Property that provides access to weather data services. market_aggregates: Property that provides access to market aggregate services. + market_data: Property that provides access to zone-addressed market data + (renewable generation, load, day-ahead forecasts, and prices). Examples: >>> from jua import JuaClient @@ -21,7 +23,9 @@ class JuaClient: >>> # Access weather services >>> forecast_model = client.weather.get_model(...) >>> # Access market aggregates - >>> market_data = client.market_aggregates.compare_runs(...) + >>> aggregates = client.market_aggregates.compare_runs(...) + >>> # Access market data by zone + >>> df = client.market_data.get_data(market_zone="DE", ...) """ @validate_call @@ -45,6 +49,7 @@ def __init__( self._weather = None self._market_aggregates = None self._power_forecast = None + self._market_data = None if jua_log_level is not None: logging.getLogger("jua").setLevel(jua_log_level) @@ -89,6 +94,20 @@ def power_forecast(self): self._power_forecast = PowerForecast(self) return self._power_forecast + @property + def market_data(self): + """Access to Jua's zone-addressed market data services. + + Returns: + MarketData client interface for querying renewable generation, + load, day-ahead forecasts, and prices by market zone. + """ + if self._market_data is None: + from jua.market_data import MarketData + + self._market_data = MarketData(self) + return self._market_data + def __enter__(self): return self diff --git a/src/jua/market_data/__init__.py b/src/jua/market_data/__init__.py new file mode 100644 index 0000000..5cf3b1b --- /dev/null +++ b/src/jua/market_data/__init__.py @@ -0,0 +1,5 @@ +from jua.market_data.market_data import MarketData + +__all__ = [ + "MarketData", +] diff --git a/src/jua/market_data/_entsoe.py b/src/jua/market_data/_entsoe.py new file mode 100644 index 0000000..b2a0c05 --- /dev/null +++ b/src/jua/market_data/_entsoe.py @@ -0,0 +1,168 @@ +"""Internal ENTSOE backend for the unified market_data interface. + +Wraps the Query Engine ``/v1/entsoe`` endpoints and normalizes responses into +the unified ``[time, market_zone, variable, value, unit]`` schema. SDK users do +not interact with this module; they go through :class:`jua.market_data.MarketData`. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING + +import pandas as pd + +from jua._api import QueryEngineAPI +from jua._utils.remove_none_from_dict import remove_none_from_dict +from jua.market_data import _mapping +from jua.market_data._frame import UNIFIED_COLUMNS, decode_columnar, parse_time + +if TYPE_CHECKING: + from jua.client import JuaClient + + +class _EntsoeBackend: + """Fetches and normalizes ENTSOE timeseries for a single market zone.""" + + def __init__(self, client: JuaClient) -> None: + self._client = client + self._api = QueryEngineAPI(jua_client=client) + + def fetch( + self, + market_zone: str, + variables: list[_mapping.MarketVariable], + *, + start_time: datetime, + end_time: datetime | None, + time_zone: str | None, + ) -> pd.DataFrame: + """Fetch unified ``variables`` for one zone from ENTSOE. + + Variables that resolve to the same native ENTSOE variable (e.g. solar + and wind both come from ``generation_actual``) are fetched in a single + request and split apart afterwards. PSR-based variables are summed per + timestamp (wind = onshore + offshore). + + Returns: + DataFrame with the unified columns, or empty if no data. + """ + default_zone = _mapping.entsoe_zone(market_zone) + + # Group unified variables by (native ENTSOE variable, effective zone) + # so each native query is issued at most once. Most variables use the + # zone's default code, but some are published under a different + # control-area code (e.g. DE imbalance prices live under "DE"). + groups: dict[tuple[str, str], list[_mapping.MarketVariable]] = {} + for variable in variables: + binding = _mapping.resolve(market_zone, variable.value).entsoe + assert binding is not None # routed here, so always set + effective_zone = binding.zone_override or default_zone + groups.setdefault((binding.variable, effective_zone), []).append(variable) + + frames: list[pd.DataFrame] = [] + for (native_variable, effective_zone), unified_vars in groups.items(): + raw = self._fetch_native( + native_variable=native_variable, + unified_vars=unified_vars, + entsoe_zone=effective_zone, + start_time=start_time, + end_time=end_time, + time_zone=time_zone, + ) + if raw.empty: + continue + for variable in unified_vars: + frame = self._normalize_variable(raw, market_zone, variable) + if not frame.empty: + frames.append(frame) + + if not frames: + return pd.DataFrame(columns=UNIFIED_COLUMNS) + return pd.concat(frames, ignore_index=True) + + def _fetch_native( + self, + *, + native_variable: str, + unified_vars: list[_mapping.MarketVariable], + entsoe_zone: str, + start_time: datetime, + end_time: datetime | None, + time_zone: str | None, + ) -> pd.DataFrame: + """Request a single native ENTSOE variable and parse the response.""" + psr_types: list[str] = [] + for variable in unified_vars: + binding = _mapping._ENTSOE_BINDINGS[variable] + psr_types.extend(binding.psr_types) + + body = remove_none_from_dict( + { + "variables": [native_variable], + "zone_keys": [entsoe_zone], + "psr_types": sorted(set(psr_types)) or None, + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat() if end_time else None, + "time_zone": time_zone, + } + ) + response = self._api.post("entsoe/data", data=body, requires_auth=True) + df = decode_columnar(response.json()) + return parse_time(df, time_zone) + + @staticmethod + def _normalize_variable( + raw: pd.DataFrame, + market_zone: str, + variable: _mapping.MarketVariable, + ) -> pd.DataFrame: + """Filter to the variable's components and rename to the unified schema. + + PSR-based variables (wind = onshore + offshore) are summed per + timestamp. Direction-split variables (imbalance Long/Short) are filtered + to a single ``other_type`` so prices are never summed together. + """ + binding = _mapping._ENTSOE_BINDINGS[variable] + + df = raw + if binding.psr_types and "psr_type" in df.columns: + df = df[df["psr_type"].isin(binding.psr_types)] + if binding.other_type is not None and "other_type" in df.columns: + df = df[df["other_type"] == binding.other_type] + if df.empty: + return pd.DataFrame(columns=UNIFIED_COLUMNS) + + unit = _first_unit(df) + if binding.psr_types: + # PSR-based variables aggregate their components per timestamp + # (e.g. wind = onshore + offshore). + agg = df.groupby("time", as_index=False, sort=True)["value"].sum( + min_count=1 + ) + else: + # Non-PSR variables (load, prices) carry exactly one value per + # timestamp once any other_type filter is applied. ENTSO-E can + # publish revision duplicates, so collapse to the last row rather + # than summing them, which would otherwise multiply the price. + agg = df.groupby("time", as_index=False, sort=True)["value"].last() + agg["market_zone"] = market_zone.upper() + agg["variable"] = variable.value + agg["unit"] = unit + return agg[UNIFIED_COLUMNS] + + # ------------------------------------------------------------------ + # Discovery passthrough (used to validate / surface availability) + # ------------------------------------------------------------------ + def available_zones(self) -> list[str]: + """Return raw ENTSOE zone codes that have data (diagnostic).""" + response = self._api.get("entsoe/zones", requires_auth=True) + return response.json().get("zones", []) + + +def _first_unit(df: pd.DataFrame) -> str | None: + """Return the first non-null unit in the frame, if any.""" + if "unit" not in df.columns: + return None + units = df["unit"].dropna() + return str(units.iloc[0]) if not units.empty else None diff --git a/src/jua/market_data/_frame.py b/src/jua/market_data/_frame.py new file mode 100644 index 0000000..9a80a39 --- /dev/null +++ b/src/jua/market_data/_frame.py @@ -0,0 +1,63 @@ +"""Shared response-decoding helpers for the market data backends. + +Internal module. Centralizes columnar-JSON decoding and DST-safe parsing of +the ``time`` column so the ENTSOE and UK-power backends behave identically. +""" + +from __future__ import annotations + +from zoneinfo import ZoneInfo + +import pandas as pd + +#: Column order of the normalized, unified market-data frame. +UNIFIED_COLUMNS: list[str] = ["time", "market_zone", "variable", "value", "unit"] + + +def decode_columnar(payload: dict) -> pd.DataFrame: + """Decode a Query Engine columnar-JSON body into a DataFrame. + + The default ``format=json`` responses are columnar (``{column: [values]}``). + When ``include_units=true`` the body is wrapped as ``{"data": ..., "units": + ...}``; we unwrap defensively even though this client never requests it. + + Args: + payload: Parsed JSON response body. + + Returns: + A DataFrame (empty when the response carries no rows). + """ + if isinstance(payload, dict) and "data" in payload and "units" in payload: + payload = payload["data"] + + if not payload or all(len(v) == 0 for v in payload.values()): + return pd.DataFrame() + + return pd.DataFrame(payload) + + +def parse_time(df: pd.DataFrame, time_zone: str | None) -> pd.DataFrame: + """Parse the ``time`` column to a DST-safe, timezone-aware dtype. + + The server emits ISO-8601 timestamps whose UTC offset changes across a + daylight-saving transition. Parsing those naively yields an ``object`` + column (mixed offsets), so we always normalize through UTC first and then + convert to the requested zone, preserving local wall-clock without dropping + or duplicating the transition hour. + + Args: + df: Frame containing a ``time`` column (no-op if absent or empty). + time_zone: IANA zone the caller requested, or ``None`` for UTC. + + Returns: + The same frame with ``time`` as a tz-aware datetime column. + """ + if df.empty or "time" not in df.columns: + return df + + times = pd.to_datetime(df["time"], utc=True, format="ISO8601") + if time_zone is not None: + times = times.dt.tz_convert(ZoneInfo(time_zone)) + df = df.copy() + df["time"] = times + return df diff --git a/src/jua/market_data/_mapping.py b/src/jua/market_data/_mapping.py new file mode 100644 index 0000000..ca738b0 --- /dev/null +++ b/src/jua/market_data/_mapping.py @@ -0,0 +1,254 @@ +"""Unified vocabulary, zone mapping, and capability matrix for market data. + +This module is the single source of truth for how the public, zone-addressed +``market_data`` vocabulary maps onto the underlying Query Engine backends +(ENTSOE and UK power). It is internal: SDK users never import from here. + +The design goal is that a caller addresses observational market data purely by +``market_zone`` (e.g. ``"DE"``, ``"GB"``) and a small curated set of variable +names that mean the same thing everywhere. The routing from a +``(market_zone, variable)`` pair to a concrete backend query lives here so the +public surface and the backends stay thin. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import StrEnum + + +class MarketBackend(StrEnum): + """Underlying data backend a unified variable is served from.""" + + ENTSOE = "entsoe" + UK_POWER = "uk_power" + + +class MarketVariable(StrEnum): + """Curated, backend-agnostic market data variables. + + The same name resolves to the appropriate native query for every + supported zone (see :data:`CAPABILITY_MATRIX`). + """ + + SOLAR = "solar" + WIND = "wind" + LOAD = "load" + SOLAR_FORECAST = "solar_forecast" + WIND_FORECAST = "wind_forecast" + LOAD_FORECAST = "load_forecast" + DAY_AHEAD_PRICES = "day_ahead_prices" + # ENTSO-E publishes imbalance prices per direction ("Long" = surplus, + # "Short" = shortfall). They are equal in single-price markets (e.g. DE, BE) + # and differ in dual-pricing markets (e.g. FR, NL), so we expose both + # rather than collapsing them, which would be lossy or simply wrong. + IMBALANCE_PRICE_LONG = "imbalance_price_long" + IMBALANCE_PRICE_SHORT = "imbalance_price_short" + + +# Market zone -> ENTSOE bidding/control zone code. Most market zones use the +# same code on the ENTSO-E Transparency Platform; Germany is published under +# the joint Germany-Luxembourg bidding zone. +MARKET_ZONE_TO_ENTSOE: dict[str, str] = { + "DE": "DE_LU", + "FR": "FR", + "NL": "NL", + "BE": "BE", + "GB": "GB", +} + +# ENTSOE PSR types that make up each unified renewable variable. Wind is the +# onshore + offshore total per the agreed "total wind only" vocabulary. +ENTSOE_SOLAR_PSR: tuple[str, ...] = ("Solar",) +ENTSOE_WIND_PSR: tuple[str, ...] = ("Wind Onshore", "Wind Offshore") + + +@dataclass(frozen=True) +class EntsoeBinding: + """How a unified variable maps onto an ENTSOE ``/data`` query. + + Attributes: + variable: Native ``EntsoeVariable`` name (e.g. ``"generation_actual"``). + psr_types: PSR types to request and sum into the unified value. + Empty for non-PSR variables (load, prices). + zone_override: ENTSOE zone code to use instead of the market zone's + default (see :data:`MARKET_ZONE_TO_ENTSOE`). Needed when a single + variable is published under a different control-area code than the + rest of the zone (e.g. DE imbalance prices live under ``"DE"``, + not the ``"DE_LU"`` bidding zone). + other_type: Value of the ENTSOE ``other_type`` column to select (e.g. + ``"Long"`` / ``"Short"`` for imbalance prices). When set, rows are + filtered to this single component instead of being summed. + """ + + variable: str + psr_types: tuple[str, ...] = () + zone_override: str | None = None + other_type: str | None = None + + +@dataclass(frozen=True) +class Capability: + """Resolution of a single ``(market_zone, variable)`` pair. + + Exactly one of ``entsoe`` / ``uk_power_variable`` is populated, matching + ``backend``. + """ + + backend: MarketBackend + entsoe: EntsoeBinding | None = None + uk_power_variable: str | None = None + + +# ENTSOE bindings for the unified variables, reused across EU zones and the +# GB price/forecast fallbacks. +_ENTSOE_BINDINGS: dict[MarketVariable, EntsoeBinding] = { + MarketVariable.SOLAR: EntsoeBinding("generation_actual", ENTSOE_SOLAR_PSR), + MarketVariable.WIND: EntsoeBinding("generation_actual", ENTSOE_WIND_PSR), + MarketVariable.SOLAR_FORECAST: EntsoeBinding( + "wind_solar_forecast_da", ENTSOE_SOLAR_PSR + ), + MarketVariable.WIND_FORECAST: EntsoeBinding( + "wind_solar_forecast_da", ENTSOE_WIND_PSR + ), + MarketVariable.LOAD: EntsoeBinding("load_actual"), + MarketVariable.LOAD_FORECAST: EntsoeBinding("load_forecast_da"), + MarketVariable.DAY_AHEAD_PRICES: EntsoeBinding("day_ahead_prices"), + MarketVariable.IMBALANCE_PRICE_LONG: EntsoeBinding( + "imbalance_prices", other_type="Long" + ), + MarketVariable.IMBALANCE_PRICE_SHORT: EntsoeBinding( + "imbalance_prices", other_type="Short" + ), +} + + +def _entsoe_capability(variable: MarketVariable) -> Capability: + return Capability(backend=MarketBackend.ENTSOE, entsoe=_ENTSOE_BINDINGS[variable]) + + +# Every unified variable is available for EU zones via ENTSOE. +_EU_CAPABILITIES: dict[MarketVariable, Capability] = { + variable: _entsoe_capability(variable) for variable in MarketVariable +} + +# GB serves renewables + load actual + renewable day-ahead forecasts from the +# richer UK-power feed (Elexon / PV_Live / NESO). GB prices and load forecast +# are intentionally not advertised: the /v1/uk-power endpoint exposes neither, +# and ENTSOE's GB zone has no usable price/load-forecast feed, so requesting +# them raises a clear "not supported" error instead of returning empty data. +# (GB day-ahead/imbalance prices will be re-added once exposed by the Query +# Engine.) +_GB_CAPABILITIES: dict[MarketVariable, Capability] = { + MarketVariable.SOLAR: Capability(MarketBackend.UK_POWER, uk_power_variable="solar"), + MarketVariable.WIND: Capability(MarketBackend.UK_POWER, uk_power_variable="wind"), + MarketVariable.LOAD: Capability(MarketBackend.UK_POWER, uk_power_variable="load"), + MarketVariable.SOLAR_FORECAST: Capability( + MarketBackend.UK_POWER, uk_power_variable="solar_forecast" + ), + MarketVariable.WIND_FORECAST: Capability( + MarketBackend.UK_POWER, uk_power_variable="wind_forecast" + ), +} + +# DE mirrors the EU defaults except for imbalance prices, which ENTSO-E +# publishes under the "DE" control area rather than the "DE_LU" bidding zone +# used for the rest of Germany's data. +_DE_CAPABILITIES: dict[MarketVariable, Capability] = dict(_EU_CAPABILITIES) +for _imb_var, _imb_dir in ( + (MarketVariable.IMBALANCE_PRICE_LONG, "Long"), + (MarketVariable.IMBALANCE_PRICE_SHORT, "Short"), +): + _DE_CAPABILITIES[_imb_var] = Capability( + backend=MarketBackend.ENTSOE, + entsoe=EntsoeBinding( + "imbalance_prices", zone_override="DE", other_type=_imb_dir + ), + ) + +# market_zone -> {unified variable -> capability} +CAPABILITY_MATRIX: dict[str, dict[MarketVariable, Capability]] = { + "DE": _DE_CAPABILITIES, + "FR": dict(_EU_CAPABILITIES), + "NL": dict(_EU_CAPABILITIES), + "BE": dict(_EU_CAPABILITIES), + "GB": _GB_CAPABILITIES, +} + + +def supported_zones() -> list[str]: + """Return the market zones the SDK can serve, sorted.""" + return sorted(CAPABILITY_MATRIX) + + +def supported_variables(market_zone: str | None = None) -> list[str]: + """Return the unified variables available, optionally for one zone. + + Args: + market_zone: When given, restrict to variables available for that + zone. When ``None``, return the full unified vocabulary. + + Returns: + Sorted list of unified variable names. + + Raises: + ValueError: If ``market_zone`` is given but not supported. + """ + if market_zone is None: + return sorted(v.value for v in MarketVariable) + zone = _normalize_zone(market_zone) + return sorted(v.value for v in CAPABILITY_MATRIX[zone]) + + +def _normalize_zone(market_zone: str) -> str: + """Validate and canonicalize a market zone code (upper-cased).""" + zone = market_zone.strip().upper() + if zone not in CAPABILITY_MATRIX: + available = ", ".join(supported_zones()) + raise ValueError( + f"Unsupported market zone '{market_zone}'. Available zones: {available}." + ) + return zone + + +def _normalize_variable(variable: str) -> MarketVariable: + """Validate and parse a unified variable name.""" + try: + return MarketVariable(variable) + except ValueError as exc: + available = ", ".join(sorted(v.value for v in MarketVariable)) + raise ValueError( + f"Unknown market variable '{variable}'. Available variables: {available}." + ) from exc + + +def resolve(market_zone: str, variable: str) -> Capability: + """Resolve a ``(market_zone, variable)`` pair to a backend capability. + + Args: + market_zone: Market zone code (e.g. ``"DE"``, ``"GB"``). Case-insensitive. + variable: Unified variable name (e.g. ``"solar"``). + + Returns: + The :class:`Capability` describing which backend and native query to use. + + Raises: + ValueError: If the zone or variable is unknown, or if the variable is + not available for that zone. + """ + zone = _normalize_zone(market_zone) + market_variable = _normalize_variable(variable) + + zone_caps = CAPABILITY_MATRIX[zone] + if market_variable not in zone_caps: + available = ", ".join(sorted(v.value for v in zone_caps)) + raise ValueError( + f"Variable '{variable}' is not supported for zone '{zone}'. " + f"Supported for {zone}: {available}." + ) + return zone_caps[market_variable] + + +def entsoe_zone(market_zone: str) -> str: + """Map a market zone to its ENTSOE zone code (e.g. ``DE -> DE_LU``).""" + return MARKET_ZONE_TO_ENTSOE[_normalize_zone(market_zone)] diff --git a/src/jua/market_data/_uk_power.py b/src/jua/market_data/_uk_power.py new file mode 100644 index 0000000..a5fe260 --- /dev/null +++ b/src/jua/market_data/_uk_power.py @@ -0,0 +1,106 @@ +"""Internal UK-power backend for the unified market_data interface. + +Wraps the Query Engine ``/v1/uk-power`` endpoints (GB actuals from Elexon / +PV_Live and NESO day-ahead forecasts) and normalizes responses into the unified +``[time, market_zone, variable, value, unit]`` schema. Internal module. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING + +import pandas as pd + +from jua._api import QueryEngineAPI +from jua._utils.remove_none_from_dict import remove_none_from_dict +from jua.market_data import _mapping +from jua.market_data._frame import UNIFIED_COLUMNS, decode_columnar, parse_time + +if TYPE_CHECKING: + from jua.client import JuaClient + +#: UK power is GB only. +_MARKET_ZONE = "GB" + + +class _UkPowerBackend: + """Fetches and normalizes UK power timeseries (GB only).""" + + def __init__(self, client: JuaClient) -> None: + self._client = client + self._api = QueryEngineAPI(jua_client=client) + + def fetch( + self, + market_zone: str, + variables: list[_mapping.MarketVariable], + *, + start_time: datetime, + end_time: datetime | None, + time_zone: str | None, + temporal_resolution_minutes: int | None = None, + ) -> pd.DataFrame: + """Fetch unified ``variables`` for GB from the UK-power feed. + + Returns: + DataFrame with the unified columns, or empty if no data. + """ + # unified variable -> native uk-power variable name + native_by_unified: dict[_mapping.MarketVariable, str] = {} + for variable in variables: + native = _mapping.resolve(market_zone, variable.value).uk_power_variable + assert native is not None # routed here, so always set + native_by_unified[variable] = native + + body = remove_none_from_dict( + { + "variables": sorted(set(native_by_unified.values())), + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat() if end_time else None, + "temporal_resolution_minutes": temporal_resolution_minutes, + "time_zone": time_zone, + } + ) + response = self._api.post("uk-power/data", data=body, requires_auth=True) + raw = parse_time(decode_columnar(response.json()), time_zone) + if raw.empty: + return pd.DataFrame(columns=UNIFIED_COLUMNS) + + frames: list[pd.DataFrame] = [] + for variable, native in native_by_unified.items(): + frame = self._normalize_variable(raw, variable, native) + if not frame.empty: + frames.append(frame) + + if not frames: + return pd.DataFrame(columns=UNIFIED_COLUMNS) + return pd.concat(frames, ignore_index=True) + + @staticmethod + def _normalize_variable( + raw: pd.DataFrame, + variable: _mapping.MarketVariable, + native: str, + ) -> pd.DataFrame: + """Select the native variable's rows and rename to the unified schema.""" + if "variable_name" not in raw.columns: + return pd.DataFrame(columns=UNIFIED_COLUMNS) + + df = raw[raw["variable_name"] == native].copy() + if df.empty: + return pd.DataFrame(columns=UNIFIED_COLUMNS) + + df["market_zone"] = _MARKET_ZONE + df["variable"] = variable.value + if "unit" not in df.columns: + df["unit"] = None + return df[UNIFIED_COLUMNS] + + # ------------------------------------------------------------------ + # Discovery passthrough + # ------------------------------------------------------------------ + def available_sources(self) -> list[str]: + """Return raw UK-power data source names (diagnostic).""" + response = self._api.get("uk-power/sources", requires_auth=True) + return response.json().get("sources", []) diff --git a/src/jua/market_data/market_data.py b/src/jua/market_data/market_data.py new file mode 100644 index 0000000..70796cc --- /dev/null +++ b/src/jua/market_data/market_data.py @@ -0,0 +1,223 @@ +"""Zone-addressed market data for the Jua SDK. + +``MarketData`` is the single public entry point for observational European +power-market data: renewable generation, load, day-ahead forecasts, and prices. +Callers address data purely by ``market_zone`` and a small, backend-agnostic +vocabulary; the SDK routes each ``(zone, variable)`` to the right underlying +data source and returns a single tidy ``pandas`` DataFrame. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING + +import pandas as pd + +from jua.market_data import _mapping +from jua.market_data._entsoe import _EntsoeBackend +from jua.market_data._frame import UNIFIED_COLUMNS +from jua.market_data._mapping import MarketBackend, MarketVariable +from jua.market_data._uk_power import _UkPowerBackend + +if TYPE_CHECKING: + from jua.client import JuaClient + + +class MarketData: + """Interface for Jua's zone-addressed market data. + + Data is addressed by ``market_zone`` (e.g. ``"DE"``, ``"FR"``, ``"GB"``) + and a curated set of variables that mean the same thing in every zone: + + - ``solar`` / ``wind`` - actual generation (MW) + - ``load`` - actual demand (MW) + - ``solar_forecast`` / ``wind_forecast`` - day-ahead generation forecast (MW) + - ``load_forecast`` - day-ahead demand forecast (MW) + - ``day_ahead_prices`` - day-ahead market price (EUR/MWh) + - ``imbalance_price_long`` / ``imbalance_price_short`` - imbalance + settlement price per direction (equal in single-price markets like + DE/BE; different in dual-pricing markets like FR/NL) + + ``wind`` is the total of all wind sub-types (onshore + offshore). The + underlying data sources differ by zone and variable, but that is an + implementation detail - the same call works everywhere. Not every variable + is served in every zone; use :meth:`get_variables` to see what a zone + supports. Requesting an unsupported ``(zone, variable)`` raises a clear + error (e.g. GB currently serves renewables and load only, not prices or + load forecast). + + Examples: + >>> from datetime import datetime, timezone + >>> from jua import JuaClient + >>> + >>> client = JuaClient() + >>> md = client.market_data + >>> + >>> md.get_zones() + ['BE', 'DE', 'FR', 'GB', 'NL'] + >>> md.get_variables(market_zone="GB") + ['load', 'solar', 'solar_forecast', 'wind', 'wind_forecast'] + >>> + >>> df = md.get_data( + ... market_zone="DE", + ... variables=["solar", "wind", "day_ahead_prices"], + ... start_time=datetime(2025, 12, 1, tzinfo=timezone.utc), + ... end_time=datetime(2025, 12, 3, tzinfo=timezone.utc), + ... ) + >>> df.columns.tolist() + ['time', 'market_zone', 'variable', 'value', 'unit'] + """ + + def __init__(self, client: JuaClient) -> None: + self._client = client + self._entsoe = _EntsoeBackend(client) + self._uk_power = _UkPowerBackend(client) + + def get_zones(self) -> list[str]: + """Return the market zones available for querying. + + Returns: + Sorted list of market zone codes (e.g. ``["BE", "DE", "FR", ...]``). + """ + return _mapping.supported_zones() + + def get_variables(self, market_zone: str | None = None) -> list[str]: + """Return the available unified variables, optionally for one zone. + + Args: + market_zone: When given, restrict to the variables available for + that zone. When ``None``, return the full vocabulary. + + Returns: + Sorted list of variable names. + + Raises: + ValueError: If ``market_zone`` is given but not supported. + """ + return _mapping.supported_variables(market_zone) + + def get_data( + self, + market_zone: str | list[str], + variables: list[str] | None = None, + *, + start_time: datetime, + end_time: datetime | None = None, + time_zone: str | None = None, + ) -> pd.DataFrame: + """Query market data for one or more zones. + + Args: + market_zone: Market zone code or list of codes (case-insensitive), + e.g. ``"DE"`` or ``["DE", "GB"]``. + variables: Unified variable names to fetch. When ``None``, every + variable available for each zone is returned. + start_time: Start of the time range, inclusive. Naive datetimes are + interpreted as UTC by the server; pass timezone-aware datetimes + to avoid ambiguity. + end_time: End of the time range, exclusive. When ``None``, no upper + bound is applied (useful for day-ahead forecasts that extend + into tomorrow). + time_zone: IANA time zone name for the returned ``time`` column + (e.g. ``"Europe/Berlin"``). Defaults to UTC. + + Returns: + A long-format ``pandas.DataFrame`` with columns + ``[time, market_zone, variable, value, unit]``. ``time`` is a + timezone-aware datetime column. Empty (with those columns) when no + data matches. + + Raises: + ValueError: If a zone is unsupported, or a requested variable is not + available for a requested zone. + RuntimeError: If an underlying API request fails. + + Examples: + >>> df = md.get_data( + ... market_zone="DE", + ... variables=["solar", "wind"], + ... start_time=datetime(2025, 12, 1, tzinfo=timezone.utc), + ... end_time=datetime(2025, 12, 2, tzinfo=timezone.utc), + ... time_zone="Europe/Berlin", + ... ) + """ + zones = [market_zone] if isinstance(market_zone, str) else list(market_zone) + if not zones: + raise ValueError("market_zone must be a non-empty string or list.") + + frames: list[pd.DataFrame] = [] + for zone in zones: + frames.extend( + self._fetch_zone( + market_zone=zone, + variables=variables, + start_time=start_time, + end_time=end_time, + time_zone=time_zone, + ) + ) + + if not frames: + return pd.DataFrame(columns=UNIFIED_COLUMNS) + + result = pd.concat(frames, ignore_index=True) + if result.empty: + return pd.DataFrame(columns=UNIFIED_COLUMNS) + return result.sort_values(["market_zone", "variable", "time"]).reset_index( + drop=True + ) + + def _fetch_zone( + self, + *, + market_zone: str, + variables: list[str] | None, + start_time: datetime, + end_time: datetime | None, + time_zone: str | None, + ) -> list[pd.DataFrame]: + """Resolve a single zone's variables and dispatch to the backends.""" + if variables is None: + requested = [ + MarketVariable(v) for v in _mapping.supported_variables(market_zone) + ] + else: + requested = [_mapping._normalize_variable(v) for v in variables] + + # Split requested variables by backend (validates availability). + by_backend: dict[MarketBackend, list[MarketVariable]] = {} + for variable in requested: + capability = _mapping.resolve(market_zone, variable.value) + by_backend.setdefault(capability.backend, []).append(variable) + + frames: list[pd.DataFrame] = [] + try: + if MarketBackend.ENTSOE in by_backend: + frames.append( + self._entsoe.fetch( + market_zone, + by_backend[MarketBackend.ENTSOE], + start_time=start_time, + end_time=end_time, + time_zone=time_zone, + ) + ) + if MarketBackend.UK_POWER in by_backend: + frames.append( + self._uk_power.fetch( + market_zone, + by_backend[MarketBackend.UK_POWER], + start_time=start_time, + end_time=end_time, + time_zone=time_zone, + ) + ) + except ValueError: + raise + except Exception as exc: # noqa: BLE001 - surface a uniform error + raise RuntimeError( + f"Failed to fetch market data for zone '{market_zone}': {exc}" + ) from exc + + return [f for f in frames if not f.empty] diff --git a/src/jua/power_forecast/power_forecast.py b/src/jua/power_forecast/power_forecast.py index 627924a..0026258 100644 --- a/src/jua/power_forecast/power_forecast.py +++ b/src/jua/power_forecast/power_forecast.py @@ -325,7 +325,7 @@ def get_data( requires_auth=True, ) data = response.json() - return self._to_dataset(data) + return self._to_dataset(data, time_zone=time_zone) except Exception as e: raise RuntimeError(f"Failed to fetch power forecast data: {e}") from e @@ -777,14 +777,28 @@ def _build_query_body( return remove_none_from_dict(body) @staticmethod - def _to_dataset(data: dict) -> xr.Dataset: - """Convert columnar JSON response to an xarray Dataset.""" + def _to_dataset(data: dict, time_zone: str | None = None) -> xr.Dataset: + """Convert columnar JSON response to an xarray Dataset. + + Timestamps are parsed DST-safely: the server emits ISO-8601 strings + whose UTC offset changes across a daylight-saving transition, so we + always normalize through UTC first (avoiding an ``object`` column of + mixed offsets) and then convert to ``time_zone`` when requested. This + keeps ``time`` / ``init_time`` as proper datetime dtypes so downstream + arithmetic (e.g. the day-ahead stitching lead computation) works for + ranges that span a DST boundary. + """ if not data or all(len(v) == 0 for v in data.values()): return xr.Dataset(attrs={"unit": "MW"}) df = pd.DataFrame(data) - df["time"] = pd.to_datetime(df["time"]) - df["init_time"] = pd.to_datetime(df["init_time"]) + for column in ("time", "init_time"): + if column not in df.columns: + continue + parsed = pd.to_datetime(df[column], utc=True, format="ISO8601") + if time_zone is not None: + parsed = parsed.dt.tz_convert(ZoneInfo(time_zone)) + df[column] = parsed index_cols = ["zone_key", "psr_type", "init_time", "time"] present_cols = [c for c in index_cols if c in df.columns] diff --git a/tests/functional/test_data_consistency.py b/tests/functional/test_data_consistency.py new file mode 100644 index 0000000..fd6d270 --- /dev/null +++ b/tests/functional/test_data_consistency.py @@ -0,0 +1,171 @@ +"""Functional data-consistency sanity checks against the live API. + +These guard the highest-risk failure mode for downstream users: silently +comparing a forecast against the wrong data because of a time-zone, DST, or +stitching error. They hit the real API and are marked ``functional`` so they +run via ``just test-functional`` rather than the default unit run. +""" + +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo + +import numpy as np +import pandas as pd +import pytest + +from jua import JuaClient + +pytestmark = pytest.mark.functional + + +@pytest.fixture +def client() -> JuaClient: + return JuaClient() + + +def _frame(ds) -> pd.DataFrame: + if "value" not in ds: + return pd.DataFrame() + return ds.to_dataframe().reset_index().dropna(subset=["value"]) + + +def test_power_forecast_timezone_invariant_values(client): + """The same forecast in two zones must carry identical values per instant. + + A tz/DST bug that shifted the data would surface here as differing values + once both series are aligned on their UTC instant. + """ + pf = client.power_forecast + kwargs = dict( + zone_keys=["DE"], + psr_types=["Solar", "Wind Onshore"], + init_time="latest", + max_prediction_timedelta=1440, + ) + utc = _frame(pf.get_data(time_zone="UTC", **kwargs)) + berlin = _frame(pf.get_data(time_zone="Europe/Berlin", **kwargs)) + assert not utc.empty and not berlin.empty + + utc["utc"] = pd.to_datetime(utc["time"], utc=True) + berlin["utc"] = pd.to_datetime(berlin["time"], utc=True) + merged = utc.merge( + berlin, on=["zone_key", "psr_type", "utc"], suffixes=("_u", "_b") + ) + + assert len(merged) == len(utc) == len(berlin) + assert np.allclose(merged["value_u"], merged["value_b"]) + + +def test_day_ahead_stitch_matches_direct_run(client): + """Each stitched day-ahead value equals a direct query of its source run. + + For ``init_hour=18`` the forecast for valid day D is the D-1 18:00 run. + Re-fetching that single run directly must reproduce the stitched values. + """ + pf = client.power_forecast + tz = ZoneInfo("Europe/Berlin") + midnight = ( + datetime.now(timezone.utc) + .astimezone(tz) + .replace(hour=0, minute=0, second=0, microsecond=0) + ) + start = midnight - timedelta(days=3) + + stitched = _frame( + pf.get_day_ahead_timeseries( + zone_keys=["DE"], + psr_types=["Solar"], + init_hour=18, + time_zone="Europe/Berlin", + start_date=start, + end_date=midnight, + ) + ) + if stitched.empty: + pytest.skip("no stitched day-ahead data available") + + stitched["utc"] = pd.to_datetime(stitched["time"], utc=True) + probe_day = stitched["utc"].iloc[len(stitched) // 2].tz_convert(tz).date() + init_local = datetime( + probe_day.year, probe_day.month, probe_day.day, 18, tzinfo=tz + ) - timedelta(days=1) + + direct = _frame( + pf.get_data( + zone_keys=["DE"], + psr_types=["Solar"], + init_time=init_local.astimezone(timezone.utc), + max_prediction_timedelta=39 * 60, + time_zone="Europe/Berlin", + ) + ) + if direct.empty: + pytest.skip("source run unavailable for probe day") + direct["utc"] = pd.to_datetime(direct["time"], utc=True) + + lo = pd.Timestamp(probe_day, tz=tz).tz_convert("UTC") + hi = lo + pd.Timedelta(days=1) + merged = stitched.merge( + direct, on=["zone_key", "psr_type", "utc"], suffixes=("_s", "_d") + ) + merged = merged[(merged["utc"] >= lo) & (merged["utc"] < hi)] + + assert len(merged) > 0 + assert np.allclose(merged["value_s"], merged["value_d"]) + + +def test_market_forecast_and_actual_share_time_grid(client): + """Day-ahead forecast and actual must sit on the same timestamps. + + A misaligned grid would invite comparing each forecast point against the + wrong actual. + """ + md = client.market_data + end = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) + start = end - timedelta(days=2) + + fc = md.get_data( + market_zone="DE", + variables=["solar_forecast"], + start_time=start, + end_time=end, + time_zone="Europe/Berlin", + ) + actual = md.get_data( + market_zone="DE", + variables=["solar"], + start_time=start, + end_time=end, + time_zone="Europe/Berlin", + ) + assert not fc.empty and not actual.empty + + grid_fc = set(pd.to_datetime(fc["time"], utc=True)) + grid_actual = set(pd.to_datetime(actual["time"], utc=True)) + overlap = grid_fc & grid_actual + assert len(overlap) >= 0.9 * min(len(grid_fc), len(grid_actual)) + + +def test_market_solar_actual_is_physically_plausible(client): + """Solar actual is non-negative and ~0 overnight (catches unit/label slips).""" + md = client.market_data + end = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) + start = end - timedelta(days=2) + + df = md.get_data( + market_zone="DE", + variables=["solar"], + start_time=start, + end_time=end, + time_zone="Europe/Berlin", + ) + assert not df.empty + + df = df.copy() + df["hour"] = pd.to_datetime(df["time"]).dt.hour + night = df[(df["hour"] <= 2) | (df["hour"] >= 23)]["value"] + midday = df[(df["hour"] >= 11) & (df["hour"] <= 14)]["value"] + + assert (df["value"] >= -1).all() + assert night.max() < 1000 + assert midday.max() > night.max() diff --git a/tests/functional/test_market_data.py b/tests/functional/test_market_data.py new file mode 100644 index 0000000..9050a53 --- /dev/null +++ b/tests/functional/test_market_data.py @@ -0,0 +1,126 @@ +"""Functional tests for the market_data module. + +These perform real API calls to verify zone-addressed market data retrieval +against live data. +""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from jua import JuaClient + +pytestmark = pytest.mark.functional + +_UNIFIED_COLUMNS = ["time", "market_zone", "variable", "value", "unit"] + + +@pytest.fixture +def md(): + return JuaClient().market_data + + +class TestMetadata: + def test_get_zones(self, md): + zones = md.get_zones() + assert isinstance(zones, list) + assert "DE" in zones + assert "GB" in zones + + def test_get_variables(self, md): + variables = md.get_variables() + assert "solar" in variables + assert "wind" in variables + assert "day_ahead_prices" in variables + + def test_get_variables_for_zone(self, md): + gb_vars = md.get_variables(market_zone="GB") + assert "solar" in gb_vars + # GB prices/load_forecast are not served and must not be advertised. + assert "day_ahead_prices" not in gb_vars + assert "load_forecast" not in gb_vars + + +class TestGetData: + def _window(self): + end = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) + start = end - timedelta(days=3) + return start, end + + def test_de_renewables(self, md): + start, end = self._window() + df = md.get_data( + market_zone="DE", + variables=["solar", "wind"], + start_time=start, + end_time=end, + ) + assert df.columns.tolist() == _UNIFIED_COLUMNS + assert not df.empty + assert set(df["variable"]) <= {"solar", "wind"} + assert set(df["market_zone"]) == {"DE"} + + def test_de_day_ahead_prices(self, md): + start, end = self._window() + df = md.get_data( + market_zone="DE", + variables=["day_ahead_prices"], + start_time=start, + end_time=end, + ) + assert not df.empty + assert set(df["unit"]) == {"EUR/MWh"} + + def test_gb_renewables_via_uk_power(self, md): + start, end = self._window() + df = md.get_data( + market_zone="GB", + variables=["solar", "wind"], + start_time=start, + end_time=end, + ) + assert not df.empty + assert set(df["market_zone"]) == {"GB"} + + def test_gb_prices_not_supported(self, md): + """GB prices/load forecast are not served: raise a clear error.""" + start, end = self._window() + for variable in [ + "day_ahead_prices", + "imbalance_price_long", + "imbalance_price_short", + "load_forecast", + ]: + with pytest.raises(ValueError, match="not supported for zone 'GB'"): + md.get_data( + market_zone="GB", + variables=[variable], + start_time=start, + end_time=end, + ) + + def test_time_zone_returns_tz_aware(self, md): + import pandas as pd + + start, end = self._window() + df = md.get_data( + market_zone="DE", + variables=["solar"], + start_time=start, + end_time=end, + time_zone="Europe/Berlin", + ) + if df.empty: + pytest.skip("No solar data for this window") + assert isinstance(df["time"].dtype, pd.DatetimeTZDtype) + + def test_multi_zone(self, md): + start, end = self._window() + df = md.get_data( + market_zone=["DE", "GB"], + variables=["solar"], + start_time=start, + end_time=end, + ) + assert not df.empty + assert set(df["market_zone"]) <= {"DE", "GB"} diff --git a/tests/market_data/test_mapping.py b/tests/market_data/test_mapping.py new file mode 100644 index 0000000..d266bb3 --- /dev/null +++ b/tests/market_data/test_mapping.py @@ -0,0 +1,102 @@ +"""Unit tests for the market_data capability matrix and routing.""" + +import pytest + +from jua.market_data import _mapping +from jua.market_data._mapping import MarketBackend, MarketVariable + + +def test_supported_zones(): + zones = _mapping.supported_zones() + assert zones == ["BE", "DE", "FR", "GB", "NL"] + + +def test_supported_variables_full_vocabulary(): + variables = _mapping.supported_variables() + assert set(variables) == {v.value for v in MarketVariable} + + +def test_supported_variables_for_zone(): + gb_vars = _mapping.supported_variables(market_zone="GB") + # GB exposes renewables + load actual + renewable day-ahead forecasts. + # Prices and load_forecast are intentionally not advertised (not served). + assert set(gb_vars) == { + "solar", + "wind", + "load", + "solar_forecast", + "wind_forecast", + } + + +def test_unsupported_zone_raises(): + with pytest.raises(ValueError, match="Unsupported market zone"): + _mapping.supported_variables(market_zone="ZZ") + + +def test_zone_is_case_insensitive(): + assert _mapping.entsoe_zone("de") == "DE_LU" + + +def test_entsoe_zone_mapping(): + assert _mapping.entsoe_zone("DE") == "DE_LU" + assert _mapping.entsoe_zone("FR") == "FR" + assert _mapping.entsoe_zone("GB") == "GB" + + +def test_eu_zone_routes_to_entsoe(): + cap = _mapping.resolve("DE", "solar") + assert cap.backend == MarketBackend.ENTSOE + assert cap.entsoe is not None + assert cap.entsoe.variable == "generation_actual" + assert cap.entsoe.psr_types == ("Solar",) + + +def test_eu_wind_sums_onshore_and_offshore(): + cap = _mapping.resolve("DE", "wind") + assert cap.entsoe is not None + assert set(cap.entsoe.psr_types) == {"Wind Onshore", "Wind Offshore"} + + +def test_gb_renewables_route_to_uk_power(): + for variable in ["solar", "wind", "load", "solar_forecast", "wind_forecast"]: + cap = _mapping.resolve("GB", variable) + assert cap.backend == MarketBackend.UK_POWER, variable + assert cap.uk_power_variable is not None + + +def test_gb_prices_and_load_forecast_not_supported(): + # These are valid vocabulary but not served for GB, so they must raise a + # clear "not supported" error rather than silently returning empty data. + for variable in [ + "day_ahead_prices", + "imbalance_price_long", + "imbalance_price_short", + "load_forecast", + ]: + with pytest.raises(ValueError, match="not supported for zone 'GB'"): + _mapping.resolve("GB", variable) + + +def test_de_imbalance_overrides_entsoe_zone_to_de(): + # DE imbalance prices are published under the "DE" control area, not the + # "DE_LU" bidding zone used for the rest of Germany's data. Both direction + # components carry the override and a single other_type filter. + for variable, direction in [ + ("imbalance_price_long", "Long"), + ("imbalance_price_short", "Short"), + ]: + cap = _mapping.resolve("DE", variable) + assert cap.backend == MarketBackend.ENTSOE + assert cap.entsoe is not None + assert cap.entsoe.variable == "imbalance_prices" + assert cap.entsoe.zone_override == "DE" + assert cap.entsoe.other_type == direction + # Everything else for DE still uses the default DE_LU zone (no override). + assert _mapping.resolve("DE", "day_ahead_prices").entsoe.zone_override is None + assert _mapping.entsoe_zone("DE") == "DE_LU" + + +def test_unknown_variable_raises(): + with pytest.raises(ValueError, match="Unknown market variable"): + _mapping.resolve("DE", "not_a_variable") diff --git a/tests/market_data/test_market_data.py b/tests/market_data/test_market_data.py new file mode 100644 index 0000000..14387a4 --- /dev/null +++ b/tests/market_data/test_market_data.py @@ -0,0 +1,362 @@ +"""Unit tests for MarketData routing, normalization, and DST-safe parsing.""" + +from datetime import datetime, timezone + +import pandas as pd +import pytest + +from jua import JuaClient +from jua.market_data._frame import parse_time + + +class _FakeResponse: + def __init__(self, payload): + self._payload = payload + + def json(self): + return self._payload + + +def _generation_payload(): + """ENTSOE generation_actual rows for Solar + Wind Onshore + Wind Offshore.""" + times = ["2025-12-01T00:00:00Z", "2025-12-01T01:00:00Z"] + rows_time = [] + rows_psr = [] + rows_value = [] + # solar=10/20, onshore=100/110, offshore=50/60 + values = { + "Solar": [10.0, 20.0], + "Wind Onshore": [100.0, 110.0], + "Wind Offshore": [50.0, 60.0], + } + for i, t in enumerate(times): + for psr in ("Solar", "Wind Onshore", "Wind Offshore"): + rows_time.append(t) + rows_psr.append(psr) + rows_value.append(values[psr][i]) + return { + "time": rows_time, + "variable_name": ["generation_actual"] * len(rows_time), + "zone_key": ["DE_LU"] * len(rows_time), + "psr_type": rows_psr, + "value": rows_value, + "unit": ["MW"] * len(rows_time), + } + + +def _prices_payload(zone_key: str): + times = ["2025-12-01T00:00:00Z", "2025-12-01T01:00:00Z"] + return { + "time": times, + "variable_name": ["day_ahead_prices"] * 2, + "zone_key": [zone_key] * 2, + "psr_type": ["", ""], + "value": [85.5, 82.3], + "unit": ["EUR/MWh", "EUR/MWh"], + } + + +def _imbalance_payload(zone_key="DE"): + """ENTSOE imbalance_prices rows: two timestamps, each with Long + Short. + + Long != Short (dual-pricing-like) so summing would be detectable, and t0 + carries a duplicate "Long" revision row (identical value) so de-duplication + can be distinguished from summing. + """ + return { + "time": [ + "2025-12-01T00:00:00Z", # Long + "2025-12-01T00:00:00Z", # Long (revision duplicate, same value) + "2025-12-01T00:00:00Z", # Short + "2025-12-01T01:00:00Z", # Long + "2025-12-01T01:00:00Z", # Short + ], + "variable_name": ["imbalance_prices"] * 5, + "zone_key": [zone_key] * 5, + "psr_type": [""] * 5, + "other_type": ["Long", "Long", "Short", "Long", "Short"], + "value": [10.0, 10.0, 99.0, -5.0, 20.0], + "unit": ["EUR/MWh"] * 5, + } + + +def _uk_payload(): + times = ["2025-12-01T00:00:00Z", "2025-12-01T01:00:00Z"] + return { + "time": times * 2, + "variable_name": ["solar", "solar", "wind", "wind"], + "value": [5.0, 8.0, 200.0, 210.0], + "unit": ["MW"] * 4, + } + + +class _Recorder: + """Captures posts and returns canned payloads keyed by native variable.""" + + def __init__(self): + self.calls = [] + + def make(self, payload_for): + def fake_post(path, data=None, requires_auth=True, **kwargs): + self.calls.append({"path": path, "body": data}) + return _FakeResponse(payload_for(data)) + + return fake_post + + +def _patch_entsoe(monkeypatch, md, payload_for): + rec = _Recorder() + monkeypatch.setattr(md._entsoe._api, "post", rec.make(payload_for)) + return rec + + +def _patch_uk(monkeypatch, md, payload_for): + rec = _Recorder() + monkeypatch.setattr(md._uk_power._api, "post", rec.make(payload_for)) + return rec + + +_START = datetime(2025, 12, 1, tzinfo=timezone.utc) +_END = datetime(2025, 12, 2, tzinfo=timezone.utc) + + +def test_de_routes_to_entsoe_with_mapped_zone(monkeypatch): + md = JuaClient().market_data + + def payload_for(body): + return _generation_payload() + + rec = _patch_entsoe(monkeypatch, md, payload_for) + + df = md.get_data( + market_zone="DE", + variables=["solar", "wind"], + start_time=_START, + end_time=_END, + ) + + # Solar + wind both come from generation_actual -> single request. + assert len(rec.calls) == 1 + body = rec.calls[0]["body"] + assert rec.calls[0]["path"] == "entsoe/data" + assert body["zone_keys"] == ["DE_LU"] + assert body["variables"] == ["generation_actual"] + assert set(body["psr_types"]) == {"Solar", "Wind Onshore", "Wind Offshore"} + + assert df.columns.tolist() == ["time", "market_zone", "variable", "value", "unit"] + assert set(df["variable"]) == {"solar", "wind"} + assert set(df["market_zone"]) == {"DE"} + + +def test_entsoe_wind_sums_onshore_and_offshore(monkeypatch): + md = JuaClient().market_data + _patch_entsoe(monkeypatch, md, lambda body: _generation_payload()) + + df = md.get_data( + market_zone="DE", + variables=["wind"], + start_time=_START, + end_time=_END, + ) + + wind = df[df["variable"] == "wind"].sort_values("time") + # onshore + offshore: 100+50=150, 110+60=170 + assert wind["value"].tolist() == [150.0, 170.0] + + +def test_gb_renewables_route_to_uk_power(monkeypatch): + md = JuaClient().market_data + + entsoe_rec = _patch_entsoe(monkeypatch, md, lambda body: _prices_payload("GB")) + uk_rec = _patch_uk(monkeypatch, md, lambda body: _uk_payload()) + + df = md.get_data( + market_zone="GB", + variables=["solar", "wind"], + start_time=_START, + end_time=_END, + ) + + # GB renewables come from uk-power; ENTSOE is not involved for GB. + assert len(uk_rec.calls) == 1 + assert uk_rec.calls[0]["path"] == "uk-power/data" + assert set(uk_rec.calls[0]["body"]["variables"]) == {"solar", "wind"} + assert len(entsoe_rec.calls) == 0 + + assert set(df["variable"]) == {"solar", "wind"} + assert set(df["market_zone"]) == {"GB"} + + +def test_gb_prices_not_supported_raises(monkeypatch): + md = JuaClient().market_data + entsoe_rec = _patch_entsoe(monkeypatch, md, lambda body: _prices_payload("GB")) + uk_rec = _patch_uk(monkeypatch, md, lambda body: _uk_payload()) + + # GB prices/load_forecast are not served: fail clearly, hit no backend. + for variable in [ + "day_ahead_prices", + "imbalance_price_long", + "imbalance_price_short", + "load_forecast", + ]: + with pytest.raises(ValueError, match="not supported for zone 'GB'"): + md.get_data( + market_zone="GB", + variables=[variable], + start_time=_START, + end_time=_END, + ) + + assert len(entsoe_rec.calls) == 0 + assert len(uk_rec.calls) == 0 + + +def test_imbalance_long_filters_dedups_and_uses_de_zone(monkeypatch): + md = JuaClient().market_data + rec = _patch_entsoe(monkeypatch, md, lambda body: _imbalance_payload("DE")) + + df = md.get_data( + market_zone="DE", + variables=["imbalance_price_long"], + start_time=_START, + end_time=_END, + ) + + # DE imbalance is published under control area "DE", not the DE_LU bidding + # zone used for everything else. + body = rec.calls[0]["body"] + assert body["zone_keys"] == ["DE"] + assert body["variables"] == ["imbalance_prices"] + + out = df.sort_values("time") + # Long only, with the duplicate t0 row collapsed (10.0, NOT 20.0), and the + # Short rows (99.0/20.0) excluded entirely. Never summed with Short. + assert out["value"].tolist() == [10.0, -5.0] + assert set(out["variable"]) == {"imbalance_price_long"} + assert set(out["unit"]) == {"EUR/MWh"} + + +def test_imbalance_short_returns_short_side(monkeypatch): + md = JuaClient().market_data + _patch_entsoe(monkeypatch, md, lambda body: _imbalance_payload("DE")) + + df = md.get_data( + market_zone="DE", + variables=["imbalance_price_short"], + start_time=_START, + end_time=_END, + ) + out = df.sort_values("time") + assert out["value"].tolist() == [99.0, 20.0] + + +def test_imbalance_long_and_short_single_fetch_split(monkeypatch): + md = JuaClient().market_data + rec = _patch_entsoe(monkeypatch, md, lambda body: _imbalance_payload("DE")) + + df = md.get_data( + market_zone="DE", + variables=["imbalance_price_long", "imbalance_price_short"], + start_time=_START, + end_time=_END, + ) + + # Both directions share one native variable + zone, so a single request is + # issued and split locally into two distinct series. + assert len(rec.calls) == 1 + long = df[df["variable"] == "imbalance_price_long"].sort_values("time") + short = df[df["variable"] == "imbalance_price_short"].sort_values("time") + assert long["value"].tolist() == [10.0, -5.0] + assert short["value"].tolist() == [99.0, 20.0] + # The two series must not be identical (proves no accidental collapse). + assert long["value"].tolist() != short["value"].tolist() + + +def test_multi_zone_fan_out_and_concat(monkeypatch): + md = JuaClient().market_data + _patch_entsoe(monkeypatch, md, lambda body: _generation_payload()) + _patch_uk(monkeypatch, md, lambda body: _uk_payload()) + + df = md.get_data( + market_zone=["DE", "GB"], + variables=["solar"], + start_time=_START, + end_time=_END, + ) + + assert set(df["market_zone"]) == {"DE", "GB"} + assert set(df["variable"]) == {"solar"} + + +def test_unknown_variable_raises(monkeypatch): + md = JuaClient().market_data + # No backend should be called; resolution fails first. + with pytest.raises(ValueError, match="Unknown market variable"): + md.get_data( + market_zone="DE", + variables=["nonexistent_variable"], + start_time=_START, + end_time=_END, + ) + + +def test_start_time_serialized_isoformat(monkeypatch): + md = JuaClient().market_data + rec = _patch_entsoe(monkeypatch, md, lambda body: _generation_payload()) + + md.get_data( + market_zone="DE", + variables=["solar"], + start_time=_START, + end_time=_END, + ) + assert rec.calls[0]["body"]["start_time"] == _START.isoformat() + assert rec.calls[0]["body"]["end_time"] == _END.isoformat() + + +def test_empty_response_returns_empty_unified_frame(monkeypatch): + md = JuaClient().market_data + _patch_entsoe(monkeypatch, md, lambda body: {}) + + df = md.get_data( + market_zone="DE", + variables=["solar"], + start_time=_START, + end_time=_END, + ) + assert df.empty + assert df.columns.tolist() == ["time", "market_zone", "variable", "value", "unit"] + + +# --------------------------------------------------------------------------- +# DST-safe time parsing +# --------------------------------------------------------------------------- +def test_parse_time_handles_fall_back_day(): + """A 25-hour fall-back day must parse to a full-length tz-aware column.""" + tz = "Europe/Berlin" + # 2025-10-26 is the European fall-back day (25 hours): 25 hourly stamps + # starting at local midnight, crossing the +02:00 -> +01:00 transition. + local_index = pd.date_range("2025-10-26 00:00", periods=25, freq="h", tz=tz) + assert len(local_index) == 25 # sanity: the 25-hour day + + # Server emits ISO strings whose offset changes across the transition. + iso_strings = [ts.isoformat() for ts in local_index] + assert any("+02:00" in s for s in iso_strings) + assert any("+01:00" in s for s in iso_strings) + + df = pd.DataFrame({"time": iso_strings, "value": range(25)}) + parsed = parse_time(df, time_zone=tz) + + assert len(parsed) == 25 + assert parsed["time"].notna().all() + assert isinstance(parsed["time"].dtype, pd.DatetimeTZDtype) + assert str(parsed["time"].dt.tz) == tz + + +def test_parse_time_defaults_to_utc(): + df = pd.DataFrame( + {"time": ["2025-12-01T00:00:00Z", "2025-12-01T01:00:00Z"], "value": [1, 2]} + ) + parsed = parse_time(df, time_zone=None) + assert isinstance(parsed["time"].dtype, pd.DatetimeTZDtype) + assert str(parsed["time"].dt.tz) == "UTC" diff --git a/tests/power_forecast/test_day_ahead_timeseries.py b/tests/power_forecast/test_day_ahead_timeseries.py index 175b941..9559164 100644 --- a/tests/power_forecast/test_day_ahead_timeseries.py +++ b/tests/power_forecast/test_day_ahead_timeseries.py @@ -1,10 +1,11 @@ from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo import pandas as pd import xarray as xr from jua import JuaClient -from jua.power_forecast.power_forecast import InitTimeInfo +from jua.power_forecast.power_forecast import InitTimeInfo, PowerForecast def _make_ds(zone: str, psr: str, init_times: list[datetime]) -> xr.Dataset: @@ -175,3 +176,336 @@ def fail_init_times(*a, **k): tmin = tmin.tz_localize("UTC") if tmin.tzinfo is None else tmin tmax = tmax.tz_localize("UTC") if tmax.tzinfo is None else tmax assert tmin >= lo and tmax < hi + + +# ---------------------------------------------------------------------- +# DST / mixed-offset parsing (regression for the day-ahead stitch bug) +# ---------------------------------------------------------------------- + + +def test_to_dataset_parses_mixed_offsets_across_dst(): + """Mixed UTC offsets (across a DST boundary) must parse to a single + tz-aware dtype, not an ``object`` column. + + Regression test: parsing such a response naively yields ``object`` dtype, + which breaks the ``time - init_time`` subtraction used by the day-ahead + stitching with ``TypeError: cannot subtract DatetimeArray from ndarray``. + """ + # Europe/Berlin springs forward on 2026-03-29: +01:00 before, +02:00 after. + data = { + "zone_key": ["GB", "GB"], + "psr_type": ["Solar", "Solar"], + "init_time": [ + "2026-03-28T09:00:00+01:00", + "2026-03-29T09:00:00+02:00", + ], + "time": [ + "2026-03-29T00:00:00+01:00", + "2026-03-30T00:00:00+02:00", + ], + "value": [1.0, 2.0], + } + + ds = PowerForecast._to_dataset(data, time_zone="Europe/Berlin") + df = ds.to_dataframe().reset_index() + + assert isinstance(df["time"].dtype, pd.DatetimeTZDtype) + assert isinstance(df["init_time"].dtype, pd.DatetimeTZDtype) + assert str(df["time"].dt.tz) == "Europe/Berlin" + # The exact operation that used to raise must now succeed and stay typed. + lead = df["time"] - df["init_time"] + assert str(lead.dtype).startswith("timedelta64") + # On the populated diagonal each value is 15h after its own init. + valid = df.dropna(subset=["value"]) + assert ((valid["time"] - valid["init_time"]) == pd.Timedelta(hours=15)).all() + + +def test_to_dataset_timezone_invariant_values(): + """The requested ``time_zone`` only relabels instants - never the values. + + Guards the failure mode where a tz/DST bug would shift the data so a + forecast lines up against the wrong hour's actual. The same raw payload + parsed in two zones must describe identical instants and identical values. + """ + data = { + "zone_key": ["DE", "DE"], + "psr_type": ["Solar", "Solar"], + "init_time": ["2026-06-01T00:00:00Z", "2026-06-01T00:00:00Z"], + "time": ["2026-06-01T10:00:00Z", "2026-06-01T11:00:00Z"], + "value": [100.0, 200.0], + } + + du = ( + PowerForecast._to_dataset(data, time_zone="UTC") + .to_dataframe() + .reset_index() + .dropna(subset=["value"]) + ) + db = ( + PowerForecast._to_dataset(data, time_zone="Europe/Berlin") + .to_dataframe() + .reset_index() + .dropna(subset=["value"]) + ) + + assert str(db["time"].dt.tz) == "Europe/Berlin" + du["utc"] = du["time"].dt.tz_convert("UTC") + db["utc"] = db["time"].dt.tz_convert("UTC") + merged = du.merge(db, on=["zone_key", "psr_type", "utc"], suffixes=("_u", "_b")) + assert len(merged) == len(du) == len(db) + assert (merged["value_u"] == merged["value_b"]).all() + + +def test_day_ahead_stitch_value_comes_from_correct_init_and_lead(monkeypatch): + """Stitched values must come from the right run at the right lead. + + With ``value = init_index * 1000 + lead_hours`` per the synthetic dataset, + the day-ahead window for ``init_hour=9`` starts 15h after each init, so the + first valid hour of the first stitched day must equal ``0 * 1000 + 15``. + Guards against off-by-one lead selection / picking the wrong init. + """ + client = JuaClient() + pf = client.power_forecast + + zone, psr = "DE", "Solar" + t1 = datetime(2025, 1, 1, 9, 0, tzinfo=timezone.utc) + t2 = datetime(2025, 1, 2, 9, 0, tzinfo=timezone.utc) + init_infos = [ + InitTimeInfo(init_time=t1, max_prediction_timedelta=40 * 60), + InitTimeInfo(init_time=t2, max_prediction_timedelta=40 * 60), + ] + monkeypatch.setattr( + pf, + "get_init_times", + lambda zone_key=None, psr_type=None, limit=96: init_infos, + ) + monkeypatch.setattr(pf, "get_data", lambda **kwargs: _make_ds(zone, psr, [t1, t2])) + + stitched = pf.get_day_ahead_timeseries( + zone_keys=[zone], + psr_types=[psr], + init_hour=9, + time_zone="UTC", + max_init_times=10, + ) + + df = stitched.to_dataframe().reset_index().dropna(subset=["value"]) + df = df.sort_values("time") + # First stitched valid hour is 2025-01-02 00:00 from the 2025-01-01 09:00 + # run at +15h lead -> value 0*1000 + 15 = 15. + assert float(df.iloc[0]["value"]) == 15.0 + # Day two starts from the second init (index 1) at +15h -> 1015. + day2 = df[ + pd.to_datetime(df["time"]).dt.tz_localize(None) >= datetime(2025, 1, 3, 0, 0) + ] + assert float(day2.iloc[0]["value"]) == 1015.0 + + +def test_to_dataset_defaults_to_utc_for_mixed_offsets(): + """Without a ``time_zone`` the frame is normalized to tz-aware UTC.""" + data = { + "zone_key": ["GB", "GB"], + "psr_type": ["Solar", "Solar"], + "init_time": [ + "2026-03-28T09:00:00+01:00", + "2026-03-29T09:00:00+02:00", + ], + "time": [ + "2026-03-28T10:00:00+01:00", + "2026-03-29T11:00:00+02:00", + ], + "value": [1.0, 2.0], + } + + ds = PowerForecast._to_dataset(data) + df = ds.to_dataframe().reset_index() + + assert isinstance(df["time"].dtype, pd.DatetimeTZDtype) + assert str(df["time"].dt.tz) == "UTC" + + +class _FakeResponse: + def __init__(self, payload): + self._payload = payload + + def json(self): + return self._payload + + +def _columnar_response( + *, + zone: str, + psr_types: list[str], + init_iso_list: list[str], + tz: ZoneInfo, + step_minutes: int = 60, + horizon_hours: int = 40, +): + """Build a columnar power-forecast response for the requested inits. + + Times and init_times are emitted as zone-local ISO strings (mirroring the + server), so a range crossing a DST transition naturally yields mixed UTC + offsets - exactly the shape that exercises the parsing fix. + """ + cols: dict[str, list] = { + "zone_key": [], + "psr_type": [], + "init_time": [], + "time": [], + "value": [], + } + steps = int(horizon_hours * 60 / step_minutes) + for it_iso in init_iso_list: + it = datetime.fromisoformat(it_iso) # tz-aware UTC + for psr in psr_types: + for s in range(steps): + t = it + timedelta(minutes=step_minutes * s) + cols["zone_key"].append(zone) + cols["psr_type"].append(psr) + cols["init_time"].append(it.astimezone(tz).isoformat()) + cols["time"].append(t.astimezone(tz).isoformat()) + cols["value"].append(float(s)) + return _FakeResponse(cols) + + +def _patch_post(monkeypatch, pf, zone, psr_types, tz): + """Route the data endpoint through a synthetic columnar response.""" + + def fake_post(path, data=None, requires_auth=True): + return _columnar_response( + zone=zone, + psr_types=psr_types, + init_iso_list=list(data["init_time"]), + tz=tz, + ) + + monkeypatch.setattr(pf._api, "post", fake_post) + + +def _run_date_range(pf, *, zone, psr_types, init_hour, tz_name, start, end): + return pf.get_day_ahead_timeseries( + zone_keys=[zone], + psr_types=psr_types, + init_hour=init_hour, + time_zone=tz_name, + start_date=start, + end_date=end, + ) + + +def test_day_ahead_date_range_handles_spring_forward_dst(monkeypatch): + """End-to-end stitch across the spring-forward transition (clocks +1h). + + This reproduces Max's GB/init_hour=9 case: the response spans + 2026-03-29 where Europe/Berlin jumps +01:00 -> +02:00. + """ + pf = JuaClient().power_forecast + tz_name = "Europe/Berlin" + tz = ZoneInfo(tz_name) + _patch_post(monkeypatch, pf, "GB", ["Solar", "Wind"], tz) + + start = datetime(2026, 3, 27, tzinfo=tz) + end = datetime(2026, 3, 31, tzinfo=tz) + ds = _run_date_range( + pf, + zone="GB", + psr_types=["Solar", "Wind"], + init_hour=9, + tz_name=tz_name, + start=start, + end=end, + ) + + times = ds.indexes["time"] + assert isinstance(times.dtype, pd.DatetimeTZDtype) + assert times.is_monotonic_increasing + assert times.is_unique + # The DST transition day is present and the series is continuous. + days = {t.date() for t in times} + assert datetime(2026, 3, 29).date() in days + + +def test_day_ahead_date_range_handles_fall_back_dst(monkeypatch): + """End-to-end stitch across the fall-back transition (clocks -1h). + + Europe/Berlin falls back on 2025-10-26 (+02:00 -> +01:00), repeating the + 02:00 hour. The stitched index must stay unique and ordered. + """ + pf = JuaClient().power_forecast + tz_name = "Europe/Berlin" + tz = ZoneInfo(tz_name) + _patch_post(monkeypatch, pf, "DE", ["Solar"], tz) + + start = datetime(2025, 10, 24, tzinfo=tz) + end = datetime(2025, 10, 28, tzinfo=tz) + ds = _run_date_range( + pf, + zone="DE", + psr_types=["Solar"], + init_hour=9, + tz_name=tz_name, + start=start, + end=end, + ) + + times = ds.indexes["time"] + assert times.is_unique, "fall-back repeated hour must not duplicate the index" + assert times.is_monotonic_increasing + days = {t.date() for t in times} + assert datetime(2025, 10, 26).date() in days + + +def test_day_ahead_date_range_includes_leap_day(monkeypatch): + """A range spanning Feb 29 (2024) builds and stitches the leap day.""" + pf = JuaClient().power_forecast + tz_name = "UTC" + tz = ZoneInfo(tz_name) + _patch_post(monkeypatch, pf, "DE", ["Solar"], tz) + + start = datetime(2024, 2, 28, tzinfo=tz) + end = datetime(2024, 3, 2, tzinfo=tz) # valid days: 28, 29, 1 -> 3 days + ds = _run_date_range( + pf, + zone="DE", + psr_types=["Solar"], + init_hour=9, + tz_name=tz_name, + start=start, + end=end, + ) + + times = ds.indexes["time"] + assert times.is_unique + days = {t.date() for t in times} + assert datetime(2024, 2, 29).date() in days + assert days == { + datetime(2024, 2, 28).date(), + datetime(2024, 2, 29).date(), + datetime(2024, 3, 1).date(), + } + + +def test_day_ahead_date_range_crosses_year_boundary(monkeypatch): + """A range spanning New Year stitches across the Dec 31 -> Jan 1 rollover.""" + pf = JuaClient().power_forecast + tz_name = "UTC" + tz = ZoneInfo(tz_name) + _patch_post(monkeypatch, pf, "DE", ["Solar"], tz) + + start = datetime(2025, 12, 31, tzinfo=tz) + end = datetime(2026, 1, 2, tzinfo=tz) # valid days: 12-31, 01-01 -> 2 days + ds = _run_date_range( + pf, + zone="DE", + psr_types=["Solar"], + init_hour=9, + tz_name=tz_name, + start=start, + end=end, + ) + + times = ds.indexes["time"] + assert times.is_unique + assert times.is_monotonic_increasing + years = {t.year for t in times} + assert years == {2025, 2026}