Skip to content

[FEATURE] Build row-level results dataframe before executing VerificationSuite #618

@marcantony

Description

@marcantony

Is your feature request related to a problem? Please describe.
We want to run checks and then do additional analysis on the row-level results dataframe. For example, we might want to count the number of rows that failed a check from the results dataframe since Deequ doesn't provide this as part of the check result. Building the row-level results dataframe requires a VerificationResult, so it can't be done until a verification suite has been run.

def checkResultsAsDataFrame(
sparkSession: SparkSession,
verificationResult: VerificationResult,
forChecks: Seq[Check] = Seq.empty)
: DataFrame = {

Running a verification suite executes a Spark job, so this means that analyzing the results dataframe takes at minimum two Spark jobs, which will be slower than if it could be done in a single job.

Doing such analysis might look something like this today:

val df = Seq(1, 2, 3).toDF("num")
val check = Check(CheckLevel.Error, "num_is_complete").isComplete("num")
val result = VerificationSuite()
    .onData(df)
    .addCheck(check)
    .run()
val rowLevelResults = VerificationResult.rowLevelResultsAsDataFrame(df.sparkSession, result, df)

val checkFailedRows = rowLevelResults
    .select(sum(when(col(check.description) === false, 1).otherwise(0)).as("check_failed_count"))
    .head()
    .getAs[Long]("check_failed_count")

Describe the solution you'd like
If it were possible to build the row-level results dataframe before running a verification suite, then the results dataframe could be fed into the verification suite instead of the "original" dataframe and we could add required analyzers to analyze what we want in a single Spark job.

E.g.

val df = Seq(1, 2, 3).toDF("num")
val check = Check(CheckLevel.Error, "num_is_complete").isComplete("num")
val checkFailedRowsAnalyzer = Size(where = Some(s"${check.description} = false"))

// TODO: create method to build results DF from checks or verification suite or something
val rowLevelResults = buildRowLevelResults(df, Seq(check))

val result = VerificationSuite()
    .onData(rowLevelResults)
    .addCheck(check)
    .addRequiredAnalyzer(checkFailedRowsAnalyzer) // New compared to above
    .run()

val checkFailedRows = result.metrics(checkFailedRowsAnalyzer).value.get

Describe alternatives you've considered
The first approach of running two Spark jobs works, it's just slower. I haven't thought of any other alternatives.

Additional context
The current implementation requires a VerificationResult because it uses that to get checks and their associated check results, but ultimately it only needs the checks and constraints:

private def constraintResultToColumn(constraintResult: ConstraintResult): Option[Column] = {
val constraint = constraintResult.constraint
constraint match {
case asserted: RowLevelAssertedConstraint =>
constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)).orElse(Some(lit(false)))
case _: RowLevelConstraint =>
constraintResult.metric.flatMap(metricToColumn).orElse(Some(lit(false)))
case _: RowLevelGroupedConstraint =>
constraintResult.metric.flatMap(metricToColumn)
case _ => None
}
}

Checks already include their list of constraints, so there should already be enough information from a list of checks to build the results dataframe before running a suite:

case class Check(
level: CheckLevel.Value,
description: String,
private[deequ] val constraints: Seq[Constraint] = Seq.empty) {

I'm not sure why the constraints on Check are private to deequ and if there's anything to be aware of there, but nothing should have to change with visibility to make this work from a syntax point of view.

Edit: I just realized the Metric[_] required to generate the column only exists on the ConstraintResult and not the Constraint so I was mistaken about the above and it won't be as trivial of a change 🙁.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions