Skip to content

Commit 7512637

Browse files
committed
MLE-12332 Can now read RDF/XML files
Main things of note: 1. New `TripleRowSchema` instead of using document row schema, as we'll need to capture the graph for quads. 2. `TripleSerializer` is mostly copy/pasted (and cleaned up a bunch) from MLCP.
1 parent 0c4b1aa commit 7512637

File tree

12 files changed

+307
-9
lines changed

12 files changed

+307
-9
lines changed

build.gradle

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ group 'com.marklogic'
1313
version '2.2-SNAPSHOT'
1414

1515
java {
16-
sourceCompatibility = 1.8
17-
targetCompatibility = 1.8
16+
// To support reading RDF files, Apache Jena is used - but that requires Java 11. If we want to do a 2.2.0 release
17+
// without requiring Java 11, we'll remove the support for reading RDF files along with the Jena dependency.
18+
sourceCompatibility = 11
19+
targetCompatibility = 11
1820
}
1921

2022
repositories {
@@ -40,6 +42,8 @@ dependencies {
4042
exclude module: "scala-library"
4143
}
4244

45+
implementation "org.apache.jena:jena-arq:4.10.0"
46+
4347
testImplementation 'org.apache.spark:spark-sql_2.12:' + sparkVersion
4448

4549
// The exclusions in these two modules ensure that we use the Jackson libraries from spark-sql when running the tests.
@@ -56,7 +60,7 @@ dependencies {
5660
exclude module: 'jackson-dataformat-csv'
5761
}
5862

59-
testImplementation "ch.qos.logback:logback-classic:1.3.5"
63+
testImplementation "ch.qos.logback:logback-classic:1.3.14"
6064
testImplementation "org.slf4j:jcl-over-slf4j:1.7.36"
6165
testImplementation "org.skyscreamer:jsonassert:1.5.1"
6266
}

src/main/java/com/marklogic/spark/DefaultSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.marklogic.spark.reader.document.DocumentRowSchema;
2222
import com.marklogic.spark.reader.document.DocumentTable;
2323
import com.marklogic.spark.reader.file.FileRowSchema;
24+
import com.marklogic.spark.reader.file.TripleRowSchema;
2425
import com.marklogic.spark.reader.optic.SchemaInferrer;
2526
import com.marklogic.spark.writer.WriteContext;
2627
import org.apache.spark.sql.SparkSession;
@@ -63,7 +64,7 @@ public String shortName() {
6364
public StructType inferSchema(CaseInsensitiveStringMap options) {
6465
final Map<String, String> properties = options.asCaseSensitiveMap();
6566
if (isFileOperation(properties)) {
66-
return FileRowSchema.SCHEMA;
67+
return "rdf".equals(properties.get(Options.READ_FILES_TYPE)) ? TripleRowSchema.SCHEMA : FileRowSchema.SCHEMA;
6768
}
6869
if (isReadDocumentsOperation(properties)) {
6970
return DocumentRowSchema.SCHEMA;
@@ -85,8 +86,7 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
8586

8687
if (isReadDocumentsOperation(properties)) {
8788
return new DocumentTable();
88-
}
89-
else if (isReadOperation(properties)) {
89+
} else if (isReadOperation(properties)) {
9090
if (logger.isDebugEnabled()) {
9191
logger.debug("Creating new table for reading");
9292
}

src/main/java/com/marklogic/spark/Options.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public abstract class Options {
5555
public static final String READ_DOCUMENTS_TRANSFORM_PARAMS_DELIMITER = "spark.marklogic.read.documents.transformParamsDelimiter";
5656
public static final String READ_DOCUMENTS_PARTITIONS_PER_FOREST = "spark.marklogic.read.documents.partitionsPerForest";
5757

58+
public static final String READ_FILES_TYPE = "spark.marklogic.read.files.type";
5859
public static final String READ_FILES_COMPRESSION = "spark.marklogic.read.files.compression";
5960

6061
// "Aggregate" = an XML document containing N child elements, each of which should become a row / document.

src/main/java/com/marklogic/spark/reader/document/DocumentRowSchema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
public abstract class DocumentRowSchema {
77

88
public static final StructType SCHEMA = new StructType()
9-
.add("URI", DataTypes.StringType)
9+
.add("URI", DataTypes.StringType, false)
1010
.add("content", DataTypes.BinaryType)
1111
.add("format", DataTypes.StringType)
1212
.add("collections", DataTypes.createArrayType(DataTypes.StringType))

src/main/java/com/marklogic/spark/reader/file/FilePartitionReaderFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,15 @@ class FilePartitionReaderFactory implements PartitionReaderFactory {
2525
@Override
2626
public PartitionReader<InternalRow> createReader(InputPartition partition) {
2727
FilePartition filePartition = (FilePartition) partition;
28+
2829
String compression = this.properties.get(Options.READ_FILES_COMPRESSION);
2930
final boolean isZip = "zip".equalsIgnoreCase(compression);
3031
final boolean isGzip = "gzip".equalsIgnoreCase(compression);
3132

33+
if ("rdf".equalsIgnoreCase(this.properties.get(Options.READ_FILES_TYPE))) {
34+
return new RdfFileReader(filePartition, hadoopConfiguration);
35+
}
36+
3237
String aggregateXmlElement = this.properties.get(Options.READ_AGGREGATES_XML_ELEMENT);
3338
if (aggregateXmlElement != null && !aggregateXmlElement.trim().isEmpty()) {
3439
if (isZip) {

src/main/java/com/marklogic/spark/reader/file/FileRowSchema.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ public abstract class FileRowSchema {
88
// Same as Spark's binaryType.
99
// See https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html .
1010
public static final StructType SCHEMA = new StructType()
11-
.add("path", DataTypes.StringType)
11+
.add("path", DataTypes.StringType, false)
1212
.add("modificationTime", DataTypes.TimestampType)
1313
.add("length", DataTypes.LongType)
14-
.add("content", DataTypes.BinaryType);
14+
.add("content", DataTypes.BinaryType, false);
1515

1616
private FileRowSchema() {
1717
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.marklogic.spark.reader.file;
2+
3+
import com.marklogic.spark.ConnectorException;
4+
import org.apache.commons.io.IOUtils;
5+
import org.apache.hadoop.fs.Path;
6+
import org.apache.jena.graph.Triple;
7+
import org.apache.jena.riot.Lang;
8+
import org.apache.jena.riot.RDFParserBuilder;
9+
import org.apache.jena.riot.system.AsyncParser;
10+
import org.apache.spark.sql.catalyst.InternalRow;
11+
import org.apache.spark.sql.connector.read.PartitionReader;
12+
import org.apache.spark.util.SerializableConfiguration;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.util.Iterator;
19+
20+
class RdfFileReader implements PartitionReader<InternalRow> {
21+
22+
private static final Logger logger = LoggerFactory.getLogger(RdfFileReader.class);
23+
24+
private final InputStream inputStream;
25+
private final Iterator<Triple> tripleStream;
26+
27+
private final TripleSerializer tripleSerializer = new TripleSerializer();
28+
29+
RdfFileReader(FilePartition partition, SerializableConfiguration hadoopConfiguration) {
30+
if (logger.isDebugEnabled()) {
31+
logger.debug("Reading RDF file {}", partition.getPath());
32+
}
33+
Path path = new Path(partition.getPath());
34+
try {
35+
this.inputStream = path.getFileSystem(hadoopConfiguration.value()).open(path);
36+
this.tripleStream = AsyncParser.of(RDFParserBuilder.create()
37+
.source(this.inputStream)
38+
.lang(Lang.RDFXML)
39+
.base(partition.getPath())
40+
).streamTriples().iterator();
41+
} catch (Exception e) {
42+
throw new ConnectorException(String.format("Unable to read RDF file at %s; cause: %s", path, e.getMessage()), e);
43+
}
44+
}
45+
46+
@Override
47+
public boolean next() throws IOException {
48+
return this.tripleStream.hasNext();
49+
}
50+
51+
@Override
52+
public InternalRow get() {
53+
Triple triple = this.tripleStream.next();
54+
return tripleSerializer.serialize(triple);
55+
}
56+
57+
@Override
58+
public void close() throws IOException {
59+
IOUtils.closeQuietly(this.inputStream);
60+
}
61+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.marklogic.spark.reader.file;
2+
3+
import org.apache.spark.sql.types.DataTypes;
4+
import org.apache.spark.sql.types.StructType;
5+
6+
/**
7+
* Represents a triple as read from an RDF file and serialized into the 3 XML elements comprising
8+
* a MarkLogic triple.
9+
*/
10+
public abstract class TripleRowSchema {
11+
12+
public static final StructType SCHEMA = new StructType()
13+
.add("subject", DataTypes.StringType, false)
14+
.add("predicate", DataTypes.StringType, false)
15+
.add("object", DataTypes.StringType, false)
16+
.add("datatype", DataTypes.StringType)
17+
.add("lang", DataTypes.StringType)
18+
.add("graph", DataTypes.StringType);
19+
20+
private TripleRowSchema() {
21+
}
22+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.marklogic.spark.reader.file;
2+
3+
import org.apache.jena.graph.Node;
4+
import org.apache.jena.graph.Triple;
5+
import org.apache.spark.sql.catalyst.InternalRow;
6+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
7+
import org.apache.spark.unsafe.types.UTF8String;
8+
9+
import java.util.Random;
10+
11+
/**
12+
* Captures the logic from Content Pump for serializing a Jena Triple into a string representation. Note that this does
13+
* not contain the "escape XML" logic in the MLCP code as we don't care about an XML representation of the triples yet.
14+
* We just want to return the raw values so they can be added to a Spark row.
15+
*/
16+
class TripleSerializer {
17+
18+
// These are both used in the MLCP-specific code below for generating a "blank" value.
19+
private final static long HASH64_STEP = 15485863L;
20+
private final Random random = new Random();
21+
22+
public InternalRow serialize(Triple triple) {
23+
String[] objectValues = serializeObject(triple);
24+
return new GenericInternalRow(new Object[]{
25+
UTF8String.fromString(serialize(triple.getSubject())),
26+
UTF8String.fromString(serialize(triple.getPredicate())),
27+
UTF8String.fromString(objectValues[0]),
28+
objectValues[1] != null ? UTF8String.fromString(objectValues[1]) : null,
29+
objectValues[2] != null ? UTF8String.fromString(objectValues[2]) : null,
30+
null
31+
});
32+
}
33+
34+
private String serialize(Node node) {
35+
return node.isBlank() ? generateBlankValue(node) : node.toString();
36+
}
37+
38+
/**
39+
* @param triple
40+
* @return an array containing a string serialization of the object; an optional datatype; and an optional "lang" value.
41+
*/
42+
private String[] serializeObject(Triple triple) {
43+
Node node = triple.getObject();
44+
if (node.isLiteral()) {
45+
String type = node.getLiteralDatatypeURI();
46+
String lang = node.getLiteralLanguage();
47+
if ("".equals(lang)) {
48+
lang = null;
49+
}
50+
if ("".equals(lang) || lang == null) {
51+
if (type == null) {
52+
type = "http://www.w3.org/2001/XMLSchema#string";
53+
}
54+
} else {
55+
type = null;
56+
}
57+
return new String[]{node.getLiteralLexicalForm(), type, lang};
58+
} else if (node.isBlank()) {
59+
return new String[]{generateBlankValue(node), null, null};
60+
} else {
61+
return new String[]{node.toString(), null, null};
62+
}
63+
}
64+
65+
/**
66+
* Reuses copy/pasted code from the MLCP codebase for generating a blank value for a "blank node" - see
67+
* https://en.wikipedia.org/wiki/Blank_node for more details. It is not known why a UUID isn't used.
68+
*
69+
* @return
70+
*/
71+
private String generateBlankValue(Node blankNode) {
72+
String value = Long.toHexString(
73+
hash64(
74+
fuse(scramble(System.currentTimeMillis()), random.nextLong()),
75+
blankNode.getBlankNodeLabel()
76+
)
77+
);
78+
return "http://marklogic.com/semantics/blank/" + value;
79+
}
80+
81+
private long hash64(long value, String str) {
82+
char[] arr = str.toCharArray();
83+
for (int i = 0; i < str.length(); i++) {
84+
value = (value + Character.getNumericValue(arr[i])) * HASH64_STEP;
85+
}
86+
return value;
87+
}
88+
89+
private long fuse(long a, long b) {
90+
return rotl(a, 8) ^ b;
91+
}
92+
93+
private long scramble(long x) {
94+
return x ^ rotl(x, 20) ^ rotl(x, 40);
95+
}
96+
97+
private long rotl(long x, long y) {
98+
return (x << y) ^ (x >> (64 - y));
99+
}
100+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package com.marklogic.spark.reader.file;
2+
3+
import com.marklogic.spark.AbstractIntegrationTest;
4+
import com.marklogic.spark.Options;
5+
import org.apache.spark.sql.Dataset;
6+
import org.apache.spark.sql.Row;
7+
import org.junit.jupiter.api.Test;
8+
9+
import java.util.List;
10+
11+
import static org.junit.jupiter.api.Assertions.*;
12+
13+
class ReadRdfFilesTest extends AbstractIntegrationTest {
14+
15+
@Test
16+
void rdfXml() {
17+
Dataset<Row> dataset = newSparkSession()
18+
.read()
19+
.format(CONNECTOR_IDENTIFIER)
20+
.option(Options.READ_FILES_TYPE, "rdf")
21+
.load("src/test/resources/rdf/mini-taxonomy.xml");
22+
23+
List<Row> rows = dataset.collectAsList();
24+
assertEquals(8, rows.size(), "Expecting 8 triples, as there are 8 child elements in the " +
25+
"single rdf:Description element in the test file.");
26+
27+
// Verify a few triples to make sure things look good.
28+
final String subject = "http://vocabulary.worldbank.org/taxonomy/451";
29+
verifyRow(rows.get(0), subject, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", "http://www.w3.org/2004/02/skos/core#Concept");
30+
verifyRow(rows.get(1), subject, "http://purl.org/dc/terms/creator", "wb", "http://www.w3.org/2001/XMLSchema#string", null);
31+
verifyRow(rows.get(4), subject, "http://www.w3.org/2004/02/skos/core#prefLabel", "Debt Management", null, "en");
32+
}
33+
34+
/**
35+
* Verifies that blank nodes are generated in the same manner as with MLCP.
36+
*/
37+
@Test
38+
void blankNodes() {
39+
Dataset<Row> dataset = newSparkSession()
40+
.read()
41+
.format(CONNECTOR_IDENTIFIER)
42+
.option(Options.READ_FILES_TYPE, "rdf")
43+
.load("src/test/resources/rdf/blank-nodes.xml");
44+
45+
dataset.show(10, 0, true);
46+
List<Row> rows = dataset.collectAsList();
47+
assertEquals(4, rows.size());
48+
49+
verifyRow(rows.get(0), "http://example.org/web-data", "http://example.org/data#title", "Web Data",
50+
"http://www.w3.org/2001/XMLSchema#string", null);
51+
52+
assertBlankValue(rows.get(1).getString(2));
53+
assertBlankValue(rows.get(2).getString(0));
54+
assertBlankValue(rows.get(3).getString(0));
55+
}
56+
57+
private void verifyRow(Row row, String subject, String predicate, String object) {
58+
verifyRow(row, subject, predicate, object, null, null);
59+
}
60+
61+
private void verifyRow(Row row, String subject, String predicate, String object, String datatype, String lang) {
62+
assertEquals(subject, row.getString(0));
63+
assertEquals(predicate, row.getString(1));
64+
assertEquals(object, row.getString(2));
65+
assertEquals(datatype, row.get(3));
66+
assertEquals(lang, row.getString(4));
67+
assertNull(row.get(5), "The graph is expected to be null since these are triples and not quads.");
68+
}
69+
70+
private void assertBlankValue(String value) {
71+
assertTrue(value.startsWith("http://marklogic.com/semantics/blank/"),
72+
"We are reusing copy/pasted code from MLCP for generating a 'blank' value, which is expected to end with " +
73+
"a random hex value. It is not known why this isn't just a Java-generated UUID; we're simply reusing " +
74+
"the code because it's what MLCP does. Actual value: " + value);
75+
}
76+
}

0 commit comments

Comments
 (0)