Skip to content

Implement Join for KeyedDataStreams #12

@jecsand838

Description

@jecsand838

Implement Join for KeyedDataStreams

Description

Once windowing is in place, we can enable stream correlation via Join. This allows merging data from two keyed streams that share the same key. Typically, the join is scoped to a window, ensuring that only items close in time (or within a certain count-based window) are joined.

With a well-defined join function, we can produce a merged output item of a new type. This feature is critical for combining related data. For instance, joining user profile updates with user actions in real time.


Proposed New Types & Function Signatures

Join

Performs a windowed join on two keyed streams of matching key type:

func Join[T any, U any, K comparable, R any](
    left  KeyedDataStream[T, K],
    right KeyedDataStream[U, K],
    spec  WindowSpec,
    joinFunc JoinFunc[T, U, R],
    params ...Params,
) KeyedDataStream[R, K] {
    // ...
}
  • left / right: Two KeyedDataStream instances keyed by the same type K.
  • spec: A WindowSpec that defines the window boundaries for matching items.
  • joinFunc: A function that combines items of type T and U into a single result of type R.

JoinFunc

User-defined function that merges two items into a single output:

type JoinFunc[T any, U any, R any] func(T, U) (R, error)

Proposed Usage Example

type ProfileEvent struct {
    UserID string
    Data   string
}

type UserEvent struct {
    UserID string
    Data   string
}

type JoinedRecord struct {
    UserID       string
    ProfileData  string
    EventData    string
}

dsProfiles := datastreams.FromArray([]ProfileEvent{ /* ... */ })
dsEvents   := datastreams.FromArray([]UserEvent{ /* ... */ })

ksProfiles := KeyBy(dsProfiles, func(p ProfileEvent) string { return p.UserID })
ksEvents   := KeyBy(dsEvents,   func(e UserEvent)   string { return e.UserID  })

joined := Join(
    ksProfiles,
    ksEvents,
    TumblingWindowOf(10*time.Second), // example window
    func(p ProfileEvent, e UserEvent) (JoinedRecord, error) {
        return JoinedRecord{
            UserID:      p.UserID,
            ProfileData: p.Data,
            EventData:   e.Data,
        }, nil
    },
    // optional params
)

In this example, events from the ProfileEvent and UserEvent streams are keyed by UserID and joined within a 10-second tumbling window. The resulting KeyedDataStream[JoinedRecord, string] can be consumed for downstream processing.


Tasks

  • Implement Join for KeyedDataStream[T, K] and KeyedDataStream[U, K]:
    • Accept a WindowSpec to define the join window boundaries.
    • Implement matching logic based on key K.
  • Design a buffer or state mechanism to hold items until the join window closes (based on event-time and watermarks).
  • Validate correctness with both time-based and count-based windows.
  • Test various scenarios (matching keys, missing keys, late or out-of-order events).

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions