|
6 | 6 | package io.openlineage.spark.agent.facets; |
7 | 7 |
|
8 | 8 | import com.fasterxml.jackson.annotation.JsonProperty; |
| 9 | +import com.fasterxml.jackson.core.JsonProcessingException; |
9 | 10 | import io.openlineage.client.OpenLineage; |
10 | 11 | import io.openlineage.spark.agent.Versions; |
11 | 12 |
|
12 | | -import java.util.NoSuchElementException; |
13 | | -import java.util.Properties; |
| 13 | +import java.util.*; |
| 14 | + |
14 | 15 | import lombok.Getter; |
15 | 16 | import lombok.NonNull; |
16 | 17 | import io.openlineage.spark.api.OpenLineageContext; |
17 | 18 | import lombok.extern.slf4j.Slf4j; |
18 | 19 | import org.apache.spark.sql.SparkSession; |
19 | 20 |
|
| 21 | +import static io.openlineage.spark.agent.util.NuFacetsUtils.getConfigValue; |
| 22 | +import static io.openlineage.spark.agent.util.NuFacetsUtils.parseJsonToMap; |
| 23 | + |
20 | 24 | /** Captures information related to the Apache Spark job. */ |
21 | 25 | @Getter |
22 | 26 | @Slf4j |
23 | 27 | public class NuFacet extends OpenLineage.DefaultRunFacet { |
24 | | - // @JsonProperty("jobId") |
25 | | - // @NonNull |
26 | | - // private Integer jobId; |
27 | | - |
28 | | - // @JsonProperty("jobDescription") |
29 | | - // private String jobDescription; |
30 | 28 |
|
31 | 29 | @JsonProperty("jobNurn") |
32 | 30 | private String jobNurn; |
33 | 31 |
|
34 | | - private String fetchJobNurn(OpenLineageContext olContext) { |
35 | | - if (olContext.getSparkSession().isPresent()) { |
36 | | - SparkSession sparkSession = olContext.getSparkSession().get(); |
37 | | - try { |
38 | | - return sparkSession.conf().get("spark.job.name"); |
39 | | - } catch (NoSuchElementException e) { |
40 | | - log.warn("spark.job.name property not found in the context"); |
41 | | - return null; |
42 | | - } |
43 | | - } |
| 32 | + /** |
| 33 | + * Resolved inputs for the job. |
| 34 | + * Map of input dataset NURNs by their location path |
| 35 | + */ |
| 36 | + @JsonProperty("resolvedInputs") |
| 37 | + private Map<String, String> resolvedInputs; |
44 | 38 |
|
45 | | - log.warn("spark.job.name property not found because the SparkContext could not be retrieved from OpenLineageContext"); |
46 | | - return null; |
| 39 | + private String getJobNurn(SparkSession sparkSession) { |
| 40 | + return getConfigValue("spark.job.name", sparkSession); |
| 41 | + } |
| 42 | + |
| 43 | + private Map<String, String> getResolvedInputs(SparkSession sparkSession) { |
| 44 | + String resolvedInputsJson = getConfigValue("spark.job.resolvedInputsMap", sparkSession); |
| 45 | + try { |
| 46 | + return parseJsonToMap(resolvedInputsJson); |
| 47 | + } catch (JsonProcessingException e) { |
| 48 | + log.warn("Error parsing resolvedInputsJson JSON", e); |
| 49 | + return null; |
| 50 | + } |
47 | 51 | } |
48 | 52 |
|
49 | 53 | public NuFacet(@NonNull OpenLineageContext olContext) { |
50 | 54 | super(Versions.OPEN_LINEAGE_PRODUCER_URI); |
51 | | - this.jobNurn = fetchJobNurn(olContext); |
| 55 | + if (olContext.getSparkSession().isPresent()) { |
| 56 | + SparkSession sparkSession = olContext.getSparkSession().get(); |
| 57 | + this.jobNurn = getJobNurn(sparkSession); |
| 58 | + this.resolvedInputs = getResolvedInputs(sparkSession); |
| 59 | + } |
52 | 60 | } |
53 | 61 | } |
0 commit comments