Replies: 3 comments 1 reply
-
Tehis is rather a feature than a bug and it is next to impossible to fix it, because in general case each DAG is parsed separately from each other. We are not likely going to fix it - because of dynamic nature of dag id generation, and continuous parsing/re-parsing mutliple dags which happens in "aysynchronous way" we never know when the "set" of our DAGs is complete it may continuously change. We never process DAGs from file as "complete list". We are parsing them in batches, and you do not have "snapshots" of all the directory available. For example if you have a DAG that moves from one file to another, even if you see second dag, it might be that either of the two dags have been already removed and will disappear next time when you get there, but if you have many dags, this parsing is continuous and you are never sure what is the "complete snapshot". Parsing of the DAGs take time and you cannot do it "instantly" for all DAGs to keep consistent snapshot. The current approach is a compromise - such clashes are possible, but it also makes it possible to handle cases where there are huge folders that you won't pay performance penalty of snapshotting the whole directory with all DAG files and making sure tha this snapshot is consistent. It's just price to pay for speed. But if you hve an idea how to implement it, feel free to continue discussing it - maybe you can find a good solution to that. Converting it into discussion. |
Beta Was this translation helpful? Give feedback.
-
Understood. Assuming this can't be checked by schedulers normal import mechanisms. If instead we were to check this externally in our CI before a dag is allowed to be submitted in the repo, is there anything we can reuse from the DagBag code as a dry-run import to make that easier? Basic prototype that works quite well, takes less than a second to run on ~30 files producing ~50 dags. We can live with that, especially if we only run it on commits to dag-directory. :) found_dags = {}
errors = []
db = DagBag(str(dag_path), read_dags_from_db=False, store_serialized_dags=False)
for filepath in list_py_file_paths(dag_path, include_examples=False):
dags_in_file = db.process_file(str(filepath), only_if_updated=False, safe_mode=False)
if len(dags_in_file) == 0:
errors.append("File {} doesn't export any dags, did you forget to call the @dag function and assign to variable".format(filepath))
for dag in dags_in_file:
if (prev_dag := found_dags.get(dag.dag_id)) is not None:
errors.append("Duplicate DAG found DAG {} from file {} was previously defined in file {}".format(dag.dag_id, filepath, prev_dag))
found_dags[dag.dag_id] = filepath
print("Dupe errors", errors)
print("Import errors", db.import_errors) Curious if this can have any side-effects and insert dags into the database, or if the DagBag is safe for dry-runs (apart from other import side-effects of course), we actually use airflow itself as our CI runner, so we wouldn't want the checker to accidentally start replacing the merged dags in database with open changes. I also noticed that DagBag has partially such a check already, raising |
Beta Was this translation helpful? Give feedback.
-
As an idea, maybe it would be nice to have a warning (similar to existing errors which show in Airflow UI) if there's ping-pong in DAG executions? I.e. if I execute a DAG and it was created from "source id X" (source id can be either a database, file... or "unknown" if somehow there's noway to determine it, which could count as "another source"). Then in the window of 2 minutes it runs from a different source with the same DAG-name. Then in the window of 2 minutes runs again... Not a 100% ensured mechanism (to keep the cache "local"), but end of the day it would help people debug this issue (which is hard to debug even when you are the one who messed it up). |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Apache Airflow version
2.3.2 (latest released)
What happened
I accidentally created two dags with the same name.
No errors were given.
Code-tab in web-ui showed one of the versions.
Running the task actually executed the second version.
What you think should happen instead
One, or both DAGs should be rejected and pop up a dag import error in the UI
Starting the dag should not be possible.
How to reproduce
Create two dags with same name.
Start dag
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.0.2
apache-airflow-providers-docker==3.0.0
apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-postgres==5.0.0
apache-airflow-providers-sqlite==2.1.3
Deployment
Other Docker-based deployment
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions