13
13
import lombok .AllArgsConstructor ;
14
14
import lombok .Setter ;
15
15
import lombok .SneakyThrows ;
16
- import lombok .Value ;
17
16
import lombok .experimental .Delegate ;
18
17
import org .assertj .core .api .Assertions ;
19
18
import org .junit .Assert ;
74
73
import tech .ydb .yoj .repository .ydb .sample .model .HintInt64Range ;
75
74
import tech .ydb .yoj .repository .ydb .sample .model .HintTablePreset ;
76
75
import tech .ydb .yoj .repository .ydb .sample .model .HintUniform ;
76
+ import tech .ydb .yoj .repository .ydb .statement .FindAllYqlStatement ;
77
77
import tech .ydb .yoj .repository .ydb .statement .FindStatement ;
78
78
import tech .ydb .yoj .repository .ydb .statement .YqlStatement ;
79
79
import tech .ydb .yoj .repository .ydb .table .YdbTable ;
@@ -195,6 +195,18 @@ public void throwConversionExceptionOnSerializationProblem() {
195
195
196
196
@ Test
197
197
public void readYqlListAndMap () {
198
+ record GroupByResult (
199
+ String id ,
200
+ List <String > items ,
201
+ Map <String , String > map ,
202
+
203
+ @ Column (flatten = false )
204
+ GroupByResult .Struct struct
205
+ ) {
206
+ record Struct (String name ) {
207
+ }
208
+ }
209
+
198
210
WithUnflattenableField entity = new WithUnflattenableField (
199
211
new WithUnflattenableField .Id ("id_yql_list" ),
200
212
new WithUnflattenableField .Unflattenable ("Hello, world!" , 100_500 )
@@ -203,7 +215,7 @@ public void readYqlListAndMap() {
203
215
db .tx (() -> {
204
216
EntitySchema <WithUnflattenableField > schema = EntitySchema .of (WithUnflattenableField .class );
205
217
var tableDescriptor = TableDescriptor .from (schema );
206
- List <GroupByResult > result = (( YdbRepositoryTransaction <?>) Tx . Current . get (). getRepositoryTransaction () )
218
+ List <GroupByResult > result = ydbRepositoryTransaction ( )
207
219
.execute (new YqlStatement <>(tableDescriptor , schema , ObjectSchema .of (GroupByResult .class )) {
208
220
@ Override
209
221
public String getQuery (String tablespace ) {
@@ -230,20 +242,6 @@ public QueryType getQueryType() {
230
242
});
231
243
}
232
244
233
- @ Value
234
- static class GroupByResult {
235
- String id ;
236
- List <String > items ;
237
- Map <String , String > map ;
238
- @ Column (flatten = false )
239
- Struct struct ;
240
-
241
- @ Value
242
- static class Struct {
243
- String name ;
244
- }
245
- }
246
-
247
245
@ Test
248
246
public void readViewFromCache () {
249
247
TypeFreak tf1 = newTypeFreak (0 , "AAA1" , "bbb" );
@@ -893,15 +891,15 @@ public void creatingRepositoryDoesNotConnect() {
893
891
public void ydbTransactionCompatibility () {
894
892
db .tx (() -> {
895
893
// No db tx or session yet!
896
- var sdkTx = (( YdbRepositoryTransaction <?>) Tx . Current . get (). getRepositoryTransaction () ).toSdkTransaction ();
894
+ var sdkTx = ydbRepositoryTransaction ( ).toSdkTransaction ();
897
895
assertThatIllegalStateException ().isThrownBy (sdkTx ::getSessionId );
898
896
assertThat (sdkTx .getId ()).isNull ();
899
897
assertThat (sdkTx .getTxMode ()).isEqualTo (TxMode .SERIALIZABLE_RW );
900
898
assertThatExceptionOfType (UnsupportedOperationException .class ).isThrownBy (sdkTx ::getStatusFuture );
901
899
902
900
// Perform any read - session and tx ID appear
903
901
db .projects ().countAll ();
904
- sdkTx = (( YdbRepositoryTransaction <?>) Tx . Current . get (). getRepositoryTransaction () ).toSdkTransaction ();
902
+ sdkTx = ydbRepositoryTransaction ( ).toSdkTransaction ();
905
903
assertThat (sdkTx .getSessionId ()).isNotNull ();
906
904
assertThat (sdkTx .getId ()).isNotNull ();
907
905
assertThat (sdkTx .getTxMode ()).isEqualTo (TxMode .SERIALIZABLE_RW );
@@ -919,15 +917,15 @@ public void ydbTransactionCompatibility() {
919
917
920
918
db .readOnly ().withStatementIsolationLevel (isolationLevel ).run (() -> {
921
919
// No db tx or session yet!
922
- var sdkTx = (( YdbRepositoryTransaction <?>) Tx . Current . get (). getRepositoryTransaction () ).toSdkTransaction ();
920
+ var sdkTx = ydbRepositoryTransaction ( ).toSdkTransaction ();
923
921
assertThatIllegalStateException ().isThrownBy (sdkTx ::getSessionId );
924
922
assertThat (sdkTx .getId ()).isNull ();
925
923
assertThat (sdkTx .getTxMode ()).isEqualTo (txMode );
926
924
assertThatExceptionOfType (UnsupportedOperationException .class ).isThrownBy (sdkTx ::getStatusFuture );
927
925
928
926
// Perform any read - session and tx ID appear
929
927
db .projects ().countAll ();
930
- sdkTx = (( YdbRepositoryTransaction <?>) Tx . Current . get (). getRepositoryTransaction () ).toSdkTransaction ();
928
+ sdkTx = ydbRepositoryTransaction ( ).toSdkTransaction ();
931
929
assertThat (sdkTx .getSessionId ()).isNotNull ();
932
930
// Read transactions might have no ID or might have an ID, depending on your YDB version (that's what YDB returns, folks!)
933
931
assertThat (sdkTx .getTxMode ()).isEqualTo (txMode );
@@ -1019,6 +1017,54 @@ public void queryStatsCollectionMode() {
1019
1017
assertThat (found ).hasSize (4 );
1020
1018
}
1021
1019
1020
+ @ Test
1021
+ public void streamingScanNotTruncatedOldSpliterator () {
1022
+ int maxPageSizeBiggerThatReal = 11_000 ;
1023
+
1024
+ db .tx (() -> IntStream .range (0 , maxPageSizeBiggerThatReal ).forEach (
1025
+ i -> db .projects ().save (new Project (new Project .Id ("id_" + i ), "name" ))
1026
+ ));
1027
+
1028
+ List <Project .Id > result = db .scan ().useNewSpliterator (false ).withMaxSize (maxPageSizeBiggerThatReal ).run (() -> {
1029
+ var schema = EntitySchema .of (Project .class );
1030
+ var desc = TableDescriptor .from (schema );
1031
+ var statement = new FindAllYqlStatement <>(desc , schema , schema );
1032
+
1033
+ var projectIds = new ArrayList <Project .Id >();
1034
+ try (var stream = ydbRepositoryTransaction ().executeScanQuery (statement , null )) {
1035
+ stream .forEach (p -> projectIds .add (p .getId ()));
1036
+ }
1037
+ return projectIds ;
1038
+ });
1039
+ assertEquals (maxPageSizeBiggerThatReal , result .size ());
1040
+ }
1041
+
1042
+ @ Test
1043
+ public void streamingScanNotTruncatedNewSpliterator () {
1044
+ int maxPageSizeBiggerThatReal = 11_000 ;
1045
+
1046
+ db .tx (() -> IntStream .range (0 , maxPageSizeBiggerThatReal ).forEach (
1047
+ i -> db .projects ().save (new Project (new Project .Id ("id_" + i ), "name" ))
1048
+ ));
1049
+
1050
+ List <Project .Id > result = db .scan ().useNewSpliterator (true ).withMaxSize (maxPageSizeBiggerThatReal ).run (() -> {
1051
+ var schema = EntitySchema .of (Project .class );
1052
+ var desc = TableDescriptor .from (schema );
1053
+ var statement = new FindAllYqlStatement <>(desc , schema , schema );
1054
+
1055
+ var projectIds = new ArrayList <Project .Id >();
1056
+ try (var stream = ydbRepositoryTransaction ().executeScanQuery (statement , null )) {
1057
+ stream .forEach (p -> projectIds .add (p .getId ()));
1058
+ }
1059
+ return projectIds ;
1060
+ });
1061
+ assertEquals (maxPageSizeBiggerThatReal , result .size ());
1062
+ }
1063
+
1064
+ private static YdbRepositoryTransaction <?> ydbRepositoryTransaction () {
1065
+ return (YdbRepositoryTransaction <?>) Tx .Current .get ().getRepositoryTransaction ();
1066
+ }
1067
+
1022
1068
@ AllArgsConstructor
1023
1069
private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc .SchemeServiceImplBase {
1024
1070
@ Delegate
0 commit comments