Skip to content

Commit d157d90

Browse files
author
Wang, Gang(Gary)
committed
MNEMONIC-259: Add test cases to verify its functionality of the DurableRDD Direct IO
1 parent 636532e commit d157d90

File tree

5 files changed

+119
-10
lines changed

5 files changed

+119
-10
lines changed

build-tools/test.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,5 @@ mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDPersonDataSpec test -pl mne
6969

7070
mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDBufferDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
7171

72+
mvn -Dsuites=org.apache.mnemonic.spark.DurableBufferIODataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
73+

mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableSparkFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ class DurableSparkFunctions(sc: SparkContext) extends Serializable {
3737
}
3838

3939
object DurableSparkFunctions {
40-
implicit def addDurableFunctions(sc: SparkContext) = new DurableSparkFunctions(sc)
40+
implicit def addDurableSparkFunctions(sc: SparkContext) = new DurableSparkFunctions(sc)
4141
}

mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
4444
preservesPartitioning: Boolean = false)
4545
extends RDD[D](_sc, deps) {
4646

47-
private val isInputOnly = null == deps
47+
private val isInputOnly = Nil == deps
4848

4949
private var durdddir:String = _
5050
if (isInputOnly) {
@@ -64,7 +64,9 @@ private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
6464
if (isInputOnly) {
6565
val ret = DurableRDD.collectMemPoolPartitionList(durdddir).getOrElse(Array[Partition]())
6666
if (ret.isEmpty) {
67-
logInfo(s"Not found any partitions in the directory ${durdddir}")
67+
logWarning(s"Not found any partitions in the directory ${durdddir}")
68+
} else {
69+
logInfo(s"Found ${ret.length} partitions in the directory ${durdddir}")
6870
}
6971
ret
7072
} else {
@@ -78,7 +80,7 @@ private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
7880
val memplist = mempListOpt match {
7981
case None => {
8082
if (isInputOnly) {
81-
logInfo(s"Not found any mem pool files related to the partition #${context.partitionId}")
83+
logWarning(s"Not found any mem pool files related to the partition #${context.partitionId}")
8284
Array[File]()
8385
} else {
8486
val mplst = DurableRDD.prepareDurablePartition[D, T](durdddir,
@@ -112,7 +114,7 @@ object DurableRDD {
112114

113115
val durableSubDirNameTemplate = "durable-rdd-%010d"
114116
val durableFileNameTemplate = "mem_%010d_%010d.mne"
115-
val durableFileNamePartitionRegex = raw"mem_(\d{10})_0000000000.mne".r
117+
val durableFileNamePartitionRegex = raw".*mem_(\d{10})_0000000000.mne".r
116118

117119
private var durableDir: Option[String] = None
118120

@@ -176,7 +178,7 @@ object DurableRDD {
176178
val paridset = new mutable.TreeSet[Int]
177179
val dir = new File(path)
178180
if (dir.isDirectory) {
179-
val flst = dir.listFiles.filter(_.isDirectory)
181+
val flst = dir.listFiles.filter(!_.isDirectory)
180182
for (file <- flst) {
181183
file.toString match {
182184
case durableFileNamePartitionRegex(paridx) => {
@@ -186,7 +188,7 @@ object DurableRDD {
186188
}
187189
}
188190
}
189-
Option(paridset.toArray.map(x => new Partition { val index = x }))
191+
Option(paridset.toArray.map(x => new Partition with Serializable { override def index:Int = x }))
190192
}
191193

192194
def collectMemPoolFileList(durddir: String, memFileNameGen: (Long)=>String): Option[Array[File]] = {
@@ -253,7 +255,7 @@ object DurableRDD {
253255
sc: SparkContext, path: String,
254256
serviceName: String, durableTypes: Array[DurableType],
255257
entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long) = {
256-
val ret = new DurableRDD[D, Unit](sc, null,
258+
val ret = new DurableRDD[D, Unit](sc, Nil,
257259
serviceName, durableTypes, entityFactoryProxies, slotKeyId,
258260
1024*1024*1024L, path, null)
259261
ret

mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.mnemonic.NonVolatileMemAllocator
2525
import org.apache.mnemonic.DurableType
2626
import org.apache.mnemonic.EntityFactoryProxy
2727
import org.apache.mnemonic.sessions.ObjectCreator
28+
import org.apache.commons.io.FileUtils
2829

2930
class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
3031

@@ -41,14 +42,17 @@ class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
4142
partitionPoolSize, f, preservesPartitioning)
4243
}
4344

44-
def saveAsMnemonic[D: ClassTag] (path: String,
45+
def saveAsMnemonic[D: ClassTag] (path: String, cleanPath: Boolean,
4546
serviceName: String,
4647
durableTypes: Array[DurableType],
4748
entityFactoryProxies: Array[EntityFactoryProxy],
4849
slotKeyId: Long,
4950
partitionPoolSize: Long,
5051
f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D]) {
5152
val dir = new File(path)
53+
if (cleanPath && dir.exists) {
54+
FileUtils.deleteDirectory(dir)
55+
}
5256
if (!dir.exists) {
5357
dir.mkdir
5458
}
@@ -61,5 +65,5 @@ class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
6165
}
6266

6367
object DurableRDDFunctions {
64-
implicit def addDurableFunctions[T: ClassTag](rdd: RDD[T]) = new DurableRDDFunctions[T](rdd)
68+
implicit def addDurableRDDFunctions[T: ClassTag](rdd: RDD[T]) = new DurableRDDFunctions[T](rdd)
6569
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.mnemonic.spark
19+
20+
import java.util.zip.CRC32
21+
22+
import org.apache.mnemonic.{DurableBuffer, DurableType}
23+
import org.apache.mnemonic.sessions.ObjectCreator
24+
import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
25+
import org.apache.mnemonic.spark.DurableSparkFunctions._
26+
27+
import org.apache.spark._
28+
import org.apache.spark.rdd._
29+
30+
import scala.language.existentials
31+
import scala.util._
32+
33+
class DurableBufferIODataSpec extends TestSpec {
34+
35+
val defaultServiceName = "pmalloc"
36+
val defaultSlotKeyId = 2L
37+
val defaultPartitionSize = 1024 * 1024 * 1024L
38+
val defaultBaseDirectory = "."
39+
val defaultNumOfPartitions = 8
40+
val defaultNumOfRecordsPerPartition = 200
41+
val mneIODirPath = "./target/mne_io_data"
42+
43+
behavior of "A SparkContext with DurableRDD"
44+
45+
it should "supports durable buffer for direct IO operations" in {
46+
val dataOffset = 8
47+
val conf = new SparkConf()
48+
.setMaster("local[*]")
49+
.setAppName("Test")
50+
val sc = new SparkContext(conf)
51+
val seed: RDD[Int] = sc.parallelize(
52+
Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
53+
val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt(1024 * 1024) + 1024 * 1024)) cache
54+
55+
val durdd = data.saveAsMnemonic[DurableBuffer[_]](
56+
mneIODirPath, true,
57+
defaultServiceName,
58+
Array(DurableType.BUFFER), Array(),
59+
defaultSlotKeyId, defaultPartitionSize,
60+
(v: Int, oc: ObjectCreator[DurableBuffer[_], _])=>
61+
{
62+
val cs = new CRC32
63+
cs.reset
64+
val buffer = oc.newDurableObjectRecord(v)
65+
val bary = new Array[Byte](v - dataOffset)
66+
if (null != buffer) {
67+
buffer.clear
68+
Random.nextBytes(bary)
69+
cs.update(bary, 0, bary.length)
70+
buffer.get.putLong(cs.getValue)
71+
buffer.get.put(bary)
72+
}
73+
Option(buffer)
74+
})
75+
76+
val indurdd = sc.mnemonic[DurableBuffer[_]](mneIODirPath, defaultServiceName,
77+
Array(DurableType.BUFFER), Array(), defaultSlotKeyId)
78+
val durtsz = indurdd map (_.getSize.toInt) sum
79+
val derrcount = indurdd map (
80+
buffer => {
81+
var chksum: Long = -1L
82+
val cs = new CRC32
83+
cs.reset
84+
if (null != buffer) {
85+
buffer.clear
86+
chksum = buffer.get.getLong
87+
val bary = new Array[Byte](buffer.get.remaining)
88+
buffer.get.get(bary)
89+
cs.update(bary)
90+
}
91+
if (chksum != cs.getValue) 1 else 0
92+
}) sum
93+
94+
val (rerrcnt: Long, rsz: Long) = (0L, data.sum.toLong)
95+
val (derrcnt: Long, dsz: Long) = (derrcount.toLong, durtsz.toLong)
96+
97+
assertResult((rerrcnt, rsz)) {
98+
(derrcnt, dsz)
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)