-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworker.py
More file actions
20 lines (16 loc) · 972 Bytes
/
worker.py
File metadata and controls
20 lines (16 loc) · 972 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import faust
# Create a Faust application named 'myapp' and configure it to use Kafka broker '0.0.0.0:9092'.
# auto_create_topics=True allows Faust to create the topic if it doesn't exist.
app = faust.App('myapp', broker='0.0.0.0:9092', auto_create_topics=True)
# Define a topic named 'hello_world' with a value type of 'str'.
# 'value_serializer' specifies how the value will be serialized. 'raw' means it's plain text.
topic = app.topic('hello_world', value_type=str, value_serializer='raw')
# Define an agent that processes messages from the 'hello_world' topic.
# This agent will consume messages from the topic and perform some action on them.
@app.agent(topic)
async def processor(stream):
# 'stream' is an asynchronous generator that yields messages from the topic.
async for message in stream:
# Process each message received from the topic.
# In this example, we simply print the received message.
print(f'Received {message}')