Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions cmd/airbyte-source/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,33 @@ func init() {

func ReadCommand(ch *Helper) *cobra.Command {
readCmd := &cobra.Command{
Use: "read",
Short: "Converts rows from a PlanetScale database into AirbyteRecordMessages",
Run: func(cmd *cobra.Command, args []string) {
Use: "read",
Short: "Converts rows from a PlanetScale database into AirbyteRecordMessages",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

ch.Logger = internal.NewLogger(cmd.OutOrStdout())
if readSourceConfigFilePath == "" {
fmt.Fprintf(cmd.ErrOrStderr(), "Please pass path to a valid source config file via the [%v] argument", "config")
os.Exit(1)
return fmt.Errorf("missing config file path")
}

if readSourceCatalogPath == "" {
fmt.Fprintf(cmd.OutOrStdout(), "Please pass path to a valid source catalog file via the [%v] argument", "config")
os.Exit(1)
return fmt.Errorf("missing catalog file path")
}

psc, err := parseSource(ch.FileReader, readSourceConfigFilePath)
if err != nil {
fmt.Fprintln(cmd.OutOrStdout(), "Please provide path to a valid configuration file")
return
return err
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Ensure database")
if err := ch.EnsureDB(psc); err != nil {
fmt.Fprintln(cmd.OutOrStdout(), "Unable to connect to PlanetScale Database")
return
return err
}

defer func() {
Expand All @@ -60,19 +61,19 @@ func ReadCommand(ch *Helper) *cobra.Command {
cs, err := checkConnectionStatus(ctx, ch.Database, psc)
if err != nil {
ch.Logger.ConnectionStatus(cs)
return
return err
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Reading catalog")
catalog, err := readCatalog(readSourceCatalogPath)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read catalog: %+v", err))
os.Exit(1)
return fmt.Errorf("unable to read catalog: %w", err)
}

if len(catalog.Streams) == 0 {
ch.Logger.Log(internal.LOGLEVEL_ERROR, "Catalog has no streams")
return
return nil
}

state := ""
Expand All @@ -81,7 +82,7 @@ func ReadCommand(ch *Helper) *cobra.Command {
b, err := os.ReadFile(stateFilePath)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read state : %v", err))
os.Exit(1)
return fmt.Errorf("unable to read state: %w", err)
}
state = string(b)
}
Expand All @@ -90,16 +91,17 @@ func ReadCommand(ch *Helper) *cobra.Command {
shards, err := ch.Database.ListShards(ctx, psc)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to list shards : %v", err))
os.Exit(1)
return fmt.Errorf("unable to list shards: %w", err)
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Reading state")
syncState, err := readState(state, psc, catalog.Streams, shards, ch.Logger)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read state : %v", err))
os.Exit(1)
return fmt.Errorf("unable to read state: %w", err)
}

var readErr error
for _, configuredStream := range catalog.Streams {
keyspaceOrDatabase := configuredStream.Stream.Namespace
if keyspaceOrDatabase == "" {
Expand All @@ -109,33 +111,49 @@ func ReadCommand(ch *Helper) *cobra.Command {
streamState, ok := syncState.Streams[streamStateKey]
if !ok {
ch.Logger.Error(fmt.Sprintf("Unable to read state for stream %v", streamStateKey))
os.Exit(1)
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
return fmt.Errorf("unable to read state for stream %v", streamStateKey)
}

ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_STARTED)

streamFailed := false
for shardName, shardState := range streamState.Shards {
var tc *psdbconnectv1alpha1.TableCursor

tc, err = shardState.SerializedCursorToTableCursor(configuredStream)
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Using serialized cursor for stream %s", streamStateKey))
if err != nil {
ch.Logger.Error(fmt.Sprintf("Invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
streamFailed = true
break
}

sc, err := ch.Database.Read(ctx, cmd.OutOrStdout(), psc, configuredStream, tc)
if err != nil {
ch.Logger.Error(err.Error())
os.Exit(1)
streamFailed = true
break

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one shard errors and breaks here, we skip all other shards. We should continue here instead so we can process the other shards.

Somewhat related to this, when an error occurs, ch.Database.Read returns both the error and the "progress-so-far" cursor sc. Since we check the error first, we break before updating the stream's state with sc, which might have advanced from previous iterations before a later one times out. We should move the if sc != nil { branch below to before the error check.

}

if sc != nil {
// if we get any new state, we assign it here.
// otherwise, the older state is round-tripped back to Airbyte.
syncState.Streams[streamStateKey].Shards[shardName] = sc
}
ch.Logger.State(syncState)
}

// Always emit state to checkpoint whatever progress was made,
// including partial progress when only some shards succeeded.
ch.Logger.StreamState(keyspaceOrDatabase, configuredStream.Stream.Name, syncState.Streams[streamStateKey])

if streamFailed {
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
readErr = fmt.Errorf("read failed for stream %v", streamStateKey)
} else {
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_COMPLETE)
}
}

return readErr
},
}
readCmd.Flags().StringVar(&readSourceCatalogPath, "catalog", "", "Path to the PlanetScale catalog configuration")
Expand All @@ -153,9 +171,26 @@ func readState(state string, psc internal.PlanetScaleSource, streams []internal.
Streams: map[string]internal.ShardStates{},
}
if state != "" {
err := json.Unmarshal([]byte(state), &syncState)
if err != nil {
return syncState, err
// Try parsing as Airbyte v2 per-stream state array first
var perStreamStates []internal.AirbyteState
if err := json.Unmarshal([]byte(state), &perStreamStates); err == nil && len(perStreamStates) > 0 && perStreamStates[0].Type == internal.STATE_TYPE_STREAM {
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Parsing Airbyte v2 per-stream state (%d streams)", len(perStreamStates)))
for _, s := range perStreamStates {
if s.Stream != nil && s.Stream.StreamState != nil {
ns := s.Stream.StreamDescriptor.Namespace
if ns == "" {
ns = psc.Database
}
key := ns + ":" + s.Stream.StreamDescriptor.Name

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: there's a few areas where this key logic is duplicated. It would be nice to extract it to a shared helper to reduce the duplication and minimize the risk of drift between them.

syncState.Streams[key] = *s.Stream.StreamState
}
}
} else {
// Fall back to legacy global state format
err := json.Unmarshal([]byte(state), &syncState)
if err != nil {
return syncState, err
}
}
}

Expand Down
Loading