Skip to content

Commit 5aadb16

Browse files
authored
Add example of data streaming with apache kafka + apache flink (#353)
* Add example of streaming with kafka + flink
1 parent 091edd1 commit 5aadb16

File tree

10 files changed

+487
-0
lines changed

10 files changed

+487
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
CRATEDB_HOST=crate
2+
CRATEDB_PORT=4200
3+
CRATEDB_PG_PORT=5432
4+
5+
WEATHER_PRODUCER_CITY=Vienna
6+
WEATHER_PRODUCER_API_KEY=#GET THE API KEY - https://www.weatherapi.com/
7+
WEATHER_PRODUCER_FETCH_EVERY_SECONDS=30
8+
WEATHER_PRODUCER_KAFKA_TOPIC=weather_topic
9+
WEATHER_PRODUCER_KAFKA_BOOTSTRAP_SERVER=kafka
10+
11+
FLINK_CONSUMER_KAFKA_TOPIC=weather_topic
12+
FLINK_CONSUMER_BOOTSTRAP_SERVER=kafka
13+
FLINK_CONSUMER_CRATEDB_PG_URI=jdbc:postgresql://crate:5432/crate
14+
FLINK_CONSUMER_CRATEDB_USER=crate
15+
FLINK_CONSUMER_CRATEDB_PASSWORD=empty
16+
17+
# Jar versions.
18+
POSTGRESQL_JAR_VERSION=42.7.2
19+
FLINK_CONNECTOR_JDBC_VERSION=3.1.2-1.18
20+
FLINK_KAFKA_JAR_URL_VERSION=3.1.0-1.18
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
FROM python:3.10
2+
3+
WORKDIR /app
4+
COPY * /app
5+
6+
RUN pip install poetry
7+
RUN poetry config virtualenvs.create false && poetry install
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# Streaming data with Apache Kafka, Apache Flink and CrateDB.
2+
3+
## About
4+
5+
This example showcases what a data-streaming architecture leveraging Kafka and Flink could look
6+
like.
7+
8+
We use.
9+
10+
- Kafka (confluent)
11+
- Apache Flink
12+
- CrateDB
13+
- Python >=3.7<=3.11
14+
15+
## Overview
16+
17+
An HTTP call is scheduled to run every 60 seconds on `weather_producer`, the API returns a JSON
18+
with the specified city's weather, the json is then sent through `Kafka`.
19+
20+
`flink_consumer` is a flink application consuming the same kafka topic;
21+
upon receiving data, it sends the resulting datastream to the sink, which is CrateDB.
22+
23+
Both `flink_consumer` and `weather_producer` are written using their respective Python Wrappers.
24+
25+
[kafka-python](https://kafka-python.readthedocs.io/en/master/)
26+
27+
[apache-flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/overview/)
28+
29+
Everything is customizable via environment variables, the API schedule, the topic, credentials...
30+
etc.
31+
32+
See `.env` for more details.
33+
34+
## How to use
35+
36+
The Docker Compose configuration will get you started quickly.
37+
You will need to fill in the API key of [Weather API](https://www.weatherapi.com/)
38+
into your local `.env` file.
39+
40+
### Run the docker compose (and build the images)
41+
42+
```
43+
docker compose up -d --build
44+
```
45+
46+
### Stop the docker compose
47+
48+
```
49+
docker compose down
50+
```
51+
52+
### Poetry
53+
54+
```
55+
poetry install
56+
```
57+
58+
### Pip
59+
60+
```
61+
pip install -r requirements.txt
62+
```
63+
64+
## Notes
65+
66+
### CrateDB initial settings.
67+
68+
CrateDB stores the shard indexes on the file system by mapping a file into memory (mmap)
69+
You might need to set `max_map_count` to something higher than the usual default, like `262144`.
70+
71+
You can do it by running `sysctl -w vm.max_map_count=262144`,
72+
for more information see: [this](https://cratedb.com/docs/guide/admin/bootstrap-checks.html#linux)
73+
74+
### Mock API call.
75+
76+
If you don't want to register in the weather api we use, you can use the
77+
provided function `mock_fetch_weather_data`, call this instead in the scheduler call.
78+
79+
This is how it would look like:
80+
81+
```python
82+
scheduler.enter(
83+
RUN_EVERY_SECONDS,
84+
1,
85+
schedule_every,
86+
(RUN_EVERY_SECONDS, mock_fetch_weather_data, scheduler)
87+
)
88+
```
89+
90+
*After changing this, re-build the docker compose.*
91+
92+
### Initial kafka topic.
93+
94+
In this example the `Kafka` topic is only initialized the first data is sent to it, because of this
95+
the flink job could fail if it exceeds the default timeout (60) seconds, this might only happen
96+
if the API takes too long to respond *the very first time this project*.
97+
98+
To solve this, you should [configure](https://kafka.apache.org/quickstart#quickstart_createtopic)
99+
the
100+
topics at boot time. This is recommended for production scenarios.
101+
102+
If you are just testing things around, you can solve this by re-running `docker compose up -d`, it
103+
will only start `flink_job` and assuming everything went ok, the topic should already exist and
104+
work as expected.
105+
106+
If it still fails, check if any other container/service is down,
107+
it could be a symptom of a wrong api token or an unresponsive Kafka server, for example.
108+
109+
## Data and schema
110+
111+
See `example.json` for the schema, as you can see in `weather_producer` and `flink_consumer`, schema
112+
manipulation is minimum,
113+
thanks to CrateDB's dynamic objects we only need to map `location` and `current` keys.
114+
115+
For more information on dynamic objects
116+
see: [this](https://cratedb.com/blog/handling-dynamic-objects-in-cratedb)
117+
118+
In `weather_producer` the `Kafka` producer directly serializes the json into a string.
119+
120+
```python
121+
KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER,
122+
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
123+
```
124+
125+
In `flink_consumer` we use a `JSON` serializer and only specify the two main keys,
126+
`location` and `current`
127+
128+
```python
129+
row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()])
130+
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
131+
```
132+
133+
If your format is not json, or if you want to specify the whole schema, adapt it as needed.
134+
135+
[Here](https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/datastream/connectors.html)
136+
you can find example of other formats like `csv` or `avro`.
137+
138+
## Jars and versions.
139+
140+
Jars are downloaded at build time to /app/jars, versions are pinned in the .env
141+
142+
There is a `JARS_PATH` in `flink_consumer`, change it if you have the jars somewhere else.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
services:
2+
weather_producer:
3+
env_file:
4+
- .env
5+
build:
6+
context: .
7+
dockerfile: Dockerfile
8+
command: python -m weather_producer
9+
depends_on:
10+
- kafka
11+
12+
flink_job:
13+
env_file:
14+
- .env
15+
build:
16+
context: .
17+
dockerfile: flink_job.Dockerfile
18+
args:
19+
- POSTGRESQL_JAR_URL=jdbc.postgresql.org/download/postgresql-${POSTGRESQL_JAR_VERSION}.jar
20+
- FLINK_SQL_JAR_URL=https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/${FLINK_CONNECTOR_JDBC_VERSION}/flink-connector-jdbc-${FLINK_CONNECTOR_JDBC_VERSION}.jar
21+
- FLINK_KAFKA_JAR_URL=https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/${FLINK_KAFKA_JAR_URL_VERSION}/flink-sql-connector-kafka-${FLINK_KAFKA_JAR_URL_VERSION}.jar
22+
command: python -m flink_consumer
23+
depends_on:
24+
- kafka
25+
26+
crate:
27+
image: crate:latest
28+
ports:
29+
- "4200:4200"
30+
command: [ "crate",
31+
"-Cdiscovery.type=single-node",
32+
]
33+
environment:
34+
- CRATE_HEAP_SIZE=2g
35+
36+
zookeeper:
37+
image: confluentinc/cp-zookeeper:6.2.0
38+
hostname: zookeeper
39+
ports:
40+
- "2181:2181"
41+
environment:
42+
ZOOKEEPER_CLIENT_PORT: 2181
43+
ZOOKEEPER_TICK_TIME: 2000
44+
45+
kafka:
46+
image: confluentinc/cp-server:6.2.0
47+
depends_on:
48+
- zookeeper
49+
ports:
50+
- "9092:9092"
51+
- "9101:9101"
52+
environment:
53+
KAFKA_BROKER_ID: 1
54+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
55+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
56+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
57+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
58+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
{
2+
"location": {
3+
"localtime": "2024-03-07 18:20",
4+
"country": "France",
5+
"localtime_epoch": 1709832024,
6+
"name": "Nonette",
7+
"lon": 3.28,
8+
"region": "Auvergne",
9+
"lat": 45.48,
10+
"tz_id": "Europe/Paris"
11+
},
12+
"current": {
13+
"feelslike_c": 11,
14+
"uv": 3,
15+
"last_updated": "2024-03-07 18:15",
16+
"feelslike_f": 51.7,
17+
"wind_degree": 30,
18+
"last_updated_epoch": 1709831700,
19+
"is_day": 1,
20+
"precip_in": 0,
21+
"wind_dir": "NNE",
22+
"gust_mph": 12.1,
23+
"temp_c": 12,
24+
"pressure_in": 29.83,
25+
"gust_kph": 19.5,
26+
"temp_f": 53.6,
27+
"precip_mm": 0,
28+
"cloud": 0,
29+
"wind_kph": 6.8,
30+
"condition": {
31+
"code": 1000,
32+
"icon": "//cdn.weatherapi.com/weather/64x64/day/113.png",
33+
"text": "Sunny"
34+
},
35+
"wind_mph": 4.3,
36+
"vis_km": 10,
37+
"humidity": 50,
38+
"pressure_mb": 1010,
39+
"vis_miles": 6
40+
}
41+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import os
2+
import logging
3+
4+
from pathlib import Path
5+
6+
from pyflink.common import Types
7+
from pyflink.datastream import StreamExecutionEnvironment
8+
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
9+
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
10+
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
11+
12+
logging.basicConfig(
13+
format='[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s',
14+
level=logging.DEBUG
15+
)
16+
17+
JARS_PATH = Path(__file__).parent / 'jars'
18+
19+
KAFKA_BOOTSTRAP_SERVER = os.getenv('FLINK_CONSUMER_BOOTSTRAP_SERVER')
20+
KAFKA_TOPIC = os.getenv('FLINK_CONSUMER_KAFKA_TOPIC')
21+
CRATEDB_PG_URI = os.getenv('FLINK_CONSUMER_CRATEDB_PG_URI', 'jdbc:postgresql://localhost:5432/crate')
22+
CRATEDB_USER = os.getenv('FLINK_CONSUMER_CRATEDB_USER')
23+
CRATEDB_PASSWORD = os.getenv('FLINK_CONSUMER_CRATEDB_PASSWORD')
24+
25+
26+
def kafka_to_cratedb(env: StreamExecutionEnvironment):
27+
row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()])
28+
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
29+
30+
# Consumes data from Kafka.
31+
kafka_consumer = FlinkKafkaConsumer(
32+
topics=KAFKA_TOPIC,
33+
deserialization_schema=json_format,
34+
properties={'bootstrap.servers': f'{KAFKA_BOOTSTRAP_SERVER}:9092'}
35+
)
36+
kafka_consumer.set_start_from_latest()
37+
38+
ds = env.add_source(kafka_consumer, source_name='kafka')
39+
40+
# Writes data to cratedb.
41+
ds.add_sink(
42+
JdbcSink.sink(
43+
"insert into doc.weather_flink_sink (location, current) values (?, ?)",
44+
row_type_info,
45+
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
46+
.with_url(CRATEDB_PG_URI)
47+
.with_driver_name('org.postgresql.Driver')
48+
.with_user_name(CRATEDB_USER)
49+
.with_password(CRATEDB_PASSWORD)
50+
.build(),
51+
JdbcExecutionOptions.builder()
52+
.with_batch_interval_ms(1000)
53+
.with_batch_size(200)
54+
.with_max_retries(5)
55+
.build()
56+
)
57+
)
58+
env.execute()
59+
60+
61+
if __name__ == '__main__':
62+
env = StreamExecutionEnvironment.get_execution_environment()
63+
jars = list(map(lambda x: 'file://' + str(x), (JARS_PATH.glob('*.jar'))))
64+
env.add_jars(*jars)
65+
66+
logging.info("Reading data from kafka")
67+
kafka_to_cratedb(env)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
FROM python:3.10
2+
# Python version is important, because as of today (2024-03-04) kafka-flink is only
3+
# supported for python<=3.10
4+
5+
ARG POSTGRESQL_JAR_URL
6+
ARG FLINK_SQL_JAR_URL
7+
ARG FLINK_KAFKA_JAR_URL
8+
9+
WORKDIR /app
10+
COPY * /app
11+
RUN wget ${POSTGRESQL_JAR_URL} --directory-prefix=/app/jars
12+
RUN wget ${FLINK_SQL_JAR_URL} --directory-prefix=/app/jars
13+
RUN wget ${FLINK_KAFKA_JAR_URL} --directory-prefix=/app/jars
14+
15+
RUN apt update && apt install -y openjdk-11-jdk
16+
RUN pip install poetry
17+
18+
RUN poetry config virtualenvs.create false && poetry install
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[tool.poetry]
2+
name = "cratedb-weather-data"
3+
version = "0.1.0"
4+
description = ""
5+
authors = ["ivan.sanchez <ivan.sanchez@crate.io>"]
6+
readme = "README.md"
7+
8+
[tool.poetry.dependencies]
9+
python = "^3.9"
10+
requests = "^2.31.0"
11+
kafka-python = "^2.0.2"
12+
apache-flink = "^1.18.1"
13+
14+
[build-system]
15+
requires = ["poetry-core"]
16+
build-backend = "poetry.core.masonry.api"

0 commit comments

Comments
 (0)