Skip to content

Commit 1148799

Browse files
authored
Iceberg sink docs (#586)
1 parent e73e380 commit 1148799

File tree

4 files changed

+81
-3
lines changed

4 files changed

+81
-3
lines changed

docs/connectors/sinks/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ sdf = sdf.apply() # continue different operations with another branch...
7878

7979
Currently, Quix Streams provides these sinks out of the box:
8080

81+
- [Apache Iceberg Sink](apache-iceberg-sink.md) - a sink to write data in Apache Iceberg format.
8182
- [CSV Sink](csv-sink.md) - a simple CSV sinks that writes data to a single CSV file.
8283
- [InfluxDB 3 Sink](influxdb3-sink.md) - a sink to write data to InfluxDB 3.
8384

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Apache Iceberg Sink
2+
3+
!!! info
4+
5+
This is a **Community** connector. Test it before using in production.
6+
7+
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.
8+
9+
This sink writes batches of data to an Apache Iceberg table.
10+
By default, the data will include the kafka message key, value, and timestamp.
11+
12+
Currently, supports Apache Iceberg hosted in:
13+
14+
- AWS
15+
16+
Supported data catalogs:
17+
18+
- AWS Glue
19+
20+
## How the Iceberg Sink Works
21+
`IcebergSink` is a batching sink.
22+
23+
It batches processed records in memory per topic partition, serializes incoming data batches into Parquet format, and appends them to the Iceberg table, updating the table schema as necessary.
24+
25+
## How To Use Iceberg Sink
26+
27+
Create an instance of `IcebergSink` and pass
28+
it to the `StreamingDataFrame.sink()` method.
29+
30+
For the full description of expected parameters, ee the [Iceberg Sink API](../../api-reference/sinks.md#icebergsink) page.
31+
32+
```python
33+
from quixstreams import Application
34+
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig
35+
36+
# Configure S3 bucket credentials
37+
iceberg_config = AWSIcebergConfig(
38+
aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
39+
)
40+
41+
# Configure the sink to write data to S3 with the AWS Glue catalog spec
42+
iceberg_sink = IcebergSink(
43+
table_name="glue.sink-test",
44+
config=iceberg_config,
45+
data_catalog_spec="aws_glue",
46+
)
47+
48+
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
49+
topic = app.topic('sink_topic')
50+
51+
# Do some processing here
52+
sdf = app.dataframe(topic=topic).print(metadata=True)
53+
54+
# Sink results to the IcebergSink
55+
sdf.sink(iceberg_sink)
56+
57+
58+
if __name__ == "__main__":
59+
# Start the application
60+
app.run()
61+
```
62+
63+
## Retrying Failures
64+
`IcebergSink` will retry failed commits automatically with a random delay up to 5 seconds.
65+
66+
## Delivery Guarantees
67+
`IcebergSink` provides at-least-once guarantees, and the results may contain duplicated rows of data if there were errors during processing.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ nav:
4949
- 'Connectors [beta]':
5050
- Sinks:
5151
- 'connectors/sinks/README.md'
52+
- Apache Iceberg Sink: connectors/sinks/apache-iceberg-sink.md
5253
- CSV Sink: connectors/sinks/csv-sink.md
5354
- InfluxDB v3 Sink: connectors/sinks/influxdb3-sink.md
5455
- Creating a Custom Sink: connectors/sinks/custom-sinks.md

quixstreams/sinks/community/iceberg.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from pyiceberg.partitioning import PartitionSpec, PartitionField
1818
from pyiceberg.schema import Schema, NestedField
1919
from pyiceberg.types import StringType, TimestampType
20-
from pyiceberg.exceptions import CommitFailedException # Import the exception
20+
from pyiceberg.exceptions import CommitFailedException
2121
except ImportError as exc:
2222
raise ImportError(
2323
f"Package {exc.name} is missing: "
@@ -101,12 +101,15 @@ class IcebergSink(BatchingSink):
101101
Example setup using an AWS-hosted Iceberg with AWS Glue:
102102
103103
```
104-
from quixstreams.sinks.community.iceberg import IcebergSink, IcebergAWSConfig
104+
from quixstreams import Application
105+
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig
105106
106-
iceberg_config = IcebergAWSConfig(
107+
# Configure S3 bucket credentials
108+
iceberg_config = AWSIcebergConfig(
107109
aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
108110
)
109111
112+
# Configure the sink to write data to S3 with the AWS Glue catalog spec
110113
iceberg_sink = IcebergSink(
111114
table_name="glue.sink-test",
112115
config=iceberg_config,
@@ -115,10 +118,16 @@ class IcebergSink(BatchingSink):
115118
116119
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
117120
topic = app.topic('sink_topic')
121+
122+
# Do some processing here
118123
sdf = app.dataframe(topic=topic).print(metadata=True)
124+
125+
# Sink results to the IcebergSink
119126
sdf.sink(iceberg_sink)
120127
128+
121129
if __name__ == "__main__":
130+
# Start the application
122131
app.run()
123132
```
124133
"""

0 commit comments

Comments
 (0)