Skip to content

Commit f5e18ce

Browse files
Jonathan LuuJonathan Luu
authored andcommitted
functional createTableAs, working on integration test
1 parent 8d0648c commit f5e18ce

File tree

5 files changed

+134
-105
lines changed

5 files changed

+134
-105
lines changed

__tests__/integrationtests.js

Lines changed: 103 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const { default: waitForExpect } = require('wait-for-expect');
12
const ksqljs = require('../ksqljs/ksqlJS.js');
23
// Pre-requisite: start a docker container
34
/* To add to README: Prior to running test with 'npm test', please start the ksqlDB
@@ -12,117 +13,119 @@ server using the command 'docker compose-up'. This will spin up a ksqlDB server
1213
// Once the ksqlDB server is running, tests can be run with terminal line: (npm test)
1314

1415
describe('--Integration Tests--', () => {
15-
// describe('--Method Tests--', () => {
16-
// beforeAll((done) => {
17-
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
18-
// done();
19-
// });
20-
21-
// afterAll(async () => {
22-
// await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
23-
// })
24-
25-
// it('.createStream properly creates a stream', async () => {
26-
// const result = await client.createStream('TESTJESTSTREAM', ['name VARCHAR', 'email varchar', 'age INTEGER'], 'testJestTopic', 'json', 1);
27-
// const streams = await client.ksql('LIST STREAMS;');
28-
// const allStreams = streams.streams;
29-
// let streamExists = false;
30-
// for (let i = 0; i < allStreams.length; i++) {
31-
// if (allStreams[i].name === "TESTJESTSTREAM") {
32-
// streamExists = true;
33-
// break;
34-
// }
35-
// }
36-
// expect(streamExists).toEqual(true);
37-
// })
38-
39-
// it('.push properly creates a push query', () => {
40-
// let pushActive = false;
41-
// client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
42-
// if (JSON.parse(data).queryId) {
43-
// pushActive = true;
44-
// }
45-
// expect(pushActive).toEqual(true)
46-
// });
47-
// })
48-
49-
// it('.terminate properly terminates a push query', () => {
50-
// client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 3;', async (data) => {
51-
// const terminateRes = await client.terminate(JSON.parse(data).queryId);
52-
// expect(terminateRes.wasTerminated).toEqual(true);
53-
// })
54-
// })
55-
56-
// it('.insertStream properly inserts a row into a stream', async () => {
57-
// const response = await client.insertStream('TESTJESTSTREAM', [
58-
// { "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
59-
// ]);
60-
// console.log(response);
61-
// const data = [];
62-
// await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES;', async (chunk) => {
63-
// data.push(JSON.parse(chunk));
64-
// if (data[1]) {
65-
// client.terminate(data[0].queryId);
66-
// expect(data[1]).toEqual(["stab-rabbit", "123@mail.com", 100])
67-
// }
68-
// });
69-
// })
70-
71-
// it('.pull receives the correct data from a pull query', async () => {
72-
// const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
73-
// console.log(pullData[1]);
74-
// expect(pullData[1]).toEqual(["stab-rabbit", "123@mail.com", 100]);
75-
// })
76-
77-
// it('.pullFromTo receives all the data', async () => {
78-
// const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
79-
// const data = await client.pullFromTo('TESTJESTSTREAM', 'America/Los_Angeles', ['2022-01-01', '00', '00', '00']);
80-
// const expectPullData = pullData[1];
81-
// const expectData = data[0].slice(0, 3);
82-
// expect(expectPullData).toEqual(expectData);
83-
// })
84-
// })
85-
86-
describe('--Materialized Views Test--', () => {
87-
beforeAll( async () => {
88-
client = new ksqljs({ ksqldbURL: 'http://localhost:8088'});
89-
await client.ksql('CREATE STREAM TESTJESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic\', value_format=\'json\', partitions=1);')
16+
describe('--Method Tests--', () => {
17+
beforeAll((done) => {
18+
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
19+
done();
9020
});
21+
9122
afterAll(async () => {
92-
await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
93-
await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;')
23+
await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
9424
})
95-
it('creates a materialized table view of a stream', async () => {
96-
await client.createTableAs('TABLEOFSTREAM', 'TESTJESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
97-
const tables = await client.ksql('LIST TABLES;');
98-
const allTables = tables.tables;
99-
let tableCheck = false;
100-
for (let i = 0; i < allTables.length; i++){
101-
if (allTables[i].name === 'TABLEOFSTREAM') {
102-
tableCheck = true;
25+
26+
it('.createStream properly creates a stream', async () => {
27+
const result = await client.createStream('TESTJESTSTREAM', ['name VARCHAR', 'email varchar', 'age INTEGER'], 'testJestTopic', 'json', 1);
28+
const streams = await client.ksql('LIST STREAMS;');
29+
const allStreams = streams.streams;
30+
let streamExists = false;
31+
for (let i = 0; i < allStreams.length; i++) {
32+
if (allStreams[i].name === "TESTJESTSTREAM") {
33+
streamExists = true;
10334
break;
10435
}
10536
}
106-
expect(tableCheck).toEqual(true);
107-
37+
expect(streamExists).toEqual(true);
10838
})
109-
it('materialized table view updates with source stream', async () => {
110-
let rowCheck = false;
111-
// push query for the table
112-
await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
113-
if (Array.isArray(JSON.parse(data))){
114-
console.log('PARSED DATA HERE', JSON.parse(data))
115-
if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === "Seattle" && JSON.parse(data)[3] === 130){
116-
rowCheck = true;
117-
}
39+
40+
it('.push properly creates a push query', () => {
41+
let pushActive = false;
42+
client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
43+
if (JSON.parse(data).queryId) {
44+
pushActive = true;
11845
}
46+
expect(pushActive).toEqual(true)
47+
});
48+
})
49+
50+
it('.terminate properly terminates a push query', () => {
51+
client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 3;', async (data) => {
52+
const terminateRes = await client.terminate(JSON.parse(data).queryId);
53+
expect(terminateRes.wasTerminated).toEqual(true);
11954
})
120-
await client.insertStream('TESTJESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
121-
await waitFor(() => expect(rowCheck).toEqual(true));
55+
})
56+
57+
it('.insertStream properly inserts a row into a stream', async () => {
58+
const response = await client.insertStream('TESTJESTSTREAM', [
59+
{ "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
60+
]);
61+
console.log(response);
62+
const data = [];
63+
await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES;', async (chunk) => {
64+
data.push(JSON.parse(chunk));
65+
if (data[1]) {
66+
client.terminate(data[0].queryId);
67+
expect(data[1]).toEqual(["stab-rabbit", "123@mail.com", 100])
68+
}
69+
});
70+
})
71+
72+
it('.pull receives the correct data from a pull query', async () => {
73+
const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
74+
console.log(pullData[1]);
75+
expect(pullData[1]).toEqual(["stab-rabbit", "123@mail.com", 100]);
76+
})
77+
78+
it('.pullFromTo receives all the data', async () => {
79+
const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
80+
const data = await client.pullFromTo('TESTJESTSTREAM', 'America/Los_Angeles', ['2022-01-01', '00', '00', '00']);
81+
const expectPullData = pullData[1];
82+
const expectData = data[0].slice(0, 3);
83+
expect(expectPullData).toEqual(expectData);
12284
})
12385
})
12486

125-
/* describe('--Health Tests--', () => {
87+
// describe('--Materialized Views Test--', () => {
88+
// beforeAll( async () => {
89+
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088'});
90+
// const waitForExpect = require('wait-for-expect');
91+
// await client.ksql('CREATE STREAM NEWTESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic2\', value_format=\'json\', partitions=1);')
92+
// });
93+
// afterAll(async () => {
94+
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
95+
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
96+
// })
97+
// it('creates a materialized table view of a stream', async () => {
98+
// await client.createTableAs('TABLEOFSTREAM', 'NEWTESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
99+
// const tables = await client.ksql('LIST TABLES;');
100+
// const allTables = tables.tables;
101+
// let tableCheck = false;
102+
// for (let i = 0; i < allTables.length; i++){
103+
// if (allTables[i].name === 'TABLEOFSTREAM') {
104+
// tableCheck = true;
105+
// break;
106+
// }
107+
// }
108+
// expect(tableCheck).toEqual(true);
109+
110+
// })
111+
// it('materialized table view updates with source stream', async () => {
112+
// let rowCheck = false;
113+
// // push query for the table
114+
// // console.log('testing materialized view')
115+
// await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
116+
// console.log('QUERY INFO',data)
117+
// if (Array.isArray(JSON.parse(data))){
118+
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === 130){
119+
// rowCheck = true;
120+
// }
121+
// }
122+
// })
123+
// await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
124+
// await waitForExpect(() => expect(rowCheck).toEqual(true))
125+
// })
126+
// })
127+
128+
describe('--Health Tests--', () => {
126129
beforeAll((done) => {
127130
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
128131
done();
@@ -230,5 +233,5 @@ describe('--Integration Tests--', () => {
230233
// message: expect.any(String),
231234
// }));
232235
// })
233-
}) */
236+
})
234237
})

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
version: "2"
2+
version: "3.2"
33

44
services:
55
zookeeper:

ksqljs/ksqlJS.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,20 @@ class ksqljs {
306306
}
307307

308308
//---------------------Create tables as select-----------------
309+
/**
310+
* Execute a query to create a new materialized table view of an existing table or stream
311+
*
312+
* <p>This method is used to create a materialized table view
313+
*
314+
* <p>This method is sql injection protected with the use of queryBuilder.
315+
*
316+
* @param {string} tableName name of the table to be created
317+
* @param {string} source name of the source stream / table materialized view is based on
318+
* @param {array} selectArray an array that contains the values (strings, aggregate functions) of the columns for the materialized view table
319+
* @param {object} propertiesObj an object containing key value pairs for supported table properties e.g {topic: 'myTopic', value_format: 'json', partitions: '1'}. {} for default values
320+
* @param {object} conditionsObj an object containing key value pairs for supported query conditions e.g {WHERE: 'a is not null', GROUP_BY: 'profileID', HAVING: 'COUNT(a) > 5' }
321+
* @returns {Promise} a promise that completes once the server response is received, returning a response object
322+
*/
309323
createTableAs = (tableName, source, selectArray, propertiesObj, conditionsObj) => {
310324
let selectColStr = selectArray.reduce((result, current) => result + ', ' + current);
311325

package-lock.json

Lines changed: 12 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
"author": "",
1111
"license": "ISC",
1212
"dependencies": {
13-
"axios": "^0.27.2"
13+
"axios": "^0.27.2",
14+
"wait-for-expect": "^3.0.2"
1415
},
1516
"devDependencies": {
16-
"jest": "^28.1.0",
17-
"dotenv": "^16.0.1"
17+
"dotenv": "^16.0.1",
18+
"jest": "^28.1.0"
1819
}
1920
}

0 commit comments

Comments
 (0)