Skip to content

Commit 62ca56b

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent fb13e83 commit 62ca56b

8 files changed

Lines changed: 220 additions & 157 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.concurrent.Executors;
2323
import java.util.function.Consumer;
2424

25+
import io.javaoperatorsdk.operator.processing.event.source.informer.pool.DefaultInformerPool;
26+
import io.javaoperatorsdk.operator.processing.event.source.informer.pool.InformerPool;
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
2729

@@ -476,4 +478,8 @@ default boolean useSSAToPatchPrimaryResource() {
476478
default boolean cloneSecondaryResourcesWhenGettingFromCache() {
477479
return false;
478480
}
481+
482+
default InformerPool informerPool() {
483+
return new DefaultInformerPool(getKubernetesClient(),this);
484+
}
479485
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.stream.Collectors;
2323
import java.util.stream.Stream;
2424

25+
import io.javaoperatorsdk.operator.processing.event.source.informer.pool.InformerPool;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728

@@ -55,6 +56,7 @@ class InformerManager<R extends HasMetadata, C extends Informable<R>>
5556
private final ResourceEventHandler<R> eventHandler;
5657
private final Map<String, Function<R, List<String>>> indexers = new HashMap<>();
5758
private ControllerConfiguration<R> controllerConfiguration;
59+
private InformerPool informerPool;
5860

5961
InformerManager(
6062
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client,
@@ -67,6 +69,7 @@ class InformerManager<R extends HasMetadata, C extends Informable<R>>
6769

6870
void setControllerConfiguration(ControllerConfiguration<R> controllerConfiguration) {
6971
this.controllerConfiguration = controllerConfiguration;
72+
this.controllerConfiguration.getConfigurationService().informerPool();
7073
}
7174

7275
@Override
@@ -166,9 +169,7 @@ private InformerWrapper<R> createEventSource(
166169
.orElse(filteredBySelectorClient)
167170
.runnableInformer(0);
168171
Optional.ofNullable(informerConfig.getItemStore()).ifPresent(informer::itemStore);
169-
var source =
170-
new InformerWrapper<>(
171-
informer, controllerConfiguration.getConfigurationService(), namespaceIdentifier);
172+
var source = new InformerWrapper<>(informer, namespaceIdentifier);
172173
source.addEventHandler(eventHandler);
173174
sources.put(namespaceIdentifier, source);
174175
return source;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 4 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,18 @@
1818
import java.util.List;
1919
import java.util.Map;
2020
import java.util.Optional;
21-
import java.util.concurrent.ExecutionException;
22-
import java.util.concurrent.TimeUnit;
23-
import java.util.concurrent.TimeoutException;
2421
import java.util.function.Function;
2522
import java.util.function.Predicate;
2623
import java.util.stream.Stream;
2724

2825
import org.slf4j.Logger;
2926
import org.slf4j.LoggerFactory;
3027

31-
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
3228
import io.fabric8.kubernetes.api.model.HasMetadata;
33-
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
3429
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
3530
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
3631
import io.fabric8.kubernetes.client.informers.cache.Cache;
3732
import io.javaoperatorsdk.operator.OperatorException;
38-
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
39-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
4033
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
4134
import io.javaoperatorsdk.operator.health.Status;
4235
import io.javaoperatorsdk.operator.processing.LifecycleAware;
@@ -51,99 +44,16 @@ class InformerWrapper<T extends HasMetadata>
5144
private final SharedIndexInformer<T> informer;
5245
private final Cache<T> cache;
5346
private final String namespaceIdentifier;
54-
private final ConfigurationService configurationService;
5547

56-
public InformerWrapper(
57-
SharedIndexInformer<T> informer,
58-
ConfigurationService configurationService,
59-
String namespaceIdentifier) {
48+
public InformerWrapper(SharedIndexInformer<T> informer, String namespaceIdentifier) {
6049
this.informer = informer;
6150
this.namespaceIdentifier = namespaceIdentifier;
6251
this.cache = (Cache<T>) informer.getStore();
63-
this.configurationService = configurationService;
6452
}
6553

6654
@Override
67-
public void start() throws OperatorException {
68-
try {
69-
70-
// register stopped handler if we have one defined
71-
configurationService
72-
.getInformerStoppedHandler()
73-
.ifPresent(
74-
ish -> {
75-
final var stopped = informer.stopped();
76-
if (stopped != null) {
77-
stopped.handle(
78-
(res, ex) -> {
79-
ish.onStop(informer, ex);
80-
return null;
81-
});
82-
} else {
83-
final var apiTypeClass = informer.getApiTypeClass();
84-
final var fullResourceName = HasMetadata.getFullResourceName(apiTypeClass);
85-
final var version = HasMetadata.getVersion(apiTypeClass);
86-
throw new IllegalStateException(
87-
"Cannot retrieve 'stopped' callback to listen to informer stopping for"
88-
+ " informer for "
89-
+ fullResourceName
90-
+ "/"
91-
+ version);
92-
}
93-
});
94-
if (!configurationService.stopOnInformerErrorDuringStartup()) {
95-
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
96-
}
97-
// change thread name for easier debugging
98-
final var thread = Thread.currentThread();
99-
final var name = thread.getName();
100-
try {
101-
thread.setName(informerInfo() + " " + thread.getId());
102-
final var resourceName = informer.getApiTypeClass().getSimpleName();
103-
log.debug(
104-
"Starting informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
105-
var start = informer.start();
106-
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
107-
// false, and there is a rbac issue the get never returns; therefore operator never really
108-
// starts
109-
log.trace(
110-
"Waiting informer to start namespace: {} resource: {}",
111-
namespaceIdentifier,
112-
resourceName);
113-
start
114-
.toCompletableFuture()
115-
.get(configurationService.cacheSyncTimeout().toMillis(), TimeUnit.MILLISECONDS);
116-
log.debug(
117-
"Started informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
118-
} catch (TimeoutException | ExecutionException e) {
119-
if (configurationService.stopOnInformerErrorDuringStartup()) {
120-
log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e);
121-
throw new OperatorException(e);
122-
} else {
123-
log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e);
124-
}
125-
} catch (InterruptedException e) {
126-
thread.interrupt();
127-
throw new IllegalStateException(e);
128-
} finally {
129-
// restore original name
130-
thread.setName(name);
131-
}
132-
133-
} catch (Exception e) {
134-
ReconcilerUtilsInternal.handleKubernetesClientException(
135-
e, HasMetadata.getFullResourceName(informer.getApiTypeClass()));
136-
throw new OperatorException(
137-
"Couldn't start informer for " + versionedFullResourceName() + " resources", e);
138-
}
139-
}
140-
141-
private String versionedFullResourceName() {
142-
final var apiTypeClass = informer.getApiTypeClass();
143-
if (apiTypeClass.isAssignableFrom(GenericKubernetesResource.class)) {
144-
return GenericKubernetesResource.class.getSimpleName();
145-
}
146-
return ReconcilerUtilsInternal.getResourceTypeNameWithVersion(apiTypeClass);
55+
public void start() {
56+
// no-op: informer initialization is handled by InformerPool
14757
}
14858

14959
@Override
@@ -201,7 +111,7 @@ public String toString() {
201111
}
202112

203113
private String informerInfo() {
204-
return "InformerWrapper [" + versionedFullResourceName() + "]";
114+
return "InformerWrapper [" + informer.getApiTypeClass().getSimpleName() + "]";
205115
}
206116

207117
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.processing.event.source.informer.pool;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import io.fabric8.kubernetes.api.model.HasMetadata;
29+
import io.fabric8.kubernetes.client.KubernetesClient;
30+
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
31+
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
32+
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
33+
import io.javaoperatorsdk.operator.OperatorException;
34+
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
35+
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
36+
37+
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACES;
38+
39+
public class DefaultInformerPool implements InformerPool {
40+
41+
private static final Logger log = LoggerFactory.getLogger(DefaultInformerPool.class);
42+
43+
private final KubernetesClient client;
44+
private final ConfigurationService configurationService;
45+
46+
private final Map<InformerClassifier, SharedIndexInformer<?>> informers = new HashMap<>();
47+
private final Map<SharedIndexInformer<?>, AtomicInteger> counters = new HashMap<>();
48+
49+
public DefaultInformerPool(KubernetesClient client, ConfigurationService configurationService) {
50+
this.client = client;
51+
this.configurationService = configurationService;
52+
}
53+
54+
public synchronized SharedIndexInformer<?> getResource(InformerClassifier classifier) {
55+
var informer = informers.get(classifier);
56+
if (informer == null) {
57+
informer = createInformer(client, classifier);
58+
initInformer(informer, classifier.namespaceIdentifier());
59+
informers.put(classifier, informer);
60+
counters.put(informer, new AtomicInteger(1));
61+
} else {
62+
counters.get(informer).incrementAndGet();
63+
}
64+
return informer;
65+
}
66+
67+
public synchronized void releaseResource(SharedIndexInformer<?> informer) {
68+
var counter = counters.get(informer);
69+
if (counter != null && counter.decrementAndGet() <= 0) {
70+
informer.stop();
71+
counters.remove(informer);
72+
informers.values().remove(informer);
73+
} else {
74+
log.warn("No informer found in the pool.");
75+
}
76+
}
77+
78+
private void initInformer(SharedIndexInformer<?> informer, String namespaceIdentifier) {
79+
try {
80+
configurationService
81+
.getInformerStoppedHandler()
82+
.ifPresent(
83+
ish -> {
84+
final var stopped = informer.stopped();
85+
if (stopped != null) {
86+
stopped.handle(
87+
(res, ex) -> {
88+
ish.onStop(informer, ex);
89+
return null;
90+
});
91+
} else {
92+
final var apiTypeClass = informer.getApiTypeClass();
93+
final var fullResourceName = HasMetadata.getFullResourceName(apiTypeClass);
94+
final var version = HasMetadata.getVersion(apiTypeClass);
95+
throw new IllegalStateException(
96+
"Cannot retrieve 'stopped' callback to listen to informer stopping for"
97+
+ " informer for "
98+
+ fullResourceName
99+
+ "/"
100+
+ version);
101+
}
102+
});
103+
if (!configurationService.stopOnInformerErrorDuringStartup()) {
104+
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
105+
}
106+
// change thread name for easier debugging
107+
final var thread = Thread.currentThread();
108+
final var name = thread.getName();
109+
try {
110+
thread.setName(
111+
"InformerPool [" + versionedFullResourceName(informer) + "] " + thread.getId());
112+
final var resourceName = informer.getApiTypeClass().getSimpleName();
113+
log.debug(
114+
"Starting informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
115+
var start = informer.start();
116+
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
117+
// false, and there is a rbac issue the get never returns; therefore operator never really
118+
// starts
119+
log.trace(
120+
"Waiting informer to start namespace: {} resource: {}",
121+
namespaceIdentifier,
122+
resourceName);
123+
start
124+
.toCompletableFuture()
125+
.get(configurationService.cacheSyncTimeout().toMillis(), TimeUnit.MILLISECONDS);
126+
log.debug(
127+
"Started informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
128+
} catch (TimeoutException | ExecutionException e) {
129+
if (configurationService.stopOnInformerErrorDuringStartup()) {
130+
log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e);
131+
throw new OperatorException(e);
132+
} else {
133+
log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e);
134+
}
135+
} catch (InterruptedException e) {
136+
thread.interrupt();
137+
throw new IllegalStateException(e);
138+
} finally {
139+
// restore original name
140+
thread.setName(name);
141+
}
142+
} catch (Exception e) {
143+
ReconcilerUtilsInternal.handleKubernetesClientException(
144+
e, HasMetadata.getFullResourceName(informer.getApiTypeClass()));
145+
throw new OperatorException(
146+
"Couldn't start informer for " + versionedFullResourceName(informer) + " resources", e);
147+
}
148+
}
149+
150+
@SuppressWarnings("unchecked")
151+
private static String versionedFullResourceName(SharedIndexInformer<?> informer) {
152+
return ReconcilerUtilsInternal.getResourceTypeNameWithVersion(
153+
(Class<? extends HasMetadata>) informer.getApiTypeClass());
154+
}
155+
156+
@SuppressWarnings("rawtypes")
157+
static SharedIndexInformer<?> createInformer(
158+
KubernetesClient client, InformerClassifier classifier) {
159+
FilterWatchListDeletable filteredClient;
160+
if (WATCH_ALL_NAMESPACES.equals(classifier.namespaceIdentifier())) {
161+
filteredClient =
162+
client
163+
.resources(classifier.resourceClass())
164+
.inAnyNamespace()
165+
.withLabelSelector(classifier.labelSelector());
166+
} else {
167+
filteredClient =
168+
client
169+
.resources(classifier.resourceClass())
170+
.inNamespace(classifier.namespaceIdentifier())
171+
.withLabelSelector(classifier.labelSelector());
172+
}
173+
174+
if (classifier.fieldSelector() != null && !classifier.fieldSelector().getFields().isEmpty()) {
175+
for (var f : classifier.fieldSelector().getFields()) {
176+
if (f.negated()) {
177+
filteredClient =
178+
(FilterWatchListDeletable) filteredClient.withoutField(f.path(), f.value());
179+
} else {
180+
filteredClient = (FilterWatchListDeletable) filteredClient.withField(f.path(), f.value());
181+
}
182+
}
183+
}
184+
return filteredClient.runnableInformer(0);
185+
}
186+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java renamed to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/InformerClassifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.javaoperatorsdk.operator.processing.event.source.pool;
16+
package io.javaoperatorsdk.operator.processing.event.source.informer.pool;
1717

1818
import io.fabric8.kubernetes.api.model.HasMetadata;
1919
import io.javaoperatorsdk.operator.api.config.informer.FieldSelector;

0 commit comments

Comments
 (0)