14
14
containers :
15
15
- name : create-nifi-ingestion-job
16
16
image : oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev
17
- command : ["bash", "-c", "curl -O https://raw.githubusercontent.com/stackabletech/demos/main/demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.xml && python -u /tmp/script/script.py"]
17
+ command :
18
+ - bash
19
+ - -euo
20
+ - pipefail
21
+ - -c
22
+ - python -u /tmp/script/script.py
18
23
volumeMounts :
19
24
- name : script
20
25
mountPath : /tmp/script
@@ -41,9 +46,8 @@ metadata:
41
46
name : create-nifi-ingestion-job-script
42
47
data :
43
48
script.py : |
44
- from nipyapi.canvas import get_root_pg_id, schedule_process_group, list_all_controllers, schedule_controller
49
+ from nipyapi.canvas import get_root_pg_id, schedule_process_group, list_all_controllers, schedule_controller, update_controller
45
50
from nipyapi.security import service_login
46
- from nipyapi.templates import get_template, upload_template, deploy_template
47
51
import nipyapi
48
52
import os
49
53
import urllib3
@@ -52,26 +56,94 @@ data:
52
56
ENDPOINT = f"https://nifi-node-default-0.nifi-node-default.{os.environ['NAMESPACE']}.svc.cluster.local:8443" # For local testing / developing replace it, afterwards change back to f"https://nifi-node-default-0.nifi-node-default.{os.environ['NAMESPACE']}.svc.cluster.local:8443"
53
57
USERNAME = "admin"
54
58
PASSWORD = open("/nifi-admin-credentials-secret/admin").read()
55
- TEMPLATE_NAME = "LakehouseKafkaIngest"
56
- TEMPLATE_FILE = f"{TEMPLATE_NAME}.xml"
57
59
58
60
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
59
61
60
62
nipyapi.config.nifi_config.host = f"{ENDPOINT}/nifi-api"
61
63
nipyapi.config.nifi_config.verify_ssl = False
62
64
63
- print("Logging in")
65
+ print(f "Logging in as {USERNAME} ")
64
66
service_login(username=USERNAME, password=PASSWORD)
65
67
print("Logged in")
66
68
67
- pg_id = get_root_pg_id()
69
+ organization = "stackabletech"
70
+ repository = "demos"
71
+ branch = "main"
72
+ version = "main"
73
+ directory = "demos/data-lakehouse-iceberg-trino-spark"
74
+ flow_name = "LakehouseKafkaIngest"
75
+
76
+ # Check if the GitHub flow registry client already exists
77
+ flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
78
+
79
+ github_client = None
80
+ for client in flow_registry_clients:
81
+ if client.component.name == "GitHubFlowRegistryClient":
82
+ github_client = client
83
+ print("Found existing GitHub flow registry client")
84
+ break
68
85
69
- upload_template(pg_id, TEMPLATE_FILE)
86
+ if not github_client:
87
+ print("Creating new GitHub flow registry client")
88
+ github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
89
+ body={
90
+ "revision": {"version": 0},
91
+ "component": {
92
+ "name": "GitHubFlowRegistryClient",
93
+ "type": "org.apache.nifi.github.GitHubFlowRegistryClient",
94
+ "properties": {
95
+ "Repository Owner": organization,
96
+ "Repository Name": repository,
97
+ },
98
+ "bundle": {
99
+ "group": "org.apache.nifi",
100
+ "artifact": "nifi-github-nar",
101
+ "version": "2.2.0",
102
+ },
103
+ },
104
+ }
105
+ )
106
+
107
+ pg_id = get_root_pg_id()
70
108
71
- template_id = get_template(TEMPLATE_NAME).id
72
- deploy_template(pg_id, template_id, 200, 0)
109
+ try:
110
+ # Create process group from the file in the Git repo
111
+ nipyapi.nifi.ProcessGroupsApi().create_process_group(
112
+ id=pg_id,
113
+ body={
114
+ "revision": {"version": 0},
115
+ "component": {
116
+ "position": {"x": 300, "y": 10},
117
+ "versionControlInformation": {
118
+ "registryId": github_client.component.id,
119
+ "flowId": flow_name,
120
+ "bucketId": directory,
121
+ "branch": branch,
122
+ "version": version,
123
+ },
124
+ },
125
+ },
126
+ )
127
+ except ValueError as e:
128
+ # Ignore, because nipyapi can't handle non-int versions yet
129
+ if "invalid literal for int() with base 10" in str(e):
130
+ print("Ignoring ValueError")
131
+ else:
132
+ raise e
73
133
74
- for controller in list_all_controllers():
75
- schedule_controller(controller, scheduled=True)
134
+ # Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
135
+ # To work around this, we try to schedule the controllers multiple times
136
+ # If `Kafka3ConnectionService` is started before `StandardRestrictedSSLContextService`, scheduling it will fail in the first iteration
137
+ # But it should succeed in the second attempt, since by then `StandardRestrictedSSLContextService` is started
138
+ max_retries = 2
139
+ for _ in range(max_retries):
140
+ controllers = list_all_controllers(pg_id)
141
+ for controller in controllers:
142
+ if controller.component.state != "ENABLED":
143
+ try:
144
+ schedule_controller(controller, scheduled=True)
145
+ print(f"Scheduled controller: {controller.component.name}")
146
+ except Exception as e:
147
+ print(f"Failed to schedule controller {controller.component.name}: {e}")
76
148
77
149
schedule_process_group(pg_id, scheduled=True)
0 commit comments