NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th…#10964
NIFI-15669: Refactored ConsumeKinesis to remove dependency on KCL. Th…#10964markap14 wants to merge 6 commits intoapache:mainfrom
Conversation
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for the extensive work on redesigning this Processor @markap14!
I plan to do a more thorough review, for now, highlighting an integration test failure that may point to some unstable expectations.
Error: Tests run: 18, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 130.6 s <<< FAILURE! -- in org.apache.nifi.processors.aws.kinesis.ConsumeKinesisIT
Error: org.apache.nifi.processors.aws.kinesis.ConsumeKinesisIT.testKplMultipleAggregatedRecords -- Time elapsed: 4.039 s <<< FAILURE!
org.opentest4j.AssertionFailedError: 3 aggregated records x 5 sub-records each ==> expected: <15> but was: <0>
at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:158)
at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:139)
at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:201)
at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:152)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:590)
at org.apache.nifi.processors.aws.kinesis.ConsumeKinesisIT.testKplMultipleAggregatedRecords(ConsumeKinesisIT.java:618)
62fb403 to
a7366a0
Compare
awelless
left a comment
There was a problem hiding this comment.
Reviewed source files only so far.
Stylistic comments are marked with "nit".
| final TableSchema destinationSchema) { | ||
| return switch (destinationSchema) { | ||
| case NEW -> item; | ||
| case LEGACY -> convertToLegacyItem(item); |
There was a problem hiding this comment.
What's the intention in supporting lease table conversion to the old KCL format?
There was a problem hiding this comment.
Yeah good catch. After I did some refactoring in how the migration works, this is no longer actually necessary. Will remove.
| } | ||
|
|
||
| final AttributeValue sequenceNumber = item.get("sequenceNumber"); | ||
| final String leaseKey = streamName.s() + ":" + shardIdValue; |
There was a problem hiding this comment.
This depends on whether KCL is configured in a single- or multi-stream mode. If it's single stream, only shard id is a part of the lease key.
| The maximum size of the buffer is controlled by the 'Max Bytes to Buffer' property. | ||
| In addition, the processor may cache some amount of data for each shard when the processor's buffer is full.""") | ||
| ConsumeKinesis buffers Kinesis Records in memory until they can be processed. \ | ||
| The maximum size of the buffer is controlled by the 'Max Batch Size' property.""") |
There was a problem hiding this comment.
Nit: Max Batch Size determines how much data we write in a single task execution.
It doesn't configure the buffer caches, which is 500 GetRecords results for polling and {number of active shards} for EFO.
| private static final long QUEUE_POLL_TIMEOUT_MILLIS = 100; | ||
| private static final Duration API_CALL_TIMEOUT = Duration.ofSeconds(30); | ||
| private static final Duration API_CALL_ATTEMPT_TIMEOUT = Duration.ofSeconds(10); | ||
| private static final byte[] NEWLINE_DELIMITER = new byte[] {'\n'}; |
There was a problem hiding this comment.
Nit: shall we use System.lineSeparator() instead?
There was a problem hiding this comment.
No. The separator should not depend on the OS of the host.
|
|
||
| Using a larger value may increase the throughput, but will do so at the expense of using more memory. | ||
| """) | ||
| static final PropertyDescriptor MAX_RECORDS_PER_REQUEST = new PropertyDescriptor.Builder() |
There was a problem hiding this comment.
This property is used only when CONSUMER_TYPE is SHARED_THROUGHPUT. We shouldn't display it for EFO consumers.
| private volatile String lastQueuedSequenceNumber; | ||
| private volatile String lastOnNextMaxSequence; | ||
| private final AtomicReference<String> lastAcknowledgedSequenceNumber = new AtomicReference<>(); |
There was a problem hiding this comment.
lastQueuedSequenceNumber and lastAcknowledgedSequenceNumber are used only to calculate a sequence number to start reading data from during a subscription restart.
It seems that lastQueuedSequenceNumber can always be used, since we don't purge queues in KinesisConsumerClient.
Also lastQueuedSequenceNumber is the same as lastOnNextMaxSequence.
| } | ||
| shardConsumers.clear(); | ||
|
|
||
| deregisterEfoConsumer(); |
There was a problem hiding this comment.
When NiFi scales down, the processor is stopped on the node being decommissioned, right?
Meaning this node will deregister the consumer, while the other active nodes are still subscribed to it.
The efo consumer is created in initialize only, thus after decommissioning a node the processors will be stuck until restarted.
If the above is correct, then we shouldn't deregister the consumer, if there are other nodes using it.
There was a problem hiding this comment.
Good catch! I hadn't considered that case. Will update it to only deregister in @OnRemoved so that if the processor is deleted from the canvas it will deregister it.
| } | ||
| } | ||
|
|
||
| if (totalQueuedResults() >= MAX_QUEUED_RESULTS) { |
There was a problem hiding this comment.
This is not fair. There a risk a consumer of a particular shard will be always sleeping.
Shall we either:
- Not track
totalQueuedResultsand use a batch per shard approach as done in efo consumer? - Use a fair semaphore to handle cache limits instead of simple sleep?
| return null; | ||
| } catch (final SdkClientException e) { | ||
| if (!state.isStopped()) { | ||
| logger.warn("GetRecords timed out for shard {}; will retry with existing iterator", shardId); |
There was a problem hiding this comment.
Nit: this isn't necessarily a timeout error, right?
| for (final ShardFetchResult result : results) { | ||
| final PollingShardState state = pollingShardStates.get(result.shardId()); | ||
| if (state != null) { | ||
| state.requestReset(); |
There was a problem hiding this comment.
Should we rollback to the latest sequence number instead? The records are still kept in the queue in KinesisConsumeClient.
This might cause out of order delivery - test with repro.
There was a problem hiding this comment.
We should drain the queue while still holding the shard lease. I.e. here, in the rollbackResults.
Since reset doesn't happen immediately, so there is a window when the lease can be acquired, subsequent records polled from the queue, before the reset happens. Repro.
But we need to make sure that while draining the queue we don't fetch a new batch of records. Otherwise we'd have to drain it as well.
| shardManager.writeCheckpoints(batch.checkpoints()); | ||
| consumerClient.acknowledgeResults(accepted); |
There was a problem hiding this comment.
If writeCheckpoints fails, we don't acknowledgeResults in this callback. It seems efo consumer will be stuck in that situation, as we request next records in the acknowledgement. Shall we swap these operations? Or have a ladder with try {} finally {} statement.
…is provides much faster startup times and drastically reduces heap utilization when using Enhanced Fan-Out (EFO) mode.
|
Thanks for the thorough review @awelless! I did several refactorings of this PR before pushing it, and it looks like I did a pretty poor job of cleaning up a couple of the approaches that I'd taken. Should be in much better shape now! And you caught a few interesting points that I'd not considered, as well! I pushed a new commit that I think addresses everything. Added some additional tests. Pushed 30,085,000 records to a Kinesis Stream and then consumed all using both EFO and Shared Throughput mode to ensure that all data was consumed in exactly the correct order without any duplicates and to ensure that performance was as expected. All looks good! |
…use we always include all sub-records within a single ProcessSession so we don't need to checkpoint partial sequences
There was a problem hiding this comment.
The comment about race condition is the one requiring attention. The rest is optional.
| @Override | ||
| public void migrateProperties(final PropertyConfiguration config) { | ||
| ProxyServiceMigration.renameProxyConfigurationServiceProperty(config); | ||
| config.renameProperty("Max Bytes to Buffer", "Max Batch Size"); |
There was a problem hiding this comment.
Should we really move Max Bytes to Buffer to Max Batch Size? These are different properties.
The default value of 100 MB for buffer size might be too much for the batch size.
| .build(); | ||
|
|
||
| static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP, ProxySpec.HTTP_AUTH); | ||
| static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder() |
There was a problem hiding this comment.
I see. Then should we have separate endpoints for Kinesis and DynamoDB?
In Localstack that's the same endpoint for each service, but this might not be the case for production scenarios.
| return; | ||
| } | ||
|
|
||
| final Set<String> ownedShardIds = new HashSet<>(); |
There was a problem hiding this comment.
Nit:
| final Set<String> ownedShardIds = new HashSet<>(); | |
| final Set<String> ownedShardIds = HashSet.newHashSet(ownedShards.size()); |
| private void shutdownScheduler() { | ||
| if (kinesisScheduler.shutdownComplete()) { | ||
| @OnRemoved | ||
| public void onRemoved(final ProcessContext context) { |
There was a problem hiding this comment.
This approach to deregistering consumers looks good to me.
What should happen when the processor's CONSUMER_TYPE changes? Now we wait for the processor to be removed. Furthermore, if the APPLICATION_NAME is changed, the old consumer will be orphaned.
Should we deregister the consumer immediately when CONSUMER_TYPE changes to SHARED_THROUGHPUT or when CONSUMER_TYPE is efo and APPLICATION_NAME changes?
There was a problem hiding this comment.
It's possible, but I don't think it's worth the effort. If user does that and wants to reclaim the slot, they can do so manually in AWS console.
| resultsByShard.computeIfAbsent(result.shardId(), k -> new ArrayList<>()).add(result); | ||
| } | ||
| for (final List<ShardFetchResult> shardResults : resultsByShard.values()) { | ||
| shardResults.sort(Comparator.comparing(ShardFetchResult::firstSequenceNumber)); |
There was a problem hiding this comment.
I wonder if we really need to sort the results here.
Since we're consuming data from a queue in KinesisConsumerClient, we should have the data ordered by sequence numbers already, right? We operate on lists everywhere, so the order is preserved.
| for (final ShardFetchResult result : results) { | ||
| final PollingShardState state = pollingShardStates.get(result.shardId()); | ||
| if (state != null) { | ||
| state.requestReset(); |
There was a problem hiding this comment.
We should drain the queue while still holding the shard lease. I.e. here, in the rollbackResults.
Since reset doesn't happen immediately, so there is a window when the lease can be acquired, subsequent records polled from the queue, before the reset happens. Repro.
But we need to make sure that while draining the queue we don't fetch a new batch of records. Otherwise we'd have to drain it as well.
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for putting together this major refactor @markap14, the approach looks good in general. I'm planning on further review, but noted a handful of mostly minor recommendations thus far.
| Thread.sleep(TABLE_POLL_MILLIS); | ||
| } catch (final InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new ProcessException("Interrupted while waiting for DynamoDB table to become ACTIVE", e); |
There was a problem hiding this comment.
| throw new ProcessException("Interrupted while waiting for DynamoDB table to become ACTIVE", e); | |
| throw new ProcessException("Interrupted while waiting for DynamoDB table [%s] to become ACTIVE".formatted(tableName), e); |
| Thread.sleep(TABLE_POLL_MILLIS); | ||
| } catch (final InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new ProcessException("Interrupted while waiting for DynamoDB table deletion", e); |
There was a problem hiding this comment.
| throw new ProcessException("Interrupted while waiting for DynamoDB table deletion", e); | |
| throw new ProcessException("Interrupted while waiting for DynamoDB table [%s] deletion".formatted(tableName), e); |
| if (keySchema.size() == 2 | ||
| && hasKey(keySchema, "streamName", KeyType.HASH) | ||
| && hasKey(keySchema, "shardId", KeyType.RANGE)) { | ||
| return TableSchema.NEW; |
There was a problem hiding this comment.
streamName and sharedId appear to be used multiple times in multiple methods, which seem look good candidates for private static final Strings.
| * @param data the user payload bytes | ||
| * @param approximateArrivalTimestamp approximate time the enclosing record arrived at Kinesis | ||
| */ | ||
| record DeaggregatedRecord( |
There was a problem hiding this comment.
Based on the description, what do you think about naming this UserRecord, DistinctRecord or DataRecord? The User prefix sounds a bit related to identity, but aligns with the description. Distinct or similar may be an option.
| * per shard via HTTP/2. Uses Reactive Streams demand-driven backpressure to control the | ||
| * rate of event delivery. | ||
| */ | ||
| final class EfoKinesisClient extends KinesisConsumerClient { |
There was a problem hiding this comment.
What do you think about spelling out EnhancedFanOutKinesisClient for clarity since Efo is not a common acronym.
| } | ||
| } else if (!existing.isExhausted() && !existing.isStopped() && !existing.isLoopRunning() | ||
| && existing.tryStartLoop()) { | ||
| logger.warn("Restarting dead fetch loop for shard {}", shardId); |
There was a problem hiding this comment.
It would be helpful to include the streamName
| }); | ||
| } catch (final RejectedExecutionException e) { | ||
| state.markLoopStopped(); | ||
| logger.debug("Executor shut down; cannot start fetch loop for shard {}", shardId); |
There was a problem hiding this comment.
| logger.debug("Executor shut down; cannot start fetch loop for shard {}", shardId); | |
| logger.debug("Executor shut down; cannot start fetch loop for stream [{}] shard [{}]", streamName, shardId); |
| } | ||
| } catch (final Exception e) { | ||
| if (!state.isStopped()) { | ||
| logger.error("Unexpected error in fetch loop for shard {}; will retry", shardId, e); |
There was a problem hiding this comment.
Should this be logged as a warning if it is going to be retried?
| "streamName", AttributeValue.builder().s("my-stream").build(), | ||
| "shardId", AttributeValue.builder().s("shardId-0001").build(), | ||
| "sequenceNumber", AttributeValue.builder().s("12345").build()); |
There was a problem hiding this comment.
Recommend declaring static variables for the map keys and values that can be reused across methods.
|
|
||
| <dependency> | ||
| <groupId>software.amazon.awssdk</groupId> | ||
| <artifactId>apache-client</artifactId> |
There was a problem hiding this comment.
This library brings in Apache HTTP Client 4, which has limited updates. The url-connection-client does not have all the flexibility, but what do you think about using it instead?
There was a problem hiding this comment.
It looks like the url-connection-client does not support proxies directly. And it doesn't support connection pooling with a max, which we're depending on here. Fortunately, though, we can upgrade to Apache HTTP Client 5, which I think makes a lot of sense.
|
Thanks @exceptionfactory I think all of your feedback makes sense. I pushed a new commit that incorporates all of it and switches to Apache HTTP Client 5 instead of version 4. |
| <dependency> | ||
| <groupId>software.amazon.awssdk</groupId> | ||
| <artifactId>apache-client</artifactId> | ||
| <artifactId>apache5-client</artifactId> |
There was a problem hiding this comment.
Currently nifi-aws-service-api-nar doesn't bring apache5-client as a dependency. We should either add it to that nar or remove it from this list.
Before adding it into nifi-aws-service-api I was getting ClassNotFoundException for Apache 5 http client.
There was a problem hiding this comment.
D'oh! I added it to the api nar but it looks like i didn't include that in the commit 🤦 Will have that up shortly.
…is provides much faster startup times and drastically reduces heap utilization when using Enhanced Fan Out (EFO) mode.
Summary
NIFI-00000
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation