Skip to content

Commit 78946e4

Browse files
authored
Merge pull request #11 from nubank/events-emission-enhancements
DGD-4258 DGD-4311 Events emission enhancements
2 parents 37c8280 + 975f831 commit 78946e4

File tree

6 files changed

+159
-46
lines changed

6 files changed

+159
-46
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package io.openlineage.spark.agent;
2+
3+
import io.openlineage.client.OpenLineage;
4+
import lombok.extern.slf4j.Slf4j;
5+
6+
import java.lang.reflect.Field;
7+
import java.util.Set;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.Stream;
10+
11+
import static io.openlineage.client.OpenLineage.RunEvent.EventType;
12+
import static io.openlineage.client.OpenLineage.RunEvent;
13+
import static io.openlineage.client.OpenLineage.RunEvent.EventType.*;
14+
import static java.util.Objects.isNull;
15+
16+
@Slf4j
17+
public class NuEventEmitter {
18+
19+
private static final Set<String> WANTED_JOB_TYPES = Set.of(
20+
"SQL_JOB" // as defined in SparkSQLExecutionContext.SPARK_JOB_TYPE
21+
);
22+
23+
private static final Set<String> WANTED_EVENT_NAME_SUBSTRINGS = Set.of(
24+
".execute_insert_into_hadoop_fs_relation_command.",
25+
".adaptive_spark_plan."
26+
);
27+
28+
private static Boolean isPermittedJobType(RunEvent event) {
29+
String jobType = event.getJob().getFacets().getJobType().getJobType();
30+
if (WANTED_JOB_TYPES.stream().noneMatch(jobType::equals)) {
31+
log.debug("OpenLineage event with job type {} has no lineage value and should not be emitted", jobType);
32+
return false;
33+
}
34+
return true;
35+
}
36+
37+
private static Boolean isPermitedEventType(RunEvent event) {
38+
if (RUNNING.equals(event.getEventType())) {
39+
log.debug("OpenLineage event is {} and should not be emitted", RUNNING);
40+
return false;
41+
}
42+
return true;
43+
}
44+
45+
private static Boolean isPermittedJobName(RunEvent event) {
46+
String jobName = event.getJob().getName();
47+
if (isNull(jobName)) {
48+
log.debug("OpenLineage event has no job name and should not be emitted");
49+
return false;
50+
}
51+
if (WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) {
52+
log.debug("OpenLineage event job name has no permitted substring and should not be emitted");
53+
return false;
54+
}
55+
return true;
56+
}
57+
58+
private static Boolean shouldEmit(RunEvent event) {
59+
return Stream.of(
60+
isPermittedJobType(event),
61+
isPermitedEventType(event),
62+
isPermittedJobName(event)
63+
).noneMatch(Boolean.FALSE::equals);
64+
}
65+
66+
private static Boolean shouldDiscardColumnLineageFacet(EventType eventType) {
67+
return !COMPLETE.equals(eventType);
68+
}
69+
70+
private static void discardColumnLineageFacet(RunEvent event) {
71+
try {
72+
Field columnLineageFacetField = OpenLineage.DatasetFacets.class.getDeclaredField("columnLineage");
73+
columnLineageFacetField.setAccessible(true);
74+
Stream
75+
.concat(event.getInputs().stream(), event.getOutputs().stream())
76+
.collect(Collectors.toList())
77+
.forEach(dataset -> {
78+
try {
79+
log.debug("Discarding column lineage facet for dataset {} {} {}",
80+
dataset.getClass().getSimpleName(), dataset.getNamespace(), dataset.getName());
81+
columnLineageFacetField.set(dataset.getFacets(), null);
82+
} catch (IllegalAccessException e) {
83+
log.error("Failed to discard column lineage facet", e);
84+
}
85+
});
86+
} catch (NoSuchFieldException e) {
87+
log.error("Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e);
88+
}
89+
}
90+
91+
public static void emit(RunEvent event, EventEmitter eventEmitter) {
92+
if (!shouldEmit(event)) {
93+
return;
94+
}
95+
96+
if (shouldDiscardColumnLineageFacet(event.getEventType())) {
97+
discardColumnLineageFacet(event);
98+
}
99+
100+
eventEmitter.emit(event);
101+
}
102+
}

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.openlineage.client.utils.DatasetIdentifier;
1212
import io.openlineage.client.utils.UUIDUtils;
1313
import io.openlineage.spark.agent.EventEmitter;
14+
import io.openlineage.spark.agent.NuEventEmitter;
1415
import io.openlineage.spark.agent.OpenLineageSparkListener;
1516
import io.openlineage.spark.agent.facets.ErrorFacet;
1617
import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder;
@@ -224,8 +225,9 @@ public void start(SparkListenerJobStart jobStart) {
224225
.build())
225226
.job(buildJob(jobStart.jobId()))
226227
.build();
228+
227229
log.debug("Posting event for start {}: {}", jobStart, event);
228-
eventEmitter.emit(event);
230+
NuEventEmitter.emit(event, eventEmitter);
229231
}
230232

231233
@Override
@@ -257,8 +259,9 @@ public void end(SparkListenerJobEnd jobEnd) {
257259
.build())
258260
.job(buildJob(jobEnd.jobId()))
259261
.build();
262+
260263
log.debug("Posting event for end {}: {}", jobEnd, event);
261-
eventEmitter.emit(event);
264+
NuEventEmitter.emit(event, eventEmitter);
262265
}
263266

264267
protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, SparkListenerEvent event) {

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.openlineage.client.OpenLineage;
1313
import io.openlineage.client.OpenLineage.RunEvent;
1414
import io.openlineage.spark.agent.EventEmitter;
15+
import io.openlineage.spark.agent.NuEventEmitter;
1516
import io.openlineage.spark.agent.filters.EventFilterUtils;
1617
import io.openlineage.spark.api.OpenLineageContext;
1718
import io.openlineage.spark.api.naming.JobNameBuilder;
@@ -95,7 +96,7 @@ public void start(SparkListenerApplicationStart applicationStart) {
9596
.build());
9697

9798
log.debug("Posting event for applicationId {} start: {}", applicationId, event);
98-
eventEmitter.emit(event);
99+
NuEventEmitter.emit(event, eventEmitter);
99100
}
100101

101102
@Override
@@ -126,7 +127,7 @@ public void end(SparkListenerApplicationEnd applicationEnd) {
126127
.build());
127128

128129
log.debug("Posting event for applicationId {} end: {}", applicationId, event);
129-
eventEmitter.emit(event);
130+
NuEventEmitter.emit(event, eventEmitter);
130131
}
131132

132133
private OpenLineage.ParentRunFacet buildApplicationParentFacet() {

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java

Lines changed: 28 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,20 @@
1616
import io.openlineage.client.OpenLineage.RunEvent.EventType;
1717
import io.openlineage.client.OpenLineageClientUtils;
1818
import io.openlineage.spark.agent.EventEmitter;
19+
import io.openlineage.spark.agent.NuEventEmitter;
1920
import io.openlineage.spark.agent.filters.EventFilterUtils;
2021
import io.openlineage.spark.agent.util.PlanUtils;
2122
import io.openlineage.spark.agent.util.ScalaConversionUtils;
2223
import io.openlineage.spark.api.OpenLineageContext;
2324
import io.openlineage.spark.api.naming.JobNameBuilder;
25+
2426
import java.time.ZoneOffset;
2527
import java.time.ZonedDateTime;
2628
import java.util.List;
2729
import java.util.Optional;
2830
import java.util.Stack;
2931
import java.util.concurrent.atomic.AtomicBoolean;
32+
3033
import lombok.extern.slf4j.Slf4j;
3134
import org.apache.spark.scheduler.ActiveJob;
3235
import org.apache.spark.scheduler.JobFailed;
@@ -50,7 +53,6 @@ class SparkSQLExecutionContext implements ExecutionContext {
5053
private static final String SPARK_PROCESSING_TYPE_BATCH = "BATCH";
5154
private static final String SPARK_PROCESSING_TYPE_STREAMING = "STREAMING";
5255
private final long executionId;
53-
private String jobName;
5456
private final OpenLineageContext olContext;
5557
private final EventEmitter eventEmitter;
5658
private final OpenLineageRunEventBuilder runEventBuilder;
@@ -88,30 +90,28 @@ public void start(SparkListenerSQLExecutionStart startEvent) {
8890
"OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart");
8991
// return;
9092
}
91-
9293
olContext.setActiveJobId(activeJobId);
93-
// We shall skip this START event, focusing on the first SparkListenerJobStart event to be the START, because of the presence of the job nurn
9494
// only one START event is expected, in case it was already sent with jobStart, we send running
95-
// EventType eventType = emittedOnJobStart ? RUNNING : START;
96-
// emittedOnSqlExecutionStart = true;
97-
98-
// RunEvent event =
99-
// runEventBuilder.buildRun(
100-
// OpenLineageRunEventContext.builder()
101-
// .applicationParentRunFacet(buildApplicationParentFacet())
102-
// .event(startEvent)
103-
// .runEventBuilder(
104-
// olContext
105-
// .getOpenLineage()
106-
// .newRunEventBuilder()
107-
// .eventTime(toZonedTime(startEvent.time()))
108-
// .eventType(eventType))
109-
// .jobBuilder(buildJob())
110-
// .jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
111-
// .build());
112-
113-
// log.debug("Posting event for start {}: {}", executionId, event);
114-
// eventEmitter.emit(event);
95+
EventType eventType = emittedOnJobStart ? RUNNING : START;
96+
emittedOnSqlExecutionStart = true;
97+
98+
RunEvent event =
99+
runEventBuilder.buildRun(
100+
OpenLineageRunEventContext.builder()
101+
.applicationParentRunFacet(buildApplicationParentFacet())
102+
.event(startEvent)
103+
.runEventBuilder(
104+
olContext
105+
.getOpenLineage()
106+
.newRunEventBuilder()
107+
.eventTime(toZonedTime(startEvent.time()))
108+
.eventType(eventType))
109+
.jobBuilder(buildJob())
110+
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
111+
.build());
112+
113+
log.debug("Posting event for start {}: {}", executionId, event);
114+
NuEventEmitter.emit(event, eventEmitter);
115115
}
116116

117117
@Override
@@ -160,7 +160,7 @@ public void end(SparkListenerSQLExecutionEnd endEvent) {
160160
if (log.isDebugEnabled()) {
161161
log.debug("Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event));
162162
}
163-
eventEmitter.emit(event);
163+
NuEventEmitter.emit(event, eventEmitter);
164164
}
165165

166166
// TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed
@@ -191,7 +191,7 @@ public void start(SparkListenerStageSubmitted stageSubmitted) {
191191
.build());
192192

193193
log.debug("Posting event for stage submitted {}: {}", executionId, event);
194-
eventEmitter.emit(event);
194+
NuEventEmitter.emit(event, eventEmitter);
195195
}
196196

197197
// TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed
@@ -221,7 +221,7 @@ public void end(SparkListenerStageCompleted stageCompleted) {
221221
.build());
222222

223223
log.debug("Posting event for stage completed {}: {}", executionId, event);
224-
eventEmitter.emit(event);
224+
NuEventEmitter.emit(event, eventEmitter);
225225
}
226226

227227
@Override
@@ -244,12 +244,6 @@ public void setActiveJob(ActiveJob activeJob) {
244244
@Override
245245
public void start(SparkListenerJobStart jobStart) {
246246
log.debug("SparkListenerJobStart - executionId: {}", executionId);
247-
try {
248-
jobName = jobStart.properties().getProperty("spark.job.name");
249-
} catch (RuntimeException e) {
250-
log.info("spark.job.name property not found in the context");
251-
}
252-
olContext.setJobNurn(jobName);
253247
if (!olContext.getQueryExecution().isPresent()) {
254248
log.info(NO_EXECUTION_INFO, olContext);
255249
return;
@@ -262,7 +256,6 @@ public void start(SparkListenerJobStart jobStart) {
262256
// only one START event is expected, in case it was already sent with sqlExecutionStart, we send
263257
// running
264258
EventType eventType = emittedOnSqlExecutionStart ? RUNNING : START;
265-
emittedOnSqlExecutionStart = true;
266259
emittedOnJobStart = true;
267260

268261
RunEvent event =
@@ -281,7 +274,7 @@ public void start(SparkListenerJobStart jobStart) {
281274
.build());
282275

283276
log.debug("Posting event for start {}: {}", executionId, event);
284-
eventEmitter.emit(event);
277+
NuEventEmitter.emit(event, eventEmitter);
285278
}
286279

287280
@Override
@@ -330,7 +323,7 @@ public void end(SparkListenerJobEnd jobEnd) {
330323
.build());
331324

332325
log.debug("Posting event for end {}: {}", executionId, event);
333-
eventEmitter.emit(event);
326+
NuEventEmitter.emit(event, eventEmitter);
334327
}
335328

336329
@Override

integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@
88
import com.fasterxml.jackson.annotation.JsonProperty;
99
import io.openlineage.client.OpenLineage;
1010
import io.openlineage.spark.agent.Versions;
11+
12+
import java.util.NoSuchElementException;
1113
import java.util.Properties;
1214
import lombok.Getter;
1315
import lombok.NonNull;
1416
import io.openlineage.spark.api.OpenLineageContext;
17+
import lombok.extern.slf4j.Slf4j;
18+
import org.apache.spark.sql.SparkSession;
1519

1620
/** Captures information related to the Apache Spark job. */
1721
@Getter
22+
@Slf4j
1823
public class NuFacet extends OpenLineage.DefaultRunFacet {
1924
// @JsonProperty("jobId")
2025
// @NonNull
@@ -26,8 +31,23 @@ public class NuFacet extends OpenLineage.DefaultRunFacet {
2631
@JsonProperty("jobNurn")
2732
private String jobNurn;
2833

34+
private String fetchJobNurn(OpenLineageContext olContext) {
35+
if (olContext.getSparkSession().isPresent()) {
36+
SparkSession sparkSession = olContext.getSparkSession().get();
37+
try {
38+
return sparkSession.conf().get("spark.job.name");
39+
} catch (NoSuchElementException e) {
40+
log.warn("spark.job.name property not found in the context");
41+
return null;
42+
}
43+
}
44+
45+
log.warn("spark.job.name property not found because the SparkContext could not be retrieved from OpenLineageContext");
46+
return null;
47+
}
48+
2949
public NuFacet(@NonNull OpenLineageContext olContext) {
3050
super(Versions.OPEN_LINEAGE_PRODUCER_URI);
31-
this.jobNurn = olContext.getJobNurn();
51+
this.jobNurn = fetchJobNurn(olContext);
3252
}
3353
}

integration/spark/shared/src/main/java/io/openlineage/spark/api/OpenLineageContext.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,6 @@ public String getSparkVersion() {
132132
*/
133133
@Getter @Setter String jobName;
134134

135-
/**
136-
* Job nurn is collected during the Spark runs, and stored for creating a custom facet within
137-
* the Run facets. It should help us to enhance the events further in the lineage pipeline.
138-
*/
139-
@Getter @Setter String jobNurn;
140-
141135
@Setter Integer activeJobId;
142136

143137
public Optional<Integer> getActiveJobId() {

0 commit comments

Comments
 (0)