Skip to content

[RFC] Implement Watermarking, Keying, Windowing, and Joining #9

@jecsand838

Description

@jecsand838

Support for Watermarking, Keying, Windowing, and Joining

In order to call pipelines v1.0.0 complete, we need to implement several features essential to processing streaming data. These features are enumerated below in the optimal order of development:

  1. Watermarking, KeyBy, and KeyedDataStream: Essential for partitioning stream data by key. Allows grouping of data by a key, i.e. a user ID to support per-key computations. (aggregating, counts, etc.). Implementing Watermarking should be part of this effort as well. Careful design and maybe a small refactor is required to avoid duplicating logic between KeyedDataStream and the existing DataStream.
  2. Windowing: Allows batch or incremental computation over keyed or non-keyed streams, i.e. sum, average, count, etc. This provides the ability to process data in discrete and incremental segments and is essential for real-time analytics. We'll want both sliding and tumbling windows for pipelines.
  3. Join: Allow data correlation from different streams that share the same key. This will allow us to merge two streams into a single output stream. The Join function will need to use Windowing.

These enhancements will bring pipelines closer to feature parity with established streaming frameworks such as Flink or Spark Streaming while maintaining a clean, dependency-free, and Go-oriented design.

After this work is done, a follow-up issue will be created for adding new methods to Pipeline which will build on-top of these enhancements.


1. KeyBy and KeyedDataStream + Watermarking

Proposed New Types & Function Signatures

TimestampExtractor

Defines how to extract an event-time timestamp from an item T.

type TimestampExtractor[T any] func(T) time.Time

WatermarkGenerator

WatermarkGenerator is invoked for each event to produce an updated watermark

type WatermarkGenerator[T any] interface {

    // OnEvent is called whenever a new event arrives, 
    OnEvent(event T, eventTime time.Time)

    // Returns the current watermark
    GetWatermark() time.Time
}

KeyBy

Inputs a DataStream[T] and keys it to return a KeyedDataStream[T, K]

func KeyBy[T any, K comparable](
    ds DataStream[T],
    keyFunc func(T) K,
    params ...Params
) KeyedDataStream[T, K] {
    // ...
}

KeyFunc

User-defined functions that extracts a key K from an item T.

type KeyFunc[T any, K comparable] func(T) K

KeyedDataStream

Represents a partitioned stream keyed by an attribute of type K.
The original DataStream plus the key extraction function & watermark functions are stored as shown below.

type KeyedDataStream[T any, K comparable] struct {
    DataStream[T]
    keyFunc KeyFunc[T, K]
    timestampExtractor TimestampExtractor[T]
    watermarkGenerator WatermarkGenerator[T]
}

Note: We need to ensure our implementation of KeyedDataStream is clean and doesn't duplicate DataStream logic.

Proposed KeyBy Usage Example

  1. KeyBy
ds := datastreams.FromArray([]Event{{GroupID: "a"}, {GroupID: "a"}})
// Create KeyedStream keyed off the Event.GroupID field.
ks := KeyBy(ds, func(e Event) string {return e.GroupID})

2. Windowing and Join Functions

Proposed New Types & Function Signatures

Window

func Window[T any, K comparable, R any](
    ds KeyedDataStream[T, K],
    spec WindowSpec,
    windowFunc WindowFunc[T, R],
    params ...Params
) KeyedDataStream[R, K] {
    // ...
}

Join

func Join[T any, K comparable, R any](
    left KeyedDataStream[T, K],
    right KeyedDataStream[U, K],
    spec WindowSpec,
    joinFunc JoinFunc[T, U, R],
    params ...Params
) KeyedDataStream[R, K] {
    // ...
}

WindowFunc

Processes items within a window, producing a single output

type WindowFunc[T any, R any] func([]T) (R, error)

JoinFunc

Merges two items from different streams into a single output stream.

type JoinFunc[T any, U any, R any] func(T, U) (R, error)

WindowSpecer

An unified inerface for window definitions

type WindowSpecer interface {
    Type()  WindowType
    Unit()   WindowUnit

    // Config returns all fields, i.e. time/count sizesm etc.
    Spec() WindowConfig
}

WindowType

Enumerates the type of window, i.e. tumbling, sliding, or *session (*future expansion)

type WindowType int

const (
    Tumbling WindowType = iota
    Sliding
)

WindowUnit

Enumerates the potential units for a window's specified size and duration

type WindowUnit int

const (
    UnitTime WindowUnit = iota
    UnitCount
)

WindowConfig

WindowConfig contains spec values abstracted from unit or window type

type WindowConfig struct {
    Size        int64
    Duration int64
}

// Use building blocks above to define structs implementing WindowSpecer  for: 
//    TumblingWindowSpec 
//         and 
//    SlidingWindowSpec, etc.
// Along with constructors, i.e. 
//    func TumblingWindowOf(duration time.Duration)
//        and
//   func SlidingWindowOfCount(size int, slideDuration int)

Proposed Usage Examples

  1. window
ds := datastreams.FromArray([]Event{{GroupID: "a"}, {GroupID: "a"}})
ks := KeyBy(ds, func(e Event) string {return e.GroupID})

// Time base tumbling window
tumbleAgg := Window(ks,
    TumblingWindowOf(5 * time.Second),
    func(batch []Event) (float64, error) {
        var sum float64
        for _, e := range batch {
            sum += e.Value
        }
        return sum, nil
    },
)

// Count based sliding window
slidingAgg := Window(ks,
    SlidingWindowOfCount(100, 50),
    func(batch []Event) (int, error) {
        return len(batch), nil
    },
)
  1. join
ksProfiles := KeyBy(dsProfiles, func(p ProfileEvent) string { return p.UserID })
ksEvents := KeyBy(dsEvents, func(e UserEvent) string { return e.UserID })

joined := Join(ksProfiles, ksEvents,
    TumblingWindowOf(10 * time.Second),
    func(p ProfileEvent, e UserEvent) (JoinedRecord, error) {
        return JoinedRecord{
            UserID: p.UserID,
            ProfileData: p.Data,
            EventData: e.Data,
        }, nil
    },
    datastreams.Params{
        SegmentName: "profile-join",
    },
)

Next Steps:

  • Review & Discuss above proposals
  • KeyBy Function & Watermark Development
  • Tumbling Window Function Development
  • Sliding Window Function Development
  • Join Function Development

Metadata

Metadata

Assignees

No one assigned

    Labels

    RFCRFC issuesenhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions