Skip to content

Commit 1653fd7

Browse files
committed
fix: support streams without pipes
1 parent 90caf09 commit 1653fd7

File tree

2 files changed

+24
-14
lines changed

2 files changed

+24
-14
lines changed

lib/ContainerLogger.js

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,8 @@ class ContainerLogger extends EventEmitter {
7878

7979
this._registerToTtyStreams(stdout, stderr);
8080
} else {
81-
stdout = stdout.pipe(new DeprecatedImagesInterceptorStream());
8281
this._handleNonTtyStream(stdout, false);
8382
if (stderr) {
84-
stderr = stderr.pipe(new DeprecatedImagesInterceptorStream());
8583
this._handleNonTtyStream(stderr, true);
8684
}
8785
}
@@ -175,11 +173,9 @@ class ContainerLogger extends EventEmitter {
175173
}
176174

177175
_registerToTtyStreams(stdout, stderr) {
178-
stdout = stdout.pipe(new DeprecatedImagesInterceptorStream());
179176
this._handleTtyStream(stdout, false);
180177

181178
if (stderr) {
182-
stderr = stderr.pipe(new DeprecatedImagesInterceptorStream());
183179
stderr.once('end', () => {
184180
this.stepFinished = true;
185181
logger.info(`stderr end event was fired for container: ${this.containerId}`);
@@ -190,27 +186,38 @@ class ContainerLogger extends EventEmitter {
190186

191187
_handleTtyStream(stream, isError) {
192188
this.handledStreams++;
193-
stream.on('end', this._handleFinished.bind(this));
189+
const deprecatedImagesInterceptor = new DeprecatedImagesInterceptorStream(true);
190+
stream.on('end', () => {
191+
this._handleFinished();
192+
deprecatedImagesInterceptor.end();
193+
});
194194
stream.on('data', (chunk) => {
195+
deprecatedImagesInterceptor.write(chunk);
195196
this._logMessage(Buffer.from(chunk).toString('utf-8'), isError);
196197
});
197198
logger.info(`Listening on stream 'data' event for container: ${this.containerId}`);
198199
}
199200

200201
_handleNonTtyStream(stream, isError) {
201202
this.handledStreams++;
203+
const deprecatedImagesInterceptor = new DeprecatedImagesInterceptorStream(true);
202204
stream.on('readable', () => {
203205
let header = stream.read(8);
204206
while (header !== null) {
207+
deprecatedImagesInterceptor.write(header);
205208
const payload = stream.read(header.readUInt32BE(4));
206209
if (payload === null) {
207210
break;
208211
}
212+
deprecatedImagesInterceptor.write(payload);
209213
this._logMessage(Buffer.from(payload).toString('utf8'), isError);
210214
header = stream.read(8);
211215
}
212216
});
213-
stream.on('end', this._handleFinished.bind(this));
217+
stream.on('end', () => {
218+
this._handleFinished();
219+
deprecatedImagesInterceptor.end();
220+
});
214221
logger.info(`Listening on stream 'readable' event for container: ${this.containerId}`);
215222
}
216223

lib/metric/deprecated-images/deprecated-images-interceptor.stream.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,34 @@ import { Transform, TransformCallback } from 'stream';
44
import deprecatedImagesCollector from './deprecated-images.collector';
55

66
export class DeprecatedImagesInterceptorStream extends Transform {
7-
private _lastChunk: Buffer;
7+
private lastChunk: Buffer;
88

9-
constructor() {
9+
constructor(private readonly noPush = false) {
1010
super();
11-
this._lastChunk = Buffer.alloc(0);
11+
this.lastChunk = Buffer.alloc(0);
1212
}
1313

1414
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
1515
try {
1616

1717
const text = Buffer
18-
.concat([this._lastChunk, chunk])
18+
.concat([this.lastChunk, chunk])
1919
.toString('utf8');
2020

2121
const lines = text.split('\n');
2222

2323
// the final element in 'lines' may be an incomplete line
24-
// save it in _lastChunk for next time
25-
this._lastChunk = Buffer.from(lines.pop() ?? '', 'utf8');
24+
// save it in lastChunk for next time
25+
this.lastChunk = Buffer.from(lines.pop() ?? '', 'utf8');
2626

2727
for (const line of lines) {
2828
deprecatedImagesCollector.catchDeprecatedImage(line.trim());
2929
}
3030

31-
this.push(chunk);
31+
if (!this.noPush) {
32+
this.push(chunk);
33+
}
34+
3235
callback();
3336
} catch (error) {
3437
callback(error as any);
@@ -42,7 +45,7 @@ export class DeprecatedImagesInterceptorStream extends Transform {
4245
*/
4346
_flush(callback: TransformCallback): void {
4447
try {
45-
const finalLine = this._lastChunk.toString('utf8');
48+
const finalLine = this.lastChunk.toString('utf8');
4649
deprecatedImagesCollector.catchDeprecatedImage(finalLine.trim());
4750
callback();
4851
} catch (error) {

0 commit comments

Comments
 (0)