CASSANALYTICS-126: Flush event consumer before persisting CDC state to prevent data loss on failure#178
CASSANALYTICS-126: Flush event consumer before persisting CDC state to prevent data loss on failure#178jyothsnakonisa wants to merge 1 commit intoapache:trunkfrom
Conversation
…o prevent data loss on failure
| return; | ||
| } | ||
|
|
||
| asyncExecutor.cancelTimer(this.timerId); |
There was a problem hiding this comment.
We should switch the order here I think. If we throw an exception during the flush() call below, the timerId will be set to -1 at this point allowing subsequent start() calls even though the flush failed. My thinking is we should have a flow of:
- flush
- If that succeeds, THEN we stop the timer
Since this is in a synchronized block we're not at risk of any kernel scheduling or races there. But should be in line with the spirit of what we're trying to accomplish on this PR.
There was a problem hiding this comment.
CdcPublisher.stop() method eventually call this stop method in state persister, CdcPublisher.stop() gets called in the following scenarios.
- TokenRanges changes (so that sidecar starts consumers for new token ranges)
- Cdc & kafka config changes.
In both the scenarios, a subsequent start is called after stop.
Now the question is, if flush fails, do we not allow restart of iterators to handle above changes? If we don't allow restart, it will remain in the broken state forever.
With the reversed ordering, if flush() throws and cancelTimer() would never be called, leaving the periodic timer still firing and timerId would not be reset, blocking any subsequent start() calls. So I think we should keep this order to be able to restart
| .collect(Collectors.toSet()); | ||
| } | ||
|
|
||
| protected void run() throws NotEnoughReplicasException |
There was a problem hiding this comment.
I was tracing the code here, and it looks like the caller of run() will basically absorb all Throwables that bubble up, log them, and then continue executing. That feels like it's a huge problematic gap in our flow. The code I'm looking at:
protected void runSafe(CompletableFuture<Void> future)
{
try
{
run();
completeActiveFuture(future);
scheduleNextRun();
}
catch (NotEnoughReplicasException e)
{
// NotEnoughReplicasException can occur when too many replicas are down
// OR if there are no new commit logs to read if writes are idle on the cluster
completeActiveFuture(future);
scheduleRun(cdcOptions.sleepWhenInsufficientReplicas().toMillis());
}
catch (Throwable t)
{
completeActiveFuture(future);
if (handleError(t))
{
LOGGER.warn("CdcConsumer epoch failed with recoverable error, scheduling next run jobId={} partition={} epoch={}",
jobId, partitionId, currentState.epoch, t);
scheduleNextRun();
}
else
{
LOGGER.error("CdcConsumer epoch failed with unrecoverable error jobId={} partition={} epoch={}",
jobId, partitionId, currentState.epoch, t);
stop();
}
}
}
handleError just logs the message and keeps on trucking. OutOfMemory? We keep going. Disk failures? Keep going.
We have the JVMStabilityInspector in Cassandra that I ended up writing for just this type of problem - wrapping intelligence and logic in a central place around "is this a recoverable error or not? And should users be able to configure it as such?"
So that said - I don't think we need to go THAT far (certainly not in this patch), but just wondering if there's something we should do here w/augmenting the handleError path or the exception handling path in runSafe that's less "absorb and allow all errors" and has a little more intelligence around our exception handling situation to prevent the situation that gave rise to this ticket: data loss from exception cases.
There was a problem hiding this comment.
I think the intention behind silently logging and continuing in case of failures is not to block retrying. However I agree on your point that for irrecoverable errors may be we should stop()
EX: if there is an out of memory exception during run method, do we stop consumers completely? what if the memory usage goes down, how does the sidecar start consumers again in that case?
We have to think about those scenarios before changing the behavior to fail on irrecoverable errors.
| public void close() | ||
| { | ||
| this.stop(); | ||
| statePersister.stop(); |
There was a problem hiding this comment.
I think we also need to stop the statePersistor in the Cdc#stop method. Right now we have:
public void stop(boolean blocking) throws ExecutionException, InterruptedException
{
if (isRunning.get() && isRunning.compareAndSet(true, false))
{
LOGGER.info("Stopping CDC Consumer jobId={} partitionId={}", jobId, partitionId);
CompletableFuture<Void> activeFuture = active.get();
if (activeFuture != null && blocking)
{
// block until active future completes
long timeout = cdcOptions.stopTimeout().toMillis();
try
{
activeFuture.get(timeout, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e)
{
LOGGER.warn("Failed to cleanly shutdown active future after {} millis", timeout);
stats.cdcConsumerStopTimeout();
}
}
LOGGER.info("Stopped CDC Consumer jobId={} partitionId={}", jobId, partitionId);
}
}
Tracing the paths that lead to here, it's the exception path from runSafe we'll never hit because handleError eats everything. So I guess the question is: if someone calls stop() on Cdc, do they also expect the StatePersister to flush and stop as well? Or should those be able to happen separately? I think it's the former and, if so, we should wire that child stop call to the parent (i.e. if Cdc stops, StatePersister stops).
Then a call to close would call Cdc#stop which would then also stop children.
There was a problem hiding this comment.
Yes you are right statePersister.close() should be called when closing CDC, If you closely notice there is another method which does that. With that said, having public stop method in Cdc is confusing. I will make stop method in CDC private, so that only close() method is available which does both CDC.stop() and statePersister.stop()
public void close()
{
this.stop();
statePersister.stop();
}
No description provided.