Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions src/dataplatform/forecast/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
from datetime import datetime, timedelta

from grpc_requests import Client
import betterproto
import pandas as pd
from aiocache import Cache, cached
Expand All @@ -14,6 +15,7 @@

async def get_forecast_data(
client: dp.DataPlatformDataServiceStub,
sync_client: Client,
location: dp.ListLocationsResponseLocationSummary,
start_date: datetime,
end_date: datetime,
Expand All @@ -24,7 +26,7 @@ async def get_forecast_data(

for forecaster in selected_forecasters:
forecaster_data_df = await get_forecast_data_one_forecaster(
client,
sync_client,
location,
start_date,
end_date,
Expand Down Expand Up @@ -63,7 +65,7 @@ async def get_forecast_data(

@cached(ttl=cache_seconds, cache=Cache.MEMORY, key_builder=key_builder_remove_client)
async def get_forecast_data_one_forecaster(
client: dp,
sync_client: Client,
location: dp.ListLocationsResponseLocationSummary,
start_date: datetime,
end_date: datetime,
Expand All @@ -78,20 +80,21 @@ async def get_forecast_data_one_forecaster(
temp_end_date = min(temp_start_date + timedelta(days=30), end_date)

# fetch data
stream_forecast_data_request = dp.StreamForecastDataRequest(
location_uuid=location.location_uuid,
energy_source=dp.EnergySource.SOLAR,
time_window=dp.TimeWindow(
start_timestamp_utc=temp_start_date,
end_timestamp_utc=temp_end_date,
),
forecasters=[selected_forecaster],
)
stream_forecast_data_request = {
"location_uuid": location.location_uuid,
"energy_source": dp.EnergySource.SOLAR.value,
"time_window": {
"start_timestamp_utc": temp_start_date.isoformat(),
"end_timestamp_utc": temp_end_date.isoformat(),
},
"forecasters": [selected_forecaster.to_dict()],
}

svc = sync_client.service("ocf.dp.DataPlatformDataService")

forecasts = []
async for chunk in client.stream_forecast_data(stream_forecast_data_request):
forecasts.append(
chunk.to_dict(include_default_values=True, casing=betterproto.Casing.SNAKE),
)
for chunk in svc.StreamForecastData(stream_forecast_data_request):
forecasts.append(chunk)

if len(forecasts) > 0:
all_data_list_dict.extend(forecasts)
Expand Down Expand Up @@ -199,6 +202,7 @@ async def get_all_observations(

async def get_all_data(
client: dp.DataPlatformDataServiceStub,
sync_client: Client,
selected_location: dp.ListLocationsResponseLocationSummary,
start_date: datetime,
end_date: datetime,
Expand All @@ -219,6 +223,7 @@ async def get_all_data(
time_start = time.time()
all_forecast_data_df = await get_forecast_data(
client,
sync_client,
selected_location,
start_date,
end_date,
Expand Down
5 changes: 5 additions & 0 deletions src/dataplatform/forecast/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import streamlit as st
from dp_sdk.ocf import dp
from grpclib.client import Channel
from grpc_requests import Client as GRPC_Client

from dataplatform.forecast.constant import metrics, observer_names
from dataplatform.forecast.data import align_t0, get_all_data
Expand Down Expand Up @@ -34,6 +35,9 @@ async def async_dp_forecast_page() -> None:
async with Channel(host=data_platform_host, port=data_platform_port) as channel:
client = dp.DataPlatformDataServiceStub(channel)

# this is used to streamline requests
sync_client = GRPC_Client.get_by_endpoint(f"{data_platform_host}:{data_platform_port}")

setup_page_dict = await setup_page(client)
selected_location = setup_page_dict["selected_location"]
start_date = setup_page_dict["start_date"]
Expand All @@ -51,6 +55,7 @@ async def async_dp_forecast_page() -> None:
### 1. Get all the data ###
all_data_dict = await get_all_data(
client=client,
sync_client=sync_client,
start_date=start_date,
end_date=end_date,
selected_forecasters=selected_forecasters,
Expand Down
Loading