-
Notifications
You must be signed in to change notification settings - Fork 15.4k
Description
Apache Airflow Provider(s)
apache-flink
Versions of Apache Airflow Providers
apache-airflow-providers-apache-flink==1.6.0
Apache Airflow version
2.10.5
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Official Apache Airflow Helm Chart
Deployment details
Deployed on local minikube instance
Executor: KubernetesExecutor
Flink Operator Repo: https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
What happened
The FlinkKubernetesSensor operator fails to find the Flink TaskManager pods:
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/sensors/flink_kubernetes.py", line 127, in poke
self._log_driver(application_state, response)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/sensors/flink_kubernetes.py", line 95, in _log_driver
for task_manager in all_pods.items:
^^^^^^^^^^^^^^
AttributeError: 'HTTPResponse' object has no attribute 'items'
What you think should happen instead
Idea 1)
The call to obtain the list of pods in FlinkKubernetesSensor._log_driver() should use a different kubernetes API call to obtain pods across all namespaces:
However, the argument for get_namespaced_pod_list() is hardcoded to the "default" namespace:
all_pods = self.hook.get_namespaced_pod_list(
namespace="default", watch=False, label_selector=task_manager_labels
)
This hook calls the list_namespaced_pod() API function and due to the hardcoded namespace, which will only look in the default namespace for flink taskmanager pods.
Idea 2)
Utilize the existing namespace argument in the FlinkKubernetesSensor parameter to look for taskmanager pods
Idea 3)
Provide an additional namespace argument for taskmanager pods. Perhaps users want to have the flexibility of deploying taskmanager pods in a different namespace than jobmanager pods?
How to reproduce
Deploy dag that contains two tasks:
deployment_yaml = ...
t1 = FlinkKubernetesOperator(
task_id="submit_flink_job",
application_file=deployment_yaml,
namespace="flink",
api_group="flink.apache.org",
api_version="v1beta1"
)
t2 = FlinkKubernetesSensor(
task_id="monitor_flink_job",
application_name=deployment_yaml['metadata']['name'],
namespace="flink",
api_group="flink.apache.org",
api_version="v1beta1",
attach_log=True
)
t1 >> t2
- The first task successfully deploys the kubernetes deployment: the job manager launches in the k8s "flink" namespace, and then the task manager pod also launches in the "flink" namespace.
- The FlinkKubernetesSensor task successfully detects the job state change of the job manager.
- The FlinkKubernetesSensor sensor fails to find the task manager pod, resulting in failure mentioned above.
Anything else
Perhaps I'm not following some best-practice for kubernetes flink deployment style where taskmanager pods should always be in the "default" namespace - I'm still fairly new to Apache Flink.
Workaround is currently to disable the "attach_log" feature:
t2 = FlinkKubernetesSensor(
...
attach_log=False
)
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct