Skip to content
69 changes: 69 additions & 0 deletions examples/market_data/market_data_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Example demonstrating the zone-addressed market_data API.

Shows how to query European power market data (renewable generation, load,
day-ahead forecasts, and prices) by market zone using a single unified
vocabulary, without caring which underlying source serves each zone.
"""

from datetime import datetime, timedelta, timezone

from jua import JuaClient


def main():
client = JuaClient()
md = client.market_data

# --- Discovery ---
print("Available zones:")
print(f" {md.get_zones()}")
print()

print("Available variables (all zones):")
print(f" {md.get_variables()}")
print()

print("Available variables for GB:")
print(f" {md.get_variables(market_zone='GB')}")
print()

end = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
start = end - timedelta(days=2)

# --- Germany: renewables + day-ahead price (served from ENTSOE) ---
print("DE solar/wind + day-ahead prices:")
de = md.get_data(
market_zone="DE",
variables=["solar", "wind", "day_ahead_prices"],
start_time=start,
end_time=end,
time_zone="Europe/Berlin",
)
print(de.head())
print()

# --- Great Britain: renewables (served from the UK power feed) ---
print("GB solar/wind:")
gb = md.get_data(
market_zone="GB",
variables=["solar", "wind"],
start_time=start,
end_time=end,
time_zone="Europe/London",
)
print(gb.head())
print()

# --- Combined request across zones in one call ---
print("Combined DE + GB solar:")
combined = md.get_data(
market_zone=["DE", "GB"],
variables=["solar"],
start_time=start,
end_time=end,
)
print(combined.groupby("market_zone")["value"].describe())


if __name__ == "__main__":
main()
50 changes: 50 additions & 0 deletions examples/market_data/query_actuals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Query realised (actual) power market data for Germany and Great Britain.

Uses the zone-addressed ``market_data`` API to pull observed solar generation,
wind generation, and load for ``DE`` and ``GB`` over a recent window. The same
unified call works for both zones even though they are served by different
underlying sources (ENTSO-E for DE, the UK-power feed for GB).
"""

from datetime import datetime, timedelta, timezone

from jua import JuaClient

# Realised quantities (no forecasts, no prices) and the local time zone to
# return each zone's ``time`` column in.
ACTUAL_VARIABLES = ["solar", "wind", "load"]
ZONE_TIME_ZONE = {
"DE": "Europe/Berlin",
"GB": "Europe/London",
}


def main():
client = JuaClient()
md = client.market_data

end = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
start = end - timedelta(days=2)

for zone, time_zone in ZONE_TIME_ZONE.items():
print(f"=== {zone} actuals ({time_zone}) ===")
df = md.get_data(
market_zone=zone,
variables=ACTUAL_VARIABLES,
start_time=start,
end_time=end,
time_zone=time_zone,
)
if df.empty:
print(" No data returned.\n")
continue

print(df.head())
print()
print("Mean by variable:")
print(df.groupby("variable")["value"].mean().round(1))
print()


if __name__ == "__main__":
main()
21 changes: 20 additions & 1 deletion src/jua/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ class JuaClient:
settings: Configuration settings for the API client.
weather: Property that provides access to weather data services.
market_aggregates: Property that provides access to market aggregate services.
market_data: Property that provides access to zone-addressed market data
(renewable generation, load, day-ahead forecasts, and prices).

Examples:
>>> from jua import JuaClient
>>> client = JuaClient()
>>> # Access weather services
>>> forecast_model = client.weather.get_model(...)
>>> # Access market aggregates
>>> market_data = client.market_aggregates.compare_runs(...)
>>> aggregates = client.market_aggregates.compare_runs(...)
>>> # Access market data by zone
>>> df = client.market_data.get_data(market_zone="DE", ...)
"""

@validate_call
Expand All @@ -45,6 +49,7 @@ def __init__(
self._weather = None
self._market_aggregates = None
self._power_forecast = None
self._market_data = None

if jua_log_level is not None:
logging.getLogger("jua").setLevel(jua_log_level)
Expand Down Expand Up @@ -89,6 +94,20 @@ def power_forecast(self):
self._power_forecast = PowerForecast(self)
return self._power_forecast

@property
def market_data(self):
"""Access to Jua's zone-addressed market data services.

Returns:
MarketData client interface for querying renewable generation,
load, day-ahead forecasts, and prices by market zone.
"""
if self._market_data is None:
from jua.market_data import MarketData

self._market_data = MarketData(self)
return self._market_data

def __enter__(self):
return self

Expand Down
5 changes: 5 additions & 0 deletions src/jua/market_data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from jua.market_data.market_data import MarketData

__all__ = [
"MarketData",
]
168 changes: 168 additions & 0 deletions src/jua/market_data/_entsoe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""Internal ENTSOE backend for the unified market_data interface.

Wraps the Query Engine ``/v1/entsoe`` endpoints and normalizes responses into
the unified ``[time, market_zone, variable, value, unit]`` schema. SDK users do
not interact with this module; they go through :class:`jua.market_data.MarketData`.
"""

from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING

import pandas as pd

from jua._api import QueryEngineAPI
from jua._utils.remove_none_from_dict import remove_none_from_dict
from jua.market_data import _mapping
from jua.market_data._frame import UNIFIED_COLUMNS, decode_columnar, parse_time

if TYPE_CHECKING:
from jua.client import JuaClient


class _EntsoeBackend:
"""Fetches and normalizes ENTSOE timeseries for a single market zone."""

def __init__(self, client: JuaClient) -> None:
self._client = client
self._api = QueryEngineAPI(jua_client=client)

def fetch(
self,
market_zone: str,
variables: list[_mapping.MarketVariable],
*,
start_time: datetime,
end_time: datetime | None,
time_zone: str | None,
) -> pd.DataFrame:
"""Fetch unified ``variables`` for one zone from ENTSOE.

Variables that resolve to the same native ENTSOE variable (e.g. solar
and wind both come from ``generation_actual``) are fetched in a single
request and split apart afterwards. PSR-based variables are summed per
timestamp (wind = onshore + offshore).

Returns:
DataFrame with the unified columns, or empty if no data.
"""
default_zone = _mapping.entsoe_zone(market_zone)

# Group unified variables by (native ENTSOE variable, effective zone)
# so each native query is issued at most once. Most variables use the
# zone's default code, but some are published under a different
# control-area code (e.g. DE imbalance prices live under "DE").
groups: dict[tuple[str, str], list[_mapping.MarketVariable]] = {}
for variable in variables:
binding = _mapping.resolve(market_zone, variable.value).entsoe
assert binding is not None # routed here, so always set
effective_zone = binding.zone_override or default_zone
groups.setdefault((binding.variable, effective_zone), []).append(variable)

frames: list[pd.DataFrame] = []
for (native_variable, effective_zone), unified_vars in groups.items():
raw = self._fetch_native(
native_variable=native_variable,
unified_vars=unified_vars,
entsoe_zone=effective_zone,
start_time=start_time,
end_time=end_time,
time_zone=time_zone,
)
if raw.empty:
continue
for variable in unified_vars:
frame = self._normalize_variable(raw, market_zone, variable)
if not frame.empty:
frames.append(frame)

if not frames:
return pd.DataFrame(columns=UNIFIED_COLUMNS)
return pd.concat(frames, ignore_index=True)

def _fetch_native(
self,
*,
native_variable: str,
unified_vars: list[_mapping.MarketVariable],
entsoe_zone: str,
start_time: datetime,
end_time: datetime | None,
time_zone: str | None,
) -> pd.DataFrame:
"""Request a single native ENTSOE variable and parse the response."""
psr_types: list[str] = []
for variable in unified_vars:
binding = _mapping._ENTSOE_BINDINGS[variable]
psr_types.extend(binding.psr_types)

body = remove_none_from_dict(
{
"variables": [native_variable],
"zone_keys": [entsoe_zone],
"psr_types": sorted(set(psr_types)) or None,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat() if end_time else None,
"time_zone": time_zone,
}
)
response = self._api.post("entsoe/data", data=body, requires_auth=True)
df = decode_columnar(response.json())
return parse_time(df, time_zone)

@staticmethod
def _normalize_variable(
raw: pd.DataFrame,
market_zone: str,
variable: _mapping.MarketVariable,
) -> pd.DataFrame:
"""Filter to the variable's components and rename to the unified schema.

PSR-based variables (wind = onshore + offshore) are summed per
timestamp. Direction-split variables (imbalance Long/Short) are filtered
to a single ``other_type`` so prices are never summed together.
"""
binding = _mapping._ENTSOE_BINDINGS[variable]

df = raw
if binding.psr_types and "psr_type" in df.columns:
df = df[df["psr_type"].isin(binding.psr_types)]
if binding.other_type is not None and "other_type" in df.columns:
df = df[df["other_type"] == binding.other_type]
if df.empty:
return pd.DataFrame(columns=UNIFIED_COLUMNS)

unit = _first_unit(df)
if binding.psr_types:
# PSR-based variables aggregate their components per timestamp
# (e.g. wind = onshore + offshore).
agg = df.groupby("time", as_index=False, sort=True)["value"].sum(
min_count=1
)
else:
# Non-PSR variables (load, prices) carry exactly one value per
# timestamp once any other_type filter is applied. ENTSO-E can
# publish revision duplicates, so collapse to the last row rather
# than summing them, which would otherwise multiply the price.
agg = df.groupby("time", as_index=False, sort=True)["value"].last()
agg["market_zone"] = market_zone.upper()
agg["variable"] = variable.value
agg["unit"] = unit
return agg[UNIFIED_COLUMNS]

# ------------------------------------------------------------------
# Discovery passthrough (used to validate / surface availability)
# ------------------------------------------------------------------
def available_zones(self) -> list[str]:
"""Return raw ENTSOE zone codes that have data (diagnostic)."""
response = self._api.get("entsoe/zones", requires_auth=True)
return response.json().get("zones", [])


def _first_unit(df: pd.DataFrame) -> str | None:
"""Return the first non-null unit in the frame, if any."""
if "unit" not in df.columns:
return None
units = df["unit"].dropna()
return str(units.iloc[0]) if not units.empty else None
Loading
Loading