diff --git a/internal/server/dummy/dataserverimpl.go b/internal/server/dummy/dataserverimpl.go index 4c7b2e8..9825521 100644 --- a/internal/server/dummy/dataserverimpl.go +++ b/internal/server/dummy/dataserverimpl.go @@ -568,6 +568,32 @@ func (d *DataPlatformDataServiceServerImpl) GetObservationsAsTimeseries( }, nil } +func (d *DataPlatformDataServiceServerImpl) GetObservationsAtTimestamp( + ctx context.Context, + req *pb.GetObservationsAtTimestampRequest, +) (*pb.GetObservationsAtTimestampResponse, error) { + values := make([]*pb.GetObservationsAtTimestampResponse_Value, len(req.LocationUuids)) + for i := range values { + ll := randomUkLngLat() + sd := determineIrradiance(req.TimestampUtc.AsTime(), ll) + + values[i] = &pb.GetObservationsAtTimestampResponse_Value{ + LocationUuid: req.LocationUuids[i], + Latlng: &pb.LatLng{ + Latitude: float32(ll.latDegs), + Longitude: float32(ll.lonDegs), + }, + ValueFraction: float32(sd.normalizedIrradiance()), + EffectiveCapacityWatts: 150e6, + } + } + + return &pb.GetObservationsAtTimestampResponse{ + TimestampUtc: req.TimestampUtc, + Values: values, + }, nil +} + // GetWeekAverageDeltas implements dp.DataPlatformDataServiceServer. func (d *DataPlatformDataServiceServerImpl) GetWeekAverageDeltas( ctx context.Context, diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go index 6445d9a..7a0709e 100644 --- a/internal/server/postgres/dataserverimpl.go +++ b/internal/server/postgres/dataserverimpl.go @@ -1031,6 +1031,75 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAtTimestamp( }, nil } +func (s *DataPlatformDataServiceServerImpl) GetObservationsAtTimestamp( + ctx context.Context, + req *pb.GetObservationsAtTimestampRequest, +) (*pb.GetObservationsAtTimestampResponse, error) { + l := zerolog.Ctx(ctx) + + querier := db.New(ix.GetTxFromContext(ctx)) + + // Set default timestamp to now if not provided + if req.TimestampUtc == nil { + req.TimestampUtc = timestamppb.New(time.Now().UTC().Truncate(time.Minute)) + } + + locUuids := make([]uuid.UUID, len(req.LocationUuids)) + for i, locStr := range req.LocationUuids { + locUuids[i] = uuid.MustParse(locStr) + } + + // Check that the observer exists + obprms := db.GetObserverByNameParams{ObserverName: req.ObserverName} + + dbObserver, err := querier.GetObserverByName(ctx, obprms) + if err != nil { + l.Err(err).Msgf("querier.GetObserverByName(%+v)", obprms) + + return nil, status.Errorf( + codes.NotFound, + "No observer of name '%s' found. Choose an existing observer or create a new one.", + req.ObserverName, + ) + } + + loprms := db.ListObservationsAtTimeForLocationsParams{ + GeometryUuids: locUuids, + SourceTypeID: int16(req.EnergySource), + ObserverUuid: dbObserver.ObserverUuid, + TargetTimestampUtc: pgtype.Timestamp{Time: req.TimestampUtc.AsTime(), Valid: true}, + } + + dbObs, err := querier.ListObservationsAtTimeForLocations(ctx, loprms) + if err != nil { + l.Err(err).Msgf("querier.ListObservationsAtTimeForLocations(%+v)", loprms) + + return nil, status.Errorf( + codes.NotFound, + "No observations found for the specified locations at the given time. "+ + "Ensure the observer exists, and that the location is operational.", + ) + } + + observations := make([]*pb.GetObservationsAtTimestampResponse_Value, len(dbObs)) + for i, obs := range dbObs { + observations[i] = &pb.GetObservationsAtTimestampResponse_Value{ + ValueFraction: float32(obs.ValueSip) / 30000.0, + EffectiveCapacityWatts: uint64(obs.CapacityWatts), + LocationUuid: obs.GeometryUuid.String(), + Latlng: &pb.LatLng{ + Latitude: obs.Latitude, + Longitude: obs.Longitude, + }, + } + } + + return &pb.GetObservationsAtTimestampResponse{ + TimestampUtc: req.TimestampUtc, + Values: observations, + }, nil +} + func (s *DataPlatformDataServiceServerImpl) GetLocation( ctx context.Context, req *pb.GetLocationRequest, diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go index 1888737..905e804 100644 --- a/internal/server/postgres/dataserverimpl_test.go +++ b/internal/server/postgres/dataserverimpl_test.go @@ -574,6 +574,124 @@ func TestGetForecastAtTimestamp(t *testing.T) { } } +func TestGetObservationsAtTimestamp(t *testing.T) { + pivotTime := time.Date(2025, 2, 26, 12, 0, 0, 0, time.UTC) + // Create an observer + observerResp, err := dc.CreateObserver(t.Context(), &pb.CreateObserverRequest{ + Name: "test_get_observations_at_timestamp_observer", + }) + require.NoError(t, err) + // Create a few sites to attach the observations to + metadata, err := structpb.NewStruct(map[string]any{"source": "test"}) + require.NoError(t, err) + + siteUuids := make([]string, 3) + for i := range siteUuids { + capacity := uint64(1000000 + i*100000) + siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{ + LocationName: fmt.Sprintf("test_get_observations_at_timestamp_site_%d", i), + GeometryWkt: fmt.Sprintf( + "POINT(%f %f)", + -0.1+float32(i)*0.01, + 51.5+float32(i)*0.01, + ), + EffectiveCapacityWatts: capacity, + Metadata: metadata, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + LocationType: pb.LocationType_LOCATION_TYPE_SITE, + ValidFromUtc: timestamppb.New(pivotTime.Add(-time.Hour * 1)), + }) + require.NoError(t, err) + siteUuids[i] = siteResp.LocationUuid + + req := &pb.CreateObservationsRequest{ + LocationUuid: siteResp.LocationUuid, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + ObserverName: observerResp.ObserverName, + Values: []*pb.CreateObservationsRequest_Value{ + { + ValueWatts: uint64(capacity / 10), + TimestampUtc: timestamppb.New(pivotTime), + }, + }, + } + _, err = dc.CreateObservations(t.Context(), req) + require.NoError(t, err) + } + + testcases := []struct { + name string + req *pb.GetObservationsAtTimestampRequest + expectedFractions []float32 + }{ + { + name: "Should get observation at exact timestamp for single location", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: []string{siteUuids[0]}, + ObserverName: observerResp.ObserverName, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime), + }, + expectedFractions: []float32{0.1}, + }, + { + name: "Should get observation at exact timestamp for multiple locations", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: siteUuids, + ObserverName: observerResp.ObserverName, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime), + }, + expectedFractions: []float32{0.1, 0.1, 0.1}, + }, + { + name: "Should return no observations where no values exist at timestamp", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: siteUuids, + ObserverName: observerResp.ObserverName, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime.Add(5 * time.Minute)), + }, + }, + { + name: "Should return no observations for non-existent location", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: []string{uuid.New().String()}, + ObserverName: observerResp.ObserverName, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime), + }, + }, + { + name: "Shouldn't return observations for non-existent observer", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: siteUuids, + ObserverName: "non_existent_observer", + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime), + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + resp, err := dc.GetObservationsAtTimestamp(t.Context(), tc.req) + if strings.Contains(tc.name, "Shouldn't") { + require.Error(t, err) + } else { + + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Values, len(tc.expectedFractions)) + + for i, obs := range resp.Values { + require.Equal(t, tc.expectedFractions[i], obs.ValueFraction) + } + } + }) + } +} + func TestGetLocation(t *testing.T) { // Create a location to fetch metadata, err := structpb.NewStruct(map[string]any{"source": "test"}) diff --git a/internal/server/postgres/sql/queries/observations.sql b/internal/server/postgres/sql/queries/observations.sql index c89f89f..c9fd58d 100644 --- a/internal/server/postgres/sql/queries/observations.sql +++ b/internal/server/postgres/sql/queries/observations.sql @@ -113,3 +113,27 @@ SELECT capacity_watts FROM ranked_observations WHERE rn = 1; + +-- name: ListObservationsAtTimeForLocations :many +/* ListObservationsAtTimeForLocations retrieves observed generation values as percentages + * of capacity for a specific time. + * This is useful for comparing observations across multiple locations. + * Observed values are 16-bit integers, with 0 representing 0% and 30000 representing 100% of capacity. + */ +SELECT + og.geometry_uuid, + og.source_type_id, + og.observation_timestamp_utc, + og.value_sip, + sh.capacity_watts, + sh.latitude, + sh.longitude, + sh.geometry_name +FROM obs.observed_generation_values AS og + INNER JOIN loc.sources_mv AS sh USING (geometry_uuid, source_type_id) +WHERE + og.geometry_uuid = ANY(sqlc.arg(geometry_uuids)::UUID []) + AND og.source_type_id = $1 + AND og.observer_uuid = $2 + AND og.observation_timestamp_utc = sqlc.arg(target_timestamp_utc)::TIMESTAMP + AND sh.sys_period @> og.observation_timestamp_utc; diff --git a/proto/ocf/dp/dp-data.messages.proto b/proto/ocf/dp/dp-data.messages.proto index 59be3c8..0b9774b 100644 --- a/proto/ocf/dp/dp-data.messages.proto +++ b/proto/ocf/dp/dp-data.messages.proto @@ -161,6 +161,44 @@ message GetForecastAtTimestampResponse { } +message GetObservationsAtTimestampRequest { + repeated string location_uuids = 1 [ + (buf.validate.field).repeated.min_items = 1, + (buf.validate.field).repeated.max_items = 1000, + (buf.validate.field).repeated.unique = true, + (buf.validate.field).repeated.items = { + string: {uuid: true} + } + ]; + EnergySource energy_source = 2 [ + (buf.validate.field).required = true + ]; + string observer_name = 3 [ + (buf.validate.field).required = true, + (buf.validate.field).string.(valid_observer_name) = true + ]; + /* The time to fetch observations for. + * If not specified, the current time will be used. + */ + optional google.protobuf.Timestamp timestamp_utc = 4 [ + (buf.validate.field).timestamp = { gt: { seconds: 112000000} } + ]; +} + +message GetObservationsAtTimestampResponse { + message Value { + string location_uuid = 1; + float value_fraction = 2; + uint64 effective_capacity_watts = 3; + LatLng latlng = 4; + google.protobuf.Struct metadata = 5; + } + + google.protobuf.Timestamp timestamp_utc = 1; + repeated Value values = 2; +} + + message GetLatestForecastsRequest { string location_uuid = 1 [ (buf.validate.field).required = true, diff --git a/proto/ocf/dp/dp-data.service.proto b/proto/ocf/dp/dp-data.service.proto index df61c8b..f8349e6 100644 --- a/proto/ocf/dp/dp-data.service.proto +++ b/proto/ocf/dp/dp-data.service.proto @@ -22,6 +22,10 @@ service DataPlatformDataService { * (i.e. many points in time, one single location). */ rpc GetObservationsAsTimeseries(GetObservationsAsTimeseriesRequest) returns (GetObservationsAsTimeseriesResponse) {} + /* GetObservationAtTimestamp fetches a vertical slice of observed data. + * (i.e. many locations, at a single point in time). This is handy for e.g. display on a map. + */ + rpc GetObservationsAtTimestamp(GetObservationsAtTimestampRequest) returns (GetObservationsAtTimestampResponse) {} /* GetLocation fetches detailed information about a specific location. */ rpc GetLocation(GetLocationRequest) returns (GetLocationResponse) {}