Skip to content

Commit 9c51bb0

Browse files
committed
Factor out code for waiting for writers to close pool backed files
1 parent 3228b18 commit 9c51bb0

File tree

1 file changed

+41
-35
lines changed

1 file changed

+41
-35
lines changed

pkg/filesystem/virtual/pool_backed_file_allocator.go

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (f *fileBackedFile) lockMutatingData() {
128128
}
129129
}
130130

131-
func (f *fileBackedFile) acquireFrozenDescriptorLocked() (filesystem.FileReader, bool) {
131+
func (f *fileBackedFile) openReadFrozen() (filesystem.FileReader, bool) {
132132
if f.referenceCount == 0 {
133133
return nil, false
134134
}
@@ -139,6 +139,43 @@ func (f *fileBackedFile) acquireFrozenDescriptorLocked() (filesystem.FileReader,
139139
}, true
140140
}
141141

142+
func (f *fileBackedFile) waitAndOpenReadFrozen(writableFileDelay <-chan struct{}) (filesystem.FileReader, bool) {
143+
f.lock.Lock()
144+
defer f.lock.Unlock()
145+
146+
if f.writableDescriptorsCount > 0 {
147+
// Process table cleaning should have cleaned up any
148+
// file descriptors belonging to files in the input
149+
// root. Yet we are still seeing the file being opened
150+
// for writing. This is bad, as it means that data may
151+
// still be present in the kernel's page cache.
152+
start := time.Now()
153+
deadlineExceeded := false
154+
for f.writableDescriptorsCount > 0 && !deadlineExceeded {
155+
if f.noMoreWritersWakeup == nil {
156+
f.noMoreWritersWakeup = make(chan struct{})
157+
}
158+
c := f.noMoreWritersWakeup
159+
160+
f.lock.Unlock()
161+
select {
162+
case <-writableFileDelay:
163+
deadlineExceeded = true
164+
case <-c:
165+
}
166+
f.lock.Lock()
167+
}
168+
169+
if f.writableDescriptorsCount > 0 {
170+
poolBackedFileAllocatorWritableFileUploadDelayTimeouts.Inc()
171+
} else {
172+
poolBackedFileAllocatorWritableFileUploadDelaySeconds.Observe(time.Now().Sub(start).Seconds())
173+
}
174+
}
175+
176+
return f.openReadFrozen()
177+
}
178+
142179
func (f *fileBackedFile) acquireShareAccessLocked(shareAccess ShareMask) {
143180
f.referenceCount += shareAccess.Count()
144181
if shareAccess&ShareMaskWrite != 0 {
@@ -184,7 +221,7 @@ func (f *fileBackedFile) getCachedDigest() digest.Digest {
184221
// updateCachedDigest returns the digest of the file. It either returns
185222
// a cached value, or computes the digest and caches it. It is only safe
186223
// to call this function while the file is frozen (i.e., calling
187-
// f.acquireFrozenDescriptorLocked()).
224+
// f.openReadFrozen()).
188225
func (f *fileBackedFile) updateCachedDigest(digestFunction digest.Function, frozenFile filesystem.FileReader) (digest.Digest, error) {
189226
// Check whether the cached digest we have is still valid.
190227
if cachedDigest := f.getCachedDigest(); cachedDigest != digest.BadDigest && cachedDigest.UsesDigestFunction(digestFunction) {
@@ -206,38 +243,7 @@ func (f *fileBackedFile) updateCachedDigest(digestFunction digest.Function, froz
206243
}
207244

208245
func (f *fileBackedFile) uploadFile(ctx context.Context, contentAddressableStorage blobstore.BlobAccess, digestFunction digest.Function, writableFileUploadDelay <-chan struct{}) (digest.Digest, error) {
209-
f.lock.Lock()
210-
if f.writableDescriptorsCount > 0 {
211-
// Process table cleaning should have cleaned up any
212-
// file descriptors belonging to files in the input
213-
// root. Yet we are still seeing the file being opened
214-
// for writing. This is bad, as it means that data may
215-
// still be present in the kernel's page cache.
216-
start := time.Now()
217-
deadlineExceeded := false
218-
for f.writableDescriptorsCount > 0 && !deadlineExceeded {
219-
if f.noMoreWritersWakeup == nil {
220-
f.noMoreWritersWakeup = make(chan struct{})
221-
}
222-
c := f.noMoreWritersWakeup
223-
224-
f.lock.Unlock()
225-
select {
226-
case <-writableFileUploadDelay:
227-
deadlineExceeded = true
228-
case <-c:
229-
}
230-
f.lock.Lock()
231-
}
232-
233-
if f.writableDescriptorsCount > 0 {
234-
poolBackedFileAllocatorWritableFileUploadDelayTimeouts.Inc()
235-
} else {
236-
poolBackedFileAllocatorWritableFileUploadDelaySeconds.Observe(time.Now().Sub(start).Seconds())
237-
}
238-
}
239-
frozenFile, success := f.acquireFrozenDescriptorLocked()
240-
f.lock.Unlock()
246+
frozenFile, success := f.waitAndOpenReadFrozen(writableFileUploadDelay)
241247
if !success {
242248
return digest.BadDigest, status.Error(codes.NotFound, "File was unlinked before uploading could start")
243249
}
@@ -261,7 +267,7 @@ func (f *fileBackedFile) getBazelOutputServiceStat(digestFunction *digest.Functi
261267
var locator *anypb.Any
262268
f.lock.Lock()
263269
if f.writableDescriptorsCount == 0 {
264-
frozenFile, success := f.acquireFrozenDescriptorLocked()
270+
frozenFile, success := f.openReadFrozen()
265271
f.lock.Unlock()
266272
if !success {
267273
return nil, status.Error(codes.NotFound, "File was unlinked before digest computation could start")

0 commit comments

Comments
 (0)