Skip to content

Long delay after writing edge snapshots due to single-threaded renaming of output files by the Spark driver #324

@arvindshmicrosoft

Description

@arvindshmicrosoft

I generated SF30000 dataset directly to ADLS Gen 2. What I observed is that after the major snapshots are written, there are significant delays due to single-threaded renaming of the output files. There is a simple, serialized, loop to do this, and all of the action is on the driver. When working with remote cloud storage the overall delays are really significant, taking up ~ 67% of the total datagen time:

stage delay HH:MM after stage is "done" # of Spark tasks
write Comment snapshot 04:28 94499/94499
write Person -[Likes]-> Comment snapshot 03:31 79943/79943
write Forum -[HasMember]-> Person snapshot 03:10 74566/74566
write Comment -[HasTag]-> Tag snapshot 02:24 55907/55907
write Person -[Likes]-> Post snapshot 01:17 34600/34600
write Post snapshot 00:57 23379/23379
write Post -[HasTag]-> Tag snapshot 00:39 15006/15006

The subsequent ones (inserts and deletes) for each of these edges, does not have as bad of a "serialized" behavior, as they seem to write much smaller numbers of files as compared to the the initial snapshot. Is there any way to control the numbers of files produced by the initial snapshot writing?

A sample call stack of the "slow behavior" for the snapshots is below.

"main" #1 prio=5 os_prio=0 tid=0x00007f8e6c05f800 nid=0x4690 runnable [0x00007f8e73805000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at org.wildfly.openssl.OpenSSLSocket.read(OpenSSLSocket.java:423)
        at org.wildfly.openssl.OpenSSLInputStream.read(OpenSSLInputStream.java:41)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        - locked <0x00000000e2cead58> (a java.io.BufferedInputStream)
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
        - locked <0x00000000e2ce1328> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
        - locked <0x00000000e2ce1328> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
        at org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation.processResponse(AbfsHttpOperation.java:330)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:274)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:205)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:181)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation$$Lambda$257/1878700101.apply(Unknown Source)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:454)
        at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:179)
        at org.apache.hadoop.fs.azurebfs.services.AbfsClient.renamePath(AbfsClient.java:500)
        at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.rename(AzureBlobFileSystemStore.java:787)
        at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.rename(AzureBlobFileSystem.java:355)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:476)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:490)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:405)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
        - locked <0x00000000a754ea00> (a org.apache.spark.sql.execution.command.DataWritingCommandExec)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan$$Lambda$3184/1809758959.apply(Unknown Source)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.sql.execution.SparkPlan$$Lambda$3185/128469338.apply(Unknown Source)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
        - locked <0x00000000a75409c8> (a org.apache.spark.sql.execution.QueryExecution)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at org.apache.spark.sql.DataFrameWriter$$Lambda$2963/729861519.apply(Unknown Source)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2968/904215674.apply(Unknown Source)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2964/495353859.apply(Unknown Source)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
        at ldbc.snb.datagen.io.dataframes$DataFrameWriter$.write(dataframes.scala:57)
        at ldbc.snb.datagen.io.dataframes$DataFrameWriter$.write(dataframes.scala:49)
        at ldbc.snb.datagen.io.Writer$WriterOps.write(Writer.scala:17)
        at ldbc.snb.datagen.io.Writer$WriterOps.write$(Writer.scala:17)
        at ldbc.snb.datagen.io.Writer$ops$$anon$1.write(Writer.scala:26)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$4(graphs.scala:118)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter$$Lambda$2947/1416238194.apply$mcV$sp(Unknown Source)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at ldbc.snb.datagen.util.SparkUI$.job(SparkUI.scala:11)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$3(graphs.scala:120)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$3$adapted(graphs.scala:113)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter$$Lambda$2946/34405587.apply(Unknown Source)
        at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:103)
        at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:102)
        at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:102)
        at scala.collection.immutable.RedBlackTree$.foreach(RedBlackTree.scala:99)
        at scala.collection.immutable.TreeMap.foreach(TreeMap.scala:205)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.write(graphs.scala:113)
        at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.write(graphs.scala:103)
        at ldbc.snb.datagen.io.Writer$WriterOps.write(Writer.scala:17)
        at ldbc.snb.datagen.io.Writer$WriterOps.write$(Writer.scala:17)
        at ldbc.snb.datagen.io.Writer$ops$$anon$1.write(Writer.scala:26)
        at ldbc.snb.datagen.transformation.TransformationStage$write$1$.$anonfun$caseBatched$1(TransformationStage.scala:36)
        at ldbc.snb.datagen.transformation.TransformationStage$write$1$.$anonfun$caseBatched$1$adapted(TransformationStage.scala:35)
        at ldbc.snb.datagen.transformation.TransformationStage$write$1$$$Lambda$2939/533945153.apply(Unknown Source)
        at shapeless.Poly1$CaseBuilder$$anon$1.$anonfun$value$1(polyntraits.scala:36)
        at shapeless.Poly1$CaseBuilder$$anon$1$$Lambda$2938/488509759.apply(Unknown Source)
        at shapeless.PolyDefns$Case.apply(poly.scala:39)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:330)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:331)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:331)
        at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
        at shapeless.syntax.CoproductOps$.map$extension(coproduct.scala:160)
        at ldbc.snb.datagen.transformation.TransformationStage$.run(TransformationStage.scala:57)
        at ldbc.snb.datagen.spark.LdbcDatagen$.run(LdbcDatagen.scala:146)
        at ldbc.snb.datagen.spark.LdbcDatagen$.main(LdbcDatagen.scala:106)
        at ldbc.snb.datagen.spark.LdbcDatagen.main(LdbcDatagen.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
        at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions