Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<artifactId>apache5-client</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently nifi-aws-service-api-nar doesn't bring apache5-client as a dependency. We should either add it to that nar or remove it from this list.

Before adding it into nifi-aws-service-api I was getting ClassNotFoundException for Apache 5 http client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh! I added it to the api nar but it looks like i didn't include that in the commit 🤦 Will have that up shortly.

<version>${software.amazon.awssdk.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
53 changes: 43 additions & 10 deletions nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-migration-utils</artifactId>
Expand All @@ -50,11 +46,44 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache5-client</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<!-- TEST DEPENDENCIES -->
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>3.4.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
Expand All @@ -78,12 +107,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</dependency>

<!-- TEST DEPENDENCIES -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-processors</artifactId>
Expand Down Expand Up @@ -116,6 +139,16 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-localstack</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis;

import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;

import java.util.List;
import java.util.Map;

/**
* Shared DynamoDB table lifecycle operations for checkpoint tables. Used by both
* {@link KinesisShardManager} for runtime table management and
* {@link LegacyCheckpointMigrator} for migration and rename operations.
*/
final class CheckpointTableUtils {

static final String ATTR_STREAM_NAME = "streamName";
static final String ATTR_SHARD_ID = "shardId";
static final String NODE_HEARTBEAT_PREFIX = "__node__#";
static final String MIGRATION_MARKER_SHARD_ID = "__migration__";

private static final long TABLE_POLL_MILLIS = 1_000;
private static final int TABLE_POLL_MAX_ATTEMPTS = 60;

private CheckpointTableUtils() { }

enum TableSchema {
NEW,
LEGACY,
UNKNOWN,
NOT_FOUND
}

static TableSchema getTableSchema(final DynamoDbClient client, final String tableName) {
try {
final DescribeTableResponse describe = client.describeTable(DescribeTableRequest.builder().tableName(tableName).build());
final List<KeySchemaElement> keySchema = describe.table().keySchema();
if (keySchema.size() == 2
&& hasKey(keySchema, ATTR_STREAM_NAME, KeyType.HASH)
&& hasKey(keySchema, ATTR_SHARD_ID, KeyType.RANGE)) {
return TableSchema.NEW;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamName and sharedId appear to be used multiple times in multiple methods, which seem look good candidates for private static final Strings.

}

if (keySchema.size() == 1 && hasKey(keySchema, "leaseKey", KeyType.HASH)) {
return TableSchema.LEGACY;
}

return TableSchema.UNKNOWN;
} catch (final ResourceNotFoundException notFound) {
return TableSchema.NOT_FOUND;
}
}

static void createNewSchemaTable(final DynamoDbClient client, final ComponentLog logger, final String tableName) {
final TableSchema tableSchema = getTableSchema(client, tableName);
if (tableSchema == TableSchema.NEW) {
logger.info("DynamoDB checkpoint table [{}] already exists", tableName);
return;
}
if (tableSchema == TableSchema.LEGACY || tableSchema == TableSchema.UNKNOWN) {
throw new ProcessException("Checkpoint table [%s] exists but does not match expected schema".formatted(tableName));
}

logger.info("Creating DynamoDB checkpoint table [{}]", tableName);
try {
final CreateTableRequest request = CreateTableRequest.builder()
.tableName(tableName)
.keySchema(
KeySchemaElement.builder().attributeName(ATTR_STREAM_NAME).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(ATTR_SHARD_ID).keyType(KeyType.RANGE).build())
.attributeDefinitions(
AttributeDefinition.builder().attributeName(ATTR_STREAM_NAME).attributeType(ScalarAttributeType.S).build(),
AttributeDefinition.builder().attributeName(ATTR_SHARD_ID).attributeType(ScalarAttributeType.S).build())
.billingMode(BillingMode.PAY_PER_REQUEST)
.build();

client.createTable(request);
} catch (final ResourceInUseException alreadyCreating) {
logger.info("DynamoDB checkpoint table [{}] is already being created by another node", tableName);
}
}

static void waitForTableActive(final DynamoDbClient client, final ComponentLog logger, final String tableName) {
final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build();
for (int i = 0; i < TABLE_POLL_MAX_ATTEMPTS; i++) {
final TableStatus status = client.describeTable(request).table().tableStatus();
if (status == TableStatus.ACTIVE) {
logger.info("DynamoDB checkpoint table [{}] is now ACTIVE", tableName);
return;
}

try {
Thread.sleep(TABLE_POLL_MILLIS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new ProcessException("Interrupted while waiting for DynamoDB table [%s] to become ACTIVE".formatted(tableName), e);
}
}

throw new ProcessException("DynamoDB checkpoint table [%s] did not become ACTIVE within %d seconds".formatted(tableName, TABLE_POLL_MAX_ATTEMPTS));
}

static void deleteTable(final DynamoDbClient client, final ComponentLog logger, final String tableName) {
try {
client.deleteTable(DeleteTableRequest.builder().tableName(tableName).build());
logger.info("Initiated deletion of DynamoDB table [{}]", tableName);
} catch (final ResourceNotFoundException e) {
logger.debug("Table [{}] already deleted", tableName);
}
}

static void waitForTableDeleted(final DynamoDbClient client, final ComponentLog logger, final String tableName) {
final DescribeTableRequest request = DescribeTableRequest.builder().tableName(tableName).build();
for (int i = 0; i < TABLE_POLL_MAX_ATTEMPTS; i++) {
try {
client.describeTable(request);
} catch (final ResourceNotFoundException e) {
logger.info("DynamoDB table [{}] has been deleted", tableName);
return;
}

try {
Thread.sleep(TABLE_POLL_MILLIS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new ProcessException("Interrupted while waiting for DynamoDB table [%s] deletion".formatted(tableName), e);
}
}

throw new ProcessException("DynamoDB table [%s] was not deleted within %d seconds".formatted(tableName, TABLE_POLL_MAX_ATTEMPTS));
}

static void copyCheckpointItems(final DynamoDbClient client, final ComponentLog logger,
final String sourceTableName, final String destTableName) {
logger.info("Copying checkpoint items from [{}] to [{}]", sourceTableName, destTableName);

Map<String, AttributeValue> exclusiveStartKey = null;
int copied = 0;
do {
final ScanRequest scanRequest = exclusiveStartKey == null
? ScanRequest.builder().tableName(sourceTableName).build()
: ScanRequest.builder().tableName(sourceTableName).exclusiveStartKey(exclusiveStartKey).build();
final ScanResponse scanResponse = client.scan(scanRequest);

for (final Map<String, AttributeValue> item : scanResponse.items()) {
final AttributeValue shardIdAttr = item.get(ATTR_SHARD_ID);
if (shardIdAttr != null) {
final String shardId = shardIdAttr.s();
if (shardId.startsWith(NODE_HEARTBEAT_PREFIX)
|| MIGRATION_MARKER_SHARD_ID.equals(shardId)) {
continue;
}
}

client.putItem(PutItemRequest.builder()
.tableName(destTableName)
.item(item)
.build());
copied++;
}

exclusiveStartKey = scanResponse.lastEvaluatedKey();
} while (exclusiveStartKey != null && !exclusiveStartKey.isEmpty());

logger.info("Copied {} checkpoint item(s) from [{}] to [{}]", copied, sourceTableName, destTableName);
}

private static boolean hasKey(final List<KeySchemaElement> keySchema, final String keyName, final KeyType keyType) {
for (final KeySchemaElement element : keySchema) {
if (keyName.equals(element.attributeName()) && keyType == element.keyType()) {
return true;
}
}
return false;
}
}
Loading
Loading