Skip to content

Commit 69f0734

Browse files
Integration test and intellisense comments added for createStreamAs method
1 parent d075807 commit 69f0734

File tree

4 files changed

+36
-6
lines changed

4 files changed

+36
-6
lines changed

__tests__/integrationtests.js

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ server using the command 'docker compose-up'. This will spin up a ksqlDB server
99
describe('--Integration Tests--', () => {
1010

1111
describe('--Method Tests--', () => {
12-
beforeAll((done) => {
12+
beforeAll(async () => {
1313
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
14-
done();
14+
await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
1515
});
1616

1717
afterAll(async () => {
@@ -75,7 +75,37 @@ describe('--Integration Tests--', () => {
7575
const expectData = data[0].slice(0, 3);
7676
expect(expectPullData).toEqual(expectData);
7777
})
78+
79+
describe('materialized streams utilizing createStreamAs', () => {
80+
beforeAll(async () => {
81+
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
82+
await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
83+
84+
newStreamQueryId = await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
85+
testAsQueryId = await client.createStreamAs('testAsStream', ['name', 'age'], 'newTestStream', {
86+
kafka_topic: 'newTestTopic',
87+
value_format: 'json',
88+
partitions: 1
89+
}, 'age > 50');
90+
})
91+
92+
afterAll(async () => {
93+
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
94+
await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
95+
})
96+
97+
it('creates materialized stream', async () => {
98+
let streamFound = false;
99+
const {streams} = await client.ksql('LIST STREAMS;');
100+
101+
for (let i = 0; i < streams.length; i++) {
102+
if (streams[i].name, streams[i].name === 'TESTASSTREAM') streamFound = true;
103+
}
104+
expect(streamFound).toBe(true);
105+
});
106+
})
78107
})
108+
79109
describe('--Health Tests--', () => {
80110
beforeAll((done) => {
81111
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });

ksqljs/ksqlJS.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,8 @@ class ksqljs {
276276
});
277277

278278
req.on("end", () => {
279-
resolve(msgOutput);
280279
session.close();
280+
resolve(msgOutput);
281281
});
282282
})
283283
}

ksqljsTest.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pullFromTo(); */
8181

8282
//---------------------Test Stream Creation As-------------------
8383

84-
const createStreamAsTest = async () => {
84+
/* const createStreamAsTest = async () => {
8585
const queryId = await client.createStreamAs('TestAsStream', ['latitude', 'longitude'], 'riderLocations', {
8686
kafka_topic: 'TestAsStream',
8787
value_format: 'json',
@@ -91,7 +91,7 @@ const createStreamAsTest = async () => {
9191
console.log('this is the queryId: ', queryId);
9292
};
9393
94-
createStreamAsTest();
94+
createStreamAsTest(); */
9595

9696
// (streamName, selectColumns, sourceStream, propertiesObj, conditionsObj)
9797
// const defaultProps = {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"description": "",
55
"main": "ksqljsTest.js",
66
"scripts": {
7-
"test": "jest --verbose"
7+
"test": "jest --verbose --detectOpenHandles"
88
},
99
"keywords": [],
1010
"author": "",

0 commit comments

Comments
 (0)