Skip to content

Commit 4a250ed

Browse files
committed
Fixing logging for writing zip files
1 parent 228340a commit 4a250ed

File tree

4 files changed

+24
-14
lines changed

4 files changed

+24
-14
lines changed

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ private synchronized void writeFailedDocumentToArchive(DocumentWriteOperation fa
249249
} catch (Exception e) {
250250
ConnectorException ex = new ConnectorException(String.format(
251251
"Unable to write failed documents to archive file at %s; URI of failed document: %s; cause: %s",
252-
archiveWriter.getZipPath(), failedDoc.getUri(), e.getMessage()
252+
archiveWriter.getZipFilePath(), failedDoc.getUri(), e.getMessage()
253253
), e);
254254
this.writeFailure.compareAndSet(null, ex);
255255
throw ex;
@@ -266,7 +266,7 @@ private ZipFileWriter createArchiveWriter(SerializableConfiguration hadoopConfig
266266
private void closeArchiveWriter() {
267267
if (archiveWriter != null) {
268268
if (failedItemCount.get() > 0) {
269-
Util.MAIN_LOGGER.info("Wrote failed documents to archive file at {}.", archiveWriter.getZipPath());
269+
Util.MAIN_LOGGER.info("Wrote failed documents to archive file at {}.", archiveWriter.getZipFilePath());
270270
}
271271
archiveWriter.close();
272272
}

src/main/java/com/marklogic/spark/writer/file/DocumentFileBatch.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,26 @@ public void commit(WriterCommitMessage[] messages) {
4040
int zipFileCount = 0;
4141
int zipEntryCount = 0;
4242
String path = null;
43+
String zipFilePath = null;
4344
for (WriterCommitMessage message : messages) {
4445
if (message instanceof FileCommitMessage) {
45-
path = ((FileCommitMessage)message).getPath();
46+
path = ((FileCommitMessage) message).getPath();
4647
fileCount += ((FileCommitMessage) message).getFileCount();
4748
} else if (message instanceof ZipCommitMessage) {
4849
path = ((ZipCommitMessage)message).getPath();
50+
zipFilePath = ((ZipCommitMessage) message).getZipFilePath();
4951
zipFileCount++;
5052
zipEntryCount += ((ZipCommitMessage) message).getZipEntryCount();
5153
}
5254
}
5355
if (fileCount == 1) {
54-
Util.MAIN_LOGGER.info("Wrote 1 file to {}.", path);
56+
Util.MAIN_LOGGER.info("Wrote 1 file to: {}.", path);
5557
} else if (fileCount > 1) {
56-
Util.MAIN_LOGGER.info("Wrote {} files to {}.", fileCount, path);
58+
Util.MAIN_LOGGER.info("Wrote {} files to: {}.", fileCount, path);
5759
} else if (zipFileCount == 1) {
58-
Util.MAIN_LOGGER.info("Wrote 1 zip file containing {} entries to {}.", zipEntryCount, path);
60+
Util.MAIN_LOGGER.info("Wrote 1 zip file containing {} entries to: {}.", zipEntryCount, zipFilePath);
5961
} else if (zipFileCount > 1) {
60-
Util.MAIN_LOGGER.info("Wrote {} zip files containing a total of {} entries to {}.", zipFileCount, zipEntryCount, path);
62+
Util.MAIN_LOGGER.info("Wrote {} zip files containing a total of {} entries to: {}.", zipFileCount, zipEntryCount, path);
6163
}
6264
}
6365
}

src/main/java/com/marklogic/spark/writer/file/ZipCommitMessage.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,23 @@
88
class ZipCommitMessage implements WriterCommitMessage {
99

1010
private final String path;
11+
private final String zipFilePath;
1112
private final int zipEntryCount;
1213

13-
ZipCommitMessage(String path, int zipEntryCount) {
14+
ZipCommitMessage(String path, String zipFilePath, int zipEntryCount) {
1415
this.path = path;
16+
this.zipFilePath = zipFilePath;
1517
this.zipEntryCount = zipEntryCount;
1618
}
1719

1820
String getPath() {
1921
return path;
2022
}
2123

24+
String getZipFilePath() {
25+
return zipFilePath;
26+
}
27+
2228
int getZipEntryCount() {
2329
return zipEntryCount;
2430
}

src/main/java/com/marklogic/spark/writer/file/ZipFileWriter.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ public class ZipFileWriter implements DataWriter<InternalRow> {
3131
private final ContextSupport context;
3232
private final SerializableConfiguration hadoopConfiguration;
3333

34-
private final String zipPath;
34+
private final String path;
35+
private final String zipFilePath;
3536

3637
// These can be instantiated lazily depending on which constructor is used.
3738
private ContentWriter contentWriter;
@@ -45,7 +46,8 @@ public class ZipFileWriter implements DataWriter<InternalRow> {
4546

4647
public ZipFileWriter(String path, Map<String, String> properties, SerializableConfiguration hadoopConfiguration,
4748
int partitionId, boolean createZipFileImmediately) {
48-
this.zipPath = makeFilePath(path, partitionId);
49+
this.path = path;
50+
this.zipFilePath = makeFilePath(path, partitionId);
4951
this.context = new ContextSupport(properties);
5052
this.hadoopConfiguration = hadoopConfiguration;
5153
if (createZipFileImmediately) {
@@ -75,7 +77,7 @@ public void close() {
7577

7678
@Override
7779
public WriterCommitMessage commit() {
78-
return new ZipCommitMessage(zipPath, zipEntryCounter);
80+
return new ZipCommitMessage(path, zipFilePath, zipEntryCounter);
7981
}
8082

8183
@Override
@@ -84,7 +86,7 @@ public void abort() {
8486
}
8587

8688
private void createZipFileAndContentWriter() {
87-
Path filePath = new Path(zipPath);
89+
Path filePath = new Path(zipFilePath);
8890
if (logger.isDebugEnabled()) {
8991
logger.debug("Will write to: {}", filePath);
9092
}
@@ -131,7 +133,7 @@ private String makeFilePath(String path, int partitionId) {
131133
return String.format("%s%s%s-%d.zip", path, File.separator, timestamp, partitionId);
132134
}
133135

134-
public String getZipPath() {
135-
return zipPath;
136+
public String getZipFilePath() {
137+
return zipFilePath;
136138
}
137139
}

0 commit comments

Comments
 (0)