@@ -22,11 +22,10 @@ python -m pip install quixstreams
22
22
23
23
### Step 2. Producing data to Kafka
24
24
25
- In order to process events with Quix Streams, they first need to be in Kafka.
26
- Let's create the file ` producer.py ` to generate some test data into the Kafka topic:
25
+ In order to process events with Quix Streams, they first need to be in Kafka.
26
+ Let's create the file ` producer.py ` to write some test data into a Kafka topic.
27
27
28
28
``` python
29
-
30
29
from quixstreams import Application
31
30
32
31
# Create an Application - the main configuration entry point
@@ -65,8 +64,8 @@ if __name__ == "__main__":
65
64
66
65
### Step 3. Consuming data from Kafka
67
66
68
- Let's create the file ` consumer.py ` with streaming processing code.
69
- It will start consuming messages from Kafka and applying transformations to them.
67
+ Let's create the file ` consumer.py ` to process the data in the topic.
68
+ It will start consuming messages from Kafka and apply transformations to them.
70
69
71
70
``` python
72
71
from quixstreams import Application
@@ -86,7 +85,7 @@ messages_topic = app.topic(name="messages", value_deserializer="json")
86
85
sdf = app.dataframe(topic = messages_topic)
87
86
88
87
# Print the input data
89
- sdf = sdf.update(lambda message : print (" Input: " , message))
88
+ sdf = sdf.update(lambda message : print (f " Input: { message} " ))
90
89
91
90
# Define a transformation to split incoming sentences
92
91
# into words using a lambda function
@@ -99,17 +98,16 @@ sdf = sdf.apply(
99
98
sdf[" length" ] = sdf[" text" ].apply(lambda word : len (word))
100
99
101
100
# Print the output result
102
- sdf = sdf.update(lambda word : print (word))
101
+ sdf = sdf.update(lambda word : print (f " Output: { word} " ))
103
102
104
103
# Run the streaming application
105
104
if __name__ == " __main__" :
106
105
app.run(sdf)
107
106
```
108
107
109
-
110
108
### Step 4. Running the Producer
111
109
112
- Let's run the ` producer.py ` to fill the topic with data.
110
+ Let's run the ` producer.py ` in a terminal to fill the topic with data.
113
111
If the topic does not exist yet, Quix Streams will create it with the default number of partitions.
114
112
115
113
``` commandline
@@ -127,10 +125,11 @@ Produce event with key="id3" value="b'{"chat_id":"id3","text":"Mollis nunc sed i
127
125
### Step 5. Running the Consumer
128
126
129
127
Now that you have a topic with data, you may start consuming events and process them.
130
- Let's run the ` consumer.py ` to see the results:
128
+ Let's run the ` consumer.py ` to see the results.
131
129
132
130
``` commandline
133
131
python consumer.py
132
+
134
133
[2024-02-21 19:57:38,669] [INFO] : Initializing processing of StreamingDataFrame
135
134
[2024-02-21 19:57:38,669] [INFO] : Topics required for this application: "messages", "words"
136
135
[2024-02-21 19:57:38,699] [INFO] : Validating Kafka topics exist and are configured correctly...
@@ -147,7 +146,6 @@ Input: {'chat_id': 'id2', 'text': 'Consectetur adipiscing elit sed'}
147
146
...
148
147
```
149
148
150
-
151
149
## Next steps
152
150
153
151
Now that you have a simple Quix Streams application working, you can dive into more advanced features:
@@ -164,4 +162,4 @@ Or check out the tutorials for more in-depth examples:
164
162
165
163
## Getting help
166
164
167
- If you run into any problems, please create an [ issue] ( https://github.com/quixio/quix-streams/issues ) or ask in ` #quix-help ` in our ** [ Quix Community on Slack] ( https://quix.io/slack-invite ) ** .
165
+ If you run into any problems, please create an [ issue] ( https://github.com/quixio/quix-streams/issues ) or ask in ` #quix-help ` in ** [ Quix Community on Slack] ( https://quix.io/slack-invite ) ** .
0 commit comments