Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions api/entity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package api

import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/microsoft/durabletask-go/internal/helpers"
"github.com/microsoft/durabletask-go/internal/protos"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// EntityID uniquely identifies an entity by its name and key.
type EntityID struct {
Name string
Key string
}

// NewEntityID creates a new EntityID with the specified name and key.
func NewEntityID(name string, key string) EntityID {
if err := helpers.ValidateEntityName(name); err != nil {
panic(err)
}
return EntityID{Name: strings.ToLower(name), Key: key}
}

// String returns the entity instance ID in the format "@<name>@<key>".
func (e EntityID) String() string {
return fmt.Sprintf("@%s@%s", strings.ToLower(e.Name), e.Key)
}

// EntityIDFromString parses an entity instance ID string in the format "@<name>@<key>".
func EntityIDFromString(s string) (EntityID, error) {
name, key, err := helpers.ParseEntityInstanceID(s)
if err != nil {
return EntityID{}, err
}
return EntityID{Name: name, Key: key}, nil
}

// EntityMetadata contains metadata about an entity instance.
type EntityMetadata struct {
InstanceID EntityID
LastModifiedTime time.Time
BacklogQueueSize int32
LockedBy string
SerializedState string
}

// SignalEntityOptions is a functional option type for signaling an entity.
type SignalEntityOptions func(*protos.SignalEntityRequest) error

// WithSignalInput configures the input for an entity signal.
func WithSignalInput(input any) SignalEntityOptions {
return func(req *protos.SignalEntityRequest) error {
bytes, err := json.Marshal(input)
if err != nil {
return err
}
req.Input = wrapperspb.String(string(bytes))
return nil
}
}

// WithRawSignalInput configures a raw string input for an entity signal.
func WithRawSignalInput(input string) SignalEntityOptions {
return func(req *protos.SignalEntityRequest) error {
req.Input = wrapperspb.String(input)
return nil
}
}

// WithSignalScheduledTime configures a scheduled time for the entity signal.
func WithSignalScheduledTime(t time.Time) SignalEntityOptions {
return func(req *protos.SignalEntityRequest) error {
req.ScheduledTime = timestamppb.New(t)
return nil
}
}

// EntityQuery defines filter criteria for querying entities.
type EntityQuery struct {
// InstanceIDStartsWith filters entities whose instance ID starts with this prefix.
InstanceIDStartsWith string
// LastModifiedFrom filters entities modified on or after this time.
LastModifiedFrom time.Time
// LastModifiedTo filters entities modified before this time.
LastModifiedTo time.Time
// IncludeState whether to include entity state in the results.
IncludeState bool
// IncludeTransient whether to include transient (stateless) entities.
IncludeTransient bool
// PageSize limits the number of entities returned per page.
PageSize int32
// ContinuationToken for fetching the next page of results.
ContinuationToken string
}

// EntityQueryResults contains the results of an entity query.
type EntityQueryResults struct {
Entities []*EntityMetadata
ContinuationToken string
}

// CleanEntityStorageRequest contains options for cleaning entity storage.
type CleanEntityStorageRequest struct {
// ContinuationToken for resuming a previous cleanup operation.
ContinuationToken string
// RemoveEmptyEntities removes entities with no state and no locks.
RemoveEmptyEntities bool
// ReleaseOrphanedLocks releases locks held by non-running orchestrations.
ReleaseOrphanedLocks bool
}

// CleanEntityStorageResult contains the results of a cleanup operation.
type CleanEntityStorageResult struct {
// EmptyEntitiesRemoved is the number of empty entities removed.
EmptyEntitiesRemoved int32
// OrphanedLocksReleased is the number of orphaned locks released.
OrphanedLocksReleased int32
// ContinuationToken for resuming cleanup. Empty if complete.
ContinuationToken string
}
45 changes: 45 additions & 0 deletions api/entity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package api

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_API_EntityID_String(t *testing.T) {
id := NewEntityID("Counter", "myCounter")
assert.Equal(t, "@counter@myCounter", id.String())
}

func Test_API_EntityIDFromString(t *testing.T) {
tests := []struct {
name string
input string
want EntityID
wantErr bool
}{
{name: "valid", input: "@counter@key1", want: EntityID{Name: "counter", Key: "key1"}},
{name: "empty key", input: "@entity@", want: EntityID{Name: "entity", Key: ""}},
{name: "invalid empty name", input: "@@key1", wantErr: true},
{name: "invalid no prefix", input: "no-at-sign", wantErr: true},
{name: "invalid no second @", input: "@onlyone", wantErr: true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := EntityIDFromString(tt.input)
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}

func Test_API_NewEntityID_InvalidNamePanics(t *testing.T) {
assert.Panics(t, func() { NewEntityID("", "key") })
assert.Panics(t, func() { NewEntityID("bad@name", "key") })
}
3 changes: 3 additions & 0 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type PurgeOptions func(*protos.PurgeInstancesRequest) error
// a random UUID value will be used for the orchestration instance ID.
func WithInstanceID(id InstanceID) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
if err := helpers.ValidateOrchestrationInstanceID(string(id)); err != nil {
return err
}
req.InstanceId = string(id)
return nil
}
Expand Down
23 changes: 23 additions & 0 deletions api/orchestration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package api

import (
"testing"

"github.com/microsoft/durabletask-go/internal/protos"
"github.com/stretchr/testify/require"
)

func Test_API_WithInstanceID_RejectsEntityFormat(t *testing.T) {
req := &protos.CreateInstanceRequest{}

err := WithInstanceID(InstanceID("@counter@key"))(req)
require.Error(t, err)
}

func Test_API_WithInstanceID_AllowsNormalValue(t *testing.T) {
req := &protos.CreateInstanceRequest{}

err := WithInstanceID(InstanceID("my-instance"))(req)
require.NoError(t, err)
require.Equal(t, "my-instance", req.InstanceId)
}
19 changes: 19 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,25 @@ type Backend interface {
PurgeOrchestrationState(context.Context, api.InstanceID) error
}

// EntityBackend is an optional interface that backends can implement to support
// entity-specific storage operations like querying and cleanup.
// If a backend does not implement this interface, entity queries and cleanup
// operations will not be available through the in-process client.
type EntityBackend interface {
Backend

// GetEntityMetadata retrieves metadata for a specific entity instance.
// Returns nil if the entity doesn't exist.
GetEntityMetadata(context.Context, api.EntityID, bool) (*api.EntityMetadata, error)

// QueryEntities queries entity instances matching the specified filter criteria.
QueryEntities(context.Context, api.EntityQuery) (*api.EntityQueryResults, error)

// CleanEntityStorage performs garbage collection on entity storage, removing
// empty entities and releasing orphaned locks.
CleanEntityStorage(context.Context, api.CleanEntityStorageRequest) (*api.CleanEntityStorageResult, error)
}

// MarshalHistoryEvent serializes the [HistoryEvent] into a protobuf byte array.
func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error) {
if bytes, err := proto.Marshal(e); err != nil {
Expand Down
130 changes: 130 additions & 0 deletions backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package backend

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/internal/helpers"
Expand All @@ -27,6 +30,17 @@ type TaskHubClient interface {
PurgeOrchestrationState(ctx context.Context, id api.InstanceID, opts ...api.PurgeOptions) error
}

// EntityTaskHubClient is an optional extension of [TaskHubClient] that adds entity-specific
// operations. Clients returned by [NewTaskHubClient] always implement this interface.
// The gRPC client ([TaskHubGrpcClient]) also implements this interface.
type EntityTaskHubClient interface {
TaskHubClient
SignalEntity(ctx context.Context, entityID api.EntityID, operationName string, opts ...api.SignalEntityOptions) error
FetchEntityMetadata(ctx context.Context, entityID api.EntityID, includeState bool) (*api.EntityMetadata, error)
QueryEntities(ctx context.Context, query api.EntityQuery) (*api.EntityQueryResults, error)
CleanEntityStorage(ctx context.Context, req api.CleanEntityStorageRequest) (*api.CleanEntityStorageResult, error)
}

type backendClient struct {
be Backend
}
Expand All @@ -52,6 +66,9 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat
}
req.InstanceId = u.String()
}
if err := helpers.ValidateOrchestrationInstanceID(req.InstanceId); err != nil {
return api.EmptyInstanceID, err
}

var span trace.Span
ctx, span = helpers.StartNewCreateOrchestrationSpan(ctx, req.Name, req.Version.GetValue(), req.InstanceId)
Expand Down Expand Up @@ -205,3 +222,116 @@ func (c *backendClient) PurgeOrchestrationState(ctx context.Context, id api.Inst
}
return nil
}

// SignalEntity sends a fire-and-forget signal to an entity, triggering the specified operation.
//
// If the entity doesn't exist, it will be created automatically when the signal is processed.
func (c *backendClient) SignalEntity(ctx context.Context, entityID api.EntityID, operationName string, opts ...api.SignalEntityOptions) error {
if err := helpers.ValidateEntityName(entityID.Name); err != nil {
return err
}

req := &protos.SignalEntityRequest{
InstanceId: entityID.String(),
Name: operationName,
}
for _, configure := range opts {
if err := configure(req); err != nil {
return fmt.Errorf("failed to configure signal entity request: %w", err)
}
}

// Ensure the entity orchestration instance exists. Create with IGNORE policy
// so it's a no-op if the instance already exists.
startEvent := helpers.NewExecutionStartedEvent(entityID.Name, req.InstanceId, nil, nil, nil, nil)
createErr := c.be.CreateOrchestrationInstance(ctx, startEvent, WithOrchestrationIdReusePolicy(&protos.OrchestrationIdReusePolicy{
Action: protos.CreateOrchestrationAction_IGNORE,
OperationStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING},
}))
if createErr != nil && !errors.Is(createErr, api.ErrDuplicateInstance) && !errors.Is(createErr, api.ErrIgnoreInstance) {
return fmt.Errorf("failed to create entity instance: %w", createErr)
}

// Build the .NET-compatible EntityRequestMessage payload with isSignal=true.
requestID := req.RequestId
if requestID == "" {
requestID = uuid.New().String()
}
reqMsg := helpers.EntityRequestMessage{
ID: requestID,
IsSignal: true,
Operation: req.Name,
}
if req.Input != nil {
reqMsg.Input = req.Input.GetValue()
}
payload, err := json.Marshal(reqMsg)
if err != nil {
return fmt.Errorf("failed to marshal signal request: %w", err)
}

e := helpers.NewEventRaisedEvent(helpers.EntityRequestEventName, wrapperspb.String(string(payload)))
if req.ScheduledTime != nil {
e.Timestamp = req.ScheduledTime
}
if err := c.be.AddNewOrchestrationEvent(ctx, api.InstanceID(req.InstanceId), e); err != nil {
return fmt.Errorf("failed to signal entity: %w", err)
}
return nil
}

// FetchEntityMetadata retrieves metadata about an entity instance.
//
// Returns nil if the entity doesn't exist.
// If the backend implements [EntityBackend], its native entity storage is used.
// Otherwise, falls back to orchestration metadata.
func (c *backendClient) FetchEntityMetadata(ctx context.Context, entityID api.EntityID, includeState bool) (*api.EntityMetadata, error) {
if err := helpers.ValidateEntityName(entityID.Name); err != nil {
return nil, err
}
if eb, ok := c.be.(EntityBackend); ok {
return eb.GetEntityMetadata(ctx, entityID, includeState)
}

// Fallback: entities are backed by orchestrations
iid := api.InstanceID(entityID.String())
metadata, err := c.be.GetOrchestrationMetadata(ctx, iid)
if err != nil {
if errors.Is(err, api.ErrInstanceNotFound) {
return nil, nil
}
return nil, fmt.Errorf("failed to get entity metadata: %w", err)
}
if metadata == nil {
return nil, nil
}

result := &api.EntityMetadata{
InstanceID: entityID,
LastModifiedTime: metadata.LastUpdatedAt,
}
if includeState {
result.SerializedState = metadata.SerializedCustomStatus
}
return result, nil
}

// QueryEntities queries entities matching the specified filter criteria.
//
// Requires the backend to implement [EntityBackend].
func (c *backendClient) QueryEntities(ctx context.Context, query api.EntityQuery) (*api.EntityQueryResults, error) {
if eb, ok := c.be.(EntityBackend); ok {
return eb.QueryEntities(ctx, query)
}
return nil, fmt.Errorf("QueryEntities requires the backend to implement EntityBackend")
}

// CleanEntityStorage performs garbage collection on entity storage.
//
// Requires the backend to implement [EntityBackend].
func (c *backendClient) CleanEntityStorage(ctx context.Context, req api.CleanEntityStorageRequest) (*api.CleanEntityStorageResult, error) {
if eb, ok := c.be.(EntityBackend); ok {
return eb.CleanEntityStorage(ctx, req)
}
return nil, fmt.Errorf("CleanEntityStorage requires the backend to implement EntityBackend")
}
Loading
Loading