Skip to content

Commit f516154

Browse files
Created invalidArgumentTypes custom error as well as a validateInputs function to verify client is passign in correct argument types
1 parent afe3066 commit f516154

File tree

5 files changed

+91
-26
lines changed

5 files changed

+91
-26
lines changed

ksqljs/customErrors.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,22 @@ class InappropriateStringParamError extends QueryBuilderError {
4747
}
4848
}
4949

50+
class invalidArgumentTypes extends Error {
51+
constructor(message) {
52+
super(message);
53+
54+
this.name = this.constructor.name;
55+
// necessary?
56+
Error.captureStackTrace(this, this.constructor);
57+
58+
}
59+
}
60+
5061
module.exports = {
5162
ksqlDBError,
5263
QueryBuilderError,
5364
EmptyQueryError,
5465
NumParamsError,
55-
InappropriateStringParamError
66+
InappropriateStringParamError,
67+
invalidArgumentTypes
5668
};

ksqljs/ksqlJS.js

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const axios = require("axios");
22
const https = require('node:https');
33
const http2 = require("http2");
44
const { ksqlDBError } = require("./customErrors.js");
5+
const validateInputs = require('./validateInputs.js');
56
const queryBuilder = require('./queryBuilder.js');
67
const builder = new queryBuilder();
78

@@ -34,10 +35,14 @@ class ksqljs {
3435
* Example: [{object that contains the metadata}, [data], [data], ...}]
3536
*/
3637
pull = (query) => {
38+
validateInputs([query, 'string', 'query']);
39+
40+
const validatedQuery = builder.build(query);
41+
3742
return axios
3843
.post(this.ksqldbURL + "/query-stream",
3944
{
40-
sql: query,
45+
sql: validatedQuery,
4146
},
4247
{
4348
headers:
@@ -70,6 +75,9 @@ class ksqljs {
7075
* result if successful.
7176
*/
7277
push(query, cb) {
78+
validateInputs([query, 'string', 'query', true], [cb, 'function', 'cb', true]);
79+
const validatedQuery = builder.build(query);
80+
7381
return new Promise((resolve, reject) => {
7482
let sentQueryId = false;
7583
const session = http2.connect(
@@ -94,7 +102,7 @@ class ksqljs {
94102
);
95103

96104
const reqBody = {
97-
sql: query,
105+
sql: validatedQuery,
98106
Accept: "application/json, application/vnd.ksqlapi.delimited.v1",
99107
};
100108

@@ -133,6 +141,8 @@ class ksqljs {
133141
* if the termination was successful.
134142
*/
135143
terminate(queryId) {
144+
validateInputs([queryId, 'string', 'queryId']);
145+
136146
return axios.post(this.ksqldbURL + '/ksql',
137147
{
138148
ksql: `TERMINATE ${queryId};`
@@ -164,9 +174,13 @@ class ksqljs {
164174
* @return {Promise} a promise that completes once the server response is received, and returns the requested data.
165175
*/
166176
ksql(query) {
177+
validateInputs([query, 'string', 'query']);
178+
179+
const validatedQuery = builder.build(query);
180+
167181
return axios.post(this.ksqldbURL + '/ksql',
168182
{
169-
ksql: query
183+
ksql: validatedQuery
170184
},
171185
{
172186
headers:
@@ -200,9 +214,8 @@ class ksqljs {
200214
* @return {Promise} a promise that completes once the server response is received, and returns a response object.
201215
*/
202216
createStream(name, columnsType, topic, value_format = 'json', partitions = 1, key) {
203-
if (typeof name !== 'string' || typeof columnsType !== 'object' || typeof topic !== 'string' || typeof partitions !== 'number') {
204-
return console.log("invalid input(s)")
205-
}
217+
validateInputs([name, 'string', 'name', true], [columnsType, 'object', 'columnsType', true], [topic, 'string', 'topic'], [partitions, 'number', 'partitions']);
218+
206219
const columnsTypeString = columnsType.reduce((result, currentType) => result + ', ' + currentType);
207220
const query = `CREATE STREAM ${name} (${columnsTypeString}) WITH (kafka_topic='${topic}', value_format='${value_format}', partitions=${partitions});`;
208221

@@ -216,7 +229,6 @@ class ksqljs {
216229
{},
217230
httpsAgent: this.httpsAgentAxios ? this.httpsAgentAxios : null,
218231
})
219-
.then(res => res)
220232
.catch(error => {
221233
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
222234
});
@@ -233,6 +245,8 @@ class ksqljs {
233245
* @returns {Promise} - a promise that completes once the server response is received, and returns a query ID
234246
*/
235247
createStreamAs = (streamName, selectColumns, sourceStream, propertiesObj, conditions, partitionBy) => {
248+
validateInputs([streamName, 'string', 'streamName', true], [selectColumns, 'array', 'selectColumns', true], [sourceStream, 'string', 'sourceStream', true], [propertiesObj, 'object', 'propertiesObj'], [conditions, 'string', 'conditions'], [partitionBy, 'string', 'partitionBy']);
249+
236250
const propertiesArgs = [];
237251
const selectColStr = selectColumns.reduce((result, current) => result + ', ' + current);
238252
// begin with first consistent portion of query
@@ -283,6 +297,8 @@ class ksqljs {
283297
* @return {Promise} a promise that completes once the server response is received, and returns a response object.
284298
*/
285299
createTable = (name, columnsType, topic, value_format = 'json', partitions) => {
300+
validateInputs([name, 'string', 'name', true], [columnsType, 'array', 'columnsType', true], [topic, 'string', 'topic', true], [value_format, 'string', 'value_format', true], [partitions, 'number', 'partitions']);
301+
286302
const columnsTypeString = columnsType.reduce((result, currentType) => result + ', ' + currentType);
287303
const query = `CREATE TABLE ${name} (${columnsTypeString}) WITH (kafka_topic='${topic}', value_format='${value_format}', partitions=${partitions});`
288304

@@ -321,6 +337,8 @@ class ksqljs {
321337
* @returns {Promise} a promise that completes once the server response is received, returning a response object
322338
*/
323339
createTableAs = (tableName, source, selectArray, propertiesObj, conditionsObj) => {
340+
validateInputs([tableName, 'string', 'tableName', true], [source, 'string', 'source', true], [selectArray, 'array', 'selectArray', true], [propertiesObj, 'object', 'propertiesObj'], [conditionsObj, 'object', 'conditionsObj']);
341+
324342
let selectColStr = selectArray.reduce((result, current) => result + ', ' + current);
325343

326344
// expect user to input properties object of format {topic: ... , value_format: ..., partitions: ...}
@@ -387,6 +405,8 @@ class ksqljs {
387405
*/
388406
//---------------------Insert Rows Into Existing Streams-----------------
389407
insertStream = (stream, rows) => {
408+
validateInputs([stream, 'string', 'stream', true], [rows, 'array', 'rows', true]);
409+
390410
return new Promise((resolve, reject) => {
391411
const msgOutput = [];
392412

@@ -453,13 +473,8 @@ class ksqljs {
453473
* the end of the array that includes the time that the data was inserted into the ksqldb.
454474
*/
455475
pullFromTo = async (streamName, timezone = 'Greenwich', from = [undefined, '00', '00', '00'], to = ['2200-03-14', '00', '00', '00']) => {
456-
if (!streamName || typeof timezone !== 'string' || !from
457-
|| typeof from[0] !== 'string' || typeof from[1] !== 'string' || typeof from[2] !== 'string' || typeof from[3] !== 'string'
458-
|| typeof to[0] !== 'string' || typeof to[1] !== 'string' || typeof to[2] !== 'string' || typeof to[3] !== 'string'
459-
|| from[0].length !== 10 || to[0].length !== 10 || from[1].length !== 2 || to[1].length !== 2 || from[2].length !== 2 || to[2].length !== 2 || from[3].length !== 2 || to[3].length !== 2
460-
) {
461-
return new Error('invalid inputs');
462-
}
476+
validateInputs([streamName, 'string', 'streamName', true], [timezone, 'string', 'timezone', true], [from, 'array', 'from', true], [to, 'array', 'to', true]);
477+
463478
const userFrom = `${from[0]}T${from[1]}:${from[2]}:${from[3]}`;
464479
const userTo = `${to[0]}T${to[1]}:${to[2]}:${to[3]}`;
465480
const userFromUnix = new Date(userFrom).getTime();
@@ -491,6 +506,8 @@ class ksqljs {
491506
* message (string): Detailed message regarding the status of the execution statement.
492507
*/
493508
inspectQueryStatus(commandId) {
509+
validateInputs([commandId, 'string', 'commandId', true]);
510+
494511
return axios.get(this.ksqldbURL + `/status/${commandId}`)
495512
.then(response => response)
496513
.catch(error => {
@@ -561,6 +578,8 @@ class ksqljs {
561578
* @return {Promise} this method returns a promise that returns a response object.
562579
*/
563580
terminateCluster(topicsToDelete = []) {
581+
validateInputs([topicsToDelete, 'array', 'topicsToDelete', true]);
582+
564583
return axios.post(this.ksqldbURL + `/ksql/terminate`, {
565584
"deleteTopicList": topicsToDelete
566585
}, {
@@ -591,9 +610,12 @@ class ksqljs {
591610
* "message": "One or more properties overrides set locally are prohibited by the KSQL server (use UNSET to reset their default value): [ksql.service.id]"
592611
* }
593612
*
613+
* @param {string} propertyName - the name of the property to validate
594614
* @return {Promise} this method returns a promise that resolves to a boolean true if the property is allowed to be changed.
595615
*/
596616
isValidProperty(propertyName) {
617+
validateInputs([propertyName, 'string', 'propertyName', true]);
618+
597619
return axios.get(this.ksqldbURL + `/is_valid_property/${propertyName}`)
598620
.then(response => response)
599621
.catch(error => {

ksqljs/queryBuilder.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ class queryBuilder {
55
}
66

77
build = (query, ...params) => {
8-
// consider building custom errors
8+
// check for empty query
99
if (this._checkEmptyQuery(query)) throw new EmptyQueryError();
10+
1011
let output = this._bind(query, ...params);
12+
1113
return output;
1214
}
1315
_bind = (query, ...params) => {
@@ -33,10 +35,10 @@ class queryBuilder {
3335
return param;
3436
case "object":
3537
if (Array.isArray(param)) {
36-
if (param[0].includes(";")) {
38+
if (param[0]?.includes(";")) {
3739
throw new InappropriateStringParamError("string params not wrapped in quotes should not include semi-colons");
3840
}
39-
return `${param[0].replaceAll("'", "''")}`
41+
return `${param[0]?.replaceAll("'", "''")}`
4042
}
4143
throw new QueryBuilderError("object should not be passed in as query argument");
4244
case "function":

ksqljs/validateInputs.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
const { invalidArgumentTypes } = require("./customErrors");
2+
3+
const validateInputs = (...args) => {
4+
const invalidArguments = [];
5+
6+
// iterate through args to verify allowed types are provided
7+
for (let i = 0; i < args.length; i++) {
8+
const [currentArg, intendedType, actualType, required] = [args[i][0], args[i][1], typeof args[i][0], args[i][3] || false];
9+
10+
if (intendedType === 'array') {
11+
if (!Array.isArray(currentArg)) invalidArguments.push(args[i]);
12+
}
13+
else if (required && actualType !== intendedType) invalidArguments.push(args[i]);
14+
else if (!required && currentArg !== undefined && currentArg !== null && actualType !== intendedType) invalidArguments.push(args[i]);
15+
}
16+
17+
// craft error message if error needs to be thrown
18+
if (invalidArguments.length) {
19+
let errorMessage = '';
20+
21+
for (let i = 0; i < invalidArguments.length; i++) {
22+
errorMessage += `argument "${invalidArguments[i][2]}" must be of type ${invalidArguments[i][1]}`;
23+
if (i < invalidArguments.length - 1) errorMessage += ', '
24+
}
25+
throw new invalidArgumentTypes(errorMessage);
26+
}
27+
}
28+
29+
module.exports = validateInputs;

ksqljsTest.js

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ const client = new ksqljs({ ksqldbURL: 'http://localhost:8088' })
1212
let metadata;
1313

1414
//---------------------Test PUll Queries-------------------
15-
const pullTest = async () => {
16-
const result = await client.pull('SELECT * FROM riderlocations;');
15+
/* const pullTest = async () => {
16+
const result = await client.pull('SELECT * FROM whatever;');
1717
console.log('this is the result', result);
1818
}
1919
20-
pullTest();
20+
pullTest(); */
2121

2222
//---------------------Test Push Queries-------------------
2323
/* const pushTest = async () => {
2424
try {
25-
metadata = await client.push('SELECT * FROM riderlocations EMIT CHANGES LIMIT 1;', (row) => console.log(row));
26-
console.log('this is the metadata returned ', metadata);
25+
queryId = await client.push('SELECT * FROM whatever EMIT CHANGES LIMIT 1;', (row) => console.log(row));
26+
console.log('this is the queryId returned ', queryId);
2727
} catch (error) {
2828
console.log(error);
2929
}
@@ -33,10 +33,10 @@ pushTest(); */
3333

3434
//---------------------Test Termination of Queries-------------------
3535
/* const terminateTest = async () => {
36-
client.terminate(metadata);
36+
client.terminate(queryId);
3737
};
3838
39-
setTimeout(() => terminateTest(metadata), 2000); */
39+
setTimeout(() => terminateTest(queryId), 2000); */
4040

4141
//---------------------Test List Queries-------------------
4242
/* const listQueries = async () => {
@@ -67,7 +67,7 @@ createTableTest(); */
6767
await client.createTableAs('currentlocation', 'riderlocations', ['profileid','LATEST_BY_OFFSET(latitude) AS la', 'LATEST_BY_OFFSET(longitude) AS lo'], {}, {GROUP_BY: 'profileId'})
6868
}
6969
70-
createTableAsTest() */
70+
createTableAsTest(); */
7171

7272
//---------------------Test Insert Stream-------------------
7373
/* const insertStreamTest = async () => {

0 commit comments

Comments
 (0)