Skip to content

Commit cba520b

Browse files
committed
MLE-17095 Can now stream when exporting an archive
Was fairly simple, since `ContentWriter` already supported streaming normal documents.
1 parent e5aa9f6 commit cba520b

File tree

8 files changed

+110
-36
lines changed

8 files changed

+110
-36
lines changed

src/main/java/com/marklogic/spark/Util.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package com.marklogic.spark;
55

6+
import com.marklogic.client.document.DocumentManager;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89

@@ -72,4 +73,21 @@ static String getOptionNameForErrorMessage(String option) {
7273
String optionName = bundle.getString(option);
7374
return optionName != null && optionName.trim().length() > 0 ? optionName.trim() : option;
7475
}
76+
77+
static Set<DocumentManager.Metadata> getRequestedMetadata(ContextSupport context) {
78+
Set<DocumentManager.Metadata> set = new HashSet<>();
79+
if (context.hasOption(Options.READ_DOCUMENTS_CATEGORIES)) {
80+
for (String category : context.getStringOption(Options.READ_DOCUMENTS_CATEGORIES).split(",")) {
81+
if ("content".equalsIgnoreCase(category)) {
82+
continue;
83+
}
84+
if ("metadata".equalsIgnoreCase(category)) {
85+
set.add(DocumentManager.Metadata.ALL);
86+
} else {
87+
set.add(DocumentManager.Metadata.valueOf(category.toUpperCase()));
88+
}
89+
}
90+
}
91+
return set;
92+
}
7593
}

src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@
44
package com.marklogic.spark.reader.document;
55

66
import com.marklogic.client.DatabaseClient;
7-
import com.marklogic.client.document.DocumentManager;
87
import com.marklogic.client.query.SearchQueryDefinition;
98
import com.marklogic.spark.ContextSupport;
109
import com.marklogic.spark.Options;
1110
import org.apache.spark.sql.types.StructType;
1211
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
1312

14-
import java.util.HashSet;
1513
import java.util.Map;
16-
import java.util.Set;
1714

1815
class DocumentContext extends ContextSupport {
1916

@@ -25,23 +22,6 @@ class DocumentContext extends ContextSupport {
2522
this.schema = schema;
2623
}
2724

28-
Set<DocumentManager.Metadata> getRequestedMetadata() {
29-
Set<DocumentManager.Metadata> set = new HashSet<>();
30-
if (hasOption(Options.READ_DOCUMENTS_CATEGORIES)) {
31-
for (String category : getStringOption(Options.READ_DOCUMENTS_CATEGORIES).split(",")) {
32-
if ("content".equalsIgnoreCase(category)) {
33-
continue;
34-
}
35-
if ("metadata".equalsIgnoreCase(category)) {
36-
set.add(DocumentManager.Metadata.ALL);
37-
} else {
38-
set.add(DocumentManager.Metadata.valueOf(category.toUpperCase()));
39-
}
40-
}
41-
}
42-
return set;
43-
}
44-
4525
boolean contentWasRequested() {
4626
if (isStreamingFiles()) {
4727
return false;

src/main/java/com/marklogic/spark/reader/document/DocumentScanBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
*/
44
package com.marklogic.spark.reader.document;
55

6-
import com.marklogic.spark.Options;
76
import com.marklogic.spark.Util;
87
import org.apache.spark.sql.connector.read.Scan;
98
import org.apache.spark.sql.connector.read.ScanBuilder;

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.marklogic.client.query.StructuredQueryBuilder;
1717
import com.marklogic.spark.Options;
1818
import com.marklogic.spark.ReadProgressLogger;
19+
import com.marklogic.spark.Util;
1920
import org.apache.spark.sql.catalyst.InternalRow;
2021
import org.apache.spark.sql.connector.read.PartitionReader;
2122
import org.slf4j.Logger;
@@ -74,7 +75,7 @@ class ForestReader implements PartitionReader<InternalRow> {
7475
this.documentManager = client.newDocumentManager();
7576
this.documentManager.setReadTransform(query.getResponseTransform());
7677
this.contentWasRequested = context.contentWasRequested();
77-
this.requestedMetadata = context.getRequestedMetadata();
78+
this.requestedMetadata = Util.getRequestedMetadata(context);
7879
this.documentManager.setMetadataCategories(this.requestedMetadata);
7980
this.queryBuilder = client.newQueryManager().newStructuredQueryBuilder();
8081
}

src/main/java/com/marklogic/spark/reader/file/GenericFileReader.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package com.marklogic.spark.reader.file;
55

66
import com.marklogic.spark.ConnectorException;
7-
import com.marklogic.spark.Options;
87
import com.marklogic.spark.Util;
98
import org.apache.spark.sql.catalyst.InternalRow;
109
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;

src/main/java/com/marklogic/spark/writer/file/ContentWriter.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import com.fasterxml.jackson.databind.JsonNode;
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import com.marklogic.client.document.GenericDocumentManager;
9+
import com.marklogic.client.io.DocumentMetadataHandle;
910
import com.marklogic.client.io.InputStreamHandle;
1011
import com.marklogic.spark.ConnectorException;
1112
import com.marklogic.spark.ContextSupport;
1213
import com.marklogic.spark.Options;
14+
import com.marklogic.spark.Util;
1315
import com.marklogic.spark.reader.document.DocumentRowSchema;
1416
import org.apache.commons.io.IOUtils;
1517
import org.apache.spark.sql.catalyst.InternalRow;
@@ -38,7 +40,7 @@ class ContentWriter {
3840
private final Charset encoding;
3941

4042
private final boolean isStreamingFiles;
41-
// Only set when streaming.
43+
// Only used when streaming.
4244
private final GenericDocumentManager documentManager;
4345

4446
ContentWriter(Map<String, String> properties) {
@@ -54,7 +56,14 @@ class ContentWriter {
5456
}
5557

5658
this.isStreamingFiles = context.isStreamingFiles();
57-
this.documentManager = this.isStreamingFiles ? context.connectToMarkLogic().newDocumentManager() : null;
59+
if (this.isStreamingFiles) {
60+
this.documentManager = context.connectToMarkLogic().newDocumentManager();
61+
if (context.hasOption(Options.READ_DOCUMENTS_CATEGORIES)) {
62+
this.documentManager.setMetadataCategories(Util.getRequestedMetadata(context));
63+
}
64+
} else {
65+
this.documentManager = null;
66+
}
5867
}
5968

6069
void writeContent(InternalRow row, OutputStream outputStream) throws IOException {
@@ -73,6 +82,21 @@ void writeContent(InternalRow row, OutputStream outputStream) throws IOException
7382

7483
void writeMetadata(InternalRow row, OutputStream outputStream) throws IOException {
7584
String metadataXml = DocumentRowSchema.makeDocumentMetadata(row).toString();
85+
writeMetadata(metadataXml, outputStream);
86+
}
87+
88+
/**
89+
* When streaming documents to an archive, the metadata unfortunately has to be retrieved in a separate request
90+
* per document. This is due to the Java Client hardcoding "content" as a category in a POST to v1/search. A
91+
* future fix to the Java Client to not hardcode this will allow for the metadata to be retrieved during the
92+
* reader phase.
93+
*/
94+
void writeMetadataWhileStreaming(String documentUri, OutputStream outputStream) throws IOException {
95+
DocumentMetadataHandle metadata = this.documentManager.readMetadata(documentUri, new DocumentMetadataHandle());
96+
writeMetadata(metadata.toString(), outputStream);
97+
}
98+
99+
private void writeMetadata(String metadataXml, OutputStream outputStream) throws IOException {
76100
// Must honor the encoding here as well, as a user could easily have values that require encoding in metadata
77101
// values or in a properties fragment.
78102
if (this.encoding != null) {

src/main/java/com/marklogic/spark/writer/file/ZipFileWriter.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package com.marklogic.spark.writer.file;
55

66
import com.marklogic.spark.ConnectorException;
7+
import com.marklogic.spark.ContextSupport;
8+
import com.marklogic.spark.Options;
79
import org.apache.commons.io.IOUtils;
810
import org.apache.hadoop.fs.FileSystem;
911
import org.apache.hadoop.fs.Path;
@@ -26,7 +28,7 @@ public class ZipFileWriter implements DataWriter<InternalRow> {
2628

2729
private static final Logger logger = LoggerFactory.getLogger(ZipFileWriter.class);
2830

29-
private final Map<String, String> properties;
31+
private final ContextSupport context;
3032
private final SerializableConfiguration hadoopConfiguration;
3133

3234
private final String zipPath;
@@ -44,7 +46,7 @@ public class ZipFileWriter implements DataWriter<InternalRow> {
4446
public ZipFileWriter(String path, Map<String, String> properties, SerializableConfiguration hadoopConfiguration,
4547
int partitionId, boolean createZipFileImmediately) {
4648
this.zipPath = makeFilePath(path, partitionId);
47-
this.properties = properties;
49+
this.context = new ContextSupport(properties);
4850
this.hadoopConfiguration = hadoopConfiguration;
4951
if (createZipFileImmediately) {
5052
createZipFileAndContentWriter();
@@ -56,15 +58,11 @@ public void write(InternalRow row) throws IOException {
5658
if (contentWriter == null) {
5759
createZipFileAndContentWriter();
5860
}
61+
5962
final String uri = row.getString(0);
6063
final String entryName = FileUtil.makePathFromDocumentURI(uri);
6164

62-
if (hasMetadata(row)) {
63-
zipOutputStream.putNextEntry(new ZipEntry(entryName + ".metadata"));
64-
this.contentWriter.writeMetadata(row, zipOutputStream);
65-
zipEntryCounter++;
66-
}
67-
65+
writeMetadataEntryIfNecessary(row, uri, entryName);
6866
zipOutputStream.putNextEntry(new ZipEntry(entryName));
6967
this.contentWriter.writeContent(row, zipOutputStream);
7068
zipEntryCounter++;
@@ -90,7 +88,7 @@ private void createZipFileAndContentWriter() {
9088
if (logger.isDebugEnabled()) {
9189
logger.debug("Will write to: {}", filePath);
9290
}
93-
this.contentWriter = new ContentWriter(properties);
91+
this.contentWriter = new ContentWriter(context.getProperties());
9492
try {
9593
FileSystem fileSystem = filePath.getFileSystem(hadoopConfiguration.value());
9694
fileSystem.setWriteChecksum(false);
@@ -100,6 +98,18 @@ private void createZipFileAndContentWriter() {
10098
}
10199
}
102100

101+
private void writeMetadataEntryIfNecessary(InternalRow row, String uri, String entryName) throws IOException {
102+
if (this.context.isStreamingFiles() && context.hasOption(Options.READ_DOCUMENTS_CATEGORIES)) {
103+
zipOutputStream.putNextEntry(new ZipEntry(entryName + ".metadata"));
104+
this.contentWriter.writeMetadataWhileStreaming(uri, zipOutputStream);
105+
zipEntryCounter++;
106+
} else if (hasMetadata(row)) {
107+
zipOutputStream.putNextEntry(new ZipEntry(entryName + ".metadata"));
108+
this.contentWriter.writeMetadata(row, zipOutputStream);
109+
zipEntryCounter++;
110+
}
111+
}
112+
103113
private boolean hasMetadata(InternalRow row) {
104114
return !row.isNullAt(3) || !row.isNullAt(4) || !row.isNullAt(5) || !row.isNullAt(6) || !row.isNullAt(7);
105115
}

src/test/java/com/marklogic/spark/writer/file/WriteArchiveTest.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,21 @@
77
import com.marklogic.spark.AbstractIntegrationTest;
88
import com.marklogic.spark.Options;
99
import com.marklogic.spark.TestUtil;
10+
import com.marklogic.spark.reader.document.DocumentRowSchema;
11+
import org.apache.spark.sql.Dataset;
1012
import org.apache.spark.sql.Row;
1113
import org.apache.spark.sql.SaveMode;
1214
import org.jdom2.Namespace;
1315
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Test;
1417
import org.junit.jupiter.api.io.TempDir;
1518
import org.junit.jupiter.params.ParameterizedTest;
1619
import org.junit.jupiter.params.provider.ValueSource;
1720

1821
import java.nio.file.Path;
1922
import java.util.List;
2023

21-
import static org.junit.jupiter.api.Assertions.assertEquals;
22-
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
import static org.junit.jupiter.api.Assertions.*;
2325

2426
class WriteArchiveTest extends AbstractIntegrationTest {
2527

@@ -55,6 +57,38 @@ void writeAllMetadata(String metadata, @TempDir Path tempDir) {
5557
verifyMetadataFiles(tempDir, metadata);
5658
}
5759

60+
@Test
61+
void streaming(@TempDir Path tempDir) {
62+
Dataset<Row> dataset = newSparkSession().read()
63+
.format(CONNECTOR_IDENTIFIER)
64+
.option(Options.CLIENT_URI, makeClientUri())
65+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "collection1")
66+
.option(Options.STREAM_FILES, true)
67+
.load();
68+
69+
dataset.collectAsList().forEach(row -> {
70+
assertNotNull(row.getString(0), "The URI column should have the URI of the document to retrieve during the writer phase.");
71+
for (int i = 1; i < DocumentRowSchema.SCHEMA.size(); i++) {
72+
assertTrue(row.isNullAt(i), "Every other column in the row should be null. We don't want the content, " +
73+
"as that will be retrieved by the writer. And we unfortunately can't get the metadata without " +
74+
"getting the content as well via a POST to v1/documents. So the writer will get the metadata " +
75+
"as well.");
76+
}
77+
});
78+
79+
dataset.repartition(1)
80+
.write()
81+
.format(CONNECTOR_IDENTIFIER)
82+
.option(Options.WRITE_FILES_COMPRESSION, "zip")
83+
.option(Options.READ_DOCUMENTS_CATEGORIES, "content,collections")
84+
.option(Options.CLIENT_URI, makeClientUri())
85+
.option(Options.STREAM_FILES, true)
86+
.mode(SaveMode.Append)
87+
.save(tempDir.toFile().getAbsolutePath());
88+
89+
verifyMetadataFiles(tempDir, "collections");
90+
}
91+
5892
private void verifyMetadataFiles(Path tempDir, String metadataValue) {
5993
final List<Row> rows = newSparkSession().read()
6094
.format(CONNECTOR_IDENTIFIER)
@@ -96,18 +130,22 @@ private void verifyMetadata(Row row, String metadataValue) {
96130
switch (metadataValue) {
97131
case "collections":
98132
verifyCollections(metadata);
133+
verifyPermissionsMissing(metadata);
99134
break;
100135
case "permissions":
101136
verifyPermissions(metadata);
102137
break;
103138
case "quality":
104139
verifyQuality(metadata);
140+
verifyPermissionsMissing(metadata);
105141
break;
106142
case "properties":
107143
verifyProperties(metadata);
144+
verifyPermissionsMissing(metadata);
108145
break;
109146
case "metadatavalues":
110147
verifyMetadataValues(metadata);
148+
verifyPermissionsMissing(metadata);
111149
break;
112150
case "metadata":
113151
verifyCollections(metadata);
@@ -131,6 +169,11 @@ private void verifyPermissions(XmlNode metadata) {
131169
metadata.assertElementExists(path + "[rapi:role-name = 'qconsole-user' and rapi:capability='read']");
132170
}
133171

172+
private void verifyPermissionsMissing(XmlNode metadata) {
173+
metadata.assertElementMissing("Permissions should not exist since they were not in the set of " +
174+
"metadata categories.", "/rapi:metadata/rapi:permissions");
175+
}
176+
134177
private void verifyQuality(XmlNode metadata) {
135178
metadata.assertElementValue("/rapi:metadata/rapi:quality", "10");
136179
}

0 commit comments

Comments
 (0)