Implement Join for KeyedDataStreams
Description
Once windowing is in place, we can enable stream correlation via Join. This allows merging data from two keyed streams that share the same key. Typically, the join is scoped to a window, ensuring that only items close in time (or within a certain count-based window) are joined.
With a well-defined join function, we can produce a merged output item of a new type. This feature is critical for combining related data. For instance, joining user profile updates with user actions in real time.
Proposed New Types & Function Signatures
Join
Performs a windowed join on two keyed streams of matching key type:
func Join[T any, U any, K comparable, R any](
left KeyedDataStream[T, K],
right KeyedDataStream[U, K],
spec WindowSpec,
joinFunc JoinFunc[T, U, R],
params ...Params,
) KeyedDataStream[R, K] {
// ...
}
left / right: Two KeyedDataStream instances keyed by the same type K.
spec: A WindowSpec that defines the window boundaries for matching items.
joinFunc: A function that combines items of type T and U into a single result of type R.
JoinFunc
User-defined function that merges two items into a single output:
type JoinFunc[T any, U any, R any] func(T, U) (R, error)
Proposed Usage Example
type ProfileEvent struct {
UserID string
Data string
}
type UserEvent struct {
UserID string
Data string
}
type JoinedRecord struct {
UserID string
ProfileData string
EventData string
}
dsProfiles := datastreams.FromArray([]ProfileEvent{ /* ... */ })
dsEvents := datastreams.FromArray([]UserEvent{ /* ... */ })
ksProfiles := KeyBy(dsProfiles, func(p ProfileEvent) string { return p.UserID })
ksEvents := KeyBy(dsEvents, func(e UserEvent) string { return e.UserID })
joined := Join(
ksProfiles,
ksEvents,
TumblingWindowOf(10*time.Second), // example window
func(p ProfileEvent, e UserEvent) (JoinedRecord, error) {
return JoinedRecord{
UserID: p.UserID,
ProfileData: p.Data,
EventData: e.Data,
}, nil
},
// optional params
)
In this example, events from the ProfileEvent and UserEvent streams are keyed by UserID and joined within a 10-second tumbling window. The resulting KeyedDataStream[JoinedRecord, string] can be consumed for downstream processing.
Tasks
Implement
Joinfor KeyedDataStreamsDescription
Once windowing is in place, we can enable stream correlation via Join. This allows merging data from two keyed streams that share the same key. Typically, the join is scoped to a window, ensuring that only items close in time (or within a certain count-based window) are joined.
With a well-defined join function, we can produce a merged output item of a new type. This feature is critical for combining related data. For instance, joining user profile updates with user actions in real time.
Proposed New Types & Function Signatures
JoinPerforms a windowed join on two keyed streams of matching key type:
left/right: TwoKeyedDataStreaminstances keyed by the same typeK.spec: AWindowSpecthat defines the window boundaries for matching items.joinFunc: A function that combines items of typeTandUinto a single result of typeR.JoinFuncUser-defined function that merges two items into a single output:
Proposed Usage Example
In this example, events from the
ProfileEventandUserEventstreams are keyed byUserIDand joined within a 10-second tumbling window. The resultingKeyedDataStream[JoinedRecord, string]can be consumed for downstream processing.Tasks
JoinforKeyedDataStream[T, K]andKeyedDataStream[U, K]:WindowSpecto define the join window boundaries.K.