Skip to content

Commit 6cde7cd

Browse files
committed
Flink Tour: Add Python examples
1 parent 59262ff commit 6cde7cd

File tree

4 files changed

+116
-0
lines changed

4 files changed

+116
-0
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Apache Flink and CrateDB with Python
2+
3+
Basic examples demonstrating how to read and write from/to
4+
CrateDB when using Apache Flink (PyFlink).
5+
6+
The examples use both the [CrateDB JDBC] and the [PostgreSQL JDBC]
7+
driver. CrateDB JDBC is needed for catalog operations, which are
8+
required when reading from CrateDB using Flink.
9+
10+
```sql
11+
uvx crash -c 'CREATE TABLE person (name STRING, age INT);'
12+
```
13+
Flink >= 1.19 has problems with JDBC and PyFlink,
14+
but previous versions need a Python of the same age.
15+
```shell
16+
uv venv --python 3.10 --seed .venv310
17+
uv pip install -r requirements.txt
18+
```
19+
```shell
20+
python write.py
21+
```
22+
```shell
23+
python ready.py
24+
```
25+
26+
27+
[CrateDB JDBC]: https://cratedb.com/docs/guide/connect/java/cratedb-jdbc.html
28+
[PostgreSQL JDBC]: https://cratedb.com/docs/guide/connect/java/postgresql-jdbc.html
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
def main():
2+
print("Hello World")
3+
4+
5+
if __name__ == "__main__":
6+
main()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
apache-flink==1.18.1
2+
avro<1.13
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import dataclasses
2+
import logging
3+
4+
from pathlib import Path
5+
6+
from pyflink.common import Types
7+
from pyflink.datastream import StreamExecutionEnvironment
8+
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
9+
10+
11+
logger = logging.getLogger(__name__)
12+
13+
JARS_PATH = Path(__file__).parent / 'jars'
14+
15+
16+
@dataclasses.dataclass
17+
class Person:
18+
name: str
19+
age: int
20+
21+
22+
def main():
23+
24+
env = StreamExecutionEnvironment.get_execution_environment()
25+
jars = list(map(lambda x: 'file://' + str(x), (JARS_PATH.glob('*.jar'))))
26+
env.add_jars(*jars)
27+
28+
# Define source data.
29+
ds = env.from_collection([
30+
Person("Fred", 35),
31+
Person("Wilma", 35),
32+
Person("Pebbles", 2),
33+
])
34+
35+
# Define CrateDB as data sink.
36+
row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
37+
ds.add_sink(
38+
JdbcSink.sink(
39+
"INSERT INTO person (name, age) VALUES (?, ?)",
40+
row_type_info,
41+
42+
# FIXME (Flink >= 1.19): java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder
43+
44+
# This is due to a bug in the python Flink library. In `flink-connector-jdbc` v3.1,
45+
# the `JdbcOutputFormat` was renamed to `RowJdbcOutputFormat`. This change has up till
46+
# now not been implemented in the python Flink library.
47+
# https://stackoverflow.com/questions/78960829/java-lang-nosuchmethodexception-in-python-flink-jdbc
48+
49+
# As you see, java `JdbcSink` connector class has different shape from Python `JdbcSink` connector.
50+
# In Java code, `jdbcSink` object is generated from `JdbcSinkBuilder` class, but in Python it is not.
51+
# I think these errors are due to API version mismatch. Any idea to solve these errors?
52+
# https://stackoverflow.com/questions/79604252/issue-with-pyflink-api-code-for-inserting-data-into-sql
53+
54+
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
55+
.with_url('jdbc:postgresql://localhost:5432/crate')
56+
.with_driver_name('org.postgresql.Driver')
57+
.with_user_name("crate")
58+
.with_password("crate")
59+
.build(),
60+
JdbcExecutionOptions.builder()
61+
.with_batch_interval_ms(1000)
62+
.with_batch_size(200)
63+
.with_max_retries(5)
64+
.build()
65+
)
66+
)
67+
68+
# Execute pipeline.
69+
env.execute()
70+
71+
72+
if __name__ == '__main__':
73+
logging.basicConfig(
74+
format='[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s',
75+
level=logging.DEBUG
76+
)
77+
78+
logger.info("Start")
79+
main()
80+
logger.info("Ready")

0 commit comments

Comments
 (0)