@@ -889,14 +889,17 @@ class TShardsInfo {
889
889
890
890
void MakeNextBatches (i64 maxDataSize, ui64 maxCount) {
891
891
YQL_ENSURE (BatchesInFlight == 0 );
892
+ YQL_ENSURE (!IsEmpty ());
892
893
i64 dataSize = 0 ;
894
+ // For columnshard batch can be slightly larger than the limit.
893
895
while (BatchesInFlight < maxCount
894
896
&& BatchesInFlight < Batches.size ()
895
- && dataSize + GetBatch (BatchesInFlight)->GetMemory () <= maxDataSize) {
897
+ && ( dataSize + GetBatch (BatchesInFlight)->GetMemory () <= maxDataSize || BatchesInFlight == 0 ) ) {
896
898
dataSize += GetBatch (BatchesInFlight)->GetMemory ();
897
899
++BatchesInFlight;
898
900
}
899
- YQL_ENSURE (BatchesInFlight == Batches.size () || GetBatch (BatchesInFlight)->GetMemory () <= maxDataSize);
901
+ YQL_ENSURE (BatchesInFlight != 0 );
902
+ YQL_ENSURE (BatchesInFlight == maxCount || BatchesInFlight == Batches.size () || dataSize + GetBatch (BatchesInFlight)->GetMemory () >= maxDataSize);
900
903
}
901
904
902
905
const IPayloadSerializer::IBatchPtr& GetBatch (size_t index) const {
@@ -1200,18 +1203,19 @@ class TShardedWriteController : public IShardedWriteController {
1200
1203
if (force) {
1201
1204
for (auto & [shardId, batches] : Serializer->FlushBatchesForce ()) {
1202
1205
for (auto & batch : batches) {
1203
- ShardsInfo.GetShard (shardId).PushBatch (std::move (batch));
1206
+ if (batch && !batch->IsEmpty ()) {
1207
+ ShardsInfo.GetShard (shardId).PushBatch (std::move (batch));
1208
+ }
1204
1209
}
1205
1210
}
1206
1211
} else {
1207
1212
for (const ui64 shardId : Serializer->GetShardIds ()) {
1208
1213
auto & shard = ShardsInfo.GetShard (shardId);
1209
1214
while (true ) {
1210
1215
auto batch = Serializer->FlushBatch (shardId);
1211
- if (! batch || batch->IsEmpty ()) {
1212
- break ;
1216
+ if (batch && ! batch->IsEmpty ()) {
1217
+ shard. PushBatch ( std::move (batch)) ;
1213
1218
}
1214
- shard.PushBatch (std::move (batch));
1215
1219
}
1216
1220
}
1217
1221
}
0 commit comments