Skip to content

Commit 4875146

Browse files
committed
improve: use onList() for host resource handling
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent e7de43f commit 4875146

9 files changed

Lines changed: 43 additions & 156 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
2828
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
2929
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
30-
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache;
3130

3231
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
3332
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES;
@@ -143,11 +142,9 @@
143142
boolean comparableResourceVersions() default DEFAULT_COMPARABLE_RESOURCE_VERSION;
144143

145144
/**
146-
* For read-cache-after-write consistency there are some corner cases where we need to check the
147-
* caches see {@link TemporaryResourceCache} periodically. This is the period in milliseconds.
148-
* Applicable only if {@link #comparableResourceVersions()} is true.
149-
*
150-
* @since 5.3.0
145+
* @deprecated Ghost resource checking is now triggered by the informer's onList callback. This
146+
* setting is no longer used.
151147
*/
148+
@Deprecated(forRemoval = true)
152149
long ghostResourceCacheCheckInterval() default DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS;
153150
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public class InformerConfiguration<R extends HasMetadata> {
5555
private Long informerListLimit;
5656
private FieldSelector fieldSelector;
5757
private Boolean comparableResourceVersions;
58-
private Duration ghostResourceCacheCheckInterval;
5958

6059
protected InformerConfiguration(
6160
Class<R> resourceClass,
@@ -70,8 +69,7 @@ protected InformerConfiguration(
7069
ItemStore<R> itemStore,
7170
Long informerListLimit,
7271
FieldSelector fieldSelector,
73-
Boolean comparableResourceVersions,
74-
Duration ghostResourceCacheCheckInterval) {
72+
Boolean comparableResourceVersions) {
7573
this(resourceClass);
7674
this.name = name;
7775
this.namespaces = namespaces;
@@ -85,7 +83,6 @@ protected InformerConfiguration(
8583
this.informerListLimit = informerListLimit;
8684
this.fieldSelector = fieldSelector;
8785
this.comparableResourceVersions = comparableResourceVersions;
88-
this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval;
8986
}
9087

9188
private InformerConfiguration(Class<R> resourceClass) {
@@ -121,8 +118,7 @@ public static <R extends HasMetadata> InformerConfiguration<R>.Builder builder(
121118
original.itemStore,
122119
original.informerListLimit,
123120
original.fieldSelector,
124-
original.comparableResourceVersions,
125-
original.ghostResourceCacheCheckInterval)
121+
original.comparableResourceVersions)
126122
.builder;
127123
}
128124

@@ -301,10 +297,6 @@ public boolean isComparableResourceVersions() {
301297
return comparableResourceVersions;
302298
}
303299

304-
public Duration getGhostResourceCacheCheckInterval() {
305-
return ghostResourceCacheCheckInterval;
306-
}
307-
308300
@SuppressWarnings("UnusedReturnValue")
309301
public class Builder {
310302

@@ -323,9 +315,6 @@ public InformerConfiguration<R> buildForController() {
323315
comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION;
324316
}
325317

326-
if (ghostResourceCacheCheckInterval == null) {
327-
ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL;
328-
}
329318
return InformerConfiguration.this;
330319
}
331320

@@ -341,10 +330,6 @@ public InformerConfiguration<R> build() {
341330
comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION;
342331
}
343332

344-
if (ghostResourceCacheCheckInterval == null) {
345-
ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL;
346-
}
347-
348333
return InformerConfiguration.this;
349334
}
350335

@@ -392,8 +377,6 @@ public InformerConfiguration<R>.Builder initFromAnnotation(
392377
.map(f -> new FieldSelector.Field(f.path(), f.value(), f.negated()))
393378
.toList()));
394379
withComparableResourceVersions(informerConfig.comparableResourceVersions());
395-
withGhostResourceCacheCheckInterval(
396-
Duration.ofMillis(informerConfig.ghostResourceCacheCheckInterval()));
397380
}
398381
return this;
399382
}
@@ -500,8 +483,8 @@ public Builder withComparableResourceVersions(boolean comparableResourceVersions
500483
return this;
501484
}
502485

486+
@Deprecated(forRemoval = true)
503487
public Builder withGhostResourceCacheCheckInterval(Duration ghostResourceCacheCheckInterval) {
504-
InformerConfiguration.this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval;
505488
return this;
506489
}
507490
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,9 @@ public Builder<R> withComparableResourceVersion(boolean comparableResourceVersio
291291
return this;
292292
}
293293

294+
@Deprecated(forRemoval = true)
294295
public Builder<R> withGhostResourceCacheCheckInterval(
295296
Duration ghostResourceCacheCheckInterval) {
296-
config.withGhostResourceCacheCheckInterval(ghostResourceCacheCheckInterval);
297297
return this;
298298
}
299299

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ public final class Constants {
4444
public static final boolean DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES = true;
4545
public static final boolean DEFAULT_COMPARABLE_RESOURCE_VERSION = true;
4646

47+
@Deprecated(forRemoval = true)
4748
public static final long DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS = 3 * 60 * 1000;
49+
50+
@Deprecated(forRemoval = true)
4851
public static final Duration DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL =
4952
Duration.ofMillis(DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS);
5053

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,7 @@ public synchronized void start() {
153153
if (isRunning()) {
154154
return;
155155
}
156-
temporaryResourceCache =
157-
new TemporaryResourceCache<>(
158-
comparableResourceVersions,
159-
configuration.getInformerConfig().getGhostResourceCacheCheckInterval().toMillis(),
160-
controllerConfiguration
161-
.getConfigurationService()
162-
.getExecutorServiceManager()
163-
.scheduledExecutorService(),
164-
this);
156+
temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions, this);
165157
this.cache = new InformerManager<>(client, configuration, this);
166158
cache.setControllerConfiguration(controllerConfiguration);
167159
cache.addIndexers(indexers);
@@ -178,6 +170,11 @@ public synchronized void stop() {
178170
manager().stop();
179171
}
180172

173+
@Override
174+
public void onList(String resourceVersion, boolean remainedEmpty) {
175+
temporaryResourceCache.checkGhostResources();
176+
}
177+
181178
@Override
182179
public void handleRecentResourceUpdate(
183180
ResourceID resourceID, R resource, R previousVersionOfResource) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import java.util.Map;
2020
import java.util.Optional;
2121
import java.util.concurrent.ConcurrentHashMap;
22-
import java.util.concurrent.ScheduledExecutorService;
23-
import java.util.concurrent.TimeUnit;
2422

2523
import org.slf4j.Logger;
2624
import org.slf4j.LoggerFactory;
@@ -71,18 +69,9 @@ public enum EventHandling {
7169

7270
public TemporaryResourceCache(
7371
boolean comparableResourceVersions,
74-
long ghostResourceCheckInterval,
75-
ScheduledExecutorService ghostCheckExecutor,
7672
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
7773
this.comparableResourceVersions = comparableResourceVersions;
7874
this.managedInformerEventSource = managedInformerEventSource;
79-
if (comparableResourceVersions) {
80-
ghostCheckExecutor.scheduleWithFixedDelay(
81-
this::checkGhostResources,
82-
ghostResourceCheckInterval,
83-
ghostResourceCheckInterval,
84-
TimeUnit.MILLISECONDS);
85-
}
8675
}
8776

8877
public synchronized void startEventFilteringModify(ResourceID resourceID) {
@@ -238,9 +227,10 @@ private String getLastSyncResourceVersion(String namespace) {
238227
* resources: when we create a resource that is deleted right after by third party and the related
239228
* informer have a disconnected watch and this watch needs to do a re-list when connected again.
240229
* In this case neither the ADD nor DELETE event will be propagated to the informer, but we
241-
* explicitly add resources to this cache. Those are cleaned up by this check.
230+
* explicitly add resources to this cache. Those are cleaned up by this check, which is triggered
231+
* by the informer's onList callback.
242232
*/
243-
private void checkGhostResources() {
233+
public void checkGhostResources() {
244234
log.debug("Checking for ghost resources.");
245235
var iterator = cache.entrySet().iterator();
246236
while (iterator.hasNext()) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
3131
import io.javaoperatorsdk.operator.api.config.ResolvedControllerConfiguration;
3232
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
33-
import io.javaoperatorsdk.operator.api.reconciler.Constants;
3433
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
3534
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
3635
import io.javaoperatorsdk.operator.processing.Controller;
@@ -64,8 +63,6 @@ public void setup() {
6463
when(controllerConfig.getConfigurationService()).thenReturn(new BaseConfigurationService());
6564
var ic = mock(InformerConfiguration.class);
6665
when(controllerConfig.getInformerConfig()).thenReturn(ic);
67-
when(ic.getGhostResourceCacheCheckInterval())
68-
.thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL);
6966

7067
setUpSource(new ControllerEventSource<>(testController), true, controllerConfig);
7168
}
@@ -329,7 +326,6 @@ public TestConfiguration(
329326
.withOnUpdateFilter(onUpdateFilter)
330327
.withGenericFilter(genericFilter)
331328
.withComparableResourceVersions(true)
332-
.withGhostResourceCacheCheckInterval(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL)
333329
.buildForController(),
334330
false);
335331
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import java.util.Optional;
2020
import java.util.Set;
2121
import java.util.concurrent.CountDownLatch;
22-
import java.util.concurrent.Executors;
23-
import java.util.concurrent.ScheduledExecutorService;
2422

2523
import org.junit.jupiter.api.BeforeEach;
2624
import org.junit.jupiter.api.Test;
@@ -38,7 +36,6 @@
3836
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
3937
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
4038
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
41-
import io.javaoperatorsdk.operator.api.reconciler.Constants;
4239
import io.javaoperatorsdk.operator.processing.event.EventHandler;
4340
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4441
import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils;
@@ -89,8 +86,6 @@ void setup() {
8986
when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig);
9087
when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET);
9188
when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class);
92-
when(informerConfig.getGhostResourceCacheCheckInterval())
93-
.thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL);
9489
informerEventSource =
9590
spy(
9691
new InformerEventSource<>(informerEventSourceConfiguration, clientMock) {
@@ -349,8 +344,7 @@ void ghostCheckRemovesCachedResourceDuringFilteringUpdate() {
349344
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
350345
when(mim.get(any())).thenReturn(Optional.empty());
351346

352-
var ghostCheckExecutor = Executors.newScheduledThreadPool(1);
353-
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes));
347+
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
354348
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
355349

356350
// put resource in cache and start a filtering update
@@ -363,15 +357,12 @@ void ghostCheckRemovesCachedResourceDuringFilteringUpdate() {
363357
when(mim.lastSyncResourceVersion(any())).thenReturn("3");
364358

365359
// ghost check should remove the cached resource
366-
await()
367-
.untilAsserted(
368-
() -> assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty());
360+
temporaryResourceCache.checkGhostResources();
361+
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
369362

370363
// complete the filtering update - the resource should not reappear
371364
temporaryResourceCache.doneEventFilterModify(resourceId, "2");
372365
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
373-
374-
ghostCheckExecutor.shutdownNow();
375366
}
376367

377368
@Test
@@ -383,8 +374,7 @@ void ghostCheckRunsConcurrentlyWithPutResource() {
383374
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
384375
when(mim.get(any())).thenReturn(Optional.empty());
385376

386-
var ghostCheckExecutor = Executors.newScheduledThreadPool(1);
387-
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes));
377+
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
388378
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
389379

390380
// put a resource that will become a ghost
@@ -394,22 +384,16 @@ void ghostCheckRunsConcurrentlyWithPutResource() {
394384
// advance sync version so ghost check removes it
395385
when(mim.lastSyncResourceVersion(any())).thenReturn("3");
396386

397-
await()
398-
.untilAsserted(
399-
() ->
400-
assertThat(
401-
temporaryResourceCache.getResourceFromCache(
402-
ResourceID.fromResource(deployment)))
403-
.isEmpty());
387+
temporaryResourceCache.checkGhostResources();
388+
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(deployment)))
389+
.isEmpty();
404390

405391
// now put a newer resource - should succeed even after ghost removal
406392
var newerDeployment = deploymentWithResourceVersion(4);
407393
temporaryResourceCache.putResource(newerDeployment);
408394
assertThat(
409395
temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(newerDeployment)))
410396
.isPresent();
411-
412-
ghostCheckExecutor.shutdownNow();
413397
}
414398

415399
@Test
@@ -421,8 +405,7 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
421405
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
422406
when(mim.get(any())).thenReturn(Optional.empty());
423407

424-
var ghostCheckExecutor = Executors.newScheduledThreadPool(1);
425-
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes));
408+
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
426409
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
427410

428411
// start filtering update and put resource
@@ -434,9 +417,8 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
434417
// namespace becomes unwatched - ghost check should clean up
435418
when(mim.isWatchingNamespace(any())).thenReturn(false);
436419

437-
await()
438-
.untilAsserted(
439-
() -> assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty());
420+
temporaryResourceCache.checkGhostResources();
421+
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
440422

441423
// complete the filtering update
442424
var doneResult = temporaryResourceCache.doneEventFilterModify(resourceId, "2");
@@ -446,8 +428,6 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
446428
// put should be rejected since namespace is no longer watched
447429
temporaryResourceCache.putResource(deploymentWithResourceVersion(3));
448430
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
449-
450-
ghostCheckExecutor.shutdownNow();
451431
}
452432

453433
private void assertNoEventProduced() {
@@ -496,8 +476,7 @@ private void withRealTemporaryResourceCache() {
496476
when(mes.manager()).thenReturn(mim);
497477
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
498478

499-
temporaryResourceCache =
500-
spy(new TemporaryResourceCache<>(true, 100, mock(ScheduledExecutorService.class), mes));
479+
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
501480
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
502481
}
503482

0 commit comments

Comments
 (0)