Skip to content

Commit 9f3d23f

Browse files
Jonathan LuuJonathan Luu
authored andcommitted
Merged in createTableAs
2 parents c393946 + f5e18ce commit 9f3d23f

File tree

7 files changed

+55
-168
lines changed

7 files changed

+55
-168
lines changed

__tests__/integrationtests.js

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const { default: waitForExpect } = require('wait-for-expect');
12
const ksqljs = require('../ksqljs/ksqlJS.js');
23
// Pre-requisite: start a docker container
34
/* To add to README: Prior to running test with 'npm test', please start the ksqlDB
@@ -86,14 +87,15 @@ describe('--Integration Tests--', () => {
8687
// describe('--Materialized Views Test--', () => {
8788
// beforeAll( async () => {
8889
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088'});
89-
// await client.ksql('CREATE STREAM TESTJESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic\', value_format=\'json\', partitions=1);')
90+
// const waitForExpect = require('wait-for-expect');
91+
// await client.ksql('CREATE STREAM NEWTESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic2\', value_format=\'json\', partitions=1);')
9092
// });
9193
// afterAll(async () => {
9294
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
93-
// await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;')
95+
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
9496
// })
9597
// it('creates a materialized table view of a stream', async () => {
96-
// await client.createTableAs('TABLEOFSTREAM', 'TESTJESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
98+
// await client.createTableAs('TABLEOFSTREAM', 'NEWTESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
9799
// const tables = await client.ksql('LIST TABLES;');
98100
// const allTables = tables.tables;
99101
// let tableCheck = false;
@@ -109,20 +111,17 @@ describe('--Integration Tests--', () => {
109111
// it('materialized table view updates with source stream', async () => {
110112
// let rowCheck = false;
111113
// // push query for the table
114+
// // console.log('testing materialized view')
112115
// await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
116+
// console.log('QUERY INFO',data)
113117
// if (Array.isArray(JSON.parse(data))){
114-
// console.log(JSON.parse(data))
115-
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === "Seattle" && JSON.parse(data)[3] === 130){
118+
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === 130){
116119
// rowCheck = true;
117120
// }
118-
// expect
119121
// }
120122
// })
121-
// await client.insertStream('TESTJESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}])
122-
// const matTable = await client.pull('SELECT * FROM TABLEOFSTREAM;');
123-
// // console.log('table list', matTable)
124-
125-
// expect(rowCheck).toEqual(true);
123+
// await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
124+
// await waitForExpect(() => expect(rowCheck).toEqual(true))
126125
// })
127126
// })
128127

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
version: "2"
2+
version: "3.2"
33

44
services:
55
zookeeper:

ksqljs/ksqlJS.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,20 @@ class ksqljs {
306306
}
307307

308308
//---------------------Create tables as select-----------------
309+
/**
310+
* Execute a query to create a new materialized table view of an existing table or stream
311+
*
312+
* <p>This method is used to create a materialized table view
313+
*
314+
* <p>This method is sql injection protected with the use of queryBuilder.
315+
*
316+
* @param {string} tableName name of the table to be created
317+
* @param {string} source name of the source stream / table materialized view is based on
318+
* @param {array} selectArray an array that contains the values (strings, aggregate functions) of the columns for the materialized view table
319+
* @param {object} propertiesObj an object containing key value pairs for supported table properties e.g {topic: 'myTopic', value_format: 'json', partitions: '1'}. {} for default values
320+
* @param {object} conditionsObj an object containing key value pairs for supported query conditions e.g {WHERE: 'a is not null', GROUP_BY: 'profileID', HAVING: 'COUNT(a) > 5' }
321+
* @returns {Promise} a promise that completes once the server response is received, returning a response object
322+
*/
309323
createTableAs = (tableName, source, selectArray, propertiesObj, conditionsObj) => {
310324
let selectColStr = selectArray.reduce((result, current) => result + ', ' + current);
311325

@@ -317,6 +331,7 @@ class ksqljs {
317331
partitions: 1
318332
};
319333
Object.assign(defaultProps, propertiesObj);
334+
320335
// if there's no properties Obj, assign them all default values
321336

322337
// expect user to input a conditions object of format {WHERE: condition, GROUP_BY: condition, HAVING: condition};

ksqljs/queryBuilder.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,7 @@ class queryBuilder {
3333
return param;
3434
case "object":
3535
if (Array.isArray(param)) {
36-
//check if spaces
37-
if (param[0].includes(" ")) {
38-
throw new InappropriateStringParamError("string params not wrapped in quotes should not include spaces");
39-
}
40-
else if (param[0].includes(";")) {
36+
if (param[0].includes(";")) {
4137
throw new InappropriateStringParamError("string params not wrapped in quotes should not include semi-colons");
4238
}
4339
return `${param[0].replaceAll("'", "''")}`

ksqljsTest.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,17 @@ createTableTest(); */
8787
//---------------------Test Table Create As-------------------
8888
const createTableAsTest = async () => {
8989

90-
await client.ksql('CREATE STREAM RIDERLOCATIONS (PROFILEID VARCHAR, LATITUDE DOUBLE, LONGITUDE DOUBLE) WITH (KAFKA_TOPIC=\'locations\', value_format=\'json\', partitions=1);')
91-
await client.createTableAs('currentLocation', 'riderlocations', ['profileId', 'LATEST_BY_OFFSET(latitude) AS la', 'LATEST_BY_OFFSET(longitude) AS lo'], {}, {GROUP_BY: 'profileId'})
90+
// await client.ksql('CREATE STREAM RIDERLOCATIONS (PROFILEID VARCHAR, LATITUDE DOUBLE, LONGITUDE DOUBLE) WITH (KAFKA_TOPIC=\'locations\', value_format=\'json\', partitions=1);')
91+
// await client.createTableAs('currentlocation', 'riderlocations', ['profileid','LATEST_BY_OFFSET(latitude) AS la', 'LATEST_BY_OFFSET(longitude) AS lo'], {}, {GROUP_BY: 'profileId'})
9292
// let x;
93-
// x = await client.push('SELECT * FROM CURRENTLOCATION EMIT CHANGES LIMIT 20;', async (data) => {
94-
// console.log('push query data', JSON.parse(data))
93+
await client.push('SELECT * FROM CURRENTLOCATION EMIT CHANGES LIMIT 3;', async (data) => {
94+
console.log('push query data', JSON.parse(data))
95+
})
96+
// // x = await client.ksql('LIST QUERIES;');
97+
// // console.log('querylist', x.queries)
9598
// })
99+
100+
// console.log(await client.ksql('LIST QUERIES;'));
96101
// console.log('first x', x)
97102
// await client.insertStream('RIDERLOCATIONS', [{'PROFILEID':'abc123abcddzzz', 'latitude':9999999,'longitude':999999}])
98103
// console.log('second x', x)

0 commit comments

Comments
 (0)