Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions go-sdk/impl/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
package impl

import (
"strings"
"sync"

"github.com/functionstream/function-stream/go-sdk/api"
"github.com/functionstream/function-stream/go-sdk/bindings/functionstream/core/collector"
"strings"
)

type runtimeContext struct {
mu sync.RWMutex
config map[string]string
stores map[string]*storeImpl
closed bool
Expand All @@ -35,9 +32,7 @@ func newRuntimeContext(config map[string]string) *runtimeContext {
}

func (c *runtimeContext) Emit(targetID uint32, data []byte) error {
c.mu.RLock()
closed := c.closed
c.mu.RUnlock()
if closed {
return api.NewError(api.ErrRuntimeClosed, "emit on closed context")
}
Expand All @@ -46,9 +41,7 @@ func (c *runtimeContext) Emit(targetID uint32, data []byte) error {
}

func (c *runtimeContext) EmitWatermark(targetID uint32, watermark uint64) error {
c.mu.RLock()
closed := c.closed
c.mu.RUnlock()
if closed {
return api.NewError(api.ErrRuntimeClosed, "emit watermark on closed context")
}
Expand All @@ -62,8 +55,6 @@ func (c *runtimeContext) GetOrCreateStore(name string) (api.Store, error) {
return nil, api.NewError(api.ErrStoreInvalidName, "store name must not be empty")
}

c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil, api.NewError(api.ErrRuntimeClosed, "store request on closed context")
}
Expand All @@ -77,21 +68,16 @@ func (c *runtimeContext) GetOrCreateStore(name string) (api.Store, error) {
}

func (c *runtimeContext) Config() map[string]string {
c.mu.RLock()
defer c.mu.RUnlock()
return cloneStringMap(c.config)
}

func (c *runtimeContext) Close() error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil
}
c.closed = true
stores := c.stores
c.stores = make(map[string]*storeImpl)
c.mu.Unlock()

var firstErr error
for _, store := range stores {
Expand Down
144 changes: 144 additions & 0 deletions go-sdk/impl/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package impl

import (
"github.com/functionstream/function-stream/go-sdk/api"
"github.com/functionstream/function-stream/go-sdk/state/codec"
"github.com/functionstream/function-stream/go-sdk/state/keyed"
"github.com/functionstream/function-stream/go-sdk/state/structures"
)

func getStoreFromContext(ctx api.Context, storeName string) (*storeImpl, error) {
store, err := ctx.GetOrCreateStore(storeName)
if err != nil {
return nil, err
}
s, ok := store.(*storeImpl)
if !ok {
return nil, api.NewError(api.ErrStoreInternal, "store %q is not the default implementation", storeName)
}
return s, nil
}

func NewValueState[T any](ctx api.Context, storeName string, valueCodec codec.Codec[T]) (*structures.ValueState[T], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return structures.NewValueState(s, valueCodec)
}

func NewListState[T any](ctx api.Context, storeName string, itemCodec codec.Codec[T]) (*structures.ListState[T], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return structures.NewListState(s, itemCodec)
}

func NewMapState[K any, V any](ctx api.Context, storeName string, keyCodec codec.Codec[K], valueCodec codec.Codec[V]) (*structures.MapState[K, V], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return structures.NewMapState(s, keyCodec, valueCodec)
}

func NewMapStateAutoKeyCodec[K any, V any](ctx api.Context, storeName string, valueCodec codec.Codec[V]) (*structures.MapState[K, V], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return structures.NewMapStateAutoKeyCodec[K, V](s, valueCodec)
}

func NewPriorityQueueState[T any](ctx api.Context, storeName string, itemCodec codec.Codec[T]) (*structures.PriorityQueueState[T], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return structures.NewPriorityQueueState(s, itemCodec)
}

func NewAggregatingState[T any, ACC any, R any](ctx api.Context, storeName string, accCodec codec.Codec[ACC], aggFunc structures.AggregateFunc[T, ACC, R]) (*structures.AggregatingState[T, ACC, R], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return structures.NewAggregatingState(s, accCodec, aggFunc)
}

func NewReducingState[V any](ctx api.Context, storeName string, valueCodec codec.Codec[V], reduceFunc structures.ReduceFunc[V]) (*structures.ReducingState[V], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return structures.NewReducingState(s, valueCodec, reduceFunc)
}

func NewKeyedListStateFactory[V any](ctx api.Context, storeName, name string, keyGroup []byte, valueCodec codec.Codec[V]) (*keyed.KeyedListStateFactory[V], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return keyed.NewKeyedListStateFactory(s, name, keyGroup, valueCodec)
}

func NewKeyedListStateFactoryAutoCodec[V any](ctx api.Context, storeName, name string, keyGroup []byte) (*keyed.KeyedListStateFactory[V], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return keyed.NewKeyedListStateFactoryAutoCodec[V](s, name, keyGroup)
}

func NewKeyedValueStateFactory[V any](ctx api.Context, storeName string, keyGroup []byte, valueCodec codec.Codec[V]) (*keyed.KeyedValueStateFactory[V], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return keyed.NewKeyedValueStateFactory(s, keyGroup, valueCodec)
}

func NewKeyedMapStateFactory[MK any, MV any](ctx api.Context, storeName string, keyGroup []byte, keyCodec codec.Codec[MK], valueCodec codec.Codec[MV]) (*keyed.KeyedMapStateFactory[MK, MV], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return keyed.NewKeyedMapStateFactory(s, keyGroup, keyCodec, valueCodec)
}

func NewKeyedPriorityQueueStateFactory[V any](ctx api.Context, storeName string, keyGroup []byte, itemCodec codec.Codec[V]) (*keyed.KeyedPriorityQueueStateFactory[V], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return keyed.NewKeyedPriorityQueueStateFactory(s, keyGroup, itemCodec)
}

func NewKeyedAggregatingStateFactory[T any, ACC any, R any](ctx api.Context, storeName string, keyGroup []byte, accCodec codec.Codec[ACC], aggFunc keyed.AggregateFunc[T, ACC, R]) (*keyed.KeyedAggregatingStateFactory[T, ACC, R], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return keyed.NewKeyedAggregatingStateFactory(s, keyGroup, accCodec, aggFunc)
}

func NewKeyedReducingStateFactory[V any](ctx api.Context, storeName string, keyGroup []byte, valueCodec codec.Codec[V], reduceFunc keyed.ReduceFunc[V]) (*keyed.KeyedReducingStateFactory[V], error) {
s, err := getStoreFromContext(ctx, storeName)
if err != nil {
return nil, err
}
return keyed.NewKeyedReducingStateFactory(s, keyGroup, valueCodec, reduceFunc)
}
42 changes: 42 additions & 0 deletions go-sdk/state/codec/bool_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import "fmt"

type BoolCodec struct{}

func (c BoolCodec) Encode(value bool) ([]byte, error) {
if value {
return []byte{1}, nil
}
return []byte{0}, nil
}

func (c BoolCodec) Decode(data []byte) (bool, error) {
if len(data) != 1 {
return false, fmt.Errorf("invalid bool payload length: %d", len(data))
}
switch data[0] {
case 0:
return false, nil
case 1:
return true, nil
default:
return false, fmt.Errorf("invalid bool payload byte: %d", data[0])
}
}

func (c BoolCodec) EncodedSize() int { return 1 }

func (c BoolCodec) IsOrderedKeyCodec() bool { return true }
25 changes: 25 additions & 0 deletions go-sdk/state/codec/bytes_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import "github.com/functionstream/function-stream/go-sdk/state/common"

type BytesCodec struct{}

func (c BytesCodec) Encode(value []byte) ([]byte, error) { return common.DupBytes(value), nil }

func (c BytesCodec) Decode(data []byte) ([]byte, error) { return common.DupBytes(data), nil }

func (c BytesCodec) EncodedSize() int { return -1 }

func (c BytesCodec) IsOrderedKeyCodec() bool { return true }
51 changes: 51 additions & 0 deletions go-sdk/state/codec/default_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import (
"fmt"
"reflect"
)

func DefaultCodecFor[V any]() (Codec[V], error) {
var zero V
t := reflect.TypeOf(zero)
if t == nil {
return nil, fmt.Errorf("default codec: type parameter V must not be interface without type constraint")
}
k := t.Kind()
switch k {
case reflect.Bool:
return any(BoolCodec{}).(Codec[V]), nil
case reflect.Int32:
return any(OrderedInt32Codec{}).(Codec[V]), nil
case reflect.Int64:
return any(OrderedInt64Codec{}).(Codec[V]), nil
case reflect.Uint32:
return any(OrderedUint32Codec{}).(Codec[V]), nil
case reflect.Uint64:
return any(OrderedUint64Codec{}).(Codec[V]), nil
case reflect.Float32:
return any(OrderedFloat32Codec{}).(Codec[V]), nil
case reflect.Float64:
return any(OrderedFloat64Codec{}).(Codec[V]), nil
case reflect.String:
return any(StringCodec{}).(Codec[V]), nil
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Uint, reflect.Uint8, reflect.Uint16:
return any(JSONCodec[V]{}).(Codec[V]), nil
case reflect.Struct, reflect.Map, reflect.Slice, reflect.Array, reflect.Interface:
return any(JSONCodec[V]{}).(Codec[V]), nil
default:
return any(JSONCodec[V]{}).(Codec[V]), nil
}
}
37 changes: 37 additions & 0 deletions go-sdk/state/codec/float32_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import (
"encoding/binary"
"fmt"
"math"
)

type Float32Codec struct{}

func (c Float32Codec) Encode(value float32) ([]byte, error) {
out := make([]byte, 4)
binary.BigEndian.PutUint32(out, math.Float32bits(value))
return out, nil
}

func (c Float32Codec) Decode(data []byte) (float32, error) {
if len(data) != 4 {
return 0, fmt.Errorf("invalid float32 payload length: %d", len(data))
}
return math.Float32frombits(binary.BigEndian.Uint32(data)), nil
}

func (c Float32Codec) EncodedSize() int { return 4 }
func (c Float32Codec) IsOrderedKeyCodec() bool { return false }
Loading