Skip to content

Commit 9d6b2b1

Browse files
razvanmaltesander
andauthored
backport: lake demo, wait for resources (#268)
* Improve resource/topic wait script (#261) Co-authored-by: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> * update broker url --------- Co-authored-by: Malte Sander <malte.sander.it@gmail.com>
1 parent 7f7b383 commit 9d6b2b1

File tree

1 file changed

+97
-2
lines changed

1 file changed

+97
-2
lines changed

demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,85 @@ spec:
1111
spec:
1212
serviceAccountName: demo-serviceaccount
1313
initContainers:
14-
- name: wait-for-kafka
14+
- name: wait-for-resources
1515
image: oci.stackable.tech/sdp/tools:1.0.0-stackable25.3.0
16-
command: ["bash", "-c", "echo 'Waiting for all kafka brokers to be ready' && kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/name=kafka -l app.kubernetes.io/instance=kafka"]
16+
command:
17+
- bash
18+
- -euo
19+
- pipefail
20+
- -c
21+
- |
22+
echo 'Waiting for all minio instances to be ready'
23+
kubectl wait --for=condition=ready --timeout=30m pod -l app=minio,release=minio,stackable.tech/vendor=Stackable
24+
echo 'Waiting for all kafka brokers to be ready'
25+
kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka
26+
echo 'Waiting for all nifi instances to be ready'
27+
kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/name=nifi,app.kubernetes.io/instance=nifi
28+
- name: wait-for-kafka-topics
29+
image: oci.stackable.tech/sdp/kafka:3.9.0-stackable25.3.0
30+
command:
31+
- bash
32+
- -euo
33+
- pipefail
34+
- -c
35+
- |
36+
#!/usr/bin/bash
37+
#
38+
# Wait for Kafka topics created by the Nifi workflows to be ready.
39+
# Also wait for all topic partitions to have a leader.
40+
# This is required for the Spark streaming job to be able to read from the topics.
41+
# Without this check, the Spark job might hang indefinitely without processing any events.
42+
43+
BROKER="${BROKER:-kafka-broker-default:9093}"
44+
45+
log() {
46+
level="$1"
47+
shift
48+
echo "[$level] $*"
49+
}
50+
51+
check_leaders() {
52+
local topic=$1
53+
local failed=0
54+
55+
log INFO "Starting leader check on Kafka broker: $BROKER for topic: $topic"
56+
metadata=$(kcat -b "$BROKER" -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -L -t "$topic" 2>/dev/null)
57+
58+
if [[ -z "$metadata" ]]; then
59+
log ERROR "Failed to retrieve metadata for topic: $topic"
60+
return 1
61+
fi
62+
63+
log DEBUG "Metadata for $topic:"
64+
echo "$metadata"
65+
66+
if echo "$metadata" | grep -q 'leader: -1'; then
67+
log ERROR "Found 'leader: -1' in topic '$topic'; topic not ready yet!"
68+
return 1
69+
fi
70+
71+
if echo "$metadata" | grep -q 'Broker: Leader not available'; then
72+
log ERROR "Topic '$topic' not available yet"
73+
return 1
74+
fi
75+
76+
log INFO "Check topic '$topic' was successful"
77+
return 0
78+
}
79+
80+
for topic in "shared_bikes_bike_status" "shared_bikes_station_status" "shared_bikes_station_information" "water_levels_measurements" "water_levels_stations"
81+
do
82+
result=$(check_leaders "$topic")
83+
echo "$result"
84+
if [ "$result" == "1" ]
85+
then
86+
exit 1
87+
fi
88+
done
89+
exit 0
90+
volumeMounts:
91+
- name: tls-kcat
92+
mountPath: /stackable/tls-kcat
1793
containers:
1894
- name: create-spark-ingestion-job
1995
image: oci.stackable.tech/sdp/tools:1.0.0-stackable25.3.0
@@ -25,7 +101,26 @@ spec:
25101
- name: manifest
26102
configMap:
27103
name: create-spark-ingestion-job-manifest
104+
- name: tls-kcat
105+
ephemeral:
106+
volumeClaimTemplate:
107+
metadata:
108+
annotations:
109+
secrets.stackable.tech/backend.autotls.cert.lifetime: "1d"
110+
secrets.stackable.tech/class: "tls"
111+
secrets.stackable.tech/format: "tls-pem"
112+
secrets.stackable.tech/scope: "pod"
113+
spec:
114+
accessModes:
115+
- ReadWriteOnce
116+
resources:
117+
requests:
118+
storage: "1"
119+
storageClassName: secrets.stackable.tech
120+
volumeMode: Filesystem
28121
restartPolicy: OnFailure
122+
securityContext:
123+
fsGroup: 1000
29124
backoffLimit: 50
30125
---
31126
apiVersion: v1

0 commit comments

Comments
 (0)