Skip to content

Commit 58fbee1

Browse files
authored
Merge pull request #25 from oslabs-beta/jonTest
createTableAs Method
2 parents e8ff1fb + 7e6ffe2 commit 58fbee1

File tree

9 files changed

+6504
-11
lines changed

9 files changed

+6504
-11
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ dist
103103
# TernJS port file
104104
.tern-port
105105

106+
<<<<<<< HEAD
106107
# Local
107108
ksqljsTest.js
108109
package-lock.json
@@ -112,3 +113,7 @@ local_ignore/
112113
# KSQLDB docker server settings
113114
ksqldb_server_config/
114115

116+
=======
117+
# Local files
118+
./ksqljs.js
119+
>>>>>>> jonTest

__tests__/integrationtests.js

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1+
const { default: waitForExpect } = require('wait-for-expect');
12
const ksqljs = require('../ksqljs/ksqlJS.js');
2-
33
// Pre-requisite: start a docker container
44
/* To add to README: Prior to running test with 'npm test', please start the ksqlDB
55
server using the command 'docker-compose up'. This will spin up a ksqlDB server on
66
'http://localhost:8088'. If the command was run before, the created container might
77
need to be removed first.
88
*/
99

10-
describe('--Integration Tests--', () => {
10+
// ** INTEGRATION TEST INSTRUCTIONS **
11+
12+
// Prior to running the test files, please ensure an instance of the ksqldb server is running
13+
// Steps to starting the ksqldb server can be found here: (https://ksqldb.io/quickstart.html)
14+
// Once the ksqlDB server is running, tests can be run with terminal line: (npm test)
1115

16+
describe('--Integration Tests--', () => {
1217
describe('--Method Tests--', () => {
1318
beforeAll((done) => {
1419
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
@@ -80,6 +85,47 @@ describe('--Integration Tests--', () => {
8085
})
8186
})
8287

88+
// describe('--Materialized Views Test--', () => {
89+
// beforeAll( async () => {
90+
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088'});
91+
// const waitForExpect = require('wait-for-expect');
92+
// await client.ksql('CREATE STREAM NEWTESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic2\', value_format=\'json\', partitions=1);')
93+
// });
94+
// afterAll(async () => {
95+
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
96+
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
97+
// })
98+
// it('creates a materialized table view of a stream', async () => {
99+
// await client.createTableAs('TABLEOFSTREAM', 'NEWTESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
100+
// const tables = await client.ksql('LIST TABLES;');
101+
// const allTables = tables.tables;
102+
// let tableCheck = false;
103+
// for (let i = 0; i < allTables.length; i++){
104+
// if (allTables[i].name === 'TABLEOFSTREAM') {
105+
// tableCheck = true;
106+
// break;
107+
// }
108+
// }
109+
// expect(tableCheck).toEqual(true);
110+
111+
// })
112+
// it('materialized table view updates with source stream', async () => {
113+
// let rowCheck = false;
114+
// // push query for the table
115+
// // console.log('testing materialized view')
116+
// await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
117+
// console.log('QUERY INFO',data)
118+
// if (Array.isArray(JSON.parse(data))){
119+
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === 130){
120+
// rowCheck = true;
121+
// }
122+
// }
123+
// })
124+
// await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
125+
// await waitForExpect(() => expect(rowCheck).toEqual(true))
126+
// })
127+
// })
128+
83129
describe('--Health Tests--', () => {
84130
beforeAll((done) => {
85131
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
version: "2"
2+
version: "3.2"
33

44
services:
55
zookeeper:

ksqldbJS_classTest.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const ksqljs = require('./ksqljs/ksqldbJS_class');
2+
3+
const client = new ksqljs('http://localhost:8088');
4+
5+
//---------------------Test PUll Queries-------------------
6+
/* const pullTest = async () => {
7+
const result = await client.pull('SELECT * FROM riderlocations;');
8+
console.log('this is the result', result);
9+
}
10+
11+
pullTest(); */
12+
13+
//---------------------Test Push Queries-------------------
14+
/* const pushTest = async () => {
15+
metadata = await client.push('SELECT * FROM riderlocations EMIT CHANGES LIMIT 1;', (row) => console.log(row));
16+
console.log('this is the metadata returned ', metadata);
17+
};
18+
19+
pushTest();
20+
21+
//---------------------Test Termination of Queries-------------------
22+
const terminateTest = async () => {
23+
client.terminate(metadata);
24+
};
25+
26+
setTimeout(() => terminateTest(metadata), 2000); */
27+
28+
//---------------------Test List Queries-------------------
29+
/* // const listQueries = async () => {
30+
// console.log(await client.ksql('LIST QUERIES;'));
31+
// console.log(await client.ksql('LIST STREAMS;'));
32+
// console.log(await client.ksql('LIST TABLES;'));
33+
// console.log(await client.ksql('LIST TOPICS;'));
34+
// }
35+
36+
// listQueries(); */
37+
38+
//---------------------Test Stream Creation-------------------
39+
/* const createStreamTest = () => {
40+
client.createStream('AnotherTestStream', ['name VARCHAR','email varchar','age INTEGER'], 'testTopic', 'json', 1);
41+
}
42+
43+
createStreamTest(); */
44+
45+
//---------------------Test Table Creation-------------------
46+
/* const createTableTest = () => {
47+
client.createTable();
48+
} */

ksqljs/ksqlJS.js

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,73 @@ class ksqljs {
305305
});
306306
}
307307

308+
//---------------------Create tables as select-----------------
308309
/**
310+
* Execute a query to create a new materialized table view of an existing table or stream
311+
*
312+
* <p>This method is used to create a materialized table view
313+
*
314+
* <p>This method is sql injection protected with the use of queryBuilder.
315+
*
316+
* @param {string} tableName name of the table to be created
317+
* @param {string} source name of the source stream / table materialized view is based on
318+
* @param {array} selectArray an array that contains the values (strings, aggregate functions) of the columns for the materialized view table
319+
* @param {object} propertiesObj an object containing key value pairs for supported table properties e.g {topic: 'myTopic', value_format: 'json', partitions: '1'}. {} for default values
320+
* @param {object} conditionsObj an object containing key value pairs for supported query conditions e.g {WHERE: 'a is not null', GROUP_BY: 'profileID', HAVING: 'COUNT(a) > 5' }
321+
* @returns {Promise} a promise that completes once the server response is received, returning a response object
322+
*/
323+
createTableAs = (tableName, source, selectArray, propertiesObj, conditionsObj) => {
324+
let selectColStr = selectArray.reduce((result, current) => result + ', ' + current);
325+
326+
// expect user to input properties object of format {topic: ... , value_format: ..., partitions: ...}
327+
// check for properties object, look for properties, if any are missing assign it a default value, if there's no property
328+
const defaultProps = {
329+
topic: tableName,
330+
value_format: 'json',
331+
partitions: 1
332+
};
333+
Object.assign(defaultProps, propertiesObj);
334+
335+
// if there's no properties Obj, assign them all default values
336+
337+
// expect user to input a conditions object of format {WHERE: condition, GROUP_BY: condition, HAVING: condition};
338+
// generate conditions string based on object
339+
// const builder = new queryBuilder();
340+
341+
let conditionQuery = '';
342+
if (conditionsObj){
343+
const conditionsArr = ['WHERE', 'GROUP_BY', 'HAVING'];
344+
const sqlClauses = [];
345+
346+
let i = 0;
347+
while (conditionsArr.length){
348+
if (conditionsObj[conditionsArr[0]]){
349+
sqlClauses[i] = [conditionsArr[0].replace('_',' ')]; // clause values are set as arrays for query builder
350+
sqlClauses[i+1] =[' ' + conditionsObj[conditionsArr[0]] + ' '];
351+
}
352+
else {
353+
sqlClauses[i] = [''];
354+
sqlClauses[i+1] = [''];
355+
}
356+
i+=2;
357+
conditionsArr.shift()
358+
}
359+
conditionQuery = builder.build('??????', sqlClauses[0], sqlClauses[1], sqlClauses[2], sqlClauses[3], sqlClauses[4], sqlClauses[5]);
360+
}
361+
362+
363+
// reformat for builder
364+
tableName = [tableName];
365+
selectColStr = [selectColStr];
366+
source = [source];
367+
conditionQuery = [conditionQuery]
368+
369+
370+
const query = builder.build(`CREATE TABLE ? WITH (kafka_topic=?, value_format=?, partitions=?) AS SELECT ? FROM ? ?EMIT CHANGES;`, tableName, defaultProps.topic, defaultProps.value_format, defaultProps.partitions, selectColStr, source, conditionQuery)
371+
return axios.post(this.ksqldbURL + '/ksql', { ksql: query })
372+
.catch(error => console.log(error));
373+
}
374+
/**
309375
* Inserts rows of data into a stream.
310376
*
311377
* <p>This method may be used to insert new rows of data into a stream.
@@ -316,6 +382,7 @@ class ksqljs {
316382
* @param {object} rows an array that contains data that is being inserted into the stream.
317383
* @return {Promise} this method returns a promise that resolves into an array describing the status of the row inserted.
318384
*/
385+
//---------------------Insert Rows Into Existing Streams-----------------
319386
insertStream = (stream, rows) => {
320387
return new Promise((resolve, reject) => {
321388
const msgOutput = [];

ksqljs/queryBuilder.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,7 @@ class queryBuilder {
3333
return param;
3434
case "object":
3535
if (Array.isArray(param)) {
36-
//check if spaces
37-
if (param[0].includes(" ")) {
38-
throw new InappropriateStringParamError("string params not wrapped in quotes should not include spaces");
39-
}
40-
else if (param[0].includes(";")) {
36+
if (param[0].includes(";")) {
4137
throw new InappropriateStringParamError("string params not wrapped in quotes should not include semi-colons");
4238
}
4339
return `${param[0].replaceAll("'", "''")}`

0 commit comments

Comments
 (0)