Skip to content

Commit 8437590

Browse files
committed
Initializing userInfo.AnyCommits
1 parent f755ba7 commit 8437590

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3263,9 +3263,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
32633263
);
32643264

32653265
userInfo.Offset = offset;
3266-
if (userInfo.Offset <= (i64)StartOffset) {
3267-
userInfo.AnyCommits = false;
3268-
}
3266+
userInfo.AnyCommits = userInfo.Offset > (i64)StartOffset;
32693267

32703268
if (LastOffsetHasBeenCommited(userInfo)) {
32713269
SendReadingFinished(user);

ydb/core/persqueue/partition_init.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,12 +433,12 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
433433
Y_ABORT_UNLESS(key);
434434
RequestInfoRange(ctx, Partition()->Tablet, PartitionId(), *key);
435435
} else {
436-
Done(ctx);
436+
PostProcessing(ctx);
437437
}
438438
break;
439439
}
440440
case NKikimrProto::NODATA:
441-
Done(ctx);
441+
PostProcessing(ctx);
442442
break;
443443
case NKikimrProto::ERROR:
444444
PQ_LOG_ERROR("read topic error");
@@ -450,6 +450,16 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
450450
};
451451
}
452452

453+
void TInitInfoRangeStep::PostProcessing(const TActorContext& ctx) {
454+
auto& usersInfoStorage = Partition()->UsersInfoStorage;
455+
for (auto& [_, userInfo] : usersInfoStorage->GetAll()) {
456+
userInfo.AnyCommits = userInfo.Offset > (i64)Partition()->StartOffset;
457+
}
458+
459+
Done(ctx);
460+
}
461+
462+
453463

454464
//
455465
// TInitDataRangeStep

ydb/core/persqueue/partition_init.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ class TInitInfoRangeStep: public TBaseKVStep {
140140

141141
void Execute(const TActorContext& ctx) override;
142142
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
143+
144+
void PostProcessing(const TActorContext& ctx);
143145
};
144146

145147
class TInitDataRangeStep: public TBaseKVStep {

0 commit comments

Comments
 (0)