Skip to content

Commit a018a22

Browse files
committed
fix: convert chunks to text lines before parsing them
1 parent 8c2b706 commit a018a22

File tree

2 files changed

+61
-13
lines changed

2 files changed

+61
-13
lines changed

lib/ContainerLogger.js

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ const Q = require('q');
33
const promiseRetry = require('promise-retry');
44
const logger = require('cf-logs').Logger('codefresh:containerLogger');
55
const CFError = require('cf-errors');
6-
const { Transform, PassThrough } = require('stream');
6+
const { Transform } = require('stream');
77

88
const _ = require('lodash');
99
const { LoggerStrategy } = require('./enums');
1010

1111
// eslint-disable-next-line import/no-unresolved
12-
const deprecatedImagesCollector = require('./metric/deprecated-images/deprecated-images.collector').default;
12+
const { DeprecatedImagesInterceptorStream } = require('./metric/deprecated-images/deprecated-images-interceptor.stream');
1313

1414
const CONTAINER_START_RETRY_TIMEOUT_SECONDS = 1;
1515
const CONTAINER_START_RETRY_LIMIT = 10;
@@ -78,9 +78,10 @@ class ContainerLogger extends EventEmitter {
7878

7979
this._registerToTtyStreams(stdout, stderr);
8080
} else {
81-
// TODO: parse deprecated images here
81+
stdout = stdout.pipe(new DeprecatedImagesInterceptorStream());
8282
this._handleNonTtyStream(stdout, false);
8383
if (stderr) {
84+
stderr = stderr.pipe(new DeprecatedImagesInterceptorStream());
8485
this._handleNonTtyStream(stderr, true);
8586
}
8687
}
@@ -136,7 +137,7 @@ class ContainerLogger extends EventEmitter {
136137
// { end = false } on the stepLoggerWritableStream because there is only one instance of it for all the steps.
137138
this.handledStreams++;
138139
let stdoutStream = stdout
139-
.pipe(this._interceptDeprecatedImagesStream())
140+
.pipe(new DeprecatedImagesInterceptorStream())
140141
.pipe(this._logSizeLimitStream())
141142
.pipe(this.stepLogger.createMaskingStream());
142143

@@ -154,7 +155,7 @@ class ContainerLogger extends EventEmitter {
154155

155156
this.handledStreams++;
156157
let stderrStream = stderr
157-
.pipe(this._interceptDeprecatedImagesStream())
158+
.pipe(new DeprecatedImagesInterceptorStream())
158159
.pipe(this._logSizeLimitStream())
159160
.pipe(this._errorTransformerStream())
160161
.pipe(this.stepLogger.createMaskingStream());
@@ -174,9 +175,11 @@ class ContainerLogger extends EventEmitter {
174175
}
175176

176177
_registerToTtyStreams(stdout, stderr) {
178+
stdout = stdout.pipe(new DeprecatedImagesInterceptorStream());
177179
this._handleTtyStream(stdout, false);
178180

179181
if (stderr) {
182+
stderr = stderr.pipe(new DeprecatedImagesInterceptorStream());
180183
stderr.once('end', () => {
181184
this.stepFinished = true;
182185
logger.info(`stderr end event was fired for container: ${this.containerId}`);
@@ -189,9 +192,7 @@ class ContainerLogger extends EventEmitter {
189192
this.handledStreams++;
190193
stream.on('end', this._handleFinished.bind(this));
191194
stream.on('data', (chunk) => {
192-
const message = Buffer.from(chunk).toString('utf-8');
193-
deprecatedImagesCollector.catchDeprecatedImage(message);
194-
this._logMessage(message, isError);
195+
this._logMessage(Buffer.from(chunk).toString('utf-8'), isError);
195196
});
196197
logger.info(`Listening on stream 'data' event for container: ${this.containerId}`);
197198
}
@@ -242,11 +243,6 @@ class ContainerLogger extends EventEmitter {
242243
this.emit('message.logged', curLogSize);
243244
}
244245

245-
_interceptDeprecatedImagesStream() {
246-
return new PassThrough()
247-
.on('data', (chunk) => deprecatedImagesCollector.catchDeprecatedImage(chunk.toString('utf8')));
248-
}
249-
250246
_errorTransformerStream() {
251247
return new Transform({
252248
transform: (data, encoding, done) => {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { Transform, TransformCallback } from 'stream';
2+
3+
// eslint-disable-next-line import/no-unresolved
4+
import deprecatedImagesCollector from './deprecated-images.collector';
5+
6+
export class DeprecatedImagesInterceptorStream extends Transform {
7+
private _lastChunk: Buffer;
8+
9+
constructor() {
10+
super();
11+
this._lastChunk = Buffer.alloc(0);
12+
}
13+
14+
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
15+
try {
16+
17+
const text = Buffer
18+
.concat([this._lastChunk, chunk])
19+
.toString('utf8');
20+
21+
const lines = text.split('\n');
22+
23+
// the final element in 'lines' may be an incomplete line
24+
// save it in _lastChunk for next time
25+
this._lastChunk = Buffer.from(lines.pop() ?? '', 'utf8');
26+
27+
for (const line of lines) {
28+
deprecatedImagesCollector.catchDeprecatedImage(line.trim());
29+
}
30+
31+
this.push(chunk);
32+
callback();
33+
} catch (error) {
34+
callback(error as any);
35+
}
36+
}
37+
38+
/**
39+
* _flush() is called when there is no more incoming data.
40+
* If we still have leftover data that didn't end with a newline,
41+
* treat it as a final line to be processed.
42+
*/
43+
_flush(callback: TransformCallback): void {
44+
try {
45+
const finalLine = this._lastChunk.toString('utf8');
46+
deprecatedImagesCollector.catchDeprecatedImage(finalLine.trim());
47+
callback();
48+
} catch (error) {
49+
callback(error as any);
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)