Skip to content

Commit 25d8df0

Browse files
committed
merged Jon's merge into dev into testsbranch
2 parents a8b705d + 58fbee1 commit 25d8df0

File tree

9 files changed

+6534
-38
lines changed

9 files changed

+6534
-38
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ dist
103103
# TernJS port file
104104
.tern-port
105105

106+
<<<<<<< HEAD
106107
# Local
107108
ksqljsTest.js
108109
package-lock.json
@@ -112,3 +113,7 @@ local_ignore/
112113
# KSQLDB docker server settings
113114
ksqldb_server_config/
114115

116+
=======
117+
# Local files
118+
./ksqljs.js
119+
>>>>>>> jonTest

__tests__/integrationtests.js

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1+
const { default: waitForExpect } = require('wait-for-expect');
12
const ksqljs = require('../ksqljs/ksqlJS.js');
2-
33
// Pre-requisite: start a docker container
44
/* To add to README: Prior to running test with 'npm test', please start the ksqlDB
5-
server using the command 'docker compose-up'. This will spin up a ksqlDB server on
6-
'http://localhost:8088'
5+
server using the command 'docker-compose up'. This will spin up a ksqlDB server on
6+
'http://localhost:8088'. If the command was run before, the created container might
7+
need to be removed first.
78
*/
89

9-
describe('--Integration Tests--', () => {
10+
// ** INTEGRATION TEST INSTRUCTIONS **
1011

12+
// Prior to running the test files, please ensure an instance of the ksqldb server is running
13+
// Steps to starting the ksqldb server can be found here: (https://ksqldb.io/quickstart.html)
14+
// Once the ksqlDB server is running, tests can be run with terminal line: (npm test)
15+
16+
describe('--Integration Tests--', () => {
1117
describe('--Method Tests--', () => {
1218
beforeAll((done) => {
1319
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
@@ -31,8 +37,8 @@ describe('--Integration Tests--', () => {
3137
}
3238
expect(streamExists).toEqual(true);
3339
})
34-
35-
it('.push properly creates a push query', async () => {
40+
41+
it('.push properly creates a push query', () => {
3642
let pushActive = false;
3743
await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
3844
if (JSON.parse(data).queryId) {
@@ -41,11 +47,10 @@ describe('--Integration Tests--', () => {
4147
expect(pushActive).toEqual(true)
4248
});
4349
})
44-
45-
it('.terminate properly terminates a push query', async () => {
46-
let terminateRes;
47-
await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 3;', async (data) => {
48-
terminateRes = await client.terminate(JSON.parse(data).queryId);
50+
51+
it('.terminate properly terminates a push query', () => {
52+
client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 3;', async (data) => {
53+
const terminateRes = await client.terminate(JSON.parse(data).queryId);
4954
expect(terminateRes.wasTerminated).toEqual(true);
5055
})
5156
// console.log("this is terminate", terminateRes);
@@ -55,7 +60,7 @@ describe('--Integration Tests--', () => {
5560
// const response = await client.terminate(queryIdDelete);
5661
// console.log(response);
5762
})
58-
63+
5964
it('.insertStream properly inserts a row into a stream', async () => {
6065
const response = await client.insertStream('TESTJESTSTREAM', [
6166
{ "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
@@ -69,7 +74,7 @@ describe('--Integration Tests--', () => {
6974
}
7075
});
7176
})
72-
77+
7378
it('.pull receives the correct data from a pull query', async () => {
7479
const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
7580
expect(pullData[1]).toEqual(["stab-rabbit", "123@mail.com", 100]);
@@ -84,12 +89,53 @@ describe('--Integration Tests--', () => {
8489
})
8590
})
8691

92+
// describe('--Materialized Views Test--', () => {
93+
// beforeAll( async () => {
94+
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088'});
95+
// const waitForExpect = require('wait-for-expect');
96+
// await client.ksql('CREATE STREAM NEWTESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic2\', value_format=\'json\', partitions=1);')
97+
// });
98+
// afterAll(async () => {
99+
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
100+
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
101+
// })
102+
// it('creates a materialized table view of a stream', async () => {
103+
// await client.createTableAs('TABLEOFSTREAM', 'NEWTESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
104+
// const tables = await client.ksql('LIST TABLES;');
105+
// const allTables = tables.tables;
106+
// let tableCheck = false;
107+
// for (let i = 0; i < allTables.length; i++){
108+
// if (allTables[i].name === 'TABLEOFSTREAM') {
109+
// tableCheck = true;
110+
// break;
111+
// }
112+
// }
113+
// expect(tableCheck).toEqual(true);
114+
115+
// })
116+
// it('materialized table view updates with source stream', async () => {
117+
// let rowCheck = false;
118+
// // push query for the table
119+
// // console.log('testing materialized view')
120+
// await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
121+
// console.log('QUERY INFO',data)
122+
// if (Array.isArray(JSON.parse(data))){
123+
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === 130){
124+
// rowCheck = true;
125+
// }
126+
// }
127+
// })
128+
// await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
129+
// await waitForExpect(() => expect(rowCheck).toEqual(true))
130+
// })
131+
// })
132+
87133
describe('--Health Tests--', () => {
88134
beforeAll((done) => {
89135
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
90136
done();
91137
});
92-
138+
93139
afterAll(async () => {
94140
await client.ksql('DROP STREAM IF EXISTS TESTSTREAM2;');
95141
})
@@ -114,7 +160,7 @@ describe('--Integration Tests--', () => {
114160
queryId: null
115161
}));
116162
})
117-
163+
118164
it('.inspectServerInfo returns the server info and status', async () => {
119165
const status = await client.inspectServerInfo();
120166
// should return something like: {
@@ -133,7 +179,7 @@ describe('--Integration Tests--', () => {
133179
})
134180
}));
135181
})
136-
182+
137183
it('.inspectServerHealth returns the server health', async () => {
138184
const status = await client.inspectServerHealth();
139185
// should return something like: {
@@ -154,7 +200,7 @@ describe('--Integration Tests--', () => {
154200
})
155201
);
156202
})
157-
203+
158204
it('.inspectClusterStatus returns the cluster status', async () => {
159205
const status = await client.inspectClusterStatus();
160206
// should return something like: {
@@ -171,13 +217,13 @@ describe('--Integration Tests--', () => {
171217
})
172218
);
173219
})
174-
220+
175221
it('.isValidProperty returns true if a server configuration property is not prohibited from setting', async () => {
176222
const status = await client.isValidProperty('test');
177223
// should return true
178224
expect(status.data).toEqual(true);
179225
})
180-
226+
181227
// it('isValidProperty returns an error if the server property is prohibited from setting', async () => {
182228
// const status = await client.isValidProperty('ksql.connect.url');
183229
// // should return something like
@@ -193,4 +239,4 @@ describe('--Integration Tests--', () => {
193239
// }));
194240
// })
195241
})
196-
})
242+
})

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
version: "2"
2+
version: "3.2"
33

44
services:
55
zookeeper:

ksqldbJS_classTest.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const ksqljs = require('./ksqljs/ksqldbJS_class');
2+
3+
const client = new ksqljs('http://localhost:8088');
4+
5+
//---------------------Test PUll Queries-------------------
6+
/* const pullTest = async () => {
7+
const result = await client.pull('SELECT * FROM riderlocations;');
8+
console.log('this is the result', result);
9+
}
10+
11+
pullTest(); */
12+
13+
//---------------------Test Push Queries-------------------
14+
/* const pushTest = async () => {
15+
metadata = await client.push('SELECT * FROM riderlocations EMIT CHANGES LIMIT 1;', (row) => console.log(row));
16+
console.log('this is the metadata returned ', metadata);
17+
};
18+
19+
pushTest();
20+
21+
//---------------------Test Termination of Queries-------------------
22+
const terminateTest = async () => {
23+
client.terminate(metadata);
24+
};
25+
26+
setTimeout(() => terminateTest(metadata), 2000); */
27+
28+
//---------------------Test List Queries-------------------
29+
/* // const listQueries = async () => {
30+
// console.log(await client.ksql('LIST QUERIES;'));
31+
// console.log(await client.ksql('LIST STREAMS;'));
32+
// console.log(await client.ksql('LIST TABLES;'));
33+
// console.log(await client.ksql('LIST TOPICS;'));
34+
// }
35+
36+
// listQueries(); */
37+
38+
//---------------------Test Stream Creation-------------------
39+
/* const createStreamTest = () => {
40+
client.createStream('AnotherTestStream', ['name VARCHAR','email varchar','age INTEGER'], 'testTopic', 'json', 1);
41+
}
42+
43+
createStreamTest(); */
44+
45+
//---------------------Test Table Creation-------------------
46+
/* const createTableTest = () => {
47+
client.createTable();
48+
} */

ksqljs/ksqlJS.js

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,13 @@ class ksqljs {
265265
}
266266

267267
/**
268-
*
269-
* @param {string} streamName
270-
* @param {string[]} selectColumns
271-
* @param {string} sourceStream
272-
* @param {object} propertiesObj
273-
* @param {string} conditions
274-
* @param {string} partitionBy
268+
*
269+
* @param {string} streamName
270+
* @param {string[]} selectColumns
271+
* @param {string} sourceStream
272+
* @param {object} propertiesObj
273+
* @param {string} conditions
274+
* @param {string} partitionBy
275275
* @returns {Promise}
276276
*/
277277
createStreamAs = (streamName, selectColumns, sourceStream, propertiesObj, conditions, partitionBy) => {
@@ -346,7 +346,73 @@ class ksqljs {
346346
});
347347
}
348348

349+
//---------------------Create tables as select-----------------
349350
/**
351+
* Execute a query to create a new materialized table view of an existing table or stream
352+
*
353+
* <p>This method is used to create a materialized table view
354+
*
355+
* <p>This method is sql injection protected with the use of queryBuilder.
356+
*
357+
* @param {string} tableName name of the table to be created
358+
* @param {string} source name of the source stream / table materialized view is based on
359+
* @param {array} selectArray an array that contains the values (strings, aggregate functions) of the columns for the materialized view table
360+
* @param {object} propertiesObj an object containing key value pairs for supported table properties e.g {topic: 'myTopic', value_format: 'json', partitions: '1'}. {} for default values
361+
* @param {object} conditionsObj an object containing key value pairs for supported query conditions e.g {WHERE: 'a is not null', GROUP_BY: 'profileID', HAVING: 'COUNT(a) > 5' }
362+
* @returns {Promise} a promise that completes once the server response is received, returning a response object
363+
*/
364+
createTableAs = (tableName, source, selectArray, propertiesObj, conditionsObj) => {
365+
let selectColStr = selectArray.reduce((result, current) => result + ', ' + current);
366+
367+
// expect user to input properties object of format {topic: ... , value_format: ..., partitions: ...}
368+
// check for properties object, look for properties, if any are missing assign it a default value, if there's no property
369+
const defaultProps = {
370+
topic: tableName,
371+
value_format: 'json',
372+
partitions: 1
373+
};
374+
Object.assign(defaultProps, propertiesObj);
375+
376+
// if there's no properties Obj, assign them all default values
377+
378+
// expect user to input a conditions object of format {WHERE: condition, GROUP_BY: condition, HAVING: condition};
379+
// generate conditions string based on object
380+
// const builder = new queryBuilder();
381+
382+
let conditionQuery = '';
383+
if (conditionsObj){
384+
const conditionsArr = ['WHERE', 'GROUP_BY', 'HAVING'];
385+
const sqlClauses = [];
386+
387+
let i = 0;
388+
while (conditionsArr.length){
389+
if (conditionsObj[conditionsArr[0]]){
390+
sqlClauses[i] = [conditionsArr[0].replace('_',' ')]; // clause values are set as arrays for query builder
391+
sqlClauses[i+1] =[' ' + conditionsObj[conditionsArr[0]] + ' '];
392+
}
393+
else {
394+
sqlClauses[i] = [''];
395+
sqlClauses[i+1] = [''];
396+
}
397+
i+=2;
398+
conditionsArr.shift()
399+
}
400+
conditionQuery = builder.build('??????', sqlClauses[0], sqlClauses[1], sqlClauses[2], sqlClauses[3], sqlClauses[4], sqlClauses[5]);
401+
}
402+
403+
404+
// reformat for builder
405+
tableName = [tableName];
406+
selectColStr = [selectColStr];
407+
source = [source];
408+
conditionQuery = [conditionQuery]
409+
410+
411+
const query = builder.build(`CREATE TABLE ? WITH (kafka_topic=?, value_format=?, partitions=?) AS SELECT ? FROM ? ?EMIT CHANGES;`, tableName, defaultProps.topic, defaultProps.value_format, defaultProps.partitions, selectColStr, source, conditionQuery)
412+
return axios.post(this.ksqldbURL + '/ksql', { ksql: query })
413+
.catch(error => console.log(error));
414+
}
415+
/**
350416
* Inserts rows of data into a stream.
351417
*
352418
* <p>This method may be used to insert new rows of data into a stream.
@@ -357,6 +423,7 @@ class ksqljs {
357423
* @param {object} rows an array that contains data that is being inserted into the stream.
358424
* @return {Promise} this method returns a promise that resolves into an array describing the status of the row inserted.
359425
*/
426+
//---------------------Insert Rows Into Existing Streams-----------------
360427
insertStream = (stream, rows) => {
361428
return new Promise((resolve, reject) => {
362429
const msgOutput = [];
@@ -529,10 +596,13 @@ class ksqljs {
529596
*
530597
* <p>This method may be used to terminate a ksqlDB cluster. First, shut down all the servers except one.
531598
*
599+
* @param {string[]} topicsToDelete an array of topic names or regular expressions for topic names to delete.
532600
* @return {Promise} this method returns a promise that returns a response object.
533601
*/
534-
terminateCluster() {
535-
return axios.post(this.ksqldbURL + `/ksql/terminate`, {}, {
602+
terminateCluster(topicsToDelete = []) {
603+
return axios.post(this.ksqldbURL + `/ksql/terminate`, {
604+
"deleteTopicList": topicsToDelete
605+
}, {
536606
headers: {
537607
// 'application/json' is the modern content-type for JSON, but some
538608
// older servers may use 'text/json'.

ksqljs/queryBuilder.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,7 @@ class queryBuilder {
3333
return param;
3434
case "object":
3535
if (Array.isArray(param)) {
36-
//check if spaces
37-
if (param[0].includes(" ")) {
38-
throw new InappropriateStringParamError("string params not wrapped in quotes should not include spaces");
39-
}
40-
else if (param[0].includes(";")) {
36+
if (param[0].includes(";")) {
4137
throw new InappropriateStringParamError("string params not wrapped in quotes should not include semi-colons");
4238
}
4339
return `${param[0].replaceAll("'", "''")}`

0 commit comments

Comments
 (0)