Support for Watermarking, Keying, Windowing, and Joining
In order to call pipelines v1.0.0 complete, we need to implement several features essential to processing streaming data. These features are enumerated below in the optimal order of development:
- Watermarking, KeyBy, and KeyedDataStream: Essential for partitioning stream data by key. Allows grouping of data by a key, i.e. a user ID to support per-key computations. (aggregating, counts, etc.). Implementing Watermarking should be part of this effort as well. Careful design and maybe a small refactor is required to avoid duplicating logic between
KeyedDataStream and the existing DataStream.
- Windowing: Allows batch or incremental computation over keyed or non-keyed streams, i.e. sum, average, count, etc. This provides the ability to process data in discrete and incremental segments and is essential for real-time analytics. We'll want both sliding and tumbling windows for
pipelines.
- Join: Allow data correlation from different streams that share the same key. This will allow us to merge two streams into a single output stream. The
Join function will need to use Windowing.
These enhancements will bring pipelines closer to feature parity with established streaming frameworks such as Flink or Spark Streaming while maintaining a clean, dependency-free, and Go-oriented design.
After this work is done, a follow-up issue will be created for adding new methods to Pipeline which will build on-top of these enhancements.
1. KeyBy and KeyedDataStream + Watermarking
Proposed New Types & Function Signatures
TimestampExtractor
Defines how to extract an event-time timestamp from an item T.
type TimestampExtractor[T any] func(T) time.Time
WatermarkGenerator
WatermarkGenerator is invoked for each event to produce an updated watermark
type WatermarkGenerator[T any] interface {
// OnEvent is called whenever a new event arrives,
OnEvent(event T, eventTime time.Time)
// Returns the current watermark
GetWatermark() time.Time
}
KeyBy
Inputs a DataStream[T] and keys it to return a KeyedDataStream[T, K]
func KeyBy[T any, K comparable](
ds DataStream[T],
keyFunc func(T) K,
params ...Params
) KeyedDataStream[T, K] {
// ...
}
KeyFunc
User-defined functions that extracts a key K from an item T.
type KeyFunc[T any, K comparable] func(T) K
KeyedDataStream
Represents a partitioned stream keyed by an attribute of type K.
The original DataStream plus the key extraction function & watermark functions are stored as shown below.
type KeyedDataStream[T any, K comparable] struct {
DataStream[T]
keyFunc KeyFunc[T, K]
timestampExtractor TimestampExtractor[T]
watermarkGenerator WatermarkGenerator[T]
}
Note: We need to ensure our implementation of KeyedDataStream is clean and doesn't duplicate DataStream logic.
Proposed KeyBy Usage Example
KeyBy
ds := datastreams.FromArray([]Event{{GroupID: "a"}, {GroupID: "a"}})
// Create KeyedStream keyed off the Event.GroupID field.
ks := KeyBy(ds, func(e Event) string {return e.GroupID})
2. Windowing and Join Functions
Proposed New Types & Function Signatures
Window
func Window[T any, K comparable, R any](
ds KeyedDataStream[T, K],
spec WindowSpec,
windowFunc WindowFunc[T, R],
params ...Params
) KeyedDataStream[R, K] {
// ...
}
Join
func Join[T any, K comparable, R any](
left KeyedDataStream[T, K],
right KeyedDataStream[U, K],
spec WindowSpec,
joinFunc JoinFunc[T, U, R],
params ...Params
) KeyedDataStream[R, K] {
// ...
}
WindowFunc
Processes items within a window, producing a single output
type WindowFunc[T any, R any] func([]T) (R, error)
JoinFunc
Merges two items from different streams into a single output stream.
type JoinFunc[T any, U any, R any] func(T, U) (R, error)
WindowSpecer
An unified inerface for window definitions
type WindowSpecer interface {
Type() WindowType
Unit() WindowUnit
// Config returns all fields, i.e. time/count sizesm etc.
Spec() WindowConfig
}
WindowType
Enumerates the type of window, i.e. tumbling, sliding, or *session (*future expansion)
type WindowType int
const (
Tumbling WindowType = iota
Sliding
)
WindowUnit
Enumerates the potential units for a window's specified size and duration
type WindowUnit int
const (
UnitTime WindowUnit = iota
UnitCount
)
WindowConfig
WindowConfig contains spec values abstracted from unit or window type
type WindowConfig struct {
Size int64
Duration int64
}
// Use building blocks above to define structs implementing WindowSpecer for:
// TumblingWindowSpec
// and
// SlidingWindowSpec, etc.
// Along with constructors, i.e.
// func TumblingWindowOf(duration time.Duration)
// and
// func SlidingWindowOfCount(size int, slideDuration int)
Proposed Usage Examples
window
ds := datastreams.FromArray([]Event{{GroupID: "a"}, {GroupID: "a"}})
ks := KeyBy(ds, func(e Event) string {return e.GroupID})
// Time base tumbling window
tumbleAgg := Window(ks,
TumblingWindowOf(5 * time.Second),
func(batch []Event) (float64, error) {
var sum float64
for _, e := range batch {
sum += e.Value
}
return sum, nil
},
)
// Count based sliding window
slidingAgg := Window(ks,
SlidingWindowOfCount(100, 50),
func(batch []Event) (int, error) {
return len(batch), nil
},
)
join
ksProfiles := KeyBy(dsProfiles, func(p ProfileEvent) string { return p.UserID })
ksEvents := KeyBy(dsEvents, func(e UserEvent) string { return e.UserID })
joined := Join(ksProfiles, ksEvents,
TumblingWindowOf(10 * time.Second),
func(p ProfileEvent, e UserEvent) (JoinedRecord, error) {
return JoinedRecord{
UserID: p.UserID,
ProfileData: p.Data,
EventData: e.Data,
}, nil
},
datastreams.Params{
SegmentName: "profile-join",
},
)
Next Steps:
Support for Watermarking, Keying, Windowing, and Joining
In order to call
pipelinesv1.0.0 complete, we need to implement several features essential to processing streaming data. These features are enumerated below in the optimal order of development:KeyedDataStreamand the existingDataStream.pipelines.Joinfunction will need to use Windowing.These enhancements will bring
pipelinescloser to feature parity with established streaming frameworks such as Flink or Spark Streaming while maintaining a clean, dependency-free, and Go-oriented design.After this work is done, a follow-up issue will be created for adding new methods to
Pipelinewhich will build on-top of these enhancements.1. KeyBy and KeyedDataStream + Watermarking
Proposed New Types & Function Signatures
TimestampExtractor
Defines how to extract an event-time timestamp from an item T.
type TimestampExtractor[T any] func(T) time.TimeWatermarkGenerator
WatermarkGenerator is invoked for each event to produce an updated watermark
KeyBy
Inputs a
DataStream[T]and keys it to return aKeyedDataStream[T, K]KeyFunc
User-defined functions that extracts a key K from an item T.
type KeyFunc[T any, K comparable] func(T) KKeyedDataStream
Represents a partitioned stream keyed by an attribute of type K.
The original DataStream plus the key extraction function & watermark functions are stored as shown below.
Note: We need to ensure our implementation of
KeyedDataStreamis clean and doesn't duplicateDataStreamlogic.Proposed
KeyByUsage ExampleKeyBy2. Windowing and Join Functions
Proposed New Types & Function Signatures
Window
Join
WindowFunc
Processes items within a window, producing a single output
type WindowFunc[T any, R any] func([]T) (R, error)JoinFunc
Merges two items from different streams into a single output stream.
type JoinFunc[T any, U any, R any] func(T, U) (R, error)WindowSpecer
An unified inerface for window definitions
WindowType
Enumerates the type of window, i.e. tumbling, sliding, or *session (*future expansion)
WindowUnit
Enumerates the potential units for a window's specified size and duration
WindowConfig
WindowConfig contains spec values abstracted from unit or window type
Proposed Usage Examples
windowjoinNext Steps:
KeyByFunction &WatermarkDevelopmentWindowFunction DevelopmentWindowFunction DevelopmentJoinFunction Development