2020import java .util .HashMap ;
2121import java .util .Map ;
2222import java .util .Optional ;
23+ import java .util .Set ;
24+ import java .util .concurrent .ConcurrentHashMap ;
2325import java .util .concurrent .ExecutorService ;
26+ import java .util .concurrent .Future ;
27+ import java .util .concurrent .ScheduledExecutorService ;
28+ import java .util .concurrent .ScheduledFuture ;
29+ import java .util .concurrent .TimeUnit ;
2430
2531import org .slf4j .Logger ;
2632import org .slf4j .LoggerFactory ;
2733
2834import io .fabric8 .kubernetes .api .model .HasMetadata ;
2935import io .fabric8 .kubernetes .client .KubernetesClientException ;
3036import io .javaoperatorsdk .operator .OperatorException ;
37+ import io .javaoperatorsdk .operator .ReconciliationTimeoutException ;
3138import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
3239import io .javaoperatorsdk .operator .api .config .ControllerConfiguration ;
3340import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
@@ -59,7 +66,12 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
5966 private final RateLimiter <? extends RateLimitState > rateLimiter ;
6067 private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
6168 private final Map <String , Object > metricsMetadata ;
69+ private final Duration reconciliationTimeout ;
70+ private final Map <ResourceID , Future <?>> runningFutures = new ConcurrentHashMap <>();
71+ private final Map <ResourceID , ScheduledFuture <?>> timeoutFutures = new ConcurrentHashMap <>();
72+ private final Set <ResourceID > timedOutResources = ConcurrentHashMap .newKeySet ();
6273 private ExecutorService executor ;
74+ private ScheduledExecutorService timeoutScheduler ;
6375
6476 public EventProcessor (
6577 EventSourceManager <P > eventSourceManager , ConfigurationService configurationService ) {
@@ -100,6 +112,8 @@ private EventProcessor(
100112 this .metrics = metrics != null ? metrics : Metrics .NOOP ;
101113 this .eventSourceManager = eventSourceManager ;
102114 this .rateLimiter = controllerConfiguration .getRateLimiter ();
115+ this .reconciliationTimeout =
116+ (Duration ) controllerConfiguration .reconciliationTimeout ().orElse (null );
103117
104118 metricsMetadata =
105119 Optional .ofNullable (eventSourceManager .getController ())
@@ -182,7 +196,16 @@ private void submitReconciliationExecution(ResourceState state) {
182196 state .unMarkEventReceived (triggerOnAllEvents ());
183197 metrics .reconciliationSubmitted (latest , state .getRetry (), metricsMetadata );
184198 log .debug ("Executing events for custom resource. Scope: {}" , executionScope );
185- executor .execute (new ReconcilerExecutor (resourceID , executionScope ));
199+ Future <?> future = executor .submit (new ReconcilerExecutor (resourceID , executionScope ));
200+ if (reconciliationTimeout != null ) {
201+ runningFutures .put (resourceID , future );
202+ var timeoutFuture =
203+ timeoutScheduler .schedule (
204+ () -> handleReconciliationTimeout (resourceID , future ),
205+ reconciliationTimeout .toMillis (),
206+ TimeUnit .MILLISECONDS );
207+ timeoutFutures .put (resourceID , timeoutFuture );
208+ }
186209 } else {
187210 log .debug (
188211 "Skipping executing controller. Controller in execution: {}. Latest"
@@ -450,17 +473,22 @@ private boolean isRetryConfigured() {
450473 @ Override
451474 public synchronized void stop () {
452475 this .running = false ;
476+ timeoutFutures .values ().forEach (f -> f .cancel (false ));
477+ timeoutFutures .clear ();
478+ timedOutResources .clear ();
479+ runningFutures .clear ();
453480 }
454481
455482 @ Override
456483 public synchronized void start () throws OperatorException {
457484 log .debug ("Starting event processor: {}" , this );
458485 // on restart new executor service is created and needs to be set here
459- executor =
460- controllerConfiguration
461- .getConfigurationService ()
462- .getExecutorServiceManager ()
463- .reconcileExecutorService ();
486+ var executorServiceManager =
487+ controllerConfiguration .getConfigurationService ().getExecutorServiceManager ();
488+ executor = executorServiceManager .reconcileExecutorService ();
489+ if (reconciliationTimeout != null ) {
490+ timeoutScheduler = executorServiceManager .scheduledExecutorService ();
491+ }
464492 this .running = true ;
465493 handleAlreadyMarkedEvents ();
466494 }
@@ -476,6 +504,29 @@ private void handleAlreadyMarkedEvents() {
476504 }
477505 }
478506
507+ private void handleReconciliationTimeout (ResourceID resourceID , Future <?> future ) {
508+ log .warn ("Reconciliation timed out for resource: {}" , resourceID );
509+ timedOutResources .add (resourceID );
510+ future .cancel (true );
511+ }
512+
513+ private void cancelTimeoutIfActive (ResourceID resourceID ) {
514+ if (reconciliationTimeout != null ) {
515+ runningFutures .remove (resourceID );
516+ var scheduledTimeout = timeoutFutures .remove (resourceID );
517+ if (scheduledTimeout != null ) {
518+ scheduledTimeout .cancel (false );
519+ }
520+ }
521+ }
522+
523+ private static boolean isInterruptedException (Throwable e ) {
524+ if (e instanceof InterruptedException ) {
525+ return true ;
526+ }
527+ return e .getCause () != null && e .getCause () instanceof InterruptedException ;
528+ }
529+
479530 private class ReconcilerExecutor implements Runnable {
480531 private final ExecutionScope <P > executionScope ;
481532 private final ResourceID resourceID ;
@@ -531,10 +582,33 @@ public void run() {
531582 MDCUtils .addResourceInfo (executionScope .getResource ());
532583 metrics .reconciliationStarted (executionScope .getResource (), metricsMetadata );
533584 thread .setName ("ReconcilerExecutor-" + controllerName () + "-" + thread .getId ());
534- PostExecutionControl <P > postExecutionControl =
535- reconciliationDispatcher .handleExecution (executionScope );
585+
586+ PostExecutionControl <P > postExecutionControl ;
587+ try {
588+ postExecutionControl = reconciliationDispatcher .handleExecution (executionScope );
589+ } catch (Exception e ) {
590+ if (timedOutResources .remove (resourceID ) || isInterruptedException (e )) {
591+ log .warn ("Reconciliation timed out for: {}" , executionScope );
592+ postExecutionControl =
593+ PostExecutionControl .exceptionDuringExecution (
594+ new ReconciliationTimeoutException (reconciliationTimeout ));
595+ } else {
596+ throw e ;
597+ }
598+ }
599+
600+ // Check if this reconciliation was timed out (e.g. timeout fired just as execution
601+ // finished)
602+ if (timedOutResources .remove (resourceID )) {
603+ log .warn ("Reconciliation timed out for: {}" , executionScope );
604+ postExecutionControl =
605+ PostExecutionControl .exceptionDuringExecution (
606+ new ReconciliationTimeoutException (reconciliationTimeout ));
607+ }
608+
536609 eventProcessingFinished (executionScope , postExecutionControl );
537610 } finally {
611+ cancelTimeoutIfActive (resourceID );
538612 metrics .reconciliationFinished (
539613 executionScope .getResource (), executionScope .getRetryInfo (), metricsMetadata );
540614 // restore original name
0 commit comments