@@ -16,11 +16,10 @@ use async_stack_trace::StackTrace;
16
16
use chrono:: NaiveDateTime ;
17
17
use futures:: { pin_mut, StreamExt } ;
18
18
use futures_async_stream:: try_stream;
19
- use risingwave_common:: array:: { Op , StreamChunk } ;
19
+ use risingwave_common:: array:: { DataChunk , Op , StreamChunk } ;
20
20
use risingwave_common:: catalog:: { Field , Schema } ;
21
21
use risingwave_common:: row:: Row ;
22
22
use risingwave_common:: types:: { DataType , NaiveDateTimeWrapper , ScalarImpl } ;
23
- use risingwave_common:: util:: chunk_coalesce:: DataChunkBuilder ;
24
23
use risingwave_common:: util:: epoch:: Epoch ;
25
24
use risingwave_storage:: StateStore ;
26
25
use tokio:: sync:: mpsc:: UnboundedReceiver ;
@@ -106,18 +105,17 @@ impl<S: StateStore> NowExecutor<S> {
106
105
) ,
107
106
) ) ) ;
108
107
109
- let mut data_chunk_builder = DataChunkBuilder :: new (
110
- schema. data_types ( ) ,
111
- if last_timestamp. is_some ( ) { 2 } else { 1 } ,
108
+ let data_chunk = DataChunk :: from_rows (
109
+ & if last_timestamp. is_some ( ) {
110
+ vec ! [
111
+ Row :: new( vec![ last_timestamp. clone( ) ] ) ,
112
+ Row :: new( vec![ timestamp. clone( ) ] ) ,
113
+ ]
114
+ } else {
115
+ vec ! [ Row :: new( vec![ timestamp. clone( ) ] ) ]
116
+ } ,
117
+ & schema. data_types ( ) ,
112
118
) ;
113
- if last_timestamp. is_some ( ) {
114
- let chunk_popped = data_chunk_builder
115
- . append_one_row_from_datums ( [ & last_timestamp] . into_iter ( ) ) ;
116
- debug_assert ! ( chunk_popped. is_none( ) ) ;
117
- }
118
- let data_chunk = data_chunk_builder
119
- . append_one_row_from_datums ( [ & timestamp] . into_iter ( ) )
120
- . unwrap ( ) ;
121
119
let mut ops = if last_timestamp. is_some ( ) {
122
120
vec ! [ Op :: Delete ]
123
121
} else {
0 commit comments