Skip to content

Commit 5f708cb

Browse files
Modifying and incorporating createTableAs method integration test
1 parent e1aac93 commit 5f708cb

File tree

1 file changed

+71
-64
lines changed

1 file changed

+71
-64
lines changed

__tests__/integrationtests.js

Lines changed: 71 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ describe('--Integration Tests--', () => {
5959
const response = await client.insertStream('TESTJESTSTREAM', [
6060
{ "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
6161
]);
62-
console.log(response);
6362
const data = [];
6463
await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES;', async (chunk) => {
6564
data.push(JSON.parse(chunk));
@@ -72,7 +71,6 @@ describe('--Integration Tests--', () => {
7271

7372
it('.pull receives the correct data from a pull query', async () => {
7473
const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
75-
console.log(pullData[1]);
7674
expect(pullData[1]).toEqual(["stab-rabbit", "123@mail.com", 100]);
7775
})
7876

@@ -84,81 +82,90 @@ describe('--Integration Tests--', () => {
8482
expect(expectPullData).toEqual(expectData);
8583
})
8684

87-
describe('materialized streams utilizing createStreamAs', () => {
85+
describe('--Materialized Streams and Tables--', () => {
8886
beforeAll(async () => {
8987
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
88+
await client.ksql('DROP TABLE IF EXISTS testAsTable;');
9089
await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
91-
92-
newStreamQueryId = await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
93-
testAsQueryId = await client.createStreamAs('testAsStream', ['name', 'age'], 'newTestStream', {
94-
kafka_topic: 'newTestTopic',
95-
value_format: 'json',
96-
partitions: 1
97-
}, 'age > 50');
98-
})
90+
await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
91+
});
9992

10093
afterAll(async () => {
101-
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
10294
await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
10395
})
10496

105-
it('creates materialized stream', async () => {
106-
let streamFound = false;
107-
const {streams} = await client.ksql('LIST STREAMS;');
108-
109-
for (let i = 0; i < streams.length; i++) {
110-
if (streams[i].name, streams[i].name === 'TESTASSTREAM') streamFound = true;
111-
}
112-
expect(streamFound).toBe(true);
97+
describe('--Materialized Streams Tests--', () => {
98+
beforeAll(async () => {
99+
// await client.ksql('DROP STREAM IF EXISTS testAsStream;')
100+
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
101+
102+
// await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
103+
testAsQueryId = await client.createStreamAs('testAsStream', ['name', 'age'], 'newTestStream', {
104+
kafka_topic: 'newTestTopic',
105+
value_format: 'json',
106+
partitions: 1
107+
}, 'age > 50');
108+
})
109+
110+
afterAll(async () => {
111+
await client.ksql('DROP STREAM IF EXISTS testAsStream;')
112+
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
113+
})
114+
115+
it('creates materialized stream', async () => {
116+
let streamFound = false;
117+
const {streams} = await client.ksql('LIST STREAMS;');
118+
119+
for (let i = 0; i < streams.length; i++) {
120+
if (streams[i].name, streams[i].name === 'TESTASSTREAM') {
121+
streamFound = true;
122+
break;
123+
}
124+
}
125+
expect(streamFound).toBe(true);
126+
});
113127
});
128+
129+
130+
describe('--Materialized Tables Tests--', () => {
131+
beforeAll( async () => {
132+
await client.createTableAs('testAsTable', 'newTestStream', ['name', 'LATEST_BY_OFFSET(age) AS recentAge'], {topic:'newTestTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
133+
});
134+
afterAll(async () => {
135+
await client.ksql('DROP TABLE IF EXISTS testAsTable;');
136+
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
137+
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
138+
})
139+
140+
it('creates a materialized table view of a stream', async () => {
141+
const {tables} = await client.ksql('LIST TABLES;');
142+
let tableFound = false;
143+
for (let i = 0; i < tables.length; i++){
144+
if (tables[i].name === 'TESTASTABLE') {
145+
tableFound = true;
146+
break;
147+
}
148+
}
149+
expect(tableFound).toEqual(true);
150+
})
151+
152+
it('receives updates from source stream', async () => {
153+
let rowReceived = false;
154+
await client.push('SELECT * FROM testAsTable EMIT CHANGES LIMIT 1;', async (data) => {
155+
if (Array.isArray(JSON.parse(data))){
156+
if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25){
157+
rowReceived = true;
158+
}
159+
}
160+
})
161+
await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25}]);
162+
await waitForExpect(() => expect(rowReceived).toEqual(true))
163+
})
164+
})
114165
})
115166
})
116-
<<<<<<< HEAD
117-
118-
=======
119167

120-
// describe('--Materialized Views Test--', () => {
121-
// beforeAll( async () => {
122-
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088'});
123-
// const waitForExpect = require('wait-for-expect');
124-
// await client.ksql('CREATE STREAM NEWTESTSTREAM (NAME VARCHAR, AGE INTEGER, LOCATION VARCHAR, WEIGHT INTEGER) WITH (kafka_topic= \'testJestTopic2\', value_format=\'json\', partitions=1);')
125-
// });
126-
// afterAll(async () => {
127-
// await client.ksql('DROP TABLE IF EXISTS TABLEOFSTREAM DELETE TOPIC;')
128-
// await client.ksql('DROP STREAM IF EXISTS NEWTESTSTREAM DELETE TOPIC;')
129-
// })
130-
// it('creates a materialized table view of a stream', async () => {
131-
// await client.createTableAs('TABLEOFSTREAM', 'NEWTESTSTREAM', ['name', 'LATEST_BY_OFFSET(age) AS recentAge', 'LATEST_BY_OFFSET(weight) AS recentweight'], {topic:'newTopic'},{WHERE: 'age >= 21', GROUP_BY: 'name'});
132-
// const tables = await client.ksql('LIST TABLES;');
133-
// const allTables = tables.tables;
134-
// let tableCheck = false;
135-
// for (let i = 0; i < allTables.length; i++){
136-
// if (allTables[i].name === 'TABLEOFSTREAM') {
137-
// tableCheck = true;
138-
// break;
139-
// }
140-
// }
141-
// expect(tableCheck).toEqual(true);
142-
143-
// })
144-
// it('materialized table view updates with source stream', async () => {
145-
// let rowCheck = false;
146-
// // push query for the table
147-
// // console.log('testing materialized view')
148-
// await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
149-
// console.log('QUERY INFO',data)
150-
// if (Array.isArray(JSON.parse(data))){
151-
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === 130){
152-
// rowCheck = true;
153-
// }
154-
// }
155-
// })
156-
// await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
157-
// await waitForExpect(() => expect(rowCheck).toEqual(true))
158-
// })
159-
// })
160168

161-
>>>>>>> dev
162169
describe('--Health Tests--', () => {
163170
beforeAll((done) => {
164171
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });

0 commit comments

Comments
 (0)