Skip to content

Commit 2d042c1

Browse files
authored
fix many scheduled events (#7098)
1 parent d518adf commit 2d042c1

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,11 @@ void TPartitionActor::MakeCommit(const TActorContext& ctx) {
149149
TPartitionActor::~TPartitionActor() = default;
150150

151151

152-
void TPartitionActor::Bootstrap(const TActorContext&) {
153-
152+
void TPartitionActor::Bootstrap(const TActorContext& ctx) {
154153
Become(&TThis::StateFunc);
154+
ctx.Schedule(PREWAIT_DATA, new TEvents::TEvWakeup());
155155
}
156156

157-
158157
void TPartitionActor::SendCommit(const ui64 readId, const ui64 offset, const TActorContext& ctx) {
159158
NKikimrClient::TPersQueueRequest request;
160159
request.MutablePartitionRequest()->SetTopic(Topic->GetPrimaryPath());
@@ -1014,7 +1013,6 @@ void TPartitionActor::WaitDataInPartition(const TActorContext& ctx) {
10141013

10151014
NTabletPipe::SendData(ctx, PipeClient, event.Release());
10161015

1017-
ctx.Schedule(PREWAIT_DATA, new TEvents::TEvWakeup());
10181016
ctx.Schedule(WAIT_DATA, new TEvPQProxy::TEvDeadlineExceeded(WaitDataCookie));
10191017

10201018
WaitDataInfly.insert(WaitDataCookie);
@@ -1213,13 +1211,18 @@ void TPartitionActor::HandlePoison(TEvents::TEvPoisonPill::TPtr&, const TActorCo
12131211
}
12141212

12151213
void TPartitionActor::Handle(TEvPQProxy::TEvDeadlineExceeded::TPtr& ev, const TActorContext& ctx) {
1216-
WaitDataInfly.erase(ev->Get()->Cookie);
1217-
HandleWakeup(ctx);
1214+
if (WaitDataInfly.erase(ev->Get()->Cookie)) {
1215+
DoWakeup(ctx);
1216+
}
12181217
}
12191218

12201219
void TPartitionActor::HandleWakeup(const TActorContext& ctx) {
1221-
if (ReadOffset >= EndOffset && WaitDataInfly.size() <= 1 && PipeClient) { //send one more
1222-
Y_ABORT_UNLESS(WaitForData);
1220+
DoWakeup(ctx);
1221+
ctx.Schedule(PREWAIT_DATA, new TEvents::TEvWakeup());
1222+
}
1223+
1224+
void TPartitionActor::DoWakeup(const TActorContext& ctx) {
1225+
if (WaitForData && ReadOffset >= EndOffset && WaitDataInfly.size() <= 1 && PipeClient) { //send one more
12231226
WaitDataInPartition(ctx);
12241227
}
12251228
}

ydb/services/persqueue_v1/actors/partition_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class TPartitionActor : public NActors::TActorBootstrapped<TPartitionActor> {
125125

126126
void HandlePoison(NActors::TEvents::TEvPoisonPill::TPtr& ev, const NActors::TActorContext& ctx);
127127
void HandleWakeup(const NActors::TActorContext& ctx);
128+
void DoWakeup(const NActors::TActorContext& ctx);
128129

129130
void InitLockPartition(const NActors::TActorContext& ctx);
130131
void InitStartReading(const NActors::TActorContext& ctx);

0 commit comments

Comments
 (0)