Skip to content

FlinkKubernetesSensor._log_driver() namespace for pod retrieval is hardcoded to "default" #48369

@kmacdonald76

Description

@kmacdonald76

Apache Airflow Provider(s)

apache-flink

Versions of Apache Airflow Providers

apache-airflow-providers-apache-flink==1.6.0

Apache Airflow version

2.10.5

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Official Apache Airflow Helm Chart

Deployment details

Deployed on local minikube instance
Executor: KubernetesExecutor
Flink Operator Repo: https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/

What happened

The FlinkKubernetesSensor operator fails to find the Flink TaskManager pods:

  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/sensors/flink_kubernetes.py", line 127, in poke
    self._log_driver(application_state, response)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/flink/sensors/flink_kubernetes.py", line 95, in _log_driver
    for task_manager in all_pods.items:
                        ^^^^^^^^^^^^^^
AttributeError: 'HTTPResponse' object has no attribute 'items'

What you think should happen instead

Idea 1)

The call to obtain the list of pods in FlinkKubernetesSensor._log_driver() should use a different kubernetes API call to obtain pods across all namespaces:

https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md#list_pod_for_all_namespaces

However, the argument for get_namespaced_pod_list() is hardcoded to the "default" namespace:

  all_pods = self.hook.get_namespaced_pod_list(
      namespace="default", watch=False, label_selector=task_manager_labels
  )

This hook calls the list_namespaced_pod() API function and due to the hardcoded namespace, which will only look in the default namespace for flink taskmanager pods.

Idea 2)

Utilize the existing namespace argument in the FlinkKubernetesSensor parameter to look for taskmanager pods

Idea 3)

Provide an additional namespace argument for taskmanager pods. Perhaps users want to have the flexibility of deploying taskmanager pods in a different namespace than jobmanager pods?

How to reproduce

Deploy dag that contains two tasks:


   deployment_yaml = ...

    t1 = FlinkKubernetesOperator(
        task_id="submit_flink_job",
        application_file=deployment_yaml,
        namespace="flink",
        api_group="flink.apache.org",
        api_version="v1beta1"
    )

    t2 = FlinkKubernetesSensor(
        task_id="monitor_flink_job",
        application_name=deployment_yaml['metadata']['name'],
        namespace="flink",
        api_group="flink.apache.org",
        api_version="v1beta1",
        attach_log=True
    )
    t1 >> t2
  • The first task successfully deploys the kubernetes deployment: the job manager launches in the k8s "flink" namespace, and then the task manager pod also launches in the "flink" namespace.
  • The FlinkKubernetesSensor task successfully detects the job state change of the job manager.
  • The FlinkKubernetesSensor sensor fails to find the task manager pod, resulting in failure mentioned above.

Anything else

Perhaps I'm not following some best-practice for kubernetes flink deployment style where taskmanager pods should always be in the "default" namespace - I'm still fairly new to Apache Flink.

Workaround is currently to disable the "attach_log" feature:

        t2 = FlinkKubernetesSensor(
            ...
            attach_log=False
        )

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions