@@ -25,6 +25,7 @@ describe('--Integration Tests--', () => {
25
25
} )
26
26
27
27
it ( '.createStream properly creates a stream' , async ( ) => {
28
+ await client . ksql ( 'DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;' )
28
29
const result = await client . createStream ( 'TESTJESTSTREAM' , [ 'name VARCHAR' , 'email varchar' , 'age INTEGER' ] , 'testJestTopic' , 'json' , 1 ) ;
29
30
const streams = await client . ksql ( 'LIST STREAMS;' ) ;
30
31
const allStreams = streams . streams ;
@@ -56,17 +57,19 @@ describe('--Integration Tests--', () => {
56
57
} )
57
58
58
59
it ( '.insertStream properly inserts a row into a stream' , async ( ) => {
59
- const response = await client . insertStream ( 'TESTJESTSTREAM' , [
60
- { "name" : "stab-rabbit" , "email" : "123@mail.com" , "age" : 100 }
61
- ] ) ;
60
+
62
61
const data = [ ] ;
63
62
await client . push ( 'SELECT * FROM TESTJESTSTREAM EMIT CHANGES;' , async ( chunk ) => {
64
63
data . push ( JSON . parse ( chunk ) ) ;
64
+ console . log ( data ) ;
65
65
if ( data [ 1 ] ) {
66
66
client . terminate ( data [ 0 ] . queryId ) ;
67
67
expect ( data [ 1 ] ) . toEqual ( [ "stab-rabbit" , "123@mail.com" , 100 ] )
68
68
}
69
69
} ) ;
70
+ const response = await client . insertStream ( 'TESTJESTSTREAM' , [
71
+ { "name" : "stab-rabbit" , "email" : "123@mail.com" , "age" : 100 }
72
+ ] ) ;
70
73
} )
71
74
72
75
it ( '.pull receives the correct data from a pull query' , async ( ) => {
@@ -98,24 +101,24 @@ describe('--Integration Tests--', () => {
98
101
beforeAll ( async ( ) => {
99
102
// await client.ksql('DROP STREAM IF EXISTS testAsStream;')
100
103
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
101
-
104
+
102
105
// await client.createStream('newTestStream', ['name VARCHAR', 'age INTEGER'], 'newTestTopic', 'json', 1);
103
106
testAsQueryId = await client . createStreamAs ( 'testAsStream' , [ 'name' , 'age' ] , 'newTestStream' , {
104
107
kafka_topic : 'newTestTopic' ,
105
108
value_format : 'json' ,
106
109
partitions : 1
107
- } , 'age > 50' ) ;
110
+ } , 'age > 50' ) ;
108
111
} )
109
-
112
+
110
113
afterAll ( async ( ) => {
111
114
await client . ksql ( 'DROP STREAM IF EXISTS testAsStream;' )
112
115
// await client.ksql('DROP STREAM IF EXISTS newTestStream DELETE TOPIC;');
113
116
} )
114
-
117
+
115
118
it ( 'creates materialized stream' , async ( ) => {
116
119
let streamFound = false ;
117
- const { streams} = await client . ksql ( 'LIST STREAMS;' ) ;
118
-
120
+ const { streams } = await client . ksql ( 'LIST STREAMS;' ) ;
121
+
119
122
for ( let i = 0 ; i < streams . length ; i ++ ) {
120
123
if ( streams [ i ] . name , streams [ i ] . name === 'TESTASSTREAM' ) {
121
124
streamFound = true ;
@@ -128,8 +131,8 @@ describe('--Integration Tests--', () => {
128
131
129
132
130
133
describe ( '--Materialized Tables Tests--' , ( ) => {
131
- beforeAll ( async ( ) => {
132
- await client . createTableAs ( 'testAsTable' , 'newTestStream' , [ 'name' , 'LATEST_BY_OFFSET(age) AS recentAge' ] , { topic :'newTestTopic' } , { WHERE : 'age >= 21' , GROUP_BY : 'name' } ) ;
134
+ beforeAll ( async ( ) => {
135
+ await client . createTableAs ( 'testAsTable' , 'newTestStream' , [ 'name' , 'LATEST_BY_OFFSET(age) AS recentAge' ] , { topic : 'newTestTopic' } , { WHERE : 'age >= 21' , GROUP_BY : 'name' } ) ;
133
136
} ) ;
134
137
afterAll ( async ( ) => {
135
138
await client . ksql ( 'DROP TABLE IF EXISTS testAsTable;' ) ;
@@ -138,29 +141,16 @@ describe('--Integration Tests--', () => {
138
141
} )
139
142
140
143
it ( 'creates a materialized table view of a stream' , async ( ) => {
141
- const { tables} = await client . ksql ( 'LIST TABLES;' ) ;
144
+ const { tables } = await client . ksql ( 'LIST TABLES;' ) ;
142
145
let tableFound = false ;
143
- for ( let i = 0 ; i < tables . length ; i ++ ) {
146
+ for ( let i = 0 ; i < tables . length ; i ++ ) {
144
147
if ( tables [ i ] . name === 'TESTASTABLE' ) {
145
148
tableFound = true ;
146
149
break ;
147
150
}
148
151
}
149
152
expect ( tableFound ) . toEqual ( true ) ;
150
153
} )
151
-
152
- // it('receives updates from source stream', async () => {
153
- // let rowReceived = false;
154
- // await client.push('SELECT * FROM testAsTable EMIT CHANGES LIMIT 1;', async (data) => {
155
- // if (Array.isArray(JSON.parse(data))){
156
- // if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25){
157
- // rowReceived = true;
158
- // }
159
- // }
160
- // })
161
- // await client.insertStream('NEWTESTSTREAM', [{"NAME":"firstTester", "AGE":25}]);
162
- // await waitForExpect(() => expect(rowReceived).toEqual(true))
163
- // })
164
154
} )
165
155
} )
166
156
} )
0 commit comments