15
15
* limitations under the License.
16
16
*/
17
17
18
- package org .apache .mnemonic .spark .rdd ;
18
+ package org .apache .mnemonic .spark .rdd
19
19
20
20
import java .io .File
21
- import scala .util ._
22
21
23
22
import org .apache .spark .rdd .RDD
24
- import org .apache .spark .{ Partition , TaskContext , SparkContext }
25
- import org .apache .spark .internal .Logging
23
+ import org .apache .spark ._
26
24
import org .apache .commons .io .FileUtils
27
- import scala .reflect .{ classTag , ClassTag }
28
- import scala .collection .mutable .HashMap
25
+ import scala .reflect .{ ClassTag }
29
26
import scala .collection .JavaConverters ._
30
27
import scala .collection .mutable .ArrayBuffer
31
- import org .apache .mnemonic .ConfigurationException
32
28
import org .apache .mnemonic .DurableType
33
29
import org .apache .mnemonic .EntityFactoryProxy
34
30
import org .apache .mnemonic .NonVolatileMemAllocator
35
- import org .apache .mnemonic .sessions .DurableInputSession
36
- import org .apache .mnemonic .sessions .SessionIterator
37
31
import org .apache .mnemonic .sessions .ObjectCreator
38
32
import org .apache .mnemonic .spark .MneDurableInputSession
39
33
import org .apache .mnemonic .spark .MneDurableOutputSession
40
34
import org .apache .mnemonic .spark .DurableException
41
35
42
36
private [spark] class DurableRDD [D : ClassTag , T : ClassTag ] (
43
- private var rdd : RDD [T ],
37
+ @ transient private var _sc : SparkContext ,
38
+ @ transient private var deps : Seq [Dependency [_]],
44
39
serviceName : String , durableTypes : Array [DurableType ],
45
40
entityFactoryProxies : Array [EntityFactoryProxy ], slotKeyId : Long ,
46
41
partitionPoolSize : Long , durableDirectory : String ,
47
42
f : (T , ObjectCreator [D , NonVolatileMemAllocator ]) => Option [D ],
48
43
preservesPartitioning : Boolean = false )
49
- extends RDD [D ](rdd ) {
44
+ extends RDD [D ](_sc, deps ) {
50
45
51
- val durdddir = DurableRDD .getRddDirName(durableDirectory, id)
46
+ private val isInputOnly = null == deps
47
+
48
+ private val durdddir = DurableRDD .getRddDirName(durableDirectory, id)
52
49
DurableRDD .resetRddDir(durdddir)
53
50
54
51
override val partitioner = if (preservesPartitioning) firstParent[T ].partitioner else None
@@ -80,7 +77,7 @@ private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
80
77
val memplist = mempListOpt match {
81
78
case None => {
82
79
val mplst = prepareDurablePartition(split, context, firstParent[T ].iterator(split, context))
83
- logInfo(s " Done transformed RDD # ${rdd .id} to durableRDD # ${id} on ${durdddir.toString}" )
80
+ logInfo(s " Done transformed RDD # ${firstParent[ T ] .id} to durableRDD # ${id} on ${durdddir.toString}" )
84
81
mplst
85
82
}
86
83
case Some (mplst) => mplst
@@ -92,12 +89,15 @@ private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
92
89
93
90
override def clearDependencies {
94
91
super .clearDependencies()
95
- rdd = null
96
92
}
97
93
98
94
def reset {
99
95
DurableRDD .resetRddDir(durdddir)
100
96
}
97
+
98
+ def destroy {
99
+ DurableRDD .deleteRddDir(durdddir)
100
+ }
101
101
}
102
102
103
103
object DurableRDD {
@@ -141,15 +141,24 @@ object DurableRDD {
141
141
}
142
142
143
143
def resetRddDir (rddDirName : String ) {
144
+ deleteRddDir(rddDirName)
145
+ createRddDir(rddDirName)
146
+ }
147
+
148
+ def createRddDir (rddDirName : String ) {
144
149
val durdddir = new File (rddDirName)
145
- if (durdddir.exists) {
146
- FileUtils .deleteDirectory(durdddir)
147
- }
148
150
if (! durdddir.mkdir) {
149
151
throw new DurableException (s " Durable RDD directory ${durdddir.toString} cannot be created " )
150
152
}
151
153
}
152
154
155
+ def deleteRddDir (rddDirName : String ) {
156
+ val durdddir = new File (rddDirName)
157
+ if (durdddir.exists) {
158
+ FileUtils .deleteDirectory(durdddir)
159
+ }
160
+ }
161
+
153
162
def genDurableFileName (splitId : Int )(mempidx : Long ): String = {
154
163
durableFileNameTemplate.format(splitId, mempidx)
155
164
}
@@ -182,14 +191,24 @@ object DurableRDD {
182
191
partitionPoolSize : Long ,
183
192
f : (T , ObjectCreator [D , NonVolatileMemAllocator ]) => Option [D ],
184
193
preservesPartitioning : Boolean = false ) = {
185
- val sc : SparkContext = rdd.context
186
- val ret = new DurableRDD [D , T ](rdd,
194
+ // val sc: SparkContext = rdd.context
195
+ val ret = new DurableRDD [D , T ](rdd.context , List ( new OneToOneDependency (rdd)) ,
187
196
serviceName, durableTypes, entityFactoryProxies, slotKeyId,
188
- partitionPoolSize, getDurableDir(sc ).get, f, preservesPartitioning)
197
+ partitionPoolSize, getDurableDir(rdd.context ).get, f, preservesPartitioning)
189
198
// sc.cleaner.foreach(_.registerRDDForCleanup(ret))
190
199
ret
191
200
}
192
201
202
+ def apply [D : ClassTag ] (
203
+ sc : SparkContext , pathname : String ,
204
+ serviceName : String , durableTypes : Array [DurableType ],
205
+ entityFactoryProxies : Array [EntityFactoryProxy ], slotKeyId : Long ) = {
206
+ val ret = new DurableRDD [D , Unit ](sc, null ,
207
+ serviceName, durableTypes, entityFactoryProxies, slotKeyId,
208
+ 1024 * 1024 * 1024L , pathname, null )
209
+ ret
210
+ }
211
+
193
212
def cleanupForApp (sc : SparkContext ) {
194
213
FileUtils .deleteDirectory(new File (getDurableDir(sc).get))
195
214
}
0 commit comments