``` val duplicates = df .select(<pk cols>) .withColumn("__file_path", col("_metadata.file_path")) .withColumn("__row_index", col("_metadata.row_index")) .withColumn( "rank", row_number().over( Window() .partitionBy(<pk cols>) .orderBy(<pk cols>))) .filter("rank > 1") .drop("rank") ``` And then: ``` df.alias("old") .merge( duplicates.alias("new"), "old.<pk1> = new.<pk1> AND ... AND old.<pkn> = new.<pkn>" + " AND old._metadata.file_path = new.__file_path" + " AND old._metadata.row_index = new.__row_index") .whenMatchedDelete() .execute() ```