Skip to content

alinuxfan/aiven_services_example

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

aiven_services_example

Aiven Kafka Quickstart with Flink and Python

1. Install Dependencies

pip install -r requirements.txt

2. Create Aiven Kafka service in UI or CLI

UI
CLI
  1. Create Aiven Account (Use SSO)

  2. Create Token for CLI Access in https://console.aiven.io/profile/tokens How To Create Authentication Tokens Save Token

  3. avn user login email@email.com --token #input token from step 2

  4. Set environment variables

KAFKA_INSTANCE_NAME=demokafka
CLOUD_PROVIDER=google-us-central1
AIVEN_PLAN_NAME=startup-2
DESTINATION_FOLDER_NAME=~/kafkacerts
  1. Use CLI to create Kafka cluster or UI by following Kafka getting started guide
avn service create  \
  -t kafka $KAFKA_INSTANCE_NAME \
  --cloud  $CLOUD_PROVIDER \
  -p $AIVEN_PLAN_NAME \
  -c kafka_rest=true \
  1. Download credentials for Python script
avn service user-creds-download $KAFKA_INSTANCE_NAME \
  -d $DESTINATION_FOLDER_NAME \
  --username avnadmin
  1. return the URI
avn service get $KAFKA_INSTANCE_NAME \                
  --format '{service_uri}'
  1. Wait Until Service is running
avn service wait $KAFKA_INSTANCE_NAME

3. Create Topic for service in UI or CLI

4. Use Service Name and cert directory for the arguments and run

python iot_faker.py \                                           
  --cert-folder ~/kafkacerts/ \
  --host demokafka-username-x11x.aivencloud.com \
  --port 27721 \
  --topic-name iot

5. Create Demo Flink Environment with UI or CLI. If UI, follow Flink getting started guide

avn service create demoflink -t flink --plan business-4

6. Enable Flink Data Service Integration with Kafka

7. Flink - Create New Application in the Applications tab

8. Flink - Create New Source Table

CREATE TABLE iot (
    device VARCHAR,
    deviceParameter VARCHAR,
    deviceValue INT,
    dateTime TIMESTAMP
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '',
    'scan.startup.mode' = 'earliest-offset',
    'topic' = 'iot',
    'value.format' = 'json',
    'key.format' = 'json',
    'key.fields' = 'device'
)

9. Create Sink Table

CREATE TABLE temperatures (
    device VARCHAR,
    deviceValue INT,
    timed TIMESTAMP,
    jitter DOUBLE
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '',
    'scan.startup.mode' = 'earliest-offset',
    'topic' = 'temperatures',
    'value.format' = 'json',
    'key.format' = 'json',
    'key.fields' = 'device'
)

10. Create the SQL statement that transforms the data from the source stream.

INSERT INTO temperatures
SELECT device, deviceValue, dateTime, RAND() AS jitter
FROM iot
WHERE deviceParameter = 'Temperature'

11. Create Deployment of Flink Application

12. Use UI to verify data in the Kafka sink Topic

About

Aiven Kafka/Flink Quickstart

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages