Skip to content

Commit afdbd43

Browse files
committed
Refactoring Nu validations for event emission. NuEventEmitter was created to hold all event emission validation logic. Edd, Spark Application and Spark SQL were changed accordingly.
1 parent 948af9d commit afdbd43

File tree

4 files changed

+123
-115
lines changed

4 files changed

+123
-115
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package io.openlineage.spark.agent;
2+
3+
import io.openlineage.client.OpenLineage;
4+
import lombok.extern.slf4j.Slf4j;
5+
6+
import java.lang.reflect.Field;
7+
import java.util.Set;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.Stream;
10+
11+
import static io.openlineage.client.OpenLineage.RunEvent.EventType;
12+
import static io.openlineage.client.OpenLineage.RunEvent;
13+
import static io.openlineage.client.OpenLineage.RunEvent.EventType.*;
14+
import static java.util.Objects.isNull;
15+
16+
@Slf4j
17+
public class NuEventEmitter {
18+
19+
private static final Set<String> WANTED_JOB_TYPES = Set.of(
20+
"SQL_JOB" // as defined in SparkSQLExecutionContext.SPARK_JOB_TYPE
21+
);
22+
23+
private static final Set<String> WANTED_EVENT_NAME_SUBSTRINGS = Set.of(
24+
".execute_insert_into_hadoop_fs_relation_command.",
25+
".adaptive_spark_plan."
26+
);
27+
28+
private static Boolean isPermittedJobType(RunEvent event) {
29+
String jobType = event.getJob().getFacets().getJobType().getJobType();
30+
if (WANTED_JOB_TYPES.stream().noneMatch(jobType::equals)) {
31+
log.info("OpenLineage event with job type {} has no lineage value and should not be emitted", jobType);
32+
return false;
33+
}
34+
return true;
35+
}
36+
37+
private static Boolean isPermitedEventType(RunEvent event) {
38+
if (RUNNING.equals(event.getEventType())) {
39+
log.info("OpenLineage event is {} and should not be emitted", RUNNING);
40+
return false;
41+
}
42+
return true;
43+
}
44+
45+
private static Boolean isPermittedJobName(RunEvent event) {
46+
String jobName = event.getJob().getName();
47+
if (isNull(jobName)) {
48+
log.info("OpenLineage event has no job name and should not be emitted");
49+
return false;
50+
}
51+
if (WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) {
52+
log.info("OpenLineage event job name has no permitted substring and should not be emitted");
53+
return false;
54+
}
55+
return true;
56+
}
57+
58+
private static Boolean shouldEmit(RunEvent event) {
59+
return Stream.of(
60+
isPermittedJobType(event),
61+
isPermitedEventType(event),
62+
isPermittedJobName(event)
63+
).noneMatch(Boolean.FALSE::equals);
64+
}
65+
66+
private static Boolean shouldDiscardColumnLineageFacet(EventType eventType) {
67+
return !COMPLETE.equals(eventType);
68+
}
69+
70+
private static void discardColumnLineageFacet(RunEvent event) {
71+
try {
72+
Field columnLineageFacetField = OpenLineage.DatasetFacets.class.getDeclaredField("columnLineage");
73+
columnLineageFacetField.setAccessible(true);
74+
Stream
75+
.concat(event.getInputs().stream(), event.getOutputs().stream())
76+
.collect(Collectors.toList())
77+
.forEach(dataset -> {
78+
try {
79+
log.info("Discarding column lineage facet for dataset {} {} {}",
80+
dataset.getClass().getName(), dataset.getNamespace(), dataset.getName());
81+
columnLineageFacetField.set(dataset.getFacets(), null);
82+
} catch (IllegalAccessException e) {
83+
log.warn("Failed to discard column lineage facet", e);
84+
}
85+
});
86+
} catch (NoSuchFieldException e) {
87+
log.error("Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e);
88+
}
89+
}
90+
91+
public static void emit(RunEvent event, EventEmitter eventEmitter) {
92+
if (!shouldEmit(event)) {
93+
return;
94+
}
95+
96+
if (shouldDiscardColumnLineageFacet(event.getEventType())) {
97+
discardColumnLineageFacet(event);
98+
}
99+
100+
eventEmitter.emit(event);
101+
}
102+
}

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.openlineage.client.utils.DatasetIdentifier;
1212
import io.openlineage.client.utils.UUIDUtils;
1313
import io.openlineage.spark.agent.EventEmitter;
14+
import io.openlineage.spark.agent.NuEventEmitter;
1415
import io.openlineage.spark.agent.OpenLineageSparkListener;
1516
import io.openlineage.spark.agent.facets.ErrorFacet;
1617
import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder;
@@ -225,10 +226,8 @@ public void start(SparkListenerJobStart jobStart) {
225226
.job(buildJob(jobStart.jobId()))
226227
.build();
227228

228-
log.info("OpenLineage {} event has no lineage value an will not be emmited", SPARK_JOB_TYPE);
229-
230-
// log.debug("Posting event for start {}: {}", jobStart, event);
231-
// eventEmitter.emit(event);
229+
log.debug("Posting event for start {}: {}", jobStart, event);
230+
NuEventEmitter.emit(event, eventEmitter);
232231
}
233232

234233
@Override
@@ -261,10 +260,8 @@ public void end(SparkListenerJobEnd jobEnd) {
261260
.job(buildJob(jobEnd.jobId()))
262261
.build();
263262

264-
log.info("OpenLineage {} event has no lineage value an will not be emmited", SPARK_JOB_TYPE);
265-
266-
// log.debug("Posting event for end {}: {}", jobEnd, event);
267-
// eventEmitter.emit(event);
263+
log.debug("Posting event for end {}: {}", jobEnd, event);
264+
NuEventEmitter.emit(event, eventEmitter);
268265
}
269266

270267
protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, SparkListenerEvent event) {

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.openlineage.client.OpenLineage;
1313
import io.openlineage.client.OpenLineage.RunEvent;
1414
import io.openlineage.spark.agent.EventEmitter;
15+
import io.openlineage.spark.agent.NuEventEmitter;
1516
import io.openlineage.spark.agent.filters.EventFilterUtils;
1617
import io.openlineage.spark.api.OpenLineageContext;
1718
import io.openlineage.spark.api.naming.JobNameBuilder;
@@ -94,11 +95,8 @@ public void start(SparkListenerApplicationStart applicationStart) {
9495
.event(applicationStart)
9596
.build());
9697

97-
98-
log.info("OpenLineage APPLICATION event has no lineage value an will not be emmited");
99-
100-
// log.debug("Posting event for applicationId {} start: {}", applicationId, event);
101-
// eventEmitter.emit(event);
98+
log.debug("Posting event for applicationId {} start: {}", applicationId, event);
99+
NuEventEmitter.emit(event, eventEmitter);
102100
}
103101

104102
@Override
@@ -128,10 +126,8 @@ public void end(SparkListenerApplicationEnd applicationEnd) {
128126
.event(applicationEnd)
129127
.build());
130128

131-
log.info("OpenLineage APPLICATION event has no lineage value an will not be emmited");
132-
133-
// log.debug("Posting event for applicationId {} end: {}", applicationId, event);
134-
// eventEmitter.emit(event);
129+
log.debug("Posting event for applicationId {} end: {}", applicationId, event);
130+
NuEventEmitter.emit(event, eventEmitter);
135131
}
136132

137133
private OpenLineage.ParentRunFacet buildApplicationParentFacet() {

integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java

Lines changed: 11 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,23 @@
1010
import static io.openlineage.client.OpenLineage.RunEvent.EventType.RUNNING;
1111
import static io.openlineage.client.OpenLineage.RunEvent.EventType.START;
1212
import static io.openlineage.spark.agent.util.TimeUtils.toZonedTime;
13-
import static java.util.Objects.isNull;
1413

1514
import io.openlineage.client.OpenLineage;
1615
import io.openlineage.client.OpenLineage.RunEvent;
1716
import io.openlineage.client.OpenLineage.RunEvent.EventType;
1817
import io.openlineage.client.OpenLineageClientUtils;
1918
import io.openlineage.spark.agent.EventEmitter;
19+
import io.openlineage.spark.agent.NuEventEmitter;
2020
import io.openlineage.spark.agent.filters.EventFilterUtils;
2121
import io.openlineage.spark.agent.util.PlanUtils;
2222
import io.openlineage.spark.agent.util.ScalaConversionUtils;
2323
import io.openlineage.spark.api.OpenLineageContext;
2424
import io.openlineage.spark.api.naming.JobNameBuilder;
2525

26-
import java.lang.reflect.Field;
2726
import java.time.ZoneOffset;
2827
import java.time.ZonedDateTime;
2928
import java.util.*;
3029
import java.util.concurrent.atomic.AtomicBoolean;
31-
import java.util.stream.Collectors;
32-
import java.util.stream.Stream;
3330

3431
import lombok.extern.slf4j.Slf4j;
3532
import org.apache.spark.scheduler.*;
@@ -60,10 +57,10 @@ class SparkSQLExecutionContext implements ExecutionContext {
6057

6158
private SparkSQLQueryParser sqlRecorder = new SparkSQLQueryParser();
6259

63-
private static final Set<String> NU_WANTED_EVENT_NAME_SUBSTRINGS = Set.of(
64-
".execute_insert_into_hadoop_fs_relation_command.",
65-
".adaptive_spark_plan."
66-
);
60+
// private static final Set<String> NU_WANTED_EVENT_NAME_SUBSTRINGS = Set.of(
61+
// ".execute_insert_into_hadoop_fs_relation_command.",
62+
// ".adaptive_spark_plan."
63+
// );
6764

6865
public SparkSQLExecutionContext(
6966
long executionId,
@@ -76,54 +73,6 @@ public SparkSQLExecutionContext(
7673
this.runEventBuilder = runEventBuilder;
7774
}
7875

79-
private static Boolean shouldEmit(RunEvent event){
80-
if (RUNNING.equals(event.getEventType())) {
81-
log.info("OpenLineage event is RUNNING and should not be emmited");
82-
return false;
83-
}
84-
85-
String jobName = event.getJob().getName();
86-
if (isNull(jobName)) {
87-
log.info("OpenLineage event has no job name should not be emitted");
88-
return false;
89-
}
90-
91-
if (NU_WANTED_EVENT_NAME_SUBSTRINGS.stream().noneMatch(jobName::contains)) {
92-
log.info("OpenLineage event has no lineage value and will not be emmited");
93-
return false;
94-
}
95-
96-
return true;
97-
}
98-
99-
private static Boolean shouldKeepColumnLineageFacet(EventType eventType) {
100-
return !(START.equals(eventType) || RUNNING.equals(eventType));
101-
}
102-
103-
private static void discardColumnLineage(RunEvent event) {
104-
if (shouldKeepColumnLineageFacet(event.getEventType())) { return; }
105-
106-
log.info("Discarding column lineage facet for event {}", event.getEventType());
107-
108-
try {
109-
Field columnLineageFacetField = OpenLineage.DatasetFacets.class.getDeclaredField("columnLineage");
110-
columnLineageFacetField.setAccessible(true);
111-
Stream
112-
.concat(event.getInputs().stream(), event.getOutputs().stream())
113-
.collect(Collectors.toList())
114-
.forEach(dataset -> {
115-
try {
116-
log.info("Discarding column lineage facet for dataset {} {} {}", dataset.getClass().getName(), dataset.getNamespace(), dataset.getName());
117-
columnLineageFacetField.set(dataset.getFacets(), null);
118-
} catch (IllegalAccessException e) {
119-
log.warn("Failed to discard column lineage facet", e);
120-
}
121-
});
122-
} catch (NoSuchFieldException e) {
123-
log.error("Failed to discard column lineage facet: columnLineage field not found at OpenLineage.DatasetFacets", e);
124-
}
125-
}
126-
12776
@Override
12877
public void start(SparkListenerSQLExecutionStart startEvent) {
12978
if (log.isDebugEnabled()) {
@@ -157,14 +106,8 @@ public void start(SparkListenerSQLExecutionStart startEvent) {
157106
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
158107
.build());
159108

160-
if (!shouldEmit(event)) {
161-
return;
162-
}
163-
164-
discardColumnLineage(event);
165-
166109
log.debug("Posting event for start {}: {}", executionId, event);
167-
eventEmitter.emit(event);
110+
NuEventEmitter.emit(event, eventEmitter);
168111
}
169112

170113
@Override
@@ -210,16 +153,10 @@ public void end(SparkListenerSQLExecutionEnd endEvent) {
210153
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
211154
.build());
212155

213-
if (!shouldEmit(event)) {
214-
return;
215-
}
216-
217-
discardColumnLineage(event);
218-
219156
if (log.isDebugEnabled()) {
220157
log.debug("Posting event for end {}: {}", executionId, OpenLineageClientUtils.toJson(event));
221158
}
222-
eventEmitter.emit(event);
159+
NuEventEmitter.emit(event, eventEmitter);
223160
}
224161

225162
// TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed
@@ -249,14 +186,8 @@ public void start(SparkListenerStageSubmitted stageSubmitted) {
249186
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
250187
.build());
251188

252-
if (!shouldEmit(event)) {
253-
return;
254-
}
255-
256-
discardColumnLineage(event);
257-
258189
log.debug("Posting event for stage submitted {}: {}", executionId, event);
259-
eventEmitter.emit(event);
190+
NuEventEmitter.emit(event, eventEmitter);
260191
}
261192

262193
// TODO: not invoked until https://github.com/OpenLineage/OpenLineage/issues/470 is completed
@@ -285,14 +216,8 @@ public void end(SparkListenerStageCompleted stageCompleted) {
285216
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
286217
.build());
287218

288-
if (!shouldEmit(event)) {
289-
return;
290-
}
291-
292-
discardColumnLineage(event);
293-
294219
log.debug("Posting event for stage completed {}: {}", executionId, event);
295-
eventEmitter.emit(event);
220+
NuEventEmitter.emit(event, eventEmitter);
296221
}
297222

298223
@Override
@@ -344,14 +269,8 @@ public void start(SparkListenerJobStart jobStart) {
344269
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
345270
.build());
346271

347-
if (!shouldEmit(event)) {
348-
return;
349-
}
350-
351-
discardColumnLineage(event);
352-
353272
log.debug("Posting event for start {}: {}", executionId, event);
354-
eventEmitter.emit(event);
273+
NuEventEmitter.emit(event, eventEmitter);
355274
}
356275

357276
@Override
@@ -399,14 +318,8 @@ public void end(SparkListenerJobEnd jobEnd) {
399318
.jobFacetsBuilder(getJobFacetsBuilder(olContext.getQueryExecution().get()))
400319
.build());
401320

402-
if (!shouldEmit(event)) {
403-
return;
404-
}
405-
406-
discardColumnLineage(event);
407-
408321
log.debug("Posting event for end {}: {}", executionId, event);
409-
eventEmitter.emit(event);
322+
NuEventEmitter.emit(event, eventEmitter);
410323
}
411324

412325
@Override

0 commit comments

Comments
 (0)