@@ -11,6 +11,7 @@ import (
11
11
12
12
"io"
13
13
14
+ "github.com/Azure/blobporter/internal"
14
15
"github.com/Azure/blobporter/pipeline"
15
16
"github.com/Azure/blobporter/util"
16
17
)
@@ -21,12 +22,13 @@ import (
21
22
22
23
// MultiFilePipeline Contructs blocks queue and implements data readers
23
24
type MultiFilePipeline struct {
24
- FilesInfo map [string ]FileInfo
25
- TotalNumberOfBlocks int
26
- TotalSize uint64
27
- BlockSize uint64
28
- NumOfPartitions int
25
+ filesInfo map [string ]FileInfo
26
+ totalNumberOfBlocks int
27
+ totalSize uint64
28
+ blockSize uint64
29
+ numOfPartitions int
29
30
includeMD5 bool
31
+ handlePool * internal.FileHandlePool
30
32
}
31
33
32
34
//FileInfo Contains the metadata associated with a file to be transferred
@@ -100,6 +102,8 @@ func NewMultiFile(params *MultiFileParams) []pipeline.SourcePipeline {
100
102
return pipelines
101
103
}
102
104
105
+ const maxNumOfHandlesPerFile int = 4
106
+
103
107
func newMultiFilePipeline (files []string , targetAliases []string , blockSize uint64 , numOfPartitions int , md5 bool , keepDirStructure bool ) pipeline.SourcePipeline {
104
108
totalNumberOfBlocks := 0
105
109
var totalSize uint64
@@ -142,19 +146,22 @@ func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint
142
146
fileInfos [files [f ]] = fileInfo
143
147
}
144
148
145
- return & MultiFilePipeline {FilesInfo : fileInfos ,
146
- TotalNumberOfBlocks : totalNumberOfBlocks ,
147
- BlockSize : blockSize ,
148
- TotalSize : totalSize ,
149
- NumOfPartitions : numOfPartitions ,
150
- includeMD5 : md5 }
149
+ handlePool := internal .NewFileHandlePool (maxNumOfHandlesPerFile , internal .Read , false )
150
+
151
+ return & MultiFilePipeline {filesInfo : fileInfos ,
152
+ totalNumberOfBlocks : totalNumberOfBlocks ,
153
+ blockSize : blockSize ,
154
+ totalSize : totalSize ,
155
+ numOfPartitions : numOfPartitions ,
156
+ includeMD5 : md5 ,
157
+ handlePool : handlePool ,
158
+ }
151
159
}
152
160
153
161
//ExecuteReader implements ExecuteReader from the pipeline.SourcePipeline Interface.
154
162
//For each file the reader will maintain a open handle from which data will be read.
155
163
// This implementation uses partitions (group of parts that can be read sequentially).
156
164
func (f * MultiFilePipeline ) ExecuteReader (partitionsQ chan pipeline.PartsPartition , partsQ chan pipeline.Part , readPartsQ chan pipeline.Part , id int , wg * sync.WaitGroup ) {
157
- fileHandles := make (map [string ]* os.File , len (f .FilesInfo ))
158
165
var err error
159
166
var partition pipeline.PartsPartition
160
167
@@ -167,25 +174,30 @@ func (f *MultiFilePipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartiti
167
174
partition , ok = <- partitionsQ
168
175
169
176
if ! ok {
170
- for _ , fh := range fileHandles {
171
- fh .Close ()
177
+ for _ , finfo := range f .filesInfo {
178
+ err = f .handlePool .CloseHandles (finfo .SourceURI )
179
+ if err != nil {
180
+ log .Fatal (fmt .Errorf ("error closing handle for file:%v. Error:%v" , finfo .SourceURI , err ))
181
+ }
172
182
}
173
183
return // no more blocks of file data to be read
174
184
}
175
185
186
+ //check if the partition is empty, as this may happen with small files
187
+ if len (partition .Parts ) == 0 {
188
+ continue
189
+ }
190
+
176
191
var part pipeline.Part
177
192
for pip := 0 ; pip < len (partition .Parts ); pip ++ {
178
193
part = partition .Parts [pip ]
179
194
180
- fileURI = f .FilesInfo [part .SourceURI ].SourceURI
181
- fileHandle = fileHandles [fileURI ]
195
+ fileURI = f .filesInfo [part .SourceURI ].SourceURI
182
196
183
197
if fileHandle == nil {
184
- if fileHandle , err = os .Open (fileURI ); err != nil {
185
- fmt .Printf ("Error while opening the file %v \n " , err )
186
- log .Fatal (err )
198
+ if fileHandle , err = f .handlePool .GetHandle (fileURI ); err != nil {
199
+ log .Fatal (fmt .Errorf (" error while opening the file.\n Error:%v " , err ))
187
200
}
188
- fileHandles [fileURI ] = fileHandle
189
201
}
190
202
191
203
if pip == 0 {
@@ -195,8 +207,7 @@ func (f *MultiFilePipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartiti
195
207
part .GetBuffer ()
196
208
197
209
if _ , err = fileHandle .Read (part .Data ); err != nil && err != io .EOF {
198
- fmt .Printf ("Error while reading the file %v \n " , err )
199
- log .Fatal (err )
210
+ log .Fatal (fmt .Errorf (" error while reading the file.\n Error:%v " , err ))
200
211
}
201
212
202
213
util .PrintfIfDebug ("ExecuteReader -> blockid:%v toread:%v name:%v read:%v " , part .BlockID , part .BytesToRead , part .TargetAlias , bytesRead )
@@ -207,16 +218,23 @@ func (f *MultiFilePipeline) ExecuteReader(partitionsQ chan pipeline.PartsPartiti
207
218
208
219
readPartsQ <- part
209
220
}
221
+
222
+ //return handle
223
+ if err = f .handlePool .ReturnHandle (fileURI , fileHandle ); err != nil {
224
+ log .Fatal (fmt .Errorf (" error returning the handle to the pool.\n Path: %v error:%v " , fileURI , err ))
225
+ }
226
+
227
+ fileHandle = nil
210
228
}
211
229
}
212
230
213
231
//GetSourcesInfo implements GetSourcesInfo from the pipeline.SourcePipeline Interface.
214
232
//Returns an an array of SourceInfo with the name, alias and size of the files to be transferred.
215
233
func (f * MultiFilePipeline ) GetSourcesInfo () []pipeline.SourceInfo {
216
234
217
- sources := make ([]pipeline.SourceInfo , len (f .FilesInfo ))
235
+ sources := make ([]pipeline.SourceInfo , len (f .filesInfo ))
218
236
var i = 0
219
- for _ , file := range f .FilesInfo {
237
+ for _ , file := range f .filesInfo {
220
238
sources [i ] = pipeline.SourceInfo {SourceName : file .SourceURI , TargetAlias : file .TargetAlias , Size : uint64 ((* file .FileStats ).Size ())}
221
239
i ++
222
240
}
@@ -255,18 +273,18 @@ func createPartsFromSource(size uint64, sourceNumOfBlocks int, blockSize uint64,
255
273
// this implementation uses partitions to group parts into a set that can be read sequentially.
256
274
// This is to avoid Window's memory pressure when calling SetFilePointer numerous times on the same handle
257
275
func (f * MultiFilePipeline ) ConstructBlockInfoQueue (blockSize uint64 ) (partitionsQ chan pipeline.PartsPartition , partsQ chan pipeline.Part , numOfBlocks int , size uint64 ) {
258
- numOfBlocks = f .TotalNumberOfBlocks
259
- size = f .TotalSize
260
- allPartitions := make ([][]pipeline.PartsPartition , len (f .FilesInfo ))
276
+ numOfBlocks = f .totalNumberOfBlocks
277
+ size = f .totalSize
278
+ allPartitions := make ([][]pipeline.PartsPartition , len (f .filesInfo ))
261
279
//size of the queue is equal to the number of partitions times the number of files to transfer.
262
280
//a lower value will block as this method is called before readers start
263
- partitionsQ = make (chan pipeline.PartsPartition , f .NumOfPartitions * len (f .FilesInfo ))
281
+ partitionsQ = make (chan pipeline.PartsPartition , f .numOfPartitions * len (f .filesInfo ))
264
282
partsQ = nil
265
283
bufferQ := pipeline .NewBytesBufferChan (uint64 (blockSize ))
266
284
pindex := 0
267
285
maxpartitionNumber := 0
268
- for _ , source := range f .FilesInfo {
269
- partitions := pipeline .ConstructPartsPartition (f .NumOfPartitions , (* source .FileStats ).Size (), int64 (blockSize ), source .SourceURI , source .TargetAlias , bufferQ )
286
+ for _ , source := range f .filesInfo {
287
+ partitions := pipeline .ConstructPartsPartition (f .numOfPartitions , (* source .FileStats ).Size (), int64 (blockSize ), source .SourceURI , source .TargetAlias , bufferQ )
270
288
allPartitions [pindex ] = partitions
271
289
if len (partitions ) > maxpartitionNumber {
272
290
maxpartitionNumber = len (partitions )
0 commit comments