Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions internal/server/dummy/dataserverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,32 @@ func (d *DataPlatformDataServiceServerImpl) GetObservationsAsTimeseries(
}, nil
}

func (d *DataPlatformDataServiceServerImpl) GetObservationsAtTimestamp(
ctx context.Context,
req *pb.GetObservationsAtTimestampRequest,
) (*pb.GetObservationsAtTimestampResponse, error) {
values := make([]*pb.GetObservationsAtTimestampResponse_Value, len(req.LocationUuids))
for i := range values {
ll := randomUkLngLat()
sd := determineIrradiance(req.TimestampUtc.AsTime(), ll)

values[i] = &pb.GetObservationsAtTimestampResponse_Value{
LocationUuid: req.LocationUuids[i],
Latlng: &pb.LatLng{
Latitude: float32(ll.latDegs),
Longitude: float32(ll.lonDegs),
},
ValueFraction: float32(sd.normalizedIrradiance()),
EffectiveCapacityWatts: 150e6,
}
}

return &pb.GetObservationsAtTimestampResponse{
TimestampUtc: req.TimestampUtc,
Values: values,
}, nil
}

// GetWeekAverageDeltas implements dp.DataPlatformDataServiceServer.
func (d *DataPlatformDataServiceServerImpl) GetWeekAverageDeltas(
ctx context.Context,
Expand Down
69 changes: 69 additions & 0 deletions internal/server/postgres/dataserverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,75 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAtTimestamp(
}, nil
}

func (s *DataPlatformDataServiceServerImpl) GetObservationsAtTimestamp(
ctx context.Context,
req *pb.GetObservationsAtTimestampRequest,
) (*pb.GetObservationsAtTimestampResponse, error) {
l := zerolog.Ctx(ctx)

querier := db.New(ix.GetTxFromContext(ctx))

// Set default timestamp to now if not provided
if req.TimestampUtc == nil {
req.TimestampUtc = timestamppb.New(time.Now().UTC().Truncate(time.Minute))
}

locUuids := make([]uuid.UUID, len(req.LocationUuids))
for i, locStr := range req.LocationUuids {
locUuids[i] = uuid.MustParse(locStr)
}

// Check that the observer exists
obprms := db.GetObserverByNameParams{ObserverName: req.ObserverName}

dbObserver, err := querier.GetObserverByName(ctx, obprms)
if err != nil {
l.Err(err).Msgf("querier.GetObserverByName(%+v)", obprms)

return nil, status.Errorf(
codes.NotFound,
"No observer of name '%s' found. Choose an existing observer or create a new one.",
req.ObserverName,
)
}

loprms := db.ListObservationsAtTimeForLocationsParams{
GeometryUuids: locUuids,
SourceTypeID: int16(req.EnergySource),
ObserverUuid: dbObserver.ObserverUuid,
TargetTimestampUtc: pgtype.Timestamp{Time: req.TimestampUtc.AsTime(), Valid: true},
}

dbObs, err := querier.ListObservationsAtTimeForLocations(ctx, loprms)
if err != nil {
l.Err(err).Msgf("querier.ListObservationsAtTimeForLocations(%+v)", loprms)

return nil, status.Errorf(
codes.NotFound,
"No observations found for the specified locations at the given time. "+
"Ensure the observer exists, and that the location is operational.",
)
}

observations := make([]*pb.GetObservationsAtTimestampResponse_Value, len(dbObs))
for i, obs := range dbObs {
observations[i] = &pb.GetObservationsAtTimestampResponse_Value{
ValueFraction: float32(obs.ValueSip) / 30000.0,
EffectiveCapacityWatts: uint64(obs.CapacityWatts),
LocationUuid: obs.GeometryUuid.String(),
Latlng: &pb.LatLng{
Latitude: obs.Latitude,
Longitude: obs.Longitude,
},
}
}

return &pb.GetObservationsAtTimestampResponse{
TimestampUtc: req.TimestampUtc,
Values: observations,
}, nil
}

func (s *DataPlatformDataServiceServerImpl) GetLocation(
ctx context.Context,
req *pb.GetLocationRequest,
Expand Down
118 changes: 118 additions & 0 deletions internal/server/postgres/dataserverimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,124 @@ func TestGetForecastAtTimestamp(t *testing.T) {
}
}

func TestGetObservationsAtTimestamp(t *testing.T) {
pivotTime := time.Date(2025, 2, 26, 12, 0, 0, 0, time.UTC)
// Create an observer
observerResp, err := dc.CreateObserver(t.Context(), &pb.CreateObserverRequest{
Name: "test_get_observations_at_timestamp_observer",
})
require.NoError(t, err)
// Create a few sites to attach the observations to
metadata, err := structpb.NewStruct(map[string]any{"source": "test"})
require.NoError(t, err)

siteUuids := make([]string, 3)
for i := range siteUuids {
capacity := uint64(1000000 + i*100000)
siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{
LocationName: fmt.Sprintf("test_get_observations_at_timestamp_site_%d", i),
GeometryWkt: fmt.Sprintf(
"POINT(%f %f)",
-0.1+float32(i)*0.01,
51.5+float32(i)*0.01,
),
EffectiveCapacityWatts: capacity,
Metadata: metadata,
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
LocationType: pb.LocationType_LOCATION_TYPE_SITE,
ValidFromUtc: timestamppb.New(pivotTime.Add(-time.Hour * 1)),
})
require.NoError(t, err)
siteUuids[i] = siteResp.LocationUuid

req := &pb.CreateObservationsRequest{
LocationUuid: siteResp.LocationUuid,
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
ObserverName: observerResp.ObserverName,
Values: []*pb.CreateObservationsRequest_Value{
{
ValueWatts: uint64(capacity / 10),
TimestampUtc: timestamppb.New(pivotTime),
},
},
}
_, err = dc.CreateObservations(t.Context(), req)
require.NoError(t, err)
}

testcases := []struct {
name string
req *pb.GetObservationsAtTimestampRequest
expectedFractions []float32
}{
{
name: "Should get observation at exact timestamp for single location",
req: &pb.GetObservationsAtTimestampRequest{
LocationUuids: []string{siteUuids[0]},
ObserverName: observerResp.ObserverName,
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
TimestampUtc: timestamppb.New(pivotTime),
},
expectedFractions: []float32{0.1},
},
{
name: "Should get observation at exact timestamp for multiple locations",
req: &pb.GetObservationsAtTimestampRequest{
LocationUuids: siteUuids,
ObserverName: observerResp.ObserverName,
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
TimestampUtc: timestamppb.New(pivotTime),
},
expectedFractions: []float32{0.1, 0.1, 0.1},
},
{
name: "Should return no observations where no values exist at timestamp",
req: &pb.GetObservationsAtTimestampRequest{
LocationUuids: siteUuids,
ObserverName: observerResp.ObserverName,
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
TimestampUtc: timestamppb.New(pivotTime.Add(5 * time.Minute)),
},
},
{
name: "Should return no observations for non-existent location",
req: &pb.GetObservationsAtTimestampRequest{
LocationUuids: []string{uuid.New().String()},
ObserverName: observerResp.ObserverName,
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
TimestampUtc: timestamppb.New(pivotTime),
},
},
{
name: "Shouldn't return observations for non-existent observer",
req: &pb.GetObservationsAtTimestampRequest{
LocationUuids: siteUuids,
ObserverName: "non_existent_observer",
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
TimestampUtc: timestamppb.New(pivotTime),
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
resp, err := dc.GetObservationsAtTimestamp(t.Context(), tc.req)
if strings.Contains(tc.name, "Shouldn't") {
require.Error(t, err)
} else {

require.NoError(t, err)
require.NotNil(t, resp)
require.Len(t, resp.Values, len(tc.expectedFractions))

for i, obs := range resp.Values {
require.Equal(t, tc.expectedFractions[i], obs.ValueFraction)
}
}
})
}
}

func TestGetLocation(t *testing.T) {
// Create a location to fetch
metadata, err := structpb.NewStruct(map[string]any{"source": "test"})
Expand Down
24 changes: 24 additions & 0 deletions internal/server/postgres/sql/queries/observations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,27 @@ SELECT
capacity_watts
FROM ranked_observations
WHERE rn = 1;

-- name: ListObservationsAtTimeForLocations :many
/* ListObservationsAtTimeForLocations retrieves observed generation values as percentages
* of capacity for a specific time.
* This is useful for comparing observations across multiple locations.
* Observed values are 16-bit integers, with 0 representing 0% and 30000 representing 100% of capacity.
*/
SELECT
og.geometry_uuid,
og.source_type_id,
og.observation_timestamp_utc,
og.value_sip,
sh.capacity_watts,
sh.latitude,
sh.longitude,
sh.geometry_name
FROM obs.observed_generation_values AS og
INNER JOIN loc.sources_mv AS sh USING (geometry_uuid, source_type_id)
WHERE
og.geometry_uuid = ANY(sqlc.arg(geometry_uuids)::UUID [])
AND og.source_type_id = $1
AND og.observer_uuid = $2
AND og.observation_timestamp_utc = sqlc.arg(target_timestamp_utc)::TIMESTAMP
AND sh.sys_period @> og.observation_timestamp_utc;
38 changes: 38 additions & 0 deletions proto/ocf/dp/dp-data.messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,44 @@ message GetForecastAtTimestampResponse {
}


message GetObservationsAtTimestampRequest {
repeated string location_uuids = 1 [
(buf.validate.field).repeated.min_items = 1,
(buf.validate.field).repeated.max_items = 1000,
(buf.validate.field).repeated.unique = true,
(buf.validate.field).repeated.items = {
string: {uuid: true}
}
];
EnergySource energy_source = 2 [
(buf.validate.field).required = true
];
string observer_name = 3 [
(buf.validate.field).required = true,
(buf.validate.field).string.(valid_observer_name) = true
];
/* The time to fetch observations for.
* If not specified, the current time will be used.
*/
optional google.protobuf.Timestamp timestamp_utc = 4 [
(buf.validate.field).timestamp = { gt: { seconds: 112000000} }
];
}

message GetObservationsAtTimestampResponse {
message Value {
string location_uuid = 1;
float value_fraction = 2;
uint64 effective_capacity_watts = 3;
LatLng latlng = 4;
google.protobuf.Struct metadata = 5;
}

google.protobuf.Timestamp timestamp_utc = 1;
repeated Value values = 2;
}


message GetLatestForecastsRequest {
string location_uuid = 1 [
(buf.validate.field).required = true,
Expand Down
4 changes: 4 additions & 0 deletions proto/ocf/dp/dp-data.service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ service DataPlatformDataService {
* (i.e. many points in time, one single location).
*/
rpc GetObservationsAsTimeseries(GetObservationsAsTimeseriesRequest) returns (GetObservationsAsTimeseriesResponse) {}
/* GetObservationAtTimestamp fetches a vertical slice of observed data.
* (i.e. many locations, at a single point in time). This is handy for e.g. display on a map.
*/
rpc GetObservationsAtTimestamp(GetObservationsAtTimestampRequest) returns (GetObservationsAtTimestampResponse) {}

/* GetLocation fetches detailed information about a specific location. */
rpc GetLocation(GetLocationRequest) returns (GetLocationResponse) {}
Expand Down
Loading