@@ -1035,6 +1035,300 @@ mod tests {
1035
1035
Ok ( ( ) )
1036
1036
}
1037
1037
1038
+ // Create test data with multiple blocks, including spilled and sliced blocks
1039
+ async fn run_take_next_bounded_spillable < R : Rows > (
1040
+ spiller : impl Spill + Clone ,
1041
+ sort_desc : Arc < Vec < SortColumnDescription > > ,
1042
+ bound : Option < Scalar > ,
1043
+ expected_blocks : Vec < Column > ,
1044
+ // Flag to test with spilled blocks
1045
+ with_spilled : bool ,
1046
+ // Flag to test with sliced blocks
1047
+ with_sliced : bool ,
1048
+ ) -> Result < ( ) > {
1049
+ let ( schema, block) = test_data ( ) ;
1050
+ let block = DataBlock :: sort ( & block, & sort_desc, None ) ?;
1051
+ let sort_row_offset = schema. fields ( ) . len ( ) ;
1052
+
1053
+ // Create multiple blocks with different splits
1054
+ let mut blocks = VecDeque :: new ( ) ;
1055
+
1056
+ // First block: 0..2
1057
+ let mut block1 = block. slice ( 0 ..2 ) ;
1058
+ let col1 = convert_rows ( schema. clone ( ) , & sort_desc, block1. clone ( ) ) . unwrap ( ) ;
1059
+ block1. add_column ( BlockEntry :: new ( col1. data_type ( ) , Value :: Column ( col1) ) ) ;
1060
+ blocks. push_back ( SpillableBlock :: new ( block1, sort_row_offset) ) ;
1061
+
1062
+ // Second block: 2..5
1063
+ let mut block2 = block. slice ( 2 ..5 ) ;
1064
+ let col2 = convert_rows ( schema. clone ( ) , & sort_desc, block2. clone ( ) ) . unwrap ( ) ;
1065
+ block2. add_column ( BlockEntry :: new ( col2. data_type ( ) , Value :: Column ( col2) ) ) ;
1066
+ blocks. push_back ( SpillableBlock :: new ( block2, sort_row_offset) ) ;
1067
+
1068
+ // We'll add the third block only if we're not using sliced blocks
1069
+ // This is to avoid duplicating the data with additional_block
1070
+ if !with_sliced {
1071
+ // Third block: 5..8
1072
+ let mut block3 = block. slice ( 5 ..8 ) ;
1073
+ let col3 = convert_rows ( schema. clone ( ) , & sort_desc, block3. clone ( ) ) . unwrap ( ) ;
1074
+ block3. add_column ( BlockEntry :: new ( col3. data_type ( ) , Value :: Column ( col3) ) ) ;
1075
+ blocks. push_back ( SpillableBlock :: new ( block3, sort_row_offset) ) ;
1076
+ }
1077
+
1078
+ // Spill some blocks if requested
1079
+ if with_spilled {
1080
+ // Spill the second block
1081
+ blocks[ 1 ] . spill ( & spiller) . await ?;
1082
+ }
1083
+
1084
+ // Create a sliced block if requested
1085
+ if with_sliced {
1086
+ // Create a block for values 8..11 (the last part of the sorted data)
1087
+ let mut additional_block = block. slice ( 5 ..8 ) ;
1088
+ let col = convert_rows ( schema. clone ( ) , & sort_desc, additional_block. clone ( ) ) . unwrap ( ) ;
1089
+ additional_block. add_column ( BlockEntry :: new ( col. data_type ( ) , Value :: Column ( col) ) ) ;
1090
+ let mut spillable_block = SpillableBlock :: new ( additional_block, sort_row_offset) ;
1091
+
1092
+ // Use SpillableBlock::slice to create a sliced block
1093
+ // This tests the SpillableBlock::slice functionality by slicing at position 1
1094
+ // For ascending Int32: [8, 10, 11] -> [8] and [10, 11]
1095
+ // For descending String: ["d", "e", "f"] -> ["d"] and ["e", "f"]
1096
+ let sliced_data = spillable_block. slice ( 1 , sort_row_offset) ;
1097
+ let sliced_block = SpillableBlock :: new ( sliced_data, sort_row_offset) ;
1098
+
1099
+ // Add both blocks to maintain the order
1100
+ blocks. push_back ( sliced_block) ;
1101
+ blocks. push_back ( spillable_block) ;
1102
+ }
1103
+
1104
+ let mut stream = BoundBlockStream :: < R , _ > {
1105
+ blocks,
1106
+ bound,
1107
+ sort_row_offset,
1108
+ spiller : spiller. clone ( ) ,
1109
+ _r : Default :: default ( ) ,
1110
+ } ;
1111
+
1112
+ // Take blocks one by one and compare with expected
1113
+ let mut result_blocks = Vec :: new ( ) ;
1114
+ while let Some ( mut block) = stream. take_next_bounded_spillable ( ) . await ? {
1115
+ // If the block data is None (spilled), restore it first
1116
+ if block. data . is_none ( ) {
1117
+ block. data = Some ( spiller. restore ( block. location . as_ref ( ) . unwrap ( ) ) . await ?) ;
1118
+ }
1119
+
1120
+ let data = block. data . unwrap ( ) ;
1121
+ let col = sort_column ( & data, sort_row_offset) . clone ( ) ;
1122
+ result_blocks. push ( col) ;
1123
+ }
1124
+
1125
+ assert_eq ! (
1126
+ expected_blocks. len( ) ,
1127
+ result_blocks. len( ) ,
1128
+ "Number of blocks doesn't match"
1129
+ ) ;
1130
+ for ( expected, actual) in expected_blocks. iter ( ) . zip ( result_blocks. iter ( ) ) {
1131
+ assert_eq ! ( expected, actual, "Block content doesn't match" ) ;
1132
+ }
1133
+
1134
+ Ok ( ( ) )
1135
+ }
1136
+
1137
+ #[ tokio:: test]
1138
+ async fn test_take_next_bounded_spillable ( ) -> Result < ( ) > {
1139
+ let spiller = MockSpiller {
1140
+ map : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
1141
+ } ;
1142
+
1143
+ // Test with ascending Int32 type
1144
+ {
1145
+ let sort_desc = Arc :: new ( vec ! [ SortColumnDescription {
1146
+ offset: 0 ,
1147
+ asc: true ,
1148
+ nulls_first: false ,
1149
+ } ] ) ;
1150
+
1151
+ // Test 1: Basic test with bound = 5 (should return blocks with values <= 5)
1152
+ // No spilled blocks, no sliced blocks
1153
+ run_take_next_bounded_spillable :: < SimpleRowsAsc < Int32Type > > (
1154
+ spiller. clone ( ) ,
1155
+ sort_desc. clone ( ) ,
1156
+ Some ( Scalar :: Number ( NumberScalar :: Int32 ( 5 ) ) ) ,
1157
+ vec ! [ Int32Type :: from_data( vec![ 3 , 5 ] ) ] ,
1158
+ false , // no spilled blocks
1159
+ false , // no sliced blocks
1160
+ )
1161
+ . await ?;
1162
+
1163
+ // Test 2: With spilled blocks, bound = 8 (should return blocks with values <= 8)
1164
+ run_take_next_bounded_spillable :: < SimpleRowsAsc < Int32Type > > (
1165
+ spiller. clone ( ) ,
1166
+ sort_desc. clone ( ) ,
1167
+ Some ( Scalar :: Number ( NumberScalar :: Int32 ( 8 ) ) ) ,
1168
+ vec ! [
1169
+ Int32Type :: from_data( vec![ 3 , 5 ] ) ,
1170
+ Int32Type :: from_data( vec![ 7 , 7 , 8 ] ) ,
1171
+ ] ,
1172
+ true , // with spilled blocks
1173
+ false , // no sliced blocks
1174
+ )
1175
+ . await ?;
1176
+
1177
+ // Test 3: With sliced blocks, bound = 7 (should return blocks with values <= 7)
1178
+ run_take_next_bounded_spillable :: < SimpleRowsAsc < Int32Type > > (
1179
+ spiller. clone ( ) ,
1180
+ sort_desc. clone ( ) ,
1181
+ Some ( Scalar :: Number ( NumberScalar :: Int32 ( 7 ) ) ) ,
1182
+ vec ! [
1183
+ Int32Type :: from_data( vec![ 3 , 5 ] ) ,
1184
+ Int32Type :: from_data( vec![ 7 , 7 ] ) ,
1185
+ ] ,
1186
+ false , // no spilled blocks
1187
+ true , // with sliced blocks
1188
+ )
1189
+ . await ?;
1190
+
1191
+ // Test 4: With both spilled and sliced blocks, bound = 10
1192
+ run_take_next_bounded_spillable :: < SimpleRowsAsc < Int32Type > > (
1193
+ spiller. clone ( ) ,
1194
+ sort_desc. clone ( ) ,
1195
+ Some ( Scalar :: Number ( NumberScalar :: Int32 ( 10 ) ) ) ,
1196
+ vec ! [
1197
+ Int32Type :: from_data( vec![ 3 , 5 ] ) ,
1198
+ Int32Type :: from_data( vec![ 7 , 7 , 8 ] ) ,
1199
+ Int32Type :: from_data( vec![ 10 ] ) ,
1200
+ ] ,
1201
+ true , // with spilled blocks
1202
+ true , // with sliced blocks
1203
+ )
1204
+ . await ?;
1205
+
1206
+ // Test 5: With bound = 2 (should return no blocks as all values > 2)
1207
+ run_take_next_bounded_spillable :: < SimpleRowsAsc < Int32Type > > (
1208
+ spiller. clone ( ) ,
1209
+ sort_desc. clone ( ) ,
1210
+ Some ( Scalar :: Number ( NumberScalar :: Int32 ( 2 ) ) ) ,
1211
+ vec ! [ ] ,
1212
+ true , // with spilled blocks
1213
+ true , // with sliced blocks
1214
+ )
1215
+ . await ?;
1216
+
1217
+ // Test 6: With bound = 12 (should return all blocks as all values <= 12)
1218
+ run_take_next_bounded_spillable :: < SimpleRowsAsc < Int32Type > > (
1219
+ spiller. clone ( ) ,
1220
+ sort_desc. clone ( ) ,
1221
+ Some ( Scalar :: Number ( NumberScalar :: Int32 ( 12 ) ) ) ,
1222
+ vec ! [
1223
+ Int32Type :: from_data( vec![ 3 , 5 ] ) ,
1224
+ Int32Type :: from_data( vec![ 7 , 7 , 8 ] ) ,
1225
+ Int32Type :: from_data( vec![ 10 , 11 , 11 ] ) ,
1226
+ ] ,
1227
+ true , // with spilled blocks
1228
+ false , // no sliced blocks
1229
+ )
1230
+ . await ?;
1231
+
1232
+ // Test 7: With no bound (should return all blocks)
1233
+ run_take_next_bounded_spillable :: < SimpleRowsAsc < Int32Type > > (
1234
+ spiller. clone ( ) ,
1235
+ sort_desc. clone ( ) ,
1236
+ None ,
1237
+ vec ! [
1238
+ Int32Type :: from_data( vec![ 3 , 5 ] ) ,
1239
+ Int32Type :: from_data( vec![ 7 , 7 , 8 ] ) ,
1240
+ Int32Type :: from_data( vec![ 10 , 11 , 11 ] ) ,
1241
+ ] ,
1242
+ true , // with spilled blocks
1243
+ false , // no sliced blocks
1244
+ )
1245
+ . await ?;
1246
+ }
1247
+
1248
+ // Test with descending String type
1249
+ {
1250
+ let sort_desc = Arc :: new ( vec ! [ SortColumnDescription {
1251
+ offset: 1 ,
1252
+ asc: false ,
1253
+ nulls_first: false ,
1254
+ } ] ) ;
1255
+
1256
+ // Test 8: With bound = "f" (should return blocks with values >= "f")
1257
+ run_take_next_bounded_spillable :: < SimpleRowsDesc < StringType > > (
1258
+ spiller. clone ( ) ,
1259
+ sort_desc. clone ( ) ,
1260
+ Some ( Scalar :: String ( "f" . to_string ( ) ) ) ,
1261
+ vec ! [
1262
+ StringType :: from_data( vec![ "w" , "h" ] ) ,
1263
+ StringType :: from_data( vec![ "g" , "f" ] ) ,
1264
+ ] ,
1265
+ false , // no spilled blocks
1266
+ false , // no sliced blocks
1267
+ )
1268
+ . await ?;
1269
+
1270
+ // Test 9: With spilled blocks, bound = "e" (should return blocks with values >= "e")
1271
+ run_take_next_bounded_spillable :: < SimpleRowsDesc < StringType > > (
1272
+ spiller. clone ( ) ,
1273
+ sort_desc. clone ( ) ,
1274
+ Some ( Scalar :: String ( "e" . to_string ( ) ) ) ,
1275
+ vec ! [
1276
+ StringType :: from_data( vec![ "w" , "h" ] ) ,
1277
+ StringType :: from_data( vec![ "g" , "f" , "e" ] ) ,
1278
+ StringType :: from_data( vec![ "e" ] ) ,
1279
+ ] ,
1280
+ true , // with spilled blocks
1281
+ false , // no sliced blocks
1282
+ )
1283
+ . await ?;
1284
+
1285
+ // Test 10: With sliced blocks, bound = "d" (should return blocks with values >= "d")
1286
+ run_take_next_bounded_spillable :: < SimpleRowsDesc < StringType > > (
1287
+ spiller. clone ( ) ,
1288
+ sort_desc. clone ( ) ,
1289
+ Some ( Scalar :: String ( "d" . to_string ( ) ) ) ,
1290
+ vec ! [
1291
+ StringType :: from_data( vec![ "w" , "h" ] ) ,
1292
+ StringType :: from_data( vec![ "g" , "f" , "e" ] ) ,
1293
+ StringType :: from_data( vec![ "e" ] ) ,
1294
+ StringType :: from_data( vec![ "d" , "d" ] ) ,
1295
+ ] ,
1296
+ false , // no spilled blocks
1297
+ true , // with sliced blocks
1298
+ )
1299
+ . await ?;
1300
+
1301
+ // Test 11: With both spilled and sliced blocks, bound = "c" (should return all blocks)
1302
+ run_take_next_bounded_spillable :: < SimpleRowsDesc < StringType > > (
1303
+ spiller. clone ( ) ,
1304
+ sort_desc. clone ( ) ,
1305
+ Some ( Scalar :: String ( "c" . to_string ( ) ) ) ,
1306
+ vec ! [
1307
+ StringType :: from_data( vec![ "w" , "h" ] ) ,
1308
+ StringType :: from_data( vec![ "g" , "f" , "e" ] ) ,
1309
+ StringType :: from_data( vec![ "e" ] ) ,
1310
+ StringType :: from_data( vec![ "d" , "d" ] ) ,
1311
+ ] ,
1312
+ true , // with spilled blocks
1313
+ true , // with sliced blocks
1314
+ )
1315
+ . await ?;
1316
+
1317
+ // Test 12: With bound = "z" (should return no blocks as all values < "z")
1318
+ run_take_next_bounded_spillable :: < SimpleRowsDesc < StringType > > (
1319
+ spiller. clone ( ) ,
1320
+ sort_desc. clone ( ) ,
1321
+ Some ( Scalar :: String ( "z" . to_string ( ) ) ) ,
1322
+ vec ! [ ] ,
1323
+ true , // with spilled blocks
1324
+ true , // with sliced blocks
1325
+ )
1326
+ . await ?;
1327
+ }
1328
+
1329
+ Ok ( ( ) )
1330
+ }
1331
+
1038
1332
#[ derive( Clone ) ]
1039
1333
struct MockSpiller {
1040
1334
map : Arc < Mutex < HashMap < String , DataBlock > > > ,
0 commit comments