-
Notifications
You must be signed in to change notification settings - Fork 240
Description
Astronomer Cosmos Version
1.10.0
dbt-core version
1.10.11
Versions of dbt adapters
dbt-bigquery 1.1.0.2
LoadMode
DBT_LS_MANIFEST
ExecutionMode
GCP_CLOUD_RUN_JOB
InvocationMode
DBT_RUNNER
airflow version
2.10.5
Operating System
Linux
If a you think it's an UI issue, what browsers are you seeing the problem on?
No response
Deployment
Google Cloud Composer
Deployment details
No response
What happened?
I am trying to create an Airflow pipeline that will run N
dbt pipelines based on the rows available in a BigQuery table. To achieve this, I've been trying to combine DbtTaskGroup
with TaskFlow components, i.e. using the @task
and @task_group
decorators. I was then hoping to use the .expand()
provided by TaskFlow to trigger the N
pipelines with different parameters.
However, when deploying a simplified version of this pipeline, I get an AirFlow exception (details in the log). I have scanned though previous issued and found #1218, which seems to suggest that this was a problem prior to 1.8.0
.
I'm not sure if my problem is related to this (or something similar) or if there are alternative ways of implementing my requirements.
Relevant log output
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskmixin.py", line 271, in set_upstream
self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskmixin.py", line 229, in _set_relatives
raise AirflowException(f"Tried to set relationships between tasks in more than one DAG: {dags}")
airflow.exceptions.AirflowException: Tried to set relationships between tasks in more than one DAG: {<DAG: DAG_NAME>, <DAG: DAG_NAME>}
How to reproduce
A sample deployment script would be
import json
from pathlib import Path
import airflow
from airflow.decorators import task
from cosmos import DbtTaskGroup, ProjectConfig, ExecutionConfig
from cosmos.constants import ExecutionMode, TestIndirectSelection
DBT_PROJECT = "my_project"
DBT_ROOT = Path(__file__).parent / "dbt" / DBT_PROJECT
@task.python()
def get_data() -> dict:
return {
"my_var": "hello world",
}
@task.python()
def prepare_dbt_vars(data: dict) -> str:
dbt_vars = json.dumps({
"run_id": "{{ run_id }}",
"my_var": data['my_var'].replace(' ', '_'),
})
return dbt_vars
with airflow.DAG(
dag_id=DBT_PROJECT,
schedule=None,
default_args={"retries": 0},
) as dag:
data = get_data()
dbt_vars = prepare_dbt_vars(data)
run_dbt = DbtTaskGroup(
project_config=ProjectConfig(
dbt_project_path=DBT_ROOT,
manifest_path=DBT_ROOT / "target" / "manifest.json",
),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.GCP_CLOUD_RUN_JOB,
test_indirect_selection=TestIndirectSelection.BUILDABLE,
),
operator_args={
"project_id": "my-gcp-project",
"region": "my-gcp-region",
"job_name": DBT_PROJECT,
"vars": dbt_vars
},
)
Anything else :)?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Contact Details
No response