Skip to content

Commit 618782e

Browse files
authored
Support Lambda Streaming (#229)
* feat: first impl of streaming * feat: support usecase without awslambda.HttpResponseStream.from * fix: swap performance.now for Date.now * fix: there's another one * fix: only assign globalThis if it doesn't exist yet * add docstring
1 parent 48630c8 commit 618782e

File tree

5 files changed

+164
-0
lines changed

5 files changed

+164
-0
lines changed

src/lambdalocal.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import os = require('os');
1313
import { createServer, IncomingMessage, ServerResponse } from 'http';
1414
import utils = require('./lib/utils.js');
1515
import Context = require('./lib/context.js');
16+
require("./lib/streaming.js");
1617

1718
/*
1819
* Lambda local version

src/lib/streaming.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Implements Lambda Response Streaming by polyfilling
3+
* `awslambda.streamifyResponse` and `awslambda.HttpResponseStream.from`.
4+
*
5+
* If they're used, `execute` will return a `ReadableStream` as `body`.
6+
*
7+
* See https://aws.amazon.com/fr/blogs/compute/introducing-aws-lambda-response-streaming/ for reference.
8+
*/
9+
10+
import { PassThrough } from "stream";
11+
12+
function streamifyResponse(handler) {
13+
return (event, context) =>
14+
new Promise(async (resolve, reject) => {
15+
const body = new StreamingBody(resolve);
16+
17+
try {
18+
const metadata = await handler(event, body, context); // cb not supported
19+
if (!body.headersSent) {
20+
body.sendHeader(metadata)
21+
}
22+
} catch (error) {
23+
reject(error);
24+
}
25+
});
26+
}
27+
28+
class StreamingBody extends PassThrough {
29+
constructor(private readonly resolve: (metadata) => void) {
30+
super();
31+
}
32+
33+
public headersSent = false
34+
sendHeader(metadata: any = {}) {
35+
const headers = { ...metadata.headers }
36+
if (this.contentType) {
37+
headers["Content-Type"] = this.contentType
38+
}
39+
40+
this.resolve({
41+
...metadata,
42+
headers,
43+
body: this
44+
})
45+
this.headersSent = true
46+
}
47+
48+
private contentType: string;
49+
setContentType(contentType) {
50+
this.contentType = contentType;
51+
}
52+
}
53+
54+
class HttpResponseStream {
55+
static from(responseStream: StreamingBody, metadata) {
56+
responseStream.sendHeader(metadata);
57+
return responseStream;
58+
}
59+
}
60+
61+
globalThis.awslambda = globalThis.awslambda || {
62+
streamifyResponse,
63+
HttpResponseStream,
64+
};
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Lambda function used for streaming test.
3+
*/
4+
exports.handler = awslambda.streamifyResponse(
5+
async (event, responseStream, context) => {
6+
responseStream.setContentType("text/plain");
7+
8+
responseStream.write("foo");
9+
setTimeout(() => {
10+
responseStream.write("bar");
11+
responseStream.end();
12+
}, 100);
13+
}
14+
);
15+

test/functs/test-func-streaming.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Lambda function used for streaming test.
3+
*/
4+
exports.handler = awslambda.streamifyResponse(
5+
async (event, responseStream, context) => {
6+
const metadata = {
7+
statusCode: 200,
8+
headers: {
9+
"X-Foo": "Bar"
10+
}
11+
};
12+
13+
responseStream.setContentType("text/plain");
14+
responseStream = awslambda.HttpResponseStream.from(responseStream, metadata);
15+
16+
responseStream.write("foo");
17+
setTimeout(() => {
18+
responseStream.write("bar");
19+
responseStream.end();
20+
}, 100);
21+
}
22+
);
23+

test/test.js

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,67 @@ describe("- Testing lambdalocal.js", function () {
420420
});
421421
});
422422
});
423+
424+
describe('* Streaming Response', function () {
425+
it('should return a readable stream as `body`', function () {
426+
var lambdalocal = require(lambdalocal_path);
427+
lambdalocal.setLogger(winston);
428+
return lambdalocal.execute({
429+
event: require(path.join(__dirname, "./events/test-event.js")),
430+
lambdaPath: path.join(__dirname, "./functs/test-func-streaming.js"),
431+
lambdaHandler: functionName,
432+
callbackWaitsForEmptyEventLoop: false,
433+
timeoutMs: timeoutMs,
434+
verboseLevel: 1
435+
}).then(function (data) {
436+
assert.deepEqual(
437+
data.headers,
438+
{ "Content-Type": "text/plain", "X-Foo": "Bar" }
439+
);
440+
441+
return new Promise((resolve, reject) => {
442+
const chunks = []
443+
const times = []
444+
data.body.on('data', (chunk) => {
445+
chunks.push(chunk.toString())
446+
times.push(Date.now())
447+
});
448+
data.body.on("end", () => {
449+
assert.deepEqual(chunks, ["foo", "bar"])
450+
assert.closeTo(times[1] - times[0], 100, 50)
451+
resolve()
452+
});
453+
})
454+
})
455+
});
456+
457+
it('also works without calling HttpResponseStream.from', function () {
458+
var lambdalocal = require(lambdalocal_path);
459+
lambdalocal.setLogger(winston);
460+
return lambdalocal.execute({
461+
event: require(path.join(__dirname, "./events/test-event.js")),
462+
lambdaPath: path.join(__dirname, "./functs/test-func-streaming-simple.js"),
463+
lambdaHandler: functionName,
464+
callbackWaitsForEmptyEventLoop: false,
465+
timeoutMs: timeoutMs,
466+
verboseLevel: 1
467+
}).then(function (data) {
468+
return new Promise((resolve, reject) => {
469+
const chunks = []
470+
const times = []
471+
data.body.on('data', (chunk) => {
472+
chunks.push(chunk.toString())
473+
times.push(Date.now())
474+
});
475+
data.body.on("end", () => {
476+
assert.deepEqual(chunks, ["foo", "bar"])
477+
assert.closeTo(times[1] - times[0], 100, 50)
478+
resolve()
479+
});
480+
})
481+
})
482+
});
483+
});
423484
}
424485
});
425486
describe("- Testing cli.js", function () {

0 commit comments

Comments
 (0)