Skip to content

Commit c108bc5

Browse files
authored
Merge pull request #26 from oslabs-beta/ms/createStreamAs
Ms/create stream as
2 parents 58fbee1 + 5f708cb commit c108bc5

File tree

4 files changed

+112
-54
lines changed

4 files changed

+112
-54
lines changed

__tests__/integrationtests.js

Lines changed: 84 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ need to be removed first.
1515

1616
describe('--Integration Tests--', () => {
1717
describe('--Method Tests--', () => {
18-
beforeAll((done) => {
18+
beforeAll(async () => {
1919
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
20-
done();
20+
await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
2121
});
2222

2323
afterAll(async () => {
@@ -59,7 +59,6 @@ describe('--Integration Tests--', () => {
5959
const response = await client.insertStream('TESTJESTSTREAM', [
6060
{ "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
6161
]);
62-
console.log(response);
6362
const data = [];
6463
await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES;', async (chunk) => {
6564
data.push(JSON.parse(chunk));
@@ -72,7 +71,6 @@ describe('--Integration Tests--', () => {
7271

7372
it('.pull receives the correct data from a pull query', async () => {
7473
const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
75-
console.log(pullData[1]);
7674
expect(pullData[1]).toEqual(["stab-rabbit", "123@mail.com", 100]);
7775
})
7876

@@ -83,48 +81,90 @@ describe('--Integration Tests--', () => {
8381
const expectData = data[0].slice(0, 3);
8482
expect(expectPullData).toEqual(expectData);
8583
})
84+
85+
describe('--Materialized Streams and Tables--', () => {
86+
beforeAll(async () => {
87+
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
88+
await client.ksql('DROP TABLE IF EXISTS testAsTable;');
89+
await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
90+
await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
91+
});
92+
93+
afterAll(async () => {
94+
await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
95+
})
96+
97+
describe('--Materialized Streams Tests--', () => {
98+
beforeAll(async () => {
99+
// await client.ksql('DROP STREAM IF EXISTS testAsStream;')
100+
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
101+
102+
// await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
103+
testAsQueryId = await client.createStreamAs('testAsStream', ['name', 'age'], 'newTestStream', {
104+
kafka_topic: 'newTestTopic',
105+
value_format: 'json',
106+
partitions: 1
107+
}, 'age > 50');
108+
})
109+
110+
afterAll(async () => {
111+
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
112+
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
113+
})
114+
115+
it('creates materialized stream', async () => {
116+
let streamFound = false;
117+
const {streams} = await client.ksql('LIST STREAMS;');
118+
119+
for (let i = 0; i < streams.length; i++) {
120+
if (streams[i].name, streams[i].name === 'TESTASSTREAM') {
121+
streamFound = true;
122+
break;
123+
}
124+
}
125+
expect(streamFound).toBe(true);
126+
});
127+
});
128+
129+
130+
describe('--Materialized Tables Tests--', () => {
131+
beforeAll( async () => {
132+
await client.createTableAs('testAsTable', 'newTestStream', ['name', 'LATEST_BY_OFFSET(age) AS recentAge'], {topic:'newTestTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
133+
});
134+
afterAll(async () => {
135+
await client.ksql('DROP TABLE IF EXISTS testAsTable;');
136+
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
137+
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
138+
})
139+
140+
it('creates a materialized table view of a stream', async () => {
141+
const {tables} = await client.ksql('LIST TABLES;');
142+
let tableFound = false;
143+
for (let i = 0; i < tables.length; i++){
144+
if (tables[i].name === 'TESTASTABLE') {
145+
tableFound = true;
146+
break;
147+
}
148+
}
149+
expect(tableFound).toEqual(true);
150+
})
151+
152+
it('receives updates from source stream', async () => {
153+
let rowReceived = false;
154+
await client.push('SELECT * FROM testAsTable EMIT CHANGES LIMIT 1;', async (data) => {
155+
if (Array.isArray(JSON.parse(data))){
156+
if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25){
157+
rowReceived = true;
158+
}
159+
}
160+
})
161+
await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25}]);
162+
await waitForExpect(() => expect(rowReceived).toEqual(true))
163+
})
164+
})
165+
})
86166
})
87167

88-
// describe('--Materialized Views Test--', () => {
89-
// beforeAll( async () => {
90-
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088'});
91-
// const waitForExpect = require('wait-for-expect');
92-
// await client.ksql('CREATE STREAM NEWTESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic2\', value_format=\'json\', partitions=1);')
93-
// });
94-
// afterAll(async () => {
95-
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
96-
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
97-
// })
98-
// it('creates a materialized table view of a stream', async () => {
99-
// await client.createTableAs('TABLEOFSTREAM', 'NEWTESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
100-
// const tables = await client.ksql('LIST TABLES;');
101-
// const allTables = tables.tables;
102-
// let tableCheck = false;
103-
// for (let i = 0; i < allTables.length; i++){
104-
// if (allTables[i].name === 'TABLEOFSTREAM') {
105-
// tableCheck = true;
106-
// break;
107-
// }
108-
// }
109-
// expect(tableCheck).toEqual(true);
110-
111-
// })
112-
// it('materialized table view updates with source stream', async () => {
113-
// let rowCheck = false;
114-
// // push query for the table
115-
// // console.log('testing materialized view')
116-
// await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
117-
// console.log('QUERY INFO',data)
118-
// if (Array.isArray(JSON.parse(data))){
119-
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === 130){
120-
// rowCheck = true;
121-
// }
122-
// }
123-
// })
124-
// await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
125-
// await waitForExpect(() => expect(rowCheck).toEqual(true))
126-
// })
127-
// })
128168

129169
describe('--Health Tests--', () => {
130170
beforeAll((done) => {

ksqljs/ksqlJS.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,14 @@ class ksqljs {
224224
}
225225

226226
/**
227-
*
228-
* @param {string} streamName
229-
* @param {string[]} selectColumns
230-
* @param {string} sourceStream
231-
* @param {object} propertiesObj
232-
* @param {string} conditions
233-
* @param {string} partitionBy
234-
* @returns {Promise}
227+
*
228+
* @param {string} streamName - the name of the stream to be created
229+
* @param {string[]} selectColumns - the columns from the underlying stream to be included in the new materialized stream
230+
* @param {string} sourceStream - the underlying stream from which the new materialized stream will be created
231+
* @param {object} propertiesObj - an object whose keys are property names and values are the associated values
232+
* @param {string} conditions - a string containing the conditional statement (i.e., the 'WHERE' statement)
233+
* @param {string} partitionBy - column by which data will be distributed
234+
* @returns {Promise} - a promise that completes once the server response is received, and returns a query ID
235235
*/
236236
createStreamAs = (streamName, selectColumns, sourceStream, propertiesObj, conditions, partitionBy) => {
237237
const propertiesArgs = [];
@@ -423,8 +423,8 @@ class ksqljs {
423423
});
424424

425425
req.on("end", () => {
426-
resolve(msgOutput);
427426
session.close();
427+
resolve(msgOutput);
428428
});
429429
})
430430
}

ksqljsTest.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@ pullFromTo();
146146
147147
// insertStreamTest();
148148
149+
/* const createStreamAsTest = async () => {
150+
const queryId = await client.createStreamAs('TestAsStream', ['latitude', 'longitude'], 'riderLocations', {
151+
kafka_topic: 'TestAsStream',
152+
value_format: 'json',
153+
partitions: 1
154+
},
155+
'latitude > 37');
156+
console.log('this is the queryId: ', queryId);
157+
};
158+
159+
createStreamAsTest(); */
160+
161+
// (streamName, selectColumns, sourceStream, propertiesObj, conditionsObj)
162+
// const defaultProps = {
163+
// kafka_topic: streamName,
164+
// value_format: 'json',
165+
// partitions: 1
166+
// };
149167

150168
//---------------------Test Inspect query status -------------------
151169
// const inspectQueryStatusTest = async () => {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"description": "",
55
"main": "ksqljsTest.js",
66
"scripts": {
7-
"test": "jest --verbose"
7+
"test": "jest --verbose --detectOpenHandles"
88
},
99
"keywords": [],
1010
"author": "",

0 commit comments

Comments
 (0)