-
Notifications
You must be signed in to change notification settings - Fork 59
Description
I generated SF30000 dataset directly to ADLS Gen 2. What I observed is that after the major snapshots are written, there are significant delays due to single-threaded renaming of the output files. There is a simple, serialized, loop to do this, and all of the action is on the driver. When working with remote cloud storage the overall delays are really significant, taking up ~ 67% of the total datagen time:
stage | delay HH:MM after stage is "done" | # of Spark tasks |
---|---|---|
write Comment snapshot | 04:28 | 94499/94499 |
write Person -[Likes]-> Comment snapshot | 03:31 | 79943/79943 |
write Forum -[HasMember]-> Person snapshot | 03:10 | 74566/74566 |
write Comment -[HasTag]-> Tag snapshot | 02:24 | 55907/55907 |
write Person -[Likes]-> Post snapshot | 01:17 | 34600/34600 |
write Post snapshot | 00:57 | 23379/23379 |
write Post -[HasTag]-> Tag snapshot | 00:39 | 15006/15006 |
The subsequent ones (inserts and deletes) for each of these edges, does not have as bad of a "serialized" behavior, as they seem to write much smaller numbers of files as compared to the the initial snapshot. Is there any way to control the numbers of files produced by the initial snapshot writing?
A sample call stack of the "slow behavior" for the snapshots is below.
"main" #1 prio=5 os_prio=0 tid=0x00007f8e6c05f800 nid=0x4690 runnable [0x00007f8e73805000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.wildfly.openssl.OpenSSLSocket.read(OpenSSLSocket.java:423)
at org.wildfly.openssl.OpenSSLInputStream.read(OpenSSLInputStream.java:41)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <0x00000000e2cead58> (a java.io.BufferedInputStream)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
- locked <0x00000000e2ce1328> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
- locked <0x00000000e2ce1328> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
at org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation.processResponse(AbfsHttpOperation.java:330)
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:274)
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:205)
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:181)
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation$$Lambda$257/1878700101.apply(Unknown Source)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:454)
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:179)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.renamePath(AbfsClient.java:500)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.rename(AzureBlobFileSystemStore.java:787)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.rename(AzureBlobFileSystem.java:355)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:476)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:490)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:405)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
- locked <0x00000000a754ea00> (a org.apache.spark.sql.execution.command.DataWritingCommandExec)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$3184/1809758959.apply(Unknown Source)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$3185/128469338.apply(Unknown Source)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
- locked <0x00000000a75409c8> (a org.apache.spark.sql.execution.QueryExecution)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter$$Lambda$2963/729861519.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2968/904215674.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2964/495353859.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at ldbc.snb.datagen.io.dataframes$DataFrameWriter$.write(dataframes.scala:57)
at ldbc.snb.datagen.io.dataframes$DataFrameWriter$.write(dataframes.scala:49)
at ldbc.snb.datagen.io.Writer$WriterOps.write(Writer.scala:17)
at ldbc.snb.datagen.io.Writer$WriterOps.write$(Writer.scala:17)
at ldbc.snb.datagen.io.Writer$ops$$anon$1.write(Writer.scala:26)
at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$4(graphs.scala:118)
at ldbc.snb.datagen.io.graphs$BatchedGraphWriter$$Lambda$2947/1416238194.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at ldbc.snb.datagen.util.SparkUI$.job(SparkUI.scala:11)
at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$3(graphs.scala:120)
at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.$anonfun$write$3$adapted(graphs.scala:113)
at ldbc.snb.datagen.io.graphs$BatchedGraphWriter$$Lambda$2946/34405587.apply(Unknown Source)
at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:103)
at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:102)
at scala.collection.immutable.RedBlackTree$._foreach(RedBlackTree.scala:102)
at scala.collection.immutable.RedBlackTree$.foreach(RedBlackTree.scala:99)
at scala.collection.immutable.TreeMap.foreach(TreeMap.scala:205)
at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.write(graphs.scala:113)
at ldbc.snb.datagen.io.graphs$BatchedGraphWriter.write(graphs.scala:103)
at ldbc.snb.datagen.io.Writer$WriterOps.write(Writer.scala:17)
at ldbc.snb.datagen.io.Writer$WriterOps.write$(Writer.scala:17)
at ldbc.snb.datagen.io.Writer$ops$$anon$1.write(Writer.scala:26)
at ldbc.snb.datagen.transformation.TransformationStage$write$1$.$anonfun$caseBatched$1(TransformationStage.scala:36)
at ldbc.snb.datagen.transformation.TransformationStage$write$1$.$anonfun$caseBatched$1$adapted(TransformationStage.scala:35)
at ldbc.snb.datagen.transformation.TransformationStage$write$1$$$Lambda$2939/533945153.apply(Unknown Source)
at shapeless.Poly1$CaseBuilder$$anon$1.$anonfun$value$1(polyntraits.scala:36)
at shapeless.Poly1$CaseBuilder$$anon$1$$Lambda$2938/488509759.apply(Unknown Source)
at shapeless.PolyDefns$Case.apply(poly.scala:39)
at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:330)
at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:331)
at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:331)
at shapeless.ops.coproduct$Mapper$$anon$15.apply(coproduct.scala:327)
at shapeless.syntax.CoproductOps$.map$extension(coproduct.scala:160)
at ldbc.snb.datagen.transformation.TransformationStage$.run(TransformationStage.scala:57)
at ldbc.snb.datagen.spark.LdbcDatagen$.run(LdbcDatagen.scala:146)
at ldbc.snb.datagen.spark.LdbcDatagen$.main(LdbcDatagen.scala:106)
at ldbc.snb.datagen.spark.LdbcDatagen.main(LdbcDatagen.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)