Skip to content

Commit 29eefe1

Browse files
authored
Merge pull request #327 from marklogic/feature/2.4.2-read-json-lines
MLE-17412 Added JSON Lines file reader
2 parents 67cebe8 + 66cc33c commit 29eefe1

File tree

6 files changed

+237
-5
lines changed

6 files changed

+237
-5
lines changed

src/main/java/com/marklogic/spark/reader/file/FileContext.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
import org.apache.hadoop.fs.Path;
1313
import org.apache.spark.util.SerializableConfiguration;
1414

15-
import java.io.IOException;
16-
import java.io.InputStream;
17-
import java.io.Serializable;
18-
import java.io.UnsupportedEncodingException;
15+
import java.io.*;
1916
import java.net.URLDecoder;
2017
import java.nio.charset.Charset;
2118
import java.nio.charset.UnsupportedCharsetException;
@@ -49,11 +46,28 @@ public boolean isGzip() {
4946
}
5047

5148
public InputStream openFile(String filePath) {
49+
return openFile(filePath, false);
50+
}
51+
52+
public InputStream openFile(String filePath, boolean guessIfGzipped) {
5253
try {
5354
Path hadoopPath = new Path(filePath);
5455
FileSystem fileSystem = hadoopPath.getFileSystem(hadoopConfiguration.value());
5556
FSDataInputStream inputStream = fileSystem.open(hadoopPath);
56-
return this.isGzip() ? new GZIPInputStream(inputStream) : inputStream;
57+
return isFileGzipped(filePath, guessIfGzipped) ? new GZIPInputStream(inputStream) : inputStream;
58+
} catch (Exception e) {
59+
throw new ConnectorException(String.format(
60+
"Unable to read file at %s; cause: %s", filePath, e.getMessage()), e);
61+
}
62+
}
63+
64+
BufferedReader openFileReader(String filePath, boolean guessIfGzipped) {
65+
try {
66+
InputStream inputStream = openFile(filePath, guessIfGzipped);
67+
InputStreamReader inputStreamReader = this.encoding != null ?
68+
new InputStreamReader(inputStream, encoding) :
69+
new InputStreamReader(inputStream);
70+
return new BufferedReader(inputStreamReader);
5771
} catch (Exception e) {
5872
throw new ConnectorException(String.format(
5973
"Unable to read file at %s; cause: %s", filePath, e.getMessage()), e);
@@ -82,4 +96,11 @@ public String decodeFilePath(String path) {
8296
return path;
8397
}
8498
}
99+
100+
private boolean isFileGzipped(String filePath, boolean guessIfGzipped) {
101+
if (this.isGzip()) {
102+
return true;
103+
}
104+
return guessIfGzipped && filePath != null && (filePath.endsWith(".gz") || filePath.endsWith(".gzip"));
105+
}
85106
}

src/main/java/com/marklogic/spark/reader/file/FilePartitionReaderFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public PartitionReader<InternalRow> createReader(InputPartition partition) {
3535
return new MlcpArchiveFileReader(filePartition, fileContext);
3636
} else if ("archive".equalsIgnoreCase(fileType)) {
3737
return new ArchiveFileReader(filePartition, fileContext);
38+
} else if ("json_lines".equalsIgnoreCase(fileType)) {
39+
return new JsonLinesFileReader(filePartition, fileContext);
3840
} else if (fileContext.hasOption(Options.READ_AGGREGATES_XML_ELEMENT)) {
3941
return fileContext.isZip() ?
4042
new ZipAggregateXmlFileReader(filePartition, fileContext) :
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file;
5+
6+
import org.apache.commons.io.IOUtils;
7+
import org.apache.spark.sql.catalyst.InternalRow;
8+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
9+
import org.apache.spark.sql.connector.read.PartitionReader;
10+
import org.apache.spark.unsafe.types.ByteArray;
11+
import org.apache.spark.unsafe.types.UTF8String;
12+
13+
import java.io.BufferedReader;
14+
import java.util.Iterator;
15+
16+
class JsonLinesFileReader implements PartitionReader<InternalRow> {
17+
18+
private final FilePartition filePartition;
19+
private final FileContext fileContext;
20+
21+
private BufferedReader bufferedReader;
22+
private Iterator<String> bufferedLines;
23+
24+
private InternalRow nextRowToReturn;
25+
private String currentFilePath;
26+
private int lineCounter;
27+
private int filePathIndex;
28+
29+
JsonLinesFileReader(FilePartition filePartition, FileContext fileContext) {
30+
this.filePartition = filePartition;
31+
this.fileContext = fileContext;
32+
}
33+
34+
@Override
35+
public boolean next() {
36+
if (bufferedLines != null && bufferedLines.hasNext()) {
37+
this.nextRowToReturn = createRowFromNextJsonLine();
38+
return true;
39+
}
40+
41+
if (bufferedReader != null) {
42+
IOUtils.closeQuietly(bufferedReader);
43+
}
44+
45+
if (filePathIndex >= filePartition.getPaths().size()) {
46+
return false;
47+
}
48+
49+
openNextFile();
50+
return next();
51+
}
52+
53+
@Override
54+
public InternalRow get() {
55+
return nextRowToReturn;
56+
}
57+
58+
@Override
59+
public void close() {
60+
IOUtils.closeQuietly(bufferedReader);
61+
}
62+
63+
private void openNextFile() {
64+
final String originalFilePath = filePartition.getPaths().get(filePathIndex);
65+
this.currentFilePath = fileContext.decodeFilePath(originalFilePath);
66+
this.lineCounter = 1;
67+
this.filePathIndex++;
68+
// To mimic the behavior of the Spark JSON data source, this will guess if the file is gzipped based on its
69+
// file extension. This allows for .gz/.gzip files to be supported without the user having to specify the
70+
// compression option, which is the same behavior as Spark JSON provides.
71+
this.bufferedReader = fileContext.openFileReader(currentFilePath, true);
72+
this.bufferedLines = bufferedReader.lines().iterator();
73+
}
74+
75+
private InternalRow createRowFromNextJsonLine() {
76+
String line = bufferedLines.next();
77+
String uri = String.format("%s-%d.json", UTF8String.fromString(currentFilePath), lineCounter);
78+
lineCounter++;
79+
return new GenericInternalRow(new Object[]{
80+
UTF8String.fromString(uri),
81+
ByteArray.concat(line.getBytes()),
82+
null, null, null, null, null, null
83+
});
84+
}
85+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file;
5+
6+
import com.fasterxml.jackson.databind.JsonNode;
7+
import com.marklogic.spark.AbstractIntegrationTest;
8+
import com.marklogic.spark.Options;
9+
import org.apache.spark.sql.Dataset;
10+
import org.apache.spark.sql.Row;
11+
import org.apache.spark.sql.SaveMode;
12+
import org.junit.jupiter.api.Test;
13+
14+
import static org.junit.jupiter.api.Assertions.assertEquals;
15+
import static org.junit.jupiter.api.Assertions.assertFalse;
16+
17+
class ReadJsonLinesFilesTest extends AbstractIntegrationTest {
18+
19+
@Test
20+
void test() {
21+
Dataset<Row> dataset = newSparkSession().read().format(CONNECTOR_IDENTIFIER)
22+
.option(Options.READ_FILES_TYPE, "json_lines")
23+
.load("src/test/resources/json-lines/nested-objects.txt");
24+
25+
assertEquals(2, dataset.count(), "Should have one row for each line in the file.");
26+
27+
dataset.write().format(CONNECTOR_IDENTIFIER)
28+
.option(Options.CLIENT_URI, makeClientUri())
29+
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
30+
.option(Options.WRITE_COLLECTIONS, "json-lines")
31+
.option(Options.WRITE_URI_REPLACE, ".*json-lines,''")
32+
.mode(SaveMode.Append)
33+
.save();
34+
35+
assertCollectionSize("json-lines", 2);
36+
37+
JsonNode doc = readJsonDocument("/nested-objects.txt-1.json");
38+
assertEquals(1, doc.get("id").asInt());
39+
assertEquals("blue", doc.at("/data/color").asText());
40+
assertEquals("world", doc.get("hello").asText());
41+
42+
doc = readJsonDocument("/nested-objects.txt-2.json");
43+
assertEquals(2, doc.get("id").asInt());
44+
assertFalse(doc.has("hello"));
45+
}
46+
47+
@Test
48+
void withUriTemplate() {
49+
newSparkSession().read().format(CONNECTOR_IDENTIFIER)
50+
.option(Options.READ_FILES_TYPE, "json_lines")
51+
.load("src/test/resources/json-lines/nested-objects.txt")
52+
.write().format(CONNECTOR_IDENTIFIER)
53+
.option(Options.CLIENT_URI, makeClientUri())
54+
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
55+
.option(Options.WRITE_COLLECTIONS, "json-lines")
56+
.option(Options.WRITE_URI_TEMPLATE, "/a/{id}.json")
57+
.mode(SaveMode.Append)
58+
.save();
59+
60+
assertCollectionSize("json-lines", 2);
61+
62+
JsonNode doc = readJsonDocument("/a/1.json");
63+
assertEquals(1, doc.get("id").asInt());
64+
65+
doc = readJsonDocument("/a/2.json");
66+
assertEquals(2, doc.get("id").asInt());
67+
}
68+
69+
@Test
70+
void encoding() {
71+
newSparkSession().read().format(CONNECTOR_IDENTIFIER)
72+
.option(Options.READ_FILES_TYPE, "json_lines")
73+
.option(Options.READ_FILES_ENCODING, "ISO-8859-1")
74+
.load("src/test/resources/json-lines/objects-iso-8859-1.txt")
75+
.write().format(CONNECTOR_IDENTIFIER)
76+
.option(Options.CLIENT_URI, makeClientUri())
77+
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
78+
.option(Options.WRITE_COLLECTIONS, "json-lines")
79+
.option(Options.WRITE_URI_REPLACE, ".*json-lines,''")
80+
.mode(SaveMode.Append)
81+
.save();
82+
83+
assertCollectionSize("json-lines", 2);
84+
85+
JsonNode doc = readJsonDocument("/objects-iso-8859-1.txt-1.json");
86+
assertEquals("Istituto di Anatomia e Istologia Patologica, Università di Ferrara, Italy",
87+
doc.get("text").asText(), "Verifying that the encoded text is correctly read and written to MarkLogic.");
88+
}
89+
90+
@Test
91+
void gzip() {
92+
newSparkSession().read().format(CONNECTOR_IDENTIFIER)
93+
.option(Options.READ_FILES_TYPE, "json_lines")
94+
.option(Options.READ_FILES_COMPRESSION, "gzip")
95+
.load("src/test/resources/json-lines/nested-objects.txt.gz")
96+
.write().format(CONNECTOR_IDENTIFIER)
97+
.option(Options.CLIENT_URI, makeClientUri())
98+
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
99+
.option(Options.WRITE_COLLECTIONS, "json-lines")
100+
.option(Options.WRITE_URI_REPLACE, ".*json-lines,''")
101+
.mode(SaveMode.Append)
102+
.save();
103+
104+
assertCollectionSize("json-lines", 2);
105+
}
106+
107+
@Test
108+
void gzipWithoutCompressionOption() {
109+
newSparkSession().read().format(CONNECTOR_IDENTIFIER)
110+
.option(Options.READ_FILES_TYPE, "json_lines")
111+
.load("src/test/resources/json-lines/nested-objects.txt.gz")
112+
.write().format(CONNECTOR_IDENTIFIER)
113+
.option(Options.CLIENT_URI, makeClientUri())
114+
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
115+
.option(Options.WRITE_COLLECTIONS, "json-lines")
116+
.option(Options.WRITE_URI_REPLACE, ".*json-lines,''")
117+
.mode(SaveMode.Append)
118+
.save();
119+
120+
assertCollectionSize("json-lines", 2);
121+
}
122+
}
126 Bytes
Binary file not shown.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"text": "Istituto di Anatomia e Istologia Patologica, Universit� di Ferrara, Italy"}
2+
{"text": "This doesn't require encoding."}

0 commit comments

Comments
 (0)