Skip to content

[Bug] Error combining DbtTaskGroup with TaskFlow components #1984

@jaime-spencer-oxb

Description

@jaime-spencer-oxb

Astronomer Cosmos Version

1.10.0

dbt-core version

1.10.11

Versions of dbt adapters

dbt-bigquery  1.1.0.2

LoadMode

DBT_LS_MANIFEST

ExecutionMode

GCP_CLOUD_RUN_JOB

InvocationMode

DBT_RUNNER

airflow version

2.10.5

Operating System

Linux

If a you think it's an UI issue, what browsers are you seeing the problem on?

No response

Deployment

Google Cloud Composer

Deployment details

No response

What happened?

I am trying to create an Airflow pipeline that will run N dbt pipelines based on the rows available in a BigQuery table. To achieve this, I've been trying to combine DbtTaskGroup with TaskFlow components, i.e. using the @task and @task_group decorators. I was then hoping to use the .expand() provided by TaskFlow to trigger the N pipelines with different parameters.

However, when deploying a simplified version of this pipeline, I get an AirFlow exception (details in the log). I have scanned though previous issued and found #1218, which seems to suggest that this was a problem prior to 1.8.0.

I'm not sure if my problem is related to this (or something similar) or if there are alternative ways of implementing my requirements.

Relevant log output

Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskmixin.py", line 271, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskmixin.py", line 229, in _set_relatives
    raise AirflowException(f"Tried to set relationships between tasks in more than one DAG: {dags}")
airflow.exceptions.AirflowException: Tried to set relationships between tasks in more than one DAG: {<DAG: DAG_NAME>, <DAG: DAG_NAME>}

How to reproduce

A sample deployment script would be

import json
from pathlib import Path

import airflow
from airflow.decorators import task
from cosmos import DbtTaskGroup, ProjectConfig, ExecutionConfig
from cosmos.constants import ExecutionMode, TestIndirectSelection

DBT_PROJECT = "my_project"
DBT_ROOT = Path(__file__).parent / "dbt" / DBT_PROJECT

@task.python()
def get_data() -> dict:
    return {
        "my_var": "hello world",
    }

@task.python()
def prepare_dbt_vars(data: dict) -> str:
    dbt_vars = json.dumps({
        "run_id": "{{ run_id }}",
        "my_var": data['my_var'].replace(' ', '_'),
    })
    return dbt_vars

with airflow.DAG(
    dag_id=DBT_PROJECT,
    schedule=None,
    default_args={"retries": 0},
) as dag:
    data = get_data()
    dbt_vars = prepare_dbt_vars(data)

    run_dbt = DbtTaskGroup(
        project_config=ProjectConfig(
            dbt_project_path=DBT_ROOT,
            manifest_path=DBT_ROOT / "target" / "manifest.json",
        ),
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.GCP_CLOUD_RUN_JOB,
            test_indirect_selection=TestIndirectSelection.BUILDABLE,
        ),
        operator_args={
            "project_id": "my-gcp-project",
            "region": "my-gcp-region",
            "job_name": DBT_PROJECT,
            "vars": dbt_vars
        },
    )

Anything else :)?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Contact Details

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    triage-neededItems need to be reviewed / assigned to milestone

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions