Skip to content

Commit 8812a1f

Browse files
authored
Merge pull request #160 from marklogic/feature/logging-enhancement
MLE-12294 Improving logging for sake of ETL tool
2 parents 8379ebe + 3bdc828 commit 8812a1f

15 files changed

+81
-44
lines changed

src/main/java/com/marklogic/spark/DefaultSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
*/
4646
public class DefaultSource implements TableProvider, DataSourceRegister {
4747

48-
private static final Logger logger = LoggerFactory.getLogger(DefaultSource.class);
48+
private static final Logger logger = LoggerFactory.getLogger("com.marklogic.spark");
4949

5050
@Override
5151
public String shortName() {

src/main/java/com/marklogic/spark/Util.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.marklogic.spark;
1717

1818
import org.apache.spark.sql.catalyst.json.JSONOptions;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
1921
import scala.collection.immutable.HashMap;
2022

2123
import java.util.ArrayList;
@@ -25,6 +27,12 @@
2527

2628
public interface Util {
2729

30+
/**
31+
* Intended for all non-debug logging where the class name doesn't matter and only adds complexity to the log
32+
* messages.
33+
*/
34+
Logger MAIN_LOGGER = LoggerFactory.getLogger("com.marklogic.spark");
35+
2836
JSONOptions DEFAULT_JSON_OPTIONS = new JSONOptions(
2937
new HashMap<>(),
3038

src/main/java/com/marklogic/spark/reader/customcode/CustomCodeMicroBatchStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.marklogic.spark.reader.customcode;
22

3+
import com.marklogic.spark.Util;
34
import org.apache.spark.sql.connector.read.InputPartition;
45
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
56
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
@@ -74,6 +75,6 @@ public void commit(Offset end) {
7475

7576
@Override
7677
public void stop() {
77-
logger.info("Stopping");
78+
Util.MAIN_LOGGER.info("Stopping");
7879
}
7980
}

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.marklogic.client.query.QueryDefinition;
1212
import com.marklogic.client.query.SearchQueryDefinition;
1313
import com.marklogic.client.query.StructuredQueryBuilder;
14+
import com.marklogic.spark.Util;
1415
import org.apache.spark.sql.catalyst.InternalRow;
1516
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
1617
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
@@ -89,9 +90,9 @@ public boolean next() {
8990
List<String> uris = getNextBatchOfUris();
9091
if (uris.isEmpty()) {
9192
// TBD on whether this should be info/debug.
92-
if (logger.isInfoEnabled()) {
93+
if (Util.MAIN_LOGGER.isInfoEnabled()) {
9394
long duration = System.currentTimeMillis() - startTime;
94-
logger.info("Read {} documents from partition {} in {}ms", docCount, forestPartition, duration);
95+
Util.MAIN_LOGGER.info("Read {} documents from partition {} in {}ms", docCount, forestPartition, duration);
9596
}
9697
return false;
9798
}

src/main/java/com/marklogic/spark/reader/optic/OpticMicroBatchStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.marklogic.spark.reader.optic;
1717

18+
import com.marklogic.spark.Util;
1819
import org.apache.spark.sql.connector.read.InputPartition;
1920
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
2021
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
@@ -98,6 +99,6 @@ public void commit(Offset end) {
9899

99100
@Override
100101
public void stop() {
101-
logger.info("Stopping");
102+
Util.MAIN_LOGGER.info("Stopping");
102103
}
103104
}

src/main/java/com/marklogic/spark/reader/optic/OpticPartitionReaderFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ class OpticPartitionReaderFactory implements PartitionReaderFactory {
3535

3636
@Override
3737
public PartitionReader<InternalRow> createReader(InputPartition partition) {
38-
logger.info("Creating reader for partition: {}", partition);
38+
if (logger.isDebugEnabled()) {
39+
logger.debug("Creating reader for partition: {}", partition);
40+
}
3941
return new OpticPartitionReader(this.readContext, (PlanAnalysis.Partition) partition);
4042
}
4143
}

src/main/java/com/marklogic/spark/reader/optic/OpticScanBuilder.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.marklogic.spark.reader.optic;
1717

1818
import com.marklogic.spark.Options;
19+
import com.marklogic.spark.Util;
1920
import com.marklogic.spark.reader.filter.FilterFactory;
2021
import com.marklogic.spark.reader.filter.OpticFilter;
2122
import org.apache.spark.sql.connector.expressions.SortOrder;
@@ -80,8 +81,8 @@ public Filter[] pushFilters(Filter[] filters) {
8081
for (Filter filter : filters) {
8182
OpticFilter opticFilter = FilterFactory.toPlanFilter(filter);
8283
if (opticFilter != null) {
83-
if (logger.isInfoEnabled()) {
84-
logger.info("Pushing down filter: {}", filter);
84+
if (logger.isDebugEnabled()) {
85+
logger.debug("Pushing down filter: {}", filter);
8586
}
8687
opticFilters.add(opticFilter);
8788
this.pushedFilters.add(filter);
@@ -110,8 +111,8 @@ public boolean pushLimit(int limit) {
110111
if (readContext.planAnalysisFoundNoRows()) {
111112
return false;
112113
}
113-
if (logger.isInfoEnabled()) {
114-
logger.info("Pushing down limit: {}", limit);
114+
if (logger.isDebugEnabled()) {
115+
logger.debug("Pushing down limit: {}", limit);
115116
}
116117
readContext.pushDownLimit(limit);
117118
return true;
@@ -125,8 +126,8 @@ public boolean pushTopN(SortOrder[] orders, int limit) {
125126
// This will be invoked when the user calls both orderBy and limit in their Spark program. If the user only
126127
// calls limit, then only pushLimit is called and this will not be called. If the user only calls orderBy and
127128
// not limit, then neither this nor pushLimit will be called.
128-
if (logger.isInfoEnabled()) {
129-
logger.info("Pushing down topN: {}; limit: {}", Arrays.asList(orders), limit);
129+
if (logger.isDebugEnabled()) {
130+
logger.debug("Pushing down topN: {}; limit: {}", Arrays.asList(orders), limit);
130131
}
131132
readContext.pushDownTopN(orders, limit);
132133
return true;
@@ -156,16 +157,16 @@ public boolean supportCompletePushDown(Aggregation aggregation) {
156157
}
157158

158159
if (hasUnsupportedAggregateFunction(aggregation)) {
159-
if (logger.isInfoEnabled()) {
160-
logger.info("Aggregation contains one or more unsupported functions, " +
160+
if (Util.MAIN_LOGGER.isInfoEnabled()) {
161+
Util.MAIN_LOGGER.info("Aggregation contains one or more unsupported functions, " +
161162
"so not pushing aggregation to MarkLogic: {}", describeAggregation(aggregation));
162163
}
163164
return false;
164165
}
165166

166167
if (readContext.getBucketCount() > 1) {
167-
if (logger.isInfoEnabled()) {
168-
logger.info("Multiple requests will be made to MarkLogic; aggregation will be applied by Spark as well: {}",
168+
if (Util.MAIN_LOGGER.isInfoEnabled()) {
169+
Util.MAIN_LOGGER.info("Multiple requests will be made to MarkLogic; aggregation will be applied by Spark as well: {}",
169170
describeAggregation(aggregation));
170171
}
171172
return false;
@@ -183,12 +184,12 @@ public boolean pushAggregation(Aggregation aggregation) {
183184
}
184185

185186
if (pushDownAggregatesIsDisabled()) {
186-
logger.info("Push down of aggregates is disabled; Spark will handle all aggregations.");
187+
Util.MAIN_LOGGER.info("Push down of aggregates is disabled; Spark will handle all aggregations.");
187188
return false;
188189
}
189190

190-
if (logger.isInfoEnabled()) {
191-
logger.info("Pushing down aggregation: {}", describeAggregation(aggregation));
191+
if (logger.isDebugEnabled()) {
192+
logger.debug("Pushing down aggregation: {}", describeAggregation(aggregation));
192193
}
193194
readContext.pushDownAggregation(aggregation);
194195
return true;

src/main/java/com/marklogic/spark/reader/optic/PlanUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ static ObjectNode buildGroupByAggregation(List<String> columnNames, Aggregation
9999
aggregateArgs.addObject().put("values", "distinct");
100100
}
101101
} else {
102-
logger.info("Unsupported aggregate function, will not be pushed to Optic: {}", func);
102+
if (logger.isDebugEnabled()) {
103+
logger.debug("Unsupported aggregate function, will not be pushed to Optic: {}", func);
104+
}
103105
}
104106
}
105107
});

src/main/java/com/marklogic/spark/reader/optic/ReadContext.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.marklogic.spark.ConnectorException;
3030
import com.marklogic.spark.ContextSupport;
3131
import com.marklogic.spark.Options;
32+
import com.marklogic.spark.Util;
3233
import com.marklogic.spark.reader.filter.OpticFilter;
3334
import org.apache.spark.sql.connector.expressions.Expression;
3435
import org.apache.spark.sql.connector.expressions.SortOrder;
@@ -92,8 +93,8 @@ public ReadContext(Map<String, String> properties, StructType schema, int defaul
9293
}
9394

9495
if (this.planAnalysis != null) {
95-
if (logger.isInfoEnabled()) {
96-
logger.info("Partition count: {}; number of requests that will be made to MarkLogic: {}",
96+
if (Util.MAIN_LOGGER.isInfoEnabled()) {
97+
Util.MAIN_LOGGER.info("Partition count: {}; number of requests that will be made to MarkLogic: {}",
9798
this.planAnalysis.getPartitions().size(), this.planAnalysis.getAllBuckets().size());
9899
}
99100
// Calling this to establish a server timestamp.
@@ -108,7 +109,7 @@ public ReadContext(Map<String, String> properties, StructType schema, int defaul
108109
private void handlePlanAnalysisError(String query, FailedRequestException ex) {
109110
final String indicatorOfNoRowsExisting = "$tableId as xs:string -- Invalid coercion: () as xs:string";
110111
if (ex.getMessage().contains(indicatorOfNoRowsExisting)) {
111-
logger.info("No rows were found, so will not create any partitions.");
112+
Util.MAIN_LOGGER.info("No rows were found, so will not create any partitions.");
112113
} else {
113114
throw new ConnectorException(String.format("Unable to run Optic DSL query %s; cause: %s", query, ex.getMessage()), ex);
114115
}
@@ -195,12 +196,14 @@ void pushDownAggregation(Aggregation aggregation) {
195196
StructField field = findColumnInSchema(sum.column(), PlanUtil.expressionToColumnName(sum.column()));
196197
newSchema = newSchema.add(func.toString(), field.dataType());
197198
} else {
198-
logger.info("Unsupported aggregate function: {}", func);
199+
if (logger.isDebugEnabled()) {
200+
logger.debug("Unsupported aggregate function: {}", func);
201+
}
199202
}
200203
}
201204

202205
if (!getProperties().containsKey(Options.READ_BATCH_SIZE)) {
203-
logger.info("Batch size was not overridden, so modifying each partition to make a single request to improve " +
206+
Util.MAIN_LOGGER.info("Batch size was not overridden, so modifying each partition to make a single request to improve " +
204207
"performance of pushed down aggregation.");
205208
List<PlanAnalysis.Partition> mergedPartitions = planAnalysis.getPartitions().stream()
206209
.map(p -> p.mergeBuckets())

src/main/java/com/marklogic/spark/reader/optic/SchemaInferrer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,16 @@
1919
import com.fasterxml.jackson.databind.JsonNode;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.marklogic.spark.ConnectorException;
22+
import com.marklogic.spark.Util;
2223
import org.apache.spark.sql.types.DataType;
2324
import org.apache.spark.sql.types.DataTypes;
2425
import org.apache.spark.sql.types.StructType;
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
2726

2827
import java.util.HashMap;
2928
import java.util.Map;
3029

3130
public abstract class SchemaInferrer {
3231

33-
private static final Logger logger = LoggerFactory.getLogger(SchemaInferrer.class);
3432
private static final ObjectMapper objectMapper = new ObjectMapper();
3533

3634
// "Column info types" = the possible set of types returned by the columnInfo call to /v1/rows. Note that this is
@@ -105,7 +103,7 @@ private static DataType determineSparkType(JsonNode column) {
105103
if (COLUMN_INFO_TYPES_TO_SPARK_TYPES.containsKey(type)) {
106104
return COLUMN_INFO_TYPES_TO_SPARK_TYPES.get(type);
107105
}
108-
logger.warn("Unrecognized column type: {}; will map to Spark StringType", column);
106+
Util.MAIN_LOGGER.warn("Unrecognized column type: {}; will map to Spark StringType", column);
109107
return DataTypes.StringType;
110108
}
111109

0 commit comments

Comments
 (0)