-
Couldn't load subscription status.
- Fork 573
Description
Is your feature request related to a problem? Please describe.
We want to run checks and then do additional analysis on the row-level results dataframe. For example, we might want to count the number of rows that failed a check from the results dataframe since Deequ doesn't provide this as part of the check result. Building the row-level results dataframe requires a VerificationResult, so it can't be done until a verification suite has been run.
deequ/src/main/scala/com/amazon/deequ/VerificationResult.scala
Lines 74 to 78 in fe256f5
| def checkResultsAsDataFrame( | |
| sparkSession: SparkSession, | |
| verificationResult: VerificationResult, | |
| forChecks: Seq[Check] = Seq.empty) | |
| : DataFrame = { |
Running a verification suite executes a Spark job, so this means that analyzing the results dataframe takes at minimum two Spark jobs, which will be slower than if it could be done in a single job.
Doing such analysis might look something like this today:
val df = Seq(1, 2, 3).toDF("num")
val check = Check(CheckLevel.Error, "num_is_complete").isComplete("num")
val result = VerificationSuite()
.onData(df)
.addCheck(check)
.run()
val rowLevelResults = VerificationResult.rowLevelResultsAsDataFrame(df.sparkSession, result, df)
val checkFailedRows = rowLevelResults
.select(sum(when(col(check.description) === false, 1).otherwise(0)).as("check_failed_count"))
.head()
.getAs[Long]("check_failed_count")Describe the solution you'd like
If it were possible to build the row-level results dataframe before running a verification suite, then the results dataframe could be fed into the verification suite instead of the "original" dataframe and we could add required analyzers to analyze what we want in a single Spark job.
E.g.
val df = Seq(1, 2, 3).toDF("num")
val check = Check(CheckLevel.Error, "num_is_complete").isComplete("num")
val checkFailedRowsAnalyzer = Size(where = Some(s"${check.description} = false"))
// TODO: create method to build results DF from checks or verification suite or something
val rowLevelResults = buildRowLevelResults(df, Seq(check))
val result = VerificationSuite()
.onData(rowLevelResults)
.addCheck(check)
.addRequiredAnalyzer(checkFailedRowsAnalyzer) // New compared to above
.run()
val checkFailedRows = result.metrics(checkFailedRowsAnalyzer).value.getDescribe alternatives you've considered
The first approach of running two Spark jobs works, it's just slower. I haven't thought of any other alternatives.
Additional context
The current implementation requires a VerificationResult because it uses that to get checks and their associated check results, but ultimately it only needs the checks and constraints:
deequ/src/main/scala/com/amazon/deequ/VerificationResult.scala
Lines 144 to 155 in fe256f5
| private def constraintResultToColumn(constraintResult: ConstraintResult): Option[Column] = { | |
| val constraint = constraintResult.constraint | |
| constraint match { | |
| case asserted: RowLevelAssertedConstraint => | |
| constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)).orElse(Some(lit(false))) | |
| case _: RowLevelConstraint => | |
| constraintResult.metric.flatMap(metricToColumn).orElse(Some(lit(false))) | |
| case _: RowLevelGroupedConstraint => | |
| constraintResult.metric.flatMap(metricToColumn) | |
| case _ => None | |
| } | |
| } |
Checks already include their list of constraints, so there should already be enough information from a list of checks to build the results dataframe before running a suite:
deequ/src/main/scala/com/amazon/deequ/checks/Check.scala
Lines 74 to 77 in fe256f5
| case class Check( | |
| level: CheckLevel.Value, | |
| description: String, | |
| private[deequ] val constraints: Seq[Constraint] = Seq.empty) { |
I'm not sure why the constraints on Check are private to deequ and if there's anything to be aware of there, but nothing should have to change with visibility to make this work from a syntax point of view.
Edit: I just realized the Metric[_] required to generate the column only exists on the ConstraintResult and not the Constraint so I was mistaken about the above and it won't be as trivial of a change 🙁.