@@ -22,17 +22,18 @@ import java.io.File
22
22
import org .apache .spark .rdd .RDD
23
23
import org .apache .spark ._
24
24
import org .apache .commons .io .FileUtils
25
- import scala .reflect .{ ClassTag }
25
+
26
+ import scala .reflect .ClassTag
26
27
import scala .collection .JavaConverters ._
27
28
import scala .collection .mutable .ArrayBuffer
28
- import org .apache .mnemonic .DurableType
29
- import org .apache .mnemonic .EntityFactoryProxy
30
- import org .apache .mnemonic .NonVolatileMemAllocator
29
+ import org .apache .mnemonic .{ConfigurationException , DurableType , EntityFactoryProxy , NonVolatileMemAllocator }
31
30
import org .apache .mnemonic .sessions .ObjectCreator
32
31
import org .apache .mnemonic .spark .MneDurableInputSession
33
32
import org .apache .mnemonic .spark .MneDurableOutputSession
34
33
import org .apache .mnemonic .spark .DurableException
35
34
35
+ import scala .collection .mutable
36
+
36
37
private [spark] class DurableRDD [D : ClassTag , T : ClassTag ] (
37
38
@ transient private var _sc : SparkContext ,
38
39
@ transient private var deps : Seq [Dependency [_]],
@@ -45,40 +46,47 @@ private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
45
46
46
47
private val isInputOnly = null == deps
47
48
48
- private val durdddir = DurableRDD .getRddDirName(durableDirectory, id)
49
- DurableRDD .resetRddDir(durdddir)
50
-
51
- override val partitioner = if (preservesPartitioning) firstParent[T ].partitioner else None
49
+ private var durdddir : String = _
50
+ if (isInputOnly) {
51
+ val dir = new File (durableDirectory)
52
+ if (! dir.exists) {
53
+ throw new ConfigurationException (" Input directory does not exist" )
54
+ }
55
+ durdddir = durableDirectory
56
+ } else {
57
+ durdddir = DurableRDD .getRddDirName(durableDirectory, id)
58
+ DurableRDD .resetRddDir(durdddir)
59
+ }
52
60
53
- override protected def getPartitions : Array [ Partition ] = firstParent[T ].partitions
61
+ override val partitioner = if ( ! isInputOnly && preservesPartitioning) firstParent[T ].partitioner else None
54
62
55
- def prepareDurablePartition (split : Partition , context : TaskContext ,
56
- iterator : Iterator [T ]): Array [File ] = {
57
- val outsess = MneDurableOutputSession [D ](serviceName,
58
- durableTypes, entityFactoryProxies, slotKeyId,
59
- partitionPoolSize, durdddir.toString,
60
- DurableRDD .genDurableFileName(split.hashCode)_)
61
- try {
62
- for (item <- iterator) {
63
- f(item, outsess) match {
64
- case Some (res) => outsess.post(res)
65
- case None =>
66
- }
63
+ override protected def getPartitions : Array [Partition ] = {
64
+ if (isInputOnly) {
65
+ val ret = DurableRDD .collectMemPoolPartitionList(durdddir).getOrElse(Array [Partition ]())
66
+ if (ret.isEmpty) {
67
+ logInfo(s " Not found any partitions in the directory ${durdddir}" )
67
68
}
68
- } finally {
69
- outsess.close()
69
+ ret
70
+ } else {
71
+ firstParent[T ].partitions
70
72
}
71
- outsess.memPools.toArray
72
73
}
73
74
74
75
override def compute (split : Partition , context : TaskContext ): Iterator [D ] = {
75
76
val mempListOpt : Option [Array [File ]] =
76
- DurableRDD .collectMemPoolFileList(durdddir.toString , DurableRDD .genDurableFileName(split.hashCode )_)
77
+ DurableRDD .collectMemPoolFileList(durdddir, DurableRDD .genDurableFileName(context.partitionId )_)
77
78
val memplist = mempListOpt match {
78
79
case None => {
79
- val mplst = prepareDurablePartition(split, context, firstParent[T ].iterator(split, context))
80
- logInfo(s " Done transformed RDD # ${firstParent[T ].id} to durableRDD # ${id} on ${durdddir.toString}" )
81
- mplst
80
+ if (isInputOnly) {
81
+ logInfo(s " Not found any mem pool files related to the partition # ${context.partitionId}" )
82
+ Array [File ]()
83
+ } else {
84
+ val mplst = DurableRDD .prepareDurablePartition[D , T ](durdddir,
85
+ serviceName, durableTypes, entityFactoryProxies, slotKeyId,
86
+ partitionPoolSize, f)(context, firstParent[T ].iterator(split, context))
87
+ logInfo(s " Done transformed RDD # ${firstParent[T ].id} to durableRDD # ${id} on ${durdddir}" )
88
+ mplst
89
+ }
82
90
}
83
91
case Some (mplst) => mplst
84
92
}
@@ -104,6 +112,7 @@ object DurableRDD {
104
112
105
113
val durableSubDirNameTemplate = " durable-rdd-%010d"
106
114
val durableFileNameTemplate = " mem_%010d_%010d.mne"
115
+ val durableFileNamePartitionRegex = raw " mem_(\d{10})_0000000000.mne " .r
107
116
108
117
private var durableDir : Option [String ] = None
109
118
@@ -146,23 +155,40 @@ object DurableRDD {
146
155
}
147
156
148
157
def createRddDir (rddDirName : String ) {
149
- val durdddir = new File (rddDirName)
150
- if (! durdddir .mkdir) {
151
- throw new DurableException (s " Durable RDD directory ${durdddir .toString} cannot be created " )
158
+ val dir = new File (rddDirName)
159
+ if (! dir .mkdir) {
160
+ throw new DurableException (s " Durable RDD directory ${dir .toString} cannot be created " )
152
161
}
153
162
}
154
163
155
164
def deleteRddDir (rddDirName : String ) {
156
- val durdddir = new File (rddDirName)
157
- if (durdddir .exists) {
158
- FileUtils .deleteDirectory(durdddir )
165
+ val dir = new File (rddDirName)
166
+ if (dir .exists) {
167
+ FileUtils .deleteDirectory(dir )
159
168
}
160
169
}
161
170
162
- def genDurableFileName (splitId : Int )(mempidx : Long ): String = {
171
+ def genDurableFileName (splitId : Long )(mempidx : Long ): String = {
163
172
durableFileNameTemplate.format(splitId, mempidx)
164
173
}
165
174
175
+ def collectMemPoolPartitionList (path : String ): Option [Array [Partition ]] = {
176
+ val paridset = new mutable.TreeSet [Int ]
177
+ val dir = new File (path)
178
+ if (dir.isDirectory) {
179
+ val flst = dir.listFiles.filter(_.isDirectory)
180
+ for (file <- flst) {
181
+ file.toString match {
182
+ case durableFileNamePartitionRegex(paridx) => {
183
+ paridset += paridx.toInt
184
+ }
185
+ case _ =>
186
+ }
187
+ }
188
+ }
189
+ Option (paridset.toArray.map(x => new Partition { val index = x }))
190
+ }
191
+
166
192
def collectMemPoolFileList (durddir : String , memFileNameGen : (Long )=> String ): Option [Array [File ]] = {
167
193
val flist : ArrayBuffer [File ] = new ArrayBuffer [File ]
168
194
var idx : Long = 0L
@@ -184,28 +210,52 @@ object DurableRDD {
184
210
}
185
211
}
186
212
213
+ def prepareDurablePartition [D : ClassTag , T : ClassTag ] (path : String ,
214
+ serviceName : String , durableTypes : Array [DurableType ],
215
+ entityFactoryProxies : Array [EntityFactoryProxy ], slotKeyId : Long ,
216
+ partitionPoolSize : Long ,
217
+ func : (T , ObjectCreator [D , NonVolatileMemAllocator ]) => Option [D ]
218
+ ) (context : TaskContext , iterator : Iterator [T ]): Array [File ] = {
219
+ val outsess = MneDurableOutputSession [D ](serviceName,
220
+ durableTypes, entityFactoryProxies, slotKeyId,
221
+ partitionPoolSize, path,
222
+ genDurableFileName(context.partitionId)_)
223
+ try {
224
+ for (item <- iterator) {
225
+ func(item, outsess) match {
226
+ case Some (res) => outsess.post(res)
227
+ case None =>
228
+ }
229
+ }
230
+ } finally {
231
+ outsess.close()
232
+ }
233
+ outsess.memPools.toArray
234
+ }
235
+
187
236
def apply [D : ClassTag , T : ClassTag ] (
188
237
rdd : RDD [T ],
189
238
serviceName : String , durableTypes : Array [DurableType ],
190
239
entityFactoryProxies : Array [EntityFactoryProxy ], slotKeyId : Long ,
191
240
partitionPoolSize : Long ,
192
241
f : (T , ObjectCreator [D , NonVolatileMemAllocator ]) => Option [D ],
193
242
preservesPartitioning : Boolean = false ) = {
194
- // val sc: SparkContext = rdd.context
243
+ val sc : SparkContext = rdd.context
244
+ val cleanF = f // sc.clean(f)
195
245
val ret = new DurableRDD [D , T ](rdd.context , List (new OneToOneDependency (rdd)),
196
246
serviceName, durableTypes, entityFactoryProxies, slotKeyId,
197
- partitionPoolSize, getDurableDir(rdd.context ).get, f , preservesPartitioning)
247
+ partitionPoolSize, getDurableDir(sc ).get, cleanF , preservesPartitioning)
198
248
// sc.cleaner.foreach(_.registerRDDForCleanup(ret))
199
249
ret
200
250
}
201
251
202
252
def apply [D : ClassTag ] (
203
- sc : SparkContext , pathname : String ,
253
+ sc : SparkContext , path : String ,
204
254
serviceName : String , durableTypes : Array [DurableType ],
205
255
entityFactoryProxies : Array [EntityFactoryProxy ], slotKeyId : Long ) = {
206
256
val ret = new DurableRDD [D , Unit ](sc, null ,
207
257
serviceName, durableTypes, entityFactoryProxies, slotKeyId,
208
- 1024 * 1024 * 1024L , pathname , null )
258
+ 1024 * 1024 * 1024L , path , null )
209
259
ret
210
260
}
211
261
0 commit comments