From 796adeec2d9a12f3b91ca4a3ccd9631021464de1 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Tue, 17 Mar 2026 14:19:24 +0000 Subject: [PATCH 1/5] fix(sql): Ignore meaningless source updates --- internal/server/postgres/dataserverimpl.go | 30 ++++++++++--- .../server/postgres/dataserverimpl_test.go | 12 +++++ .../server/postgres/sql/queries/locations.sql | 45 +++++++++++++++---- 3 files changed, 72 insertions(+), 15 deletions(-) diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go index 5b4c586..a043c89 100644 --- a/internal/server/postgres/dataserverimpl.go +++ b/internal/server/postgres/dataserverimpl.go @@ -14,6 +14,7 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog" "google.golang.org/grpc" @@ -1242,6 +1243,8 @@ func (s *DataPlatformDataServiceServerImpl) UpdateLocation( } } + refreshIsRequired := req.NewLocationName != nil + // Update the source history with a new entry csprms := db.CreateSourceEntryParams{ GeometryUuid: dbSource.GeometryUuid, @@ -1253,7 +1256,18 @@ func (s *DataPlatformDataServiceServerImpl) UpdateLocation( } dbNewSource, err := querier.CreateSourceEntry(ctx, csprms) - if err != nil { + switch { + case err == nil: + refreshIsRequired = true + + case errors.Is(err, pgx.ErrNoRows): + l.Debug().Msg("capacity and metadata unchanged; skipping insert") + dbNewSource = db.CreateSourceEntryRow{ + CapacityWatts: dbSource.CapacityWatts, + ValidFromUtc: csprms.ValidFromUtc, + } + + default: l.Err(err).Msgf("querier.CreateSourceEntry(%+v)", csprms) return nil, status.Error( @@ -1269,13 +1283,15 @@ func (s *DataPlatformDataServiceServerImpl) UpdateLocation( Str("dp.source.valid_from_utc", dbNewSource.ValidFromUtc.Time.String()). Msg("updated source") - err = querier.RefreshSourcesMaterializedView(ctx) - if err != nil { - l.Err(err).Msg("querier.RefreshSourcesMaterializedView()") - return nil, status.Error(codes.Internal, "Failed to update sources materialised view") - } + if refreshIsRequired { + err = querier.RefreshSourcesMaterializedView(ctx) + if err != nil { + l.Err(err).Msg("querier.RefreshSourcesMaterializedView()") + return nil, status.Error(codes.Internal, "Failed to update sources materialised view") + } - l.Debug().Msg("refreshed sources materialised view") + l.Debug().Msg("refreshed sources materialised view") + } return &pb.UpdateLocationResponse{ LocationUuid: req.LocationUuid, diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go index 00c589b..34b72a3 100644 --- a/internal/server/postgres/dataserverimpl_test.go +++ b/internal/server/postgres/dataserverimpl_test.go @@ -238,6 +238,18 @@ func TestUpdateLocation(t *testing.T) { expectedCapacityWatts uint64 expectedMetadata map[string]any }{ + { + name: "Should return the same when the update doesn't change anything", + req: &pb.UpdateLocationRequest{ + LocationUuid: createResp.LocationUuid, + NewEffectiveCapacityWatts: func() *uint64 { v := uint64(1234e6); return &v }(), + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + ValidFromUtc: timestamppb.New(pivotTime.Add(-4 * time.Hour)), + }, + expectedName: "test_update_location_site", + expectedCapacityWatts: 1234e6, + expectedMetadata: map[string]any{"source": "test"}, + }, { name: "Should update capacity to higher value", req: &pb.UpdateLocationRequest{ diff --git a/internal/server/postgres/sql/queries/locations.sql b/internal/server/postgres/sql/queries/locations.sql index e8aaa52..da9c42a 100644 --- a/internal/server/postgres/sql/queries/locations.sql +++ b/internal/server/postgres/sql/queries/locations.sql @@ -69,7 +69,30 @@ WHERE -- name: CreateSourceEntry :one /* CreateSourceEntry creates a new source entry for a given geometry and source type. + * It fetches the state prior to the input valid time, and only inserts the new row if it differs + * from the previous one. */ +WITH prev_state AS ( + SELECT + sh.capacity_watts, + sh.capacity_limit_sip, + sh.metadata + FROM loc.sources_history AS sh + WHERE sh.geometry_uuid = $1 + AND sh.source_type_id = $2 + AND sh.valid_from_utc <= $3 + ORDER BY sh.valid_from_utc DESC + LIMIT 1 +), +new_state AS ( + SELECT + $1::UUID AS geometry_uuid, + $2::SMALLINT AS source_type_id, + sqlc.arg(capacity_watts)::BIGINT AS capacity_watts, + sqlc.narg(capacity_limit_sip)::SMALLINT AS capacity_limit_sip, + $3::TIMESTAMP AS valid_from_utc, + CASE WHEN sqlc.arg(metadata)::JSONB = '{}'::JSONB THEN NULL ELSE sqlc.arg(metadata)::JSONB END AS metadata +) INSERT INTO loc.sources_history ( geometry_uuid, source_type_id, @@ -77,14 +100,20 @@ INSERT INTO loc.sources_history ( capacity_limit_sip, valid_from_utc, metadata -) VALUES ( - $1, - $2, - $3, - $4, - $5, - CASE WHEN sqlc.arg(metadata)::JSONB = '{}'::JSONB THEN NULL ELSE sqlc.arg(metadata)::JSONB END -) RETURNING geometry_uuid, source_type_id, capacity_watts, valid_from_utc, metadata; +) +SELECT + n.geometry_uuid, + n.source_type_id, + n.capacity_watts, + n.capacity_limit_sip, + n.valid_from_utc, + n.metadata +FROM new_state AS n + LEFT OUTER JOIN prev_state AS p ON TRUE +WHERE p.capacity_watts IS DISTINCT FROM n.capacity_watts + OR p.capacity_limit_sip IS DISTINCT FROM n.capacity_limit_sip + OR p.metadata IS DISTINCT FROM n.metadata +RETURNING geometry_uuid, source_type_id, capacity_watts, valid_from_utc, metadata; -- name: RefreshSourcesMaterializedView :exec REFRESH MATERIALIZED VIEW CONCURRENTLY loc.sources_mv; From d46d638c831d37b66d2268a0b1b841f975297256 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Wed, 18 Mar 2026 16:34:27 +0000 Subject: [PATCH 2/5] fix(core): Return geometry as binary if asked --- internal/server/postgres/dataserverimpl.go | 19 +++++++ .../server/postgres/dataserverimpl_test.go | 54 +++++++++++++++++++ .../server/postgres/sql/queries/locations.sql | 8 +++ 3 files changed, 81 insertions(+) diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go index a043c89..0e3ebb4 100644 --- a/internal/server/postgres/dataserverimpl.go +++ b/internal/server/postgres/dataserverimpl.go @@ -1067,6 +1067,24 @@ func (s *DataPlatformDataServiceServerImpl) GetLocation( Str("dp.source.valid_from_utc", dbSource.SysPeriod.Lower.Time.String()). Msg("found source") + geometry := make([]byte, 0) + geometry = nil + if req.IncludeGeometry { + gwkbprms := db.GetGeometryWKBParams{ + GeometryUuids: []uuid.UUID{dbSource.GeometryUuid}, + } + dbGeometry, err := querier.GetGeometryWKB(ctx, gwkbprms) + if err != nil { + l.Err(err).Msgf("querier.GetGeometryWKB(%+v)", gwkbprms) + + return nil, status.Errorf( + codes.Internal, + "Failed to retrieve geometry for location. See logs for details.", + ) + } + geometry = dbGeometry.GeomWkb + } + return &pb.GetLocationResponse{ LocationUuid: dbSource.GeometryUuid.String(), LocationName: dbSource.GeometryName, @@ -1076,6 +1094,7 @@ func (s *DataPlatformDataServiceServerImpl) GetLocation( }, EffectiveCapacityWatts: uint64(dbSource.CapacityWatts), Metadata: dbSource.MetadataJsonb, + GeometryWkb: geometry, }, nil } diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go index 34b72a3..6c50a18 100644 --- a/internal/server/postgres/dataserverimpl_test.go +++ b/internal/server/postgres/dataserverimpl_test.go @@ -1,6 +1,7 @@ package postgres import ( + "encoding/hex" "encoding/json" "fmt" "maps" @@ -573,6 +574,59 @@ func TestGetForecastAtTimestamp(t *testing.T) { } } +func TestGetLocation(t *testing.T) { + // Create a location to fetch + metadata, err := structpb.NewStruct(map[string]any{"source": "test"}) + require.NoError(t, err) + createResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{ + LocationName: "test_get_location_site", + GeometryWkt: "POLYGON((0 0,0 1,1 1,1 0,0 0))", + EffectiveCapacityWatts: 12e6, + Metadata: metadata, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + LocationType: pb.LocationType_LOCATION_TYPE_GSP, + ValidFromUtc: timestamppb.New(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC)), + }) + require.NoError(t, err) + + testCases := []struct{ + name string + req *pb.GetLocationRequest + }{ + { + name: "Should get location without geometry", + req: &pb.GetLocationRequest{ + LocationUuid: createResp.LocationUuid, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + IncludeGeometry: false, + }, + }, + { + name: "Should get location with geometry", + req: &pb.GetLocationRequest{ + LocationUuid: createResp.LocationUuid, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + IncludeGeometry: true, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + resp, err := dc.GetLocation(t.Context(), tc.req) + require.NoError(t, err) + require.Equal(t, createResp.LocationUuid, resp.LocationUuid) + require.Equal(t, "test_get_location_site", resp.LocationName) + require.Equal(t, uint64(12e6), resp.EffectiveCapacityWatts) + if tc.req.IncludeGeometry{ + expected, err := hex.DecodeString("01030000000100000005000000000000000000000000000000000000000000000000000000000000000000f03f000000000000f03f000000000000f03f000000000000f03f000000000000000000000000000000000000000000000000") + require.NoError(t, err) + require.Equal(t, expected, resp.GeometryWkb) + } + }) + } +} + func TestGetLocationsAsGeoJSON(t *testing.T) { // Create some locations siteUuids := make([]string, 3) diff --git a/internal/server/postgres/sql/queries/locations.sql b/internal/server/postgres/sql/queries/locations.sql index da9c42a..f614d36 100644 --- a/internal/server/postgres/sql/queries/locations.sql +++ b/internal/server/postgres/sql/queries/locations.sql @@ -21,6 +21,14 @@ WHERE l.geometry_uuid = $1 RETURNING l.geometry_uuid, l.geometry_name, ST_X(l.associated_point)::REAL AS longitude, ST_Y(l.associated_point)::REAL AS latitude; +-- name: GetGeometryWKB :one +/* GetGeometryWKB returns the geometries in WKB format for the given geometry UUIDs. */ +SELECT + geometry_uuid, + ST_AsBinary(geom)::BYTEA AS geom_wkb +FROM loc.geometries +WHERE geometry_uuid = ANY(sqlc.arg(geometry_uuids)::UUID []); + -- name: GetGeometryGeoJSON :one /* GetLocationGeoJSON returns a GeoJSON FeatureCollection for the given geometries. * The simplification level can be adjusted via the `simplification_level` argument. From ca19a9c9cb646af606ebcc4f93b762ada92a0d78 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Wed, 18 Mar 2026 16:35:24 +0000 Subject: [PATCH 3/5] fix(repo): linting --- internal/server/postgres/dataserverimpl.go | 7 ++++--- internal/server/postgres/dataserverimpl_test.go | 11 +++++++---- internal/server/postgres/sql/queries/locations.sql | 2 +- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go index 0e3ebb4..6445d9a 100644 --- a/internal/server/postgres/dataserverimpl.go +++ b/internal/server/postgres/dataserverimpl.go @@ -1067,12 +1067,12 @@ func (s *DataPlatformDataServiceServerImpl) GetLocation( Str("dp.source.valid_from_utc", dbSource.SysPeriod.Lower.Time.String()). Msg("found source") - geometry := make([]byte, 0) - geometry = nil + var geometry []byte if req.IncludeGeometry { gwkbprms := db.GetGeometryWKBParams{ GeometryUuids: []uuid.UUID{dbSource.GeometryUuid}, } + dbGeometry, err := querier.GetGeometryWKB(ctx, gwkbprms) if err != nil { l.Err(err).Msgf("querier.GetGeometryWKB(%+v)", gwkbprms) @@ -1082,6 +1082,7 @@ func (s *DataPlatformDataServiceServerImpl) GetLocation( "Failed to retrieve geometry for location. See logs for details.", ) } + geometry = dbGeometry.GeomWkb } @@ -1094,7 +1095,7 @@ func (s *DataPlatformDataServiceServerImpl) GetLocation( }, EffectiveCapacityWatts: uint64(dbSource.CapacityWatts), Metadata: dbSource.MetadataJsonb, - GeometryWkb: geometry, + GeometryWkb: geometry, }, nil } diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go index 6c50a18..1888737 100644 --- a/internal/server/postgres/dataserverimpl_test.go +++ b/internal/server/postgres/dataserverimpl_test.go @@ -589,9 +589,9 @@ func TestGetLocation(t *testing.T) { }) require.NoError(t, err) - testCases := []struct{ + testCases := []struct { name string - req *pb.GetLocationRequest + req *pb.GetLocationRequest }{ { name: "Should get location without geometry", @@ -618,8 +618,11 @@ func TestGetLocation(t *testing.T) { require.Equal(t, createResp.LocationUuid, resp.LocationUuid) require.Equal(t, "test_get_location_site", resp.LocationName) require.Equal(t, uint64(12e6), resp.EffectiveCapacityWatts) - if tc.req.IncludeGeometry{ - expected, err := hex.DecodeString("01030000000100000005000000000000000000000000000000000000000000000000000000000000000000f03f000000000000f03f000000000000f03f000000000000f03f000000000000000000000000000000000000000000000000") + + if tc.req.IncludeGeometry { + expected, err := hex.DecodeString( + "01030000000100000005000000000000000000000000000000000000000000000000000000000000000000f03f000000000000f03f000000000000f03f000000000000f03f000000000000000000000000000000000000000000000000", + ) require.NoError(t, err) require.Equal(t, expected, resp.GeometryWkb) } diff --git a/internal/server/postgres/sql/queries/locations.sql b/internal/server/postgres/sql/queries/locations.sql index f614d36..fc1a342 100644 --- a/internal/server/postgres/sql/queries/locations.sql +++ b/internal/server/postgres/sql/queries/locations.sql @@ -25,7 +25,7 @@ RETURNING /* GetGeometryWKB returns the geometries in WKB format for the given geometry UUIDs. */ SELECT geometry_uuid, - ST_AsBinary(geom)::BYTEA AS geom_wkb + ST_ASBINARY(geom)::BYTEA AS geom_wkb FROM loc.geometries WHERE geometry_uuid = ANY(sqlc.arg(geometry_uuids)::UUID []); From dc9dbf07f5494e4b43d6cee366ed384887f1f029 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Fri, 20 Mar 2026 15:30:18 +0000 Subject: [PATCH 4/5] feat(proto): Add GetObservationsAtTimestamp route --- internal/server/dummy/dataserverimpl.go | 26 ++++ internal/server/postgres/dataserverimpl.go | 55 ++++++++ .../server/postgres/dataserverimpl_test.go | 117 ++++++++++++++++++ .../postgres/sql/queries/observations.sql | 28 +++++ proto/ocf/dp/dp-data.messages.proto | 38 ++++++ proto/ocf/dp/dp-data.service.proto | 4 + 6 files changed, 268 insertions(+) diff --git a/internal/server/dummy/dataserverimpl.go b/internal/server/dummy/dataserverimpl.go index 4c7b2e8..9825521 100644 --- a/internal/server/dummy/dataserverimpl.go +++ b/internal/server/dummy/dataserverimpl.go @@ -568,6 +568,32 @@ func (d *DataPlatformDataServiceServerImpl) GetObservationsAsTimeseries( }, nil } +func (d *DataPlatformDataServiceServerImpl) GetObservationsAtTimestamp( + ctx context.Context, + req *pb.GetObservationsAtTimestampRequest, +) (*pb.GetObservationsAtTimestampResponse, error) { + values := make([]*pb.GetObservationsAtTimestampResponse_Value, len(req.LocationUuids)) + for i := range values { + ll := randomUkLngLat() + sd := determineIrradiance(req.TimestampUtc.AsTime(), ll) + + values[i] = &pb.GetObservationsAtTimestampResponse_Value{ + LocationUuid: req.LocationUuids[i], + Latlng: &pb.LatLng{ + Latitude: float32(ll.latDegs), + Longitude: float32(ll.lonDegs), + }, + ValueFraction: float32(sd.normalizedIrradiance()), + EffectiveCapacityWatts: 150e6, + } + } + + return &pb.GetObservationsAtTimestampResponse{ + TimestampUtc: req.TimestampUtc, + Values: values, + }, nil +} + // GetWeekAverageDeltas implements dp.DataPlatformDataServiceServer. func (d *DataPlatformDataServiceServerImpl) GetWeekAverageDeltas( ctx context.Context, diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go index 6445d9a..f18f65d 100644 --- a/internal/server/postgres/dataserverimpl.go +++ b/internal/server/postgres/dataserverimpl.go @@ -1031,6 +1031,61 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAtTimestamp( }, nil } +func (s *DataPlatformDataServiceServerImpl) GetObservationsAtTimestamp( + ctx context.Context, + req *pb.GetObservationsAtTimestampRequest, +) (*pb.GetObservationsAtTimestampResponse, error) { + l := zerolog.Ctx(ctx) + + querier := db.New(ix.GetTxFromContext(ctx)) + + // Set default timestamp to now if not provided + if req.TimestampUtc == nil { + req.TimestampUtc = timestamppb.New(time.Now().UTC().Truncate(time.Minute)) + } + + locUuids := make([]uuid.UUID, len(req.LocationUuids)) + for i, locStr := range req.LocationUuids { + locUuids[i] = uuid.MustParse(locStr) + } + + loprms := db.ListObservationsAtTimeForLocationsParams{ + GeometryUuids: locUuids, + SourceTypeID: int16(req.EnergySource), + ObserverName: req.ObserverName, + TargetTimestampUtc: pgtype.Timestamp{Time: req.TimestampUtc.AsTime(), Valid: true}, + } + + dbObs, err := querier.ListObservationsAtTimeForLocations(ctx, loprms) + if err != nil { + l.Err(err).Msgf("querier.ListObservationsAtTimeForLocations(%+v)", loprms) + + return nil, status.Errorf( + codes.NotFound, + "No observations found for the specified locations at the given time. "+ + "Ensure the observer exists, and that the location is operational.", + ) + } + + observations := make([]*pb.GetObservationsAtTimestampResponse_Value, len(dbObs)) + for i, obs := range dbObs { + observations[i] = &pb.GetObservationsAtTimestampResponse_Value{ + ValueFraction: float32(obs.ValueSip) / 30000.0, + EffectiveCapacityWatts: uint64(obs.CapacityWatts), + LocationUuid: obs.GeometryUuid.String(), + Latlng: &pb.LatLng{ + Latitude: obs.Latitude, + Longitude: obs.Longitude, + }, + } + } + + return &pb.GetObservationsAtTimestampResponse{ + TimestampUtc: req.TimestampUtc, + Values: observations, + }, nil +} + func (s *DataPlatformDataServiceServerImpl) GetLocation( ctx context.Context, req *pb.GetLocationRequest, diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go index 1888737..3ec888d 100644 --- a/internal/server/postgres/dataserverimpl_test.go +++ b/internal/server/postgres/dataserverimpl_test.go @@ -574,6 +574,123 @@ func TestGetForecastAtTimestamp(t *testing.T) { } } +func TestGetObservationsAtTimestamp(t *testing.T) { + pivotTime := time.Date(2025, 2, 26, 12, 0, 0, 0, time.UTC) + // Create an observer + observerResp, err := dc.CreateObserver(t.Context(), &pb.CreateObserverRequest{ + Name: "test_get_observations_at_timestamp_observer", + }) + require.NoError(t, err) + // Create a few sites to attach the observations to + metadata, err := structpb.NewStruct(map[string]any{"source": "test"}) + require.NoError(t, err) + + siteUuids := make([]string, 3) + for i := range siteUuids { + capacity := uint64(1000000 + i*100000) + siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{ + LocationName: fmt.Sprintf("test_get_observations_at_timestamp_site_%d", i), + GeometryWkt: fmt.Sprintf( + "POINT(%f %f)", + -0.1+float32(i)*0.01, + 51.5+float32(i)*0.01, + ), + EffectiveCapacityWatts: capacity, + Metadata: metadata, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + LocationType: pb.LocationType_LOCATION_TYPE_SITE, + ValidFromUtc: timestamppb.New(pivotTime.Add(-time.Hour * 1)), + }) + require.NoError(t, err) + siteUuids[i] = siteResp.LocationUuid + + req := &pb.CreateObservationsRequest{ + LocationUuid: siteResp.LocationUuid, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + ObserverName: observerResp.ObserverName, + Values: []*pb.CreateObservationsRequest_Value{ + { + ValueWatts: uint64(capacity / 10), + TimestampUtc: timestamppb.New(pivotTime), + }, + }, + } + _, err = dc.CreateObservations(t.Context(), req) + require.NoError(t, err) + } + + testcases := []struct { + name string + req *pb.GetObservationsAtTimestampRequest + expectedFractions []float32 + }{ + { + name: "Should get observation at exact timestamp for single location", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: []string{siteUuids[0]}, + ObserverName: observerResp.ObserverName, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime), + }, + expectedFractions: []float32{0.1}, + }, + { + name: "Should get observation at exact timestamp for multiple locations", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: siteUuids, + ObserverName: observerResp.ObserverName, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime), + }, + expectedFractions: []float32{0.1, 0.1, 0.1}, + }, + { + name: "Should return no observations where no values exist at timestamp", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: siteUuids, + ObserverName: observerResp.ObserverName, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime.Add(5 * time.Minute)), + }, + }, + { + name: "Should return no observations for non-existent location", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: []string{uuid.New().String()}, + ObserverName: observerResp.ObserverName, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime), + }, + }, + { + name: "Should return no observations for non-existent observer", + req: &pb.GetObservationsAtTimestampRequest{ + LocationUuids: siteUuids, + ObserverName: "non_existent_observer", + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + TimestampUtc: timestamppb.New(pivotTime), + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + resp, err := dc.GetObservationsAtTimestamp(t.Context(), tc.req) + if strings.Contains(tc.name, "Shouldn't") { + require.Error(t, err) + } + + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Values, len(tc.expectedFractions)) + + for i, obs := range resp.Values { + require.Equal(t, tc.expectedFractions[i], obs.ValueFraction) + } + }) + } +} + func TestGetLocation(t *testing.T) { // Create a location to fetch metadata, err := structpb.NewStruct(map[string]any{"source": "test"}) diff --git a/internal/server/postgres/sql/queries/observations.sql b/internal/server/postgres/sql/queries/observations.sql index c89f89f..6105225 100644 --- a/internal/server/postgres/sql/queries/observations.sql +++ b/internal/server/postgres/sql/queries/observations.sql @@ -113,3 +113,31 @@ SELECT capacity_watts FROM ranked_observations WHERE rn = 1; + +-- name: ListObservationsAtTimeForLocations :many +/* ListObservationsAtTimeForLocations retrieves observed generation values as percentages + * of capacity for a specific time. + * This is useful for comparing observations across multiple locations. + * Observed values are 16-bit integers, with 0 representing 0% and 30000 representing 100% of capacity. + */ +SELECT + og.geometry_uuid, + og.source_type_id, + og.observation_timestamp_utc, + og.value_sip, + sh.capacity_watts, + sh.latitude, + sh.longitude, + sh.geometry_name +FROM obs.observed_generation_values AS og + INNER JOIN loc.sources_mv AS sh USING (geometry_uuid, source_type_id) +WHERE + og.geometry_uuid = ANY(sqlc.arg(geometry_uuids)::UUID []) + AND og.source_type_id = $1 + AND og.observer_uuid + = ( + SELECT observer_uuid FROM obs.observers + WHERE observer_name = LOWER(sqlc.arg(observer_name)::TEXT) + ) + AND og.observation_timestamp_utc = sqlc.arg(target_timestamp_utc)::TIMESTAMP + AND sh.sys_period @> og.observation_timestamp_utc; diff --git a/proto/ocf/dp/dp-data.messages.proto b/proto/ocf/dp/dp-data.messages.proto index 59be3c8..0b9774b 100644 --- a/proto/ocf/dp/dp-data.messages.proto +++ b/proto/ocf/dp/dp-data.messages.proto @@ -161,6 +161,44 @@ message GetForecastAtTimestampResponse { } +message GetObservationsAtTimestampRequest { + repeated string location_uuids = 1 [ + (buf.validate.field).repeated.min_items = 1, + (buf.validate.field).repeated.max_items = 1000, + (buf.validate.field).repeated.unique = true, + (buf.validate.field).repeated.items = { + string: {uuid: true} + } + ]; + EnergySource energy_source = 2 [ + (buf.validate.field).required = true + ]; + string observer_name = 3 [ + (buf.validate.field).required = true, + (buf.validate.field).string.(valid_observer_name) = true + ]; + /* The time to fetch observations for. + * If not specified, the current time will be used. + */ + optional google.protobuf.Timestamp timestamp_utc = 4 [ + (buf.validate.field).timestamp = { gt: { seconds: 112000000} } + ]; +} + +message GetObservationsAtTimestampResponse { + message Value { + string location_uuid = 1; + float value_fraction = 2; + uint64 effective_capacity_watts = 3; + LatLng latlng = 4; + google.protobuf.Struct metadata = 5; + } + + google.protobuf.Timestamp timestamp_utc = 1; + repeated Value values = 2; +} + + message GetLatestForecastsRequest { string location_uuid = 1 [ (buf.validate.field).required = true, diff --git a/proto/ocf/dp/dp-data.service.proto b/proto/ocf/dp/dp-data.service.proto index df61c8b..f8349e6 100644 --- a/proto/ocf/dp/dp-data.service.proto +++ b/proto/ocf/dp/dp-data.service.proto @@ -22,6 +22,10 @@ service DataPlatformDataService { * (i.e. many points in time, one single location). */ rpc GetObservationsAsTimeseries(GetObservationsAsTimeseriesRequest) returns (GetObservationsAsTimeseriesResponse) {} + /* GetObservationAtTimestamp fetches a vertical slice of observed data. + * (i.e. many locations, at a single point in time). This is handy for e.g. display on a map. + */ + rpc GetObservationsAtTimestamp(GetObservationsAtTimestampRequest) returns (GetObservationsAtTimestampResponse) {} /* GetLocation fetches detailed information about a specific location. */ rpc GetLocation(GetLocationRequest) returns (GetLocationResponse) {} From f745458f7cd5c7fbff2a817b6d8ecfb5c9b5da0f Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Fri, 20 Mar 2026 15:56:09 +0000 Subject: [PATCH 5/5] fix(backend): Error on unknown observer --- internal/server/postgres/dataserverimpl.go | 16 +++++++++++++++- internal/server/postgres/dataserverimpl_test.go | 15 ++++++++------- .../server/postgres/sql/queries/observations.sql | 6 +----- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go index f18f65d..7a0709e 100644 --- a/internal/server/postgres/dataserverimpl.go +++ b/internal/server/postgres/dataserverimpl.go @@ -1049,10 +1049,24 @@ func (s *DataPlatformDataServiceServerImpl) GetObservationsAtTimestamp( locUuids[i] = uuid.MustParse(locStr) } + // Check that the observer exists + obprms := db.GetObserverByNameParams{ObserverName: req.ObserverName} + + dbObserver, err := querier.GetObserverByName(ctx, obprms) + if err != nil { + l.Err(err).Msgf("querier.GetObserverByName(%+v)", obprms) + + return nil, status.Errorf( + codes.NotFound, + "No observer of name '%s' found. Choose an existing observer or create a new one.", + req.ObserverName, + ) + } + loprms := db.ListObservationsAtTimeForLocationsParams{ GeometryUuids: locUuids, SourceTypeID: int16(req.EnergySource), - ObserverName: req.ObserverName, + ObserverUuid: dbObserver.ObserverUuid, TargetTimestampUtc: pgtype.Timestamp{Time: req.TimestampUtc.AsTime(), Valid: true}, } diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go index 3ec888d..905e804 100644 --- a/internal/server/postgres/dataserverimpl_test.go +++ b/internal/server/postgres/dataserverimpl_test.go @@ -663,7 +663,7 @@ func TestGetObservationsAtTimestamp(t *testing.T) { }, }, { - name: "Should return no observations for non-existent observer", + name: "Shouldn't return observations for non-existent observer", req: &pb.GetObservationsAtTimestampRequest{ LocationUuids: siteUuids, ObserverName: "non_existent_observer", @@ -678,14 +678,15 @@ func TestGetObservationsAtTimestamp(t *testing.T) { resp, err := dc.GetObservationsAtTimestamp(t.Context(), tc.req) if strings.Contains(tc.name, "Shouldn't") { require.Error(t, err) - } + } else { - require.NoError(t, err) - require.NotNil(t, resp) - require.Len(t, resp.Values, len(tc.expectedFractions)) + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Values, len(tc.expectedFractions)) - for i, obs := range resp.Values { - require.Equal(t, tc.expectedFractions[i], obs.ValueFraction) + for i, obs := range resp.Values { + require.Equal(t, tc.expectedFractions[i], obs.ValueFraction) + } } }) } diff --git a/internal/server/postgres/sql/queries/observations.sql b/internal/server/postgres/sql/queries/observations.sql index 6105225..c9fd58d 100644 --- a/internal/server/postgres/sql/queries/observations.sql +++ b/internal/server/postgres/sql/queries/observations.sql @@ -134,10 +134,6 @@ FROM obs.observed_generation_values AS og WHERE og.geometry_uuid = ANY(sqlc.arg(geometry_uuids)::UUID []) AND og.source_type_id = $1 - AND og.observer_uuid - = ( - SELECT observer_uuid FROM obs.observers - WHERE observer_name = LOWER(sqlc.arg(observer_name)::TEXT) - ) + AND og.observer_uuid = $2 AND og.observation_timestamp_utc = sqlc.arg(target_timestamp_utc)::TIMESTAMP AND sh.sys_period @> og.observation_timestamp_utc;