8
8
"sync"
9
9
"time"
10
10
11
- "github.com/Azure/blobporter/util"
12
-
13
11
"github.com/Azure/blobporter/pipeline"
14
12
)
15
13
@@ -116,13 +114,16 @@ func (h *fileHandleManager) getHandle(path string) (*os.File, error) {
116
114
117
115
if ! isInit {
118
116
fh , err = h .initFile (path )
119
- if err != nil {
120
- return nil , err
121
- }
122
117
}
123
118
h .initTracker .Store (path , true )
124
119
h .Unlock ()
125
120
121
+ if err != nil {
122
+ return nil , err
123
+ }
124
+
125
+
126
+
126
127
if ! h .cacheEnabled {
127
128
//we don't have handle from the initialization, so open a new one.
128
129
if fh != nil {
@@ -135,42 +136,48 @@ func (h *fileHandleManager) getHandle(path string) (*os.File, error) {
135
136
136
137
//try to get it from the cache
137
138
var fhQ chan * os.File
138
- var ok bool
139
+ var incache bool
139
140
var val interface {}
140
- val , ok = h .fileHandlesQMap .Load (path )
141
- if ok {
141
+
142
+ val , incache = h .fileHandlesQMap .Load (path )
143
+ if incache {
142
144
fhQ = val .(chan * os.File )
143
145
fh := <- fhQ
146
+
144
147
return fh , nil
145
148
}
146
149
147
- //at this point we need to create a new handle and create the pool handles if not full if it hasn't be created.
148
- if fh , err = os .OpenFile (path , os .O_WRONLY , os .ModeAppend ); err != nil {
149
- return nil , err
150
+ //if init, the handle from the creation is not available (e.i. fh == nil)
151
+ if isInit {
152
+ if fh , err = os .OpenFile (path , os .O_WRONLY , os .ModeAppend ); err != nil {
153
+ return nil , err
154
+ }
155
+
150
156
}
151
157
152
- //the pool is full return the handle
158
+ h .Lock ()
159
+ defer h .Unlock ()
160
+ //ok not in the cache so we need to check if the cache is full
153
161
if h .cachedFileHandles + h .numOfHandlesPerFile > maxFileHandlesInCache {
154
- util .PrintfIfDebug ("getHandle -> Cache is full: %v\n " , h .cachedFileHandles + h .numOfHandlesPerFile )
155
162
return fh , nil
156
163
}
157
164
158
- //prime the pool with additional handles
159
- if ! isInit {
160
- fhQ = make (chan * os.File , h .numOfHandlesPerFile )
161
- h .cachedFileHandles = h .cachedFileHandles + 1
162
- for i := 1 ; i < h .numOfHandlesPerFile ; i ++ {
163
- var fhi * os.File
164
- if fhi , err = os .OpenFile (path , os .O_WRONLY , os .ModeAppend ); err != nil {
165
- return nil , err
166
- }
167
- fhQ <- fhi
168
- h .cachedFileHandles = h .cachedFileHandles + 1
169
- }
165
+ //add items to the cache
166
+ fhQ = make (chan * os.File , h .numOfHandlesPerFile )
167
+ h .cachedFileHandles = h .cachedFileHandles + 1
168
+ for i := 1 ; i < h .numOfHandlesPerFile ; i ++ {
169
+ var fhi * os.File
170
170
171
- h .fileHandlesQMap .Store (path , fhQ )
171
+ if fhi , err = os .OpenFile (path , os .O_WRONLY , os .ModeAppend ); err != nil {
172
+ return nil , err
173
+ }
174
+
175
+ fhQ <- fhi
176
+ h .cachedFileHandles = h .cachedFileHandles + 1
172
177
}
173
178
179
+ h .fileHandlesQMap .Store (path , fhQ )
180
+
174
181
return fh , nil
175
182
176
183
}
@@ -187,14 +194,18 @@ func (h *fileHandleManager) returnHandle(path string, fileHandle *os.File) error
187
194
var ok bool
188
195
189
196
val , ok = h .fileHandlesQMap .Load (path )
197
+
190
198
if ok {
191
199
fhq = val .(chan * os.File )
200
+ //fmt.Printf("return to dequeue %v\n", path)
192
201
fhq <- fileHandle
202
+ //fmt.Printf("after to dequeue %v\n", path)
193
203
h .fileHandlesQMap .Store (path , fhq )
194
204
return nil
195
205
}
196
206
197
207
//not found in the map, which is the case when the cache is at capacity
208
+ //fmt.Printf("close (not in the map) %v", path)
198
209
return fileHandle .Close ()
199
210
}
200
211
0 commit comments