Skip to content

Commit 2f83c26

Browse files
Resolving merge conflicts
2 parents 16a2285 + ca609e8 commit 2f83c26

12 files changed

+96
-6230
lines changed

.gitignore

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,4 @@ package-lock.json
109109
local_ignore/
110110
.gitignore
111111

112-
# KSQLDB docker server settings
113-
ksqldb_server_config/
114112

.npmignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#Test Files
2+
/__tests__/
3+
.yml

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ npm install ksqldb-js
4545
Create a client in the application file:
4646

4747
```
48-
const ksqljs = require('ksqldb-js');
49-
const client = new ksqljs({ksqldbURL: '<url to ksqlDB server>'})
48+
const ksqldb = require('ksqldb-js');
49+
const client = new ksqldb({ksqldbURL: '<url to ksqlDB server>'})
5050
```
5151

5252
To run tests initiate Docker containers included in yaml file:

__tests__/integrationtests.js

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
const { default: waitForExpect } = require('wait-for-expect');
2-
const ksqljs = require('../ksqljs/ksqlJS.js');
2+
const ksqldb = require('../ksqldb/ksqldb');
33
// Pre-requisite: start a docker container
44
/* To add to README: Prior to running test with 'npm test', please start the ksqlDB
55
server using the command 'docker-compose up'. This will spin up a ksqlDB server on
@@ -16,7 +16,7 @@ need to be removed first.
1616
describe('--Integration Tests--', () => {
1717
describe('--Method Tests--', () => {
1818
beforeAll((done) => {
19-
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
19+
client = new ksqldb({ ksqldbURL: 'http://localhost:8088' });
2020
done();
2121
});
2222

@@ -25,6 +25,7 @@ describe('--Integration Tests--', () => {
2525
})
2626

2727
it('.createStream properly creates a stream', async () => {
28+
await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;')
2829
const result = await client.createStream('TESTJESTSTREAM', ['name VARCHAR', 'email varchar', 'age INTEGER'], 'testJestTopic', 'json', 1);
2930
const streams = await client.ksql('LIST STREAMS;');
3031
const allStreams = streams.streams;
@@ -56,9 +57,7 @@ describe('--Integration Tests--', () => {
5657
})
5758

5859
it('.insertStream properly inserts a row into a stream', async () => {
59-
const response = await client.insertStream('TESTJESTSTREAM', [
60-
{ "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
61-
]);
60+
6261
const data = [];
6362
await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES;', async (chunk) => {
6463
data.push(JSON.parse(chunk));
@@ -67,6 +66,9 @@ describe('--Integration Tests--', () => {
6766
expect(data[1]).toEqual(["stab-rabbit", "123@mail.com", 100])
6867
}
6968
});
69+
const response = await client.insertStream('TESTJESTSTREAM', [
70+
{ "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
71+
]);
7072
})
7173

7274
it('.pull receives the correct data from a pull query', async () => {
@@ -98,24 +100,24 @@ describe('--Integration Tests--', () => {
98100
beforeAll(async () => {
99101
// await client.ksql('DROP STREAM IF EXISTS testAsStream;')
100102
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
101-
103+
102104
// await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
103105
testAsQueryId = await client.createStreamAs('testAsStream', ['name', 'age'], 'newTestStream', {
104106
kafka_topic: 'newTestTopic',
105107
value_format: 'json',
106108
partitions: 1
107-
}, 'age > 50');
109+
}, 'age > 50');
108110
})
109-
111+
110112
afterAll(async () => {
111113
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
112114
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
113115
})
114-
116+
115117
it('creates materialized stream', async () => {
116118
let streamFound = false;
117-
const {streams} = await client.ksql('LIST STREAMS;');
118-
119+
const { streams } = await client.ksql('LIST STREAMS;');
120+
119121
for (let i = 0; i < streams.length; i++) {
120122
if (streams[i].name, streams[i].name === 'TESTASSTREAM') {
121123
streamFound = true;
@@ -128,8 +130,8 @@ describe('--Integration Tests--', () => {
128130

129131

130132
describe('--Materialized Tables Tests--', () => {
131-
beforeAll( async () => {
132-
await client.createTableAs('testAsTable', 'newTestStream', ['name', 'LATEST_BY_OFFSET(age) AS recentAge'], {topic:'newTestTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
133+
beforeAll(async () => {
134+
await client.createTableAs('testAsTable', 'newTestStream', ['name', 'LATEST_BY_OFFSET(age) AS recentAge'], { topic: 'newTestTopic' }, { WHERE: 'age >= 21', GROUP_BY: 'name' });
133135
});
134136
afterAll(async () => {
135137
await client.ksql('DROP TABLE IF EXISTS testAsTable;');
@@ -138,37 +140,24 @@ describe('--Integration Tests--', () => {
138140
})
139141

140142
it('creates a materialized table view of a stream', async () => {
141-
const {tables} = await client.ksql('LIST TABLES;');
143+
const { tables } = await client.ksql('LIST TABLES;');
142144
let tableFound = false;
143-
for (let i = 0; i < tables.length; i++){
145+
for (let i = 0; i < tables.length; i++) {
144146
if (tables[i].name === 'TESTASTABLE') {
145147
tableFound = true;
146148
break;
147149
}
148150
}
149151
expect(tableFound).toEqual(true);
150152
})
151-
152-
// it('receives updates from source stream', async () => {
153-
// let rowReceived = false;
154-
// await client.push('SELECT * FROM testAsTable EMIT CHANGES LIMIT 1;', async (data) => {
155-
// if (Array.isArray(JSON.parse(data))){
156-
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25){
157-
// rowReceived = true;
158-
// }
159-
// }
160-
// })
161-
// await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25}]);
162-
// await waitForExpect(() => expect(rowReceived).toEqual(true))
163-
// })
164153
})
165154
})
166155
})
167156

168157

169158
describe('--Health Tests--', () => {
170159
beforeAll((done) => {
171-
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
160+
client = new ksqldb({ ksqldbURL: 'http://localhost:8088' });
172161
done();
173162
});
174163

__tests__/queryBuilderTests.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
const queryBuilder = require('../ksqljs/queryBuilder.js');
2-
const { QueryBuilderError, EmptyQueryError, NumParamsError, InappropriateStringParamError } = require('../ksqljs/customErrors.js');
1+
const queryBuilder = require('../ksqldb/queryBuilder.js');
2+
const { QueryBuilderError, EmptyQueryError, NumParamsError, InappropriateStringParamError } = require('../ksqldb/customErrors.js');
33

44

55
describe('Unit tests for query builder class', () => {

klip_64_ksqldb_js_client.md

Lines changed: 0 additions & 63 deletions
This file was deleted.
File renamed without changes.

ksqljs/ksqldb.js renamed to ksqldb/ksqldb.js

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,21 @@ const validateInputs = require('./validateInputs.js');
66
const queryBuilder = require('./queryBuilder.js');
77
const builder = new queryBuilder();
88

9-
class ksqljs {
9+
class ksqldb {
10+
/**
11+
* Constructor
12+
* @param {object} config
13+
*
14+
* Config object can have these properties
15+
*
16+
* ksqldbURL: Connection URL or address
17+
*
18+
* API: Username or API key for basic authentication
19+
*
20+
* secret: Password or secret for basic authentication
21+
*
22+
* httpsAgent: httpsAgent for setting TLS properties
23+
*/
1024
constructor(config) {
1125
this.ksqldbURL = config.ksqldbURL;
1226
this.API = config.API ? config.API : null;
@@ -274,7 +288,16 @@ class ksqljs {
274288
// utilize query with variables to build actual query
275289
const query = builder.build(builderQuery, [streamName], ...propertiesArgs, [sourceStream]);
276290

277-
return axios.post(this.ksqldbURL + '/ksql', { ksql: query })
291+
return axios.post(this.ksqldbURL + '/ksql', { ksql: query }, {
292+
headers:
293+
this.API && this.secret ?
294+
{
295+
"Authorization": `Basic ${Buffer.from(this.API + ":" + this.secret, 'utf8').toString('base64')}`,
296+
}
297+
:
298+
{},
299+
httpsAgent: this.httpsAgentAxios ? this.httpsAgentAxios : null,
300+
})
278301
.then(res => res.data[0].commandStatus.queryId)
279302
.catch(error => {
280303
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
@@ -349,34 +372,34 @@ class ksqljs {
349372
partitions: 1
350373
};
351374
Object.assign(defaultProps, propertiesObj);
352-
375+
353376
// if there's no properties Obj, assign them all default values
354377

355378
// expect user to input a conditions object of format {WHERE: condition, GROUP_BY: condition, HAVING: condition};
356379
// generate conditions string based on object
357380
// const builder = new queryBuilder();
358381

359382
let conditionQuery = '';
360-
if (conditionsObj){
383+
if (conditionsObj) {
361384
const conditionsArr = ['WHERE', 'GROUP_BY', 'HAVING'];
362385
const sqlClauses = [];
363-
386+
364387
let i = 0;
365-
while (conditionsArr.length){
366-
if (conditionsObj[conditionsArr[0]]){
367-
sqlClauses[i] = [conditionsArr[0].replace('_',' ')]; // clause values are set as arrays for query builder
368-
sqlClauses[i+1] =[' ' + conditionsObj[conditionsArr[0]] + ' '];
388+
while (conditionsArr.length) {
389+
if (conditionsObj[conditionsArr[0]]) {
390+
sqlClauses[i] = [conditionsArr[0].replace('_', ' ')]; // clause values are set as arrays for query builder
391+
sqlClauses[i + 1] = [' ' + conditionsObj[conditionsArr[0]] + ' '];
369392
}
370393
else {
371394
sqlClauses[i] = [''];
372-
sqlClauses[i+1] = [''];
395+
sqlClauses[i + 1] = [''];
373396
}
374-
i+=2;
397+
i += 2;
375398
conditionsArr.shift()
376399
}
377400
conditionQuery = builder.build('??????', sqlClauses[0], sqlClauses[1], sqlClauses[2], sqlClauses[3], sqlClauses[4], sqlClauses[5]);
378401
}
379-
402+
380403

381404
// reformat for builder
382405
tableName = [tableName];
@@ -386,23 +409,30 @@ class ksqljs {
386409

387410

388411
const query = builder.build(`CREATE TABLE ? WITH (kafka_topic=?, value_format=?, partitions=?) AS SELECT ? FROM ? ?EMIT CHANGES;`, tableName, defaultProps.topic, defaultProps.value_format, defaultProps.partitions, selectColStr, source, conditionQuery)
389-
return axios.post(this.ksqldbURL + '/ksql', { ksql: query })
390-
.catch(error =>{
391-
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
392-
});
412+
return axios.post(this.ksqldbURL + '/ksql', { ksql: query }, {
413+
headers:
414+
this.API && this.secret ?
415+
{
416+
"Authorization": `Basic ${Buffer.from(this.API + ":" + this.secret, 'utf8').toString('base64')}`,
417+
}
418+
:
419+
{},
420+
httpsAgent: this.httpsAgentAxios ? this.httpsAgentAxios : null,
421+
})
422+
.catch(error => console.log(error));
393423
}
394424

395-
/**
396-
* Inserts rows of data into a stream.
397-
*
398-
* <p>This method may be used to insert new rows of data into a stream.
399-
*
400-
* <p>This method is sql injection protected with the use of queryBuilder.
401-
*
402-
* @param {string} stream the name of the stream to insert data into.
403-
* @param {object} rows an array that contains data that is being inserted into the stream.
404-
* @return {Promise} this method returns a promise that resolves into an array describing the status of the row inserted.
405-
*/
425+
/**
426+
* Inserts rows of data into a stream.
427+
*
428+
* <p>This method may be used to insert new rows of data into a stream.
429+
*
430+
* <p>This method is sql injection protected with the use of queryBuilder.
431+
*
432+
* @param {string} stream the name of the stream to insert data into.
433+
* @param {object} rows an array that contains data that is being inserted into the stream.
434+
* @return {Promise} this method returns a promise that resolves into an array describing the status of the row inserted.
435+
*/
406436
//---------------------Insert Rows Into Existing Streams-----------------
407437
insertStream = (stream, rows) => {
408438
validateInputs([stream, 'string', 'stream', true], [rows, 'array', 'rows', true]);
@@ -624,4 +654,4 @@ class ksqljs {
624654
}
625655
};
626656

627-
module.exports = ksqljs;
657+
module.exports = ksqldb;
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)