Skip to content

[Bug] can not process DAG and not have error log #1992

@xianzheTM

Description

@xianzheTM

Astronomer Cosmos Version

1.10.2

dbt-core version

1.8.9

Versions of dbt adapters

Athena 1.8.4

LoadMode

DBT_LS

ExecutionMode

LOCAL

InvocationMode

None

airflow version

2.11.0

Operating System

debian 12

If a you think it's an UI issue, what browsers are you seeing the problem on?

No response

Deployment

Docker-Compose

Deployment details

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.11.0
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
#                                Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
x-airflow-common:
  &airflow-common
  # Use a custom image that bakes in dbt + cosmos.
  # Build context is repo root containing the Dockerfile.
  build: .
  image: airflow-cosmos-local:2.11.0
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    DBT_PROFILES_DIR: /opt/airflow/dbt
    PIP_INDEX_URL: https://pypi.tuna.tsinghua.edu.cn/simple
    PIP_TIMEOUT: '120'
    AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: '300'
    # yamllint disable rule:line-length
    # Use simple http server on scheduler for health checks
    # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
    # yamllint enable rule:line-length
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    # 日志配置
    AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
    AIRFLOW__LOGGING__FAB_LOGGING_LEVEL: WARN
    AIRFLOW__LOGGING__BASE_LOG_FOLDER: /opt/airflow/logs
    AIRFLOW__LOGGING__REMOTE_LOGGING: 'false'
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: ''
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: ''
    AIRFLOW__LOGGING__ENCRYPT_S3_LOGS: 'false'
    # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
    # for other purpose (development, test and especially production usage) build/extend Airflow image.
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    # The following line can be used to set a custom config file, stored in the local config folder
    # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file
    AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - /home/leiyang/work/unified-data-all/unified-data-dbt:/opt/airflow/dbt
    - /home/leiyang/.aws:/home/airflow/.aws:ro
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always

  redis:
    # Redis is limited to 7.2-bookworm due to licencing change
    # https://redis.io/blog/redis-adopts-dual-source-available-licensing/
    image: redis:7.2-bookworm
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      # yamllint disable rule:line-length
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
  # or by explicitly targeted on the command line e.g. docker-compose up flower.
  # See: https://docs.docker.com/compose/profiles/
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  postgres-db-volume:

and DCOKFILE:

FROM apache/airflow:2.11.0

# Switch to root to adjust apt sources and install minimal tools
USER root
RUN set -ex \
    # Remove any extra sources to avoid slow/failing repos
    && rm -f /etc/apt/sources.list.d/* || true \
    # Write a fresh sources.list using Aliyun mirrors (Debian 12/bookworm)
    && echo 'deb https://mirrors.aliyun.com/debian bookworm main contrib non-free non-free-firmware' > /etc/apt/sources.list \
    && echo 'deb https://mirrors.aliyun.com/debian bookworm-updates main contrib non-free non-free-firmware' >> /etc/apt/sources.list \
    && echo 'deb https://mirrors.aliyun.com/debian-security bookworm-security main contrib non-free non-free-firmware' >> /etc/apt/sources.list \
    && apt-get update \
    && apt-get install -y --no-install-recommends \
       python3-venv \
       build-essential \
    && rm -rf /var/lib/apt/lists/* \
    && mkdir -p /opt/airflow/.venvs \
    && chown -R airflow:0 /opt/airflow/.venvs

# Switch to airflow user for Python package installation
USER airflow

# Install Cosmos for Local execution into the image's Python env (venv)
RUN pip install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple "astronomer-cosmos[local]"

# Create a dedicated venv for dbt + athena and install packages there
ENV DBT_VENV=/opt/airflow/.venvs/dbt-athena
RUN python3 -m venv "$DBT_VENV" \
    && "$DBT_VENV/bin/pip" install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip setuptools wheel \
    && "$DBT_VENV/bin/pip" install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple \
         "dbt-core>=1.8,<1.9" \
         "dbt-athena-community>=1.8,<1.9"

What happened?

should success process DAG file and find it in web ui,but process DAG faild. And it appears to have entered a loop, continuously retrying.

This is airflow config:

[core]
executor = CeleryExecutor
fernet_key =
dags_are_paused_at_creation = True
load_examples = True
dagbag_import_timeout = 300

[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres/airflow

[celery]
result_backend = db+postgresql://airflow:airflow@postgres/airflow
broker_url = redis://:@redis:6379/0

[api]
auth_backends = airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session

[scheduler]
enable_health_check = True

[logging]
logging_level = DEBUG
fab_logging_level = DEBUG
base_log_folder = /opt/airflow/logs
remote_logging = False
remote_base_log_folder =
remote_log_conn_id =
encrypt_s3_logs = False

Relevant log output

How to reproduce

DAG file:

from __future__ import annotations

from datetime import datetime

from cosmos import DbtDag
from cosmos.config import (
    ProjectConfig,
    ProfileConfig,
    ExecutionConfig,
    RenderConfig,
    LoadMode,
    ExecutionMode,
)


# Cosmos Local mode: call a fixed dbt binary inside a dedicated venv
# baked into the Airflow image, to keep global env clean while avoiding
# per-task venv overhead.

DBT_PROJECT_DIR = "/opt/airflow/dbt"
DBT_PROFILES_DIR = DBT_PROJECT_DIR


dag = DbtDag(
    dag_id="ext_walmart_adgroup_aditem_report",
    schedule_interval=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={"owner": "airflow"},
    project_config=ProjectConfig(
        dbt_project_path=DBT_PROJECT_DIR,
    ),
    profile_config=ProfileConfig(
        profile_name="athena",
        target_name="dev",
        profiles_yml_filepath=f"{DBT_PROFILES_DIR}/profiles.yml",
    ),
    render_config=RenderConfig(
        load_method=LoadMode.DBT_LS,
        select=["ext_walmart_aditem_report"],
    ),
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.LOCAL,
        dbt_executable_path="/opt/airflow/.venvs/dbt-athena/bin/dbt",
    ),
)

It's a very simple model. Running these two commands directly inside the container is very fast:

/opt/airflow/.venvs/dbt-athena/bin/dbt deps --project-dir /tmp/tmpd96z2aj5 --profiles-dir /opt/airflow/dbt --profile athena --target dev
/opt/airflow/.venvs/dbt-athena/bin/dbt ls --output json --project-dir /tmp/tmpd96z2aj5 --profiles-dir /opt/airflow/dbt --profile athena --target dev --select ext_walmart_aditem_report

ext_walmart_aditem_report.sql:

  {{ config(materialized='table') }}

  select 1 as test_column

Anything else :)?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Contact Details

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingtriage-neededItems need to be reviewed / assigned to milestone

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions