Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public synchronized void start()
/**
* Stop the SidecarStatePersister gracefully, blocking to await for any pending flushes to complete.
*/
@Override
public void stop()
{
stop(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,18 @@ protected void run() throws NotEnoughReplicasException
eventConsumer.accept(event);
}

// flush before persisting state; if delivery fails this throws,
// skipping persist() so the micro-batch is retried on the next run
try
{
eventConsumer.flush();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during event consumer flush", e);
}

// persist end state
CdcState endState = it.endState();
persist(endState, tokenRange);
Expand Down Expand Up @@ -442,5 +454,6 @@ protected void refreshSchema()
public void close()
{
this.stop();
statePersister.stop();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to stop the statePersistor in the Cdc#stop method. Right now we have:

    public void stop(boolean blocking) throws ExecutionException, InterruptedException
    {
        if (isRunning.get() && isRunning.compareAndSet(true, false))
        {
            LOGGER.info("Stopping CDC Consumer jobId={} partitionId={}", jobId, partitionId);
            CompletableFuture<Void> activeFuture = active.get();
            if (activeFuture != null && blocking)
            {
                // block until active future completes
                long timeout = cdcOptions.stopTimeout().toMillis();
                try
                {
                    activeFuture.get(timeout, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e)
                {
                    LOGGER.warn("Failed to cleanly shutdown active future after {} millis", timeout);
                    stats.cdcConsumerStopTimeout();
                }
            }
            LOGGER.info("Stopped CDC Consumer jobId={} partitionId={}", jobId, partitionId);
        }
    }

Tracing the paths that lead to here, it's the exception path from runSafe we'll never hit because handleError eats everything. So I guess the question is: if someone calls stop() on Cdc, do they also expect the StatePersister to flush and stop as well? Or should those be able to happen separately? I think it's the former and, if so, we should wire that child stop call to the parent (i.e. if Cdc stops, StatePersister stops).

Then a call to close would call Cdc#stop which would then also stop children.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right statePersister.close() should be called when closing CDC, If you closely notice there is another method which does that. With that said, having public stop method in Cdc is confusing. I will make stop method in CDC private, so that only close() method is available which does both CDC.stop() and statePersister.stop()

public void close()
    {
        this.stop();
        statePersister.stop();
    }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,17 @@

public interface EventConsumer extends Consumer<CdcEvent>
{
/**
* Flush any pending events to the transport layer.
* Called after each micro-batch and before CDC state is persisted, to ensure
* all events have been durably delivered before the commit-log position advances.
* If delivery fails, implementations must throw so that state is NOT persisted
* and the micro-batch will be retried on the next run.
*
* @throws InterruptedException if the calling thread is interrupted while flushing
*/
default void flush() throws InterruptedException
{
// no-op by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ default CdcState loadCanonicalState(String jobId, int partitionId, @Nullable Tok
.orElse(CdcState.BLANK);
}

/**
* Stop the StatePersister, flushing any buffered state to persistent storage before returning.
* Implementations that buffer state asynchronously must override this to ensure no state is lost on shutdown.
*/
default void stop()
{
// no-op by default
}

/**
* Load last CDC state from persistant storage after a bounce, restart or configuration change.
* <p>
Expand Down