Skip to content

Parallel Execution of GX in Airflow randomly fails. In serial execution always passes #138

@apexcoder7

Description

@apexcoder7

Parallel Execution of GX provider in Airflow randomly fails.

Here is the log:

***   * /mnt/airdrive/airflow/logs/dag_id=xxxx_pipeline_dag_v4/run_id=scheduled__2024-04-07T00:00:00+00:00/task_id=gx_validate_xxx_col_std_dev/attempt=1.log
[2024-04-08, 09:51:48 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [queued]>
[2024-04-08, 09:51:49 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 09:51:49 IST] {taskinstance.py:2214} INFO - Executing <Task(GreatExpectationsOperator): gx_validate_xxx_col_std_dev> on 2024-04-07 00:00:00+00:00
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:60} INFO - Started process 1111645 to run task
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxxx_pipeline_dag_v4', 'gx_validate_xxx_col_std_dev', 'scheduled__2024-04-07T00:00:00+00:00', '--job-id', '622', '--raw', '--subdir', 'DAGS_FOLDER/xxxxxxxx/xxxxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpn6gnlugt']
[2024-04-08, 09:51:49 IST] {standard_task_runner.py:88} INFO - Job 622: Subtask gx_validate_xxx_col_std_dev
[2024-04-08, 09:51:49 IST] {task_command.py:423} INFO - Running <TaskInstance: xxxx_pipeline_dag_v4.gx_validate_xxx_col_std_dev scheduled__2024-04-07T00:00:00+00:00 [running]> on host xxxx-datapipeline-***-xxxxx.xxxxdatapipelin.xxxxxxxx.com
[2024-04-08, 09:51:49 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxxx_exa_xxxx_col_std_dev' AIRFLOW_CTX_EXECUTION_DATE='2024-04-07T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-07T00:00:00+00:00'
[2024-04-08, 09:51:49 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 09:51:49 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 09:51:49 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
    self.data_context = ge.data_context.FileDataContext(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 64, in __init__
    self._scaffold_project()
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 91, in _scaffold_project
    if self.is_project_scaffolded(self._context_root_directory):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 513, in is_project_scaffolded
    and cls.config_variables_yml_exist(ge_dir)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/serializable_data_context.py", line 261, in config_variables_yml_exist
    config_var_path = config.get("config_variables_file_path")
                      ^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'get'

Another random failure log:

:
:
[2024-04-08, 22:21:00 IST] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-04-08, 22:21:00 IST] {taskinstance.py:2214} INFO - Executing &lt;Task(GreatExpectationsOperator): gx_validate_xxx_col_not_null&gt; on 2024-04-08 16:50:34.740105+00:00
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:60} INFO - Started process 1539585 to run task
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'xxx_pipeline_dag_v4', 'gx_validate_xxx_col_not_null', 'manual__2024-04-08T16:50:34.740105+00:00', '--job-id', '633', '--raw', '--subdir', 'DAGS_FOLDER/xxx_dags/xxx_pipeline_dag_v4.py', '--cfg-path', '/tmp/tmpdgy8dl6u']
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:88} INFO - Job 633: Subtask gx_validate_xxx_col_not_null
[2024-04-08, 22:21:00 IST] {task_command.py:423} INFO - Running &lt;TaskInstance: xxx_pipeline_dag_v4.gx_validate_xxx_col_not_null manual__2024-04-08T16:50:34.740105+00:00 [running]&gt; on host xxx-datapipeline-***-private.sub03080733021.xxx.com
[2024-04-08, 22:21:00 IST] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='xxx_pipeline_dag_v4' AIRFLOW_CTX_TASK_ID='gx_validate_xxx_col_not_null' AIRFLOW_CTX_EXECUTION_DATE='2024-04-08T16:50:34.740105+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-08T16:50:34.740105+00:00'
[2024-04-08, 22:21:00 IST] {great_expectations.py:580} INFO - Running validation with Great Expectations...
[2024-04-08, 22:21:00 IST] {great_expectations.py:582} INFO - Instantiating Data Context...
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - /home/xxx/workspace/***/gx/great_expectations.yml
[2024-04-08, 22:21:00 IST] {logging_mixin.py:188} INFO - ordereddict([('config_version', 3.0), ('datasources', ordereddict([('xxx_CLEANSED_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))])), ('dynamic_pandas_asset_runtime_datasource', ordereddict([('class_name', 'Datasource'), ('module_name', 'great_expectations.datasource'), ('execution_engine', ordereddict([('class_name', 'PandasExecutionEngine'), ('module_name', 'great_expectations.execution_engine')])), ('data_connectors', ordereddict([('default_runtime_connector', ordereddict([('name', 'default_runtime_connector'), ('class_name', 'RuntimeDataConnector'), ('module_name', 'great_expectations.datasource.data_connector'), ('batch_identifiers', ['***_run_id'])]))]))]))])), ('config_variables_file_path', 'uncommitted/config_variables.yml'), ('plugins_directory', 'plugins/'), ('stores', ordereddict([('expectations_store', ordereddict([('class_name', 'ExpectationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'expectations/')]))])), ('validations_store', ordereddict([('class_name', 'ValidationsStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/validations/')]))])), ('evaluation_parameter_store', ordereddict([('class_name', 'EvaluationParameterStore')])), ('checkpoint_store', ordereddict([('class_name', 'CheckpointStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'checkpoints/')]))])), ('profiler_store', ordereddict([('class_name', 'ProfilerStore'), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('suppress_store_backend_id', True), ('base_directory', 'profilers/')]))]))])), ('expectations_store_name', 'expectations_store'), ('validations_store_name', 'validations_store'), ('evaluation_parameter_store_name', 'evaluation_parameter_store'), ('checkpoint_store_name', 'checkpoint_store'), ('data_docs_sites', ordereddict([('local_site', ordereddict([('class_name', 'SiteBuilder'), ('show_how_to_buttons', True), ('store_backend', ordereddict([('class_name', 'TupleFilesystemStoreBackend'), ('base_directory', 'uncommitted/data_docs/local_site/')])), ('site_index_builder', ordereddict([('class_name', 'DefaultSiteIndexBuilder')]))]))])), ('anonymous_usage_statistics', ordereddict([('data_context_id', 'dba4d0fa-ce75-444b-94e5-623ad64aecd1'), ('enabled', True)])), ('fluent_datasources', ordereddict([('filesystem_source_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('xxx_CLEANSED', ordereddict([('type', 'csv'), ('filepath_or_buffer', 'data/xxx_CLEANSED.csv')]))]))])), ('dynamic_pandas', ordereddict([('type', 'pandas'), ('assets', ordereddict([('dynamic_pandas_asset', ordereddict([('type', 'dataframe'), ('batch_metadata', ordereddict())]))]))]))])), ('notebooks', None), ('include_rendered_content', ordereddict([('globally', False), ('expectation_suite', False), ('expectation_validation_result', False)]))])
[2024-04-08, 22:21:00 IST] {base.py:1716} ERROR - Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {base.py:145} ERROR - Encountered errors during loading config.  See ValidationError for more details.
[2024-04-08, 22:21:00 IST] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations_provider/operators/great_expectations.py", line 586, in execute
    self.data_context = ge.data_context.FileDataContext(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 66, in __init__
    self._project_config = self._init_project_config(project_config)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 111, in _init_project_config
    project_config = FileDataContext._load_file_backed_project_config(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/data_context/file_data_context.py", line 213, in _load_file_backed_project_config
    return DataContextConfig.from_commented_map(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 139, in from_commented_map
    config: Union[dict, BYC] = schema_instance.load(commented_map)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 723, in load
    return self._do_load(
           ^^^^^^^^^^^^^^
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/marshmallow/schema.py", line 909, in _do_load
    self.handle_error(exc, data, many=many, partial=partial)
  File "/home/xxx/miniconda3/envs/airflow/lib/python3.11/site-packages/great_expectations/data_context/types/base.py", line 1717, in handle_error
    raise gx_exceptions.InvalidDataContextConfigError(
great_expectations.exceptions.exceptions.InvalidDataContextConfigError: Error while processing DataContextConfig: _schema
[2024-04-08, 22:21:00 IST] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=xxx_pipeline_dag_v4, task_id=gx_validate_xxx_col_not_null, execution_date=20240408T165034, start_date=20240408T165100, end_date=20240408T165100
[2024-04-08, 22:21:00 IST] {standard_task_runner.py:107} ERROR - Failed to execute job 633 for task gx_validate_xxx_col_not_null (Error while processing DataContextConfig: _schema; 1539585)
[2024-04-08, 22:21:00 IST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-04-08, 22:21:01 IST] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check</code>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions