@@ -77,8 +77,7 @@ if (!(await file(tarFile).exists())) {
77
77
rej ( err ) ;
78
78
return ;
79
79
}
80
- } ) ;
81
- extract . once ( "drain" , ( ) => {
80
+
82
81
res ( ) ;
83
82
} ) ;
84
83
} ) ;
@@ -143,53 +142,55 @@ for (const layer of manifest.Layers) {
143
142
144
143
const inprogressPath = path . join ( cacheFolder , layerName + "-in-progress" ) ;
145
144
await rm ( inprogressPath , { recursive : true } ) ;
146
- const layerCacheGzip = file ( inprogressPath ) ;
147
- const cacheWriter = layerCacheGzip . writer ( ) ;
148
- const hasher = new Bun . CryptoHasher ( "sha256" ) ;
149
145
146
+ const hasher = new Bun . CryptoHasher ( "sha256" ) ;
147
+ const cacheWriter = file ( inprogressPath ) . writer ( ) ;
150
148
const gzipStream = zlib . createGzip ( { level : 9 } ) ;
151
- gzipStream . pipe (
152
- new stream . Writable ( {
153
- write ( value : Buffer , _ , callback ) {
154
- cacheWriter . write ( value . buffer ) ;
155
- hasher . update ( value . buffer ) ;
156
- callback ( ) ;
157
- } ,
158
- } ) ,
159
- ) ;
149
+ const writeStream = new stream . Writable ( {
150
+ write ( val : Buffer , _ , cb ) {
151
+ hasher . update ( val , "binary" ) ;
152
+ cacheWriter . write ( val ) ;
153
+ cb ( ) ;
154
+ } ,
155
+ } ) ;
160
156
157
+ gzipStream . pipe ( writeStream ) ;
161
158
await file ( layerPath )
162
159
. stream ( )
163
160
. pipeTo (
164
161
new WritableStream ( {
165
- write ( value ) {
166
- return new Promise ( ( res , rej ) => {
167
- gzipStream . write ( value , ( err ) => {
168
- if ( err ) {
169
- rej ( err ) ;
170
- return ;
171
- }
172
- res ( ) ;
173
- } ) ;
162
+ write ( value : Buffer ) {
163
+ return new Promise ( async ( res ) => {
164
+ const needsWriteBackoff = gzipStream . write ( value ) ;
165
+ // We need to back-off with the writes
166
+ if ( ! needsWriteBackoff ) {
167
+ const onDrain = ( ) => {
168
+ // Remove event listener when it finishes
169
+ gzipStream . off ( "drain" , onDrain ) ;
170
+ res ( ) ;
171
+ } ;
172
+
173
+ gzipStream . on ( "drain" , onDrain ) ;
174
+ return ;
175
+ }
176
+
177
+ res ( ) ;
174
178
} ) ;
175
179
} ,
176
180
close ( ) {
177
- return new Promise ( ( res ) => {
178
- gzipStream . end ( ( ) => {
179
- res ( ) ;
180
- } ) ;
181
+ return new Promise ( async ( res ) => {
182
+ // Flush before end
183
+ await new Promise ( ( resFlush ) => gzipStream . flush ( ( ) => resFlush ( true ) ) ) ;
184
+ // End the stream
185
+ gzipStream . end ( res ) ;
181
186
} ) ;
182
187
} ,
183
188
} ) ,
184
189
) ;
185
190
186
- await new Promise ( ( res ) =>
187
- gzipStream . flush ( ( ) => {
188
- res ( true ) ;
189
- } ) ,
190
- ) ;
191
+ // Wait until the gzipStream has finished all the piping
192
+ await new Promise ( ( res ) => gzipStream . on ( "end" , ( ) => res ( true ) ) ) ;
191
193
192
- await cacheWriter . end ( ) ;
193
194
const digest = hasher . digest ( "hex" ) ;
194
195
await rename ( inprogressPath , path . join ( cacheFolder , digest ) ) ;
195
196
await write ( layerCachePath , digest ) ;
0 commit comments