Skip to content

Commit e9946e4

Browse files
committed
Merge branch 'dev' of https://github.com/oslabs-beta/ksqljs into mx/jestTestsBranch
2 parents 25d8df0 + c108bc5 commit e9946e4

File tree

4 files changed

+112
-52
lines changed

4 files changed

+112
-52
lines changed

__tests__/integrationtests.js

Lines changed: 84 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ need to be removed first.
1515

1616
describe('--Integration Tests--', () => {
1717
describe('--Method Tests--', () => {
18-
beforeAll((done) => {
18+
beforeAll(async () => {
1919
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
20-
done();
20+
await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
2121
});
2222

2323
afterAll(async () => {
@@ -87,48 +87,90 @@ describe('--Integration Tests--', () => {
8787
const expectData = data[0].slice(0, 3);
8888
expect(expectPullData).toEqual(expectData);
8989
})
90+
91+
describe('--Materialized Streams and Tables--', () => {
92+
beforeAll(async () => {
93+
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
94+
await client.ksql('DROP TABLE IF EXISTS testAsTable;');
95+
await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
96+
await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
97+
});
98+
99+
afterAll(async () => {
100+
await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
101+
})
102+
103+
describe('--Materialized Streams Tests--', () => {
104+
beforeAll(async () => {
105+
// await client.ksql('DROP STREAM IF EXISTS testAsStream;')
106+
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
107+
108+
// await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
109+
testAsQueryId = await client.createStreamAs('testAsStream', ['name', 'age'], 'newTestStream', {
110+
kafka_topic: 'newTestTopic',
111+
value_format: 'json',
112+
partitions: 1
113+
}, 'age > 50');
114+
})
115+
116+
afterAll(async () => {
117+
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
118+
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
119+
})
120+
121+
it('creates materialized stream', async () => {
122+
let streamFound = false;
123+
const {streams} = await client.ksql('LIST STREAMS;');
124+
125+
for (let i = 0; i < streams.length; i++) {
126+
if (streams[i].name, streams[i].name === 'TESTASSTREAM') {
127+
streamFound = true;
128+
break;
129+
}
130+
}
131+
expect(streamFound).toBe(true);
132+
});
133+
});
134+
135+
136+
describe('--Materialized Tables Tests--', () => {
137+
beforeAll( async () => {
138+
await client.createTableAs('testAsTable', 'newTestStream', ['name', 'LATEST_BY_OFFSET(age) AS recentAge'], {topic:'newTestTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
139+
});
140+
afterAll(async () => {
141+
await client.ksql('DROP TABLE IF EXISTS testAsTable;');
142+
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
143+
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
144+
})
145+
146+
it('creates a materialized table view of a stream', async () => {
147+
const {tables} = await client.ksql('LIST TABLES;');
148+
let tableFound = false;
149+
for (let i = 0; i < tables.length; i++){
150+
if (tables[i].name === 'TESTASTABLE') {
151+
tableFound = true;
152+
break;
153+
}
154+
}
155+
expect(tableFound).toEqual(true);
156+
})
157+
158+
it('receives updates from source stream', async () => {
159+
let rowReceived = false;
160+
await client.push('SELECT * FROM testAsTable EMIT CHANGES LIMIT 1;', async (data) => {
161+
if (Array.isArray(JSON.parse(data))){
162+
if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25){
163+
rowReceived = true;
164+
}
165+
}
166+
})
167+
await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25}]);
168+
await waitForExpect(() => expect(rowReceived).toEqual(true))
169+
})
170+
})
171+
})
90172
})
91173

92-
// describe('--Materialized Views Test--', () => {
93-
// beforeAll( async () => {
94-
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088'});
95-
// const waitForExpect = require('wait-for-expect');
96-
// await client.ksql('CREATE STREAM NEWTESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic2\', value_format=\'json\', partitions=1);')
97-
// });
98-
// afterAll(async () => {
99-
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
100-
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
101-
// })
102-
// it('creates a materialized table view of a stream', async () => {
103-
// await client.createTableAs('TABLEOFSTREAM', 'NEWTESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
104-
// const tables = await client.ksql('LIST TABLES;');
105-
// const allTables = tables.tables;
106-
// let tableCheck = false;
107-
// for (let i = 0; i < allTables.length; i++){
108-
// if (allTables[i].name === 'TABLEOFSTREAM') {
109-
// tableCheck = true;
110-
// break;
111-
// }
112-
// }
113-
// expect(tableCheck).toEqual(true);
114-
115-
// })
116-
// it('materialized table view updates with source stream', async () => {
117-
// let rowCheck = false;
118-
// // push query for the table
119-
// // console.log('testing materialized view')
120-
// await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
121-
// console.log('QUERY INFO',data)
122-
// if (Array.isArray(JSON.parse(data))){
123-
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === 130){
124-
// rowCheck = true;
125-
// }
126-
// }
127-
// })
128-
// await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
129-
// await waitForExpect(() => expect(rowCheck).toEqual(true))
130-
// })
131-
// })
132174

133175
describe('--Health Tests--', () => {
134176
beforeAll((done) => {

ksqljs/ksqlJS.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,14 @@ class ksqljs {
265265
}
266266

267267
/**
268-
*
269-
* @param {string} streamName
270-
* @param {string[]} selectColumns
271-
* @param {string} sourceStream
272-
* @param {object} propertiesObj
273-
* @param {string} conditions
274-
* @param {string} partitionBy
275-
* @returns {Promise}
268+
*
269+
* @param {string} streamName - the name of the stream to be created
270+
* @param {string[]} selectColumns - the columns from the underlying stream to be included in the new materialized stream
271+
* @param {string} sourceStream - the underlying stream from which the new materialized stream will be created
272+
* @param {object} propertiesObj - an object whose keys are property names and values are the associated values
273+
* @param {string} conditions - a string containing the conditional statement (i.e., the 'WHERE' statement)
274+
* @param {string} partitionBy - column by which data will be distributed
275+
* @returns {Promise} - a promise that completes once the server response is received, and returns a query ID
276276
*/
277277
createStreamAs = (streamName, selectColumns, sourceStream, propertiesObj, conditions, partitionBy) => {
278278
const propertiesArgs = [];
@@ -464,8 +464,8 @@ class ksqljs {
464464
});
465465

466466
req.on("end", () => {
467-
resolve(msgOutput);
468467
session.close();
468+
resolve(msgOutput);
469469
});
470470
})
471471
}

ksqljsTest.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@ pullFromTo();
146146
147147
// insertStreamTest();
148148
149+
/* const createStreamAsTest = async () => {
150+
const queryId = await client.createStreamAs('TestAsStream', ['latitude', 'longitude'], 'riderLocations', {
151+
kafka_topic: 'TestAsStream',
152+
value_format: 'json',
153+
partitions: 1
154+
},
155+
'latitude > 37');
156+
console.log('this is the queryId: ', queryId);
157+
};
158+
159+
createStreamAsTest(); */
160+
161+
// (streamName, selectColumns, sourceStream, propertiesObj, conditionsObj)
162+
// const defaultProps = {
163+
// kafka_topic: streamName,
164+
// value_format: 'json',
165+
// partitions: 1
166+
// };
149167

150168
//---------------------Test Inspect query status -------------------
151169
// const inspectQueryStatusTest = async () => {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"description": "",
55
"main": "ksqljsTest.js",
66
"scripts": {
7-
"test": "jest --verbose"
7+
"test": "jest --verbose --detectOpenHandles"
88
},
99
"keywords": [],
1010
"author": "",

0 commit comments

Comments
 (0)