Skip to content

Commit 9345ee9

Browse files
committed
distributed tx commit on read session
1 parent 9fa6ad2 commit 9345ee9

18 files changed

+562
-216
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst) {
156156
}
157157

158158
void TPartition::ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset,
159-
const TInstant writeTimestamp, const TInstant createTimestamp) {
160-
ctx.Send(Tablet, MakeReplyGetClientOffsetOk(dst, offset, writeTimestamp, createTimestamp).Release());
159+
const TInstant writeTimestamp, const TInstant createTimestamp, bool consumerHasAnyCommits) {
160+
ctx.Send(Tablet, MakeReplyGetClientOffsetOk(dst, offset, writeTimestamp, createTimestamp, consumerHasAnyCommits).Release());
161161
}
162162

163163
NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i) {
@@ -2285,19 +2285,22 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
22852285
Y_ABORT_UNLESS(userInfo.Offset == (i64)operation.GetCommitOffsetsBegin());
22862286
}
22872287

2288-
if (operation.GetCommitOffsetsEnd() < StartOffset) {
2288+
if (operation.GetCommitOffsetsEnd() <= StartOffset) {
2289+
userInfo.AnyCommits = false;
22892290
userInfo.Offset = StartOffset;
22902291
} else if (operation.GetCommitOffsetsEnd() > EndOffset) {
2292+
userInfo.AnyCommits = true;
22912293
userInfo.Offset = EndOffset;
22922294
} else {
2295+
userInfo.AnyCommits = true;
22932296
userInfo.Offset = operation.GetCommitOffsetsEnd();
22942297
}
22952298

2296-
if (operation.GetKillReadSession()) { // savnik check default
2299+
if (operation.GetKillReadSession()) {
22972300
userInfo.Session = "";
22982301
userInfo.PartitionSessionId = 0;
22992302
userInfo.Generation = 0;
2300-
userInfo.Step = 0; // savnik check need drop this
2303+
userInfo.Step = 0;
23012304
userInfo.PipeClient = {};
23022305
}
23032306
}
@@ -2423,6 +2426,9 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
24232426
userInfo.Generation = actual->Generation;
24242427
userInfo.Step = actual->Step;
24252428
userInfo.Offset = actual->Offset;
2429+
if (userInfo.Offset <= (i64)StartOffset) {
2430+
userInfo.AnyCommits = false;
2431+
}
24262432
userInfo.ReadRuleGeneration = actual->ReadRuleGeneration;
24272433
userInfo.ReadFromTimestamp = actual->ReadFromTimestamp;
24282434
userInfo.HasReadRule = true;
@@ -2607,7 +2613,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
26072613
return EProcessResult::Continue;
26082614
}
26092615

2610-
void TPartition::ExecImmediateTx(TTransaction& t)
2616+
void TPartition::ExecImmediateTx(TTransaction& t) // savnik
26112617
{
26122618
--ImmediateTxCount;
26132619
const auto& record = t.ProposeTransaction->GetRecord();
@@ -2639,7 +2645,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
26392645
"the consumer has been deleted");
26402646
return;
26412647
}
2642-
TUserInfoBase& userInfo = GetOrCreatePendingUser(user);
2648+
TUserInfoBase& pendingUserInfo = GetOrCreatePendingUser(user);
26432649

26442650
if (operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
26452651
ScheduleReplyPropose(record,
@@ -2649,7 +2655,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
26492655
return;
26502656
}
26512657

2652-
if (userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
2658+
if (pendingUserInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
26532659
ScheduleReplyPropose(record,
26542660
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
26552661
NKikimrPQ::TError::BAD_REQUEST,
@@ -2664,7 +2670,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
26642670
"incorrect offset range (commit to the future)");
26652671
return;
26662672
}
2667-
userInfo.Offset = operation.GetCommitOffsetsEnd();
2673+
pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
26682674
}
26692675
CommitWriteOperations(t);
26702676

@@ -2799,7 +2805,7 @@ void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
27992805
userInfo.PipeClient = act.PipeClient;
28002806
ScheduleReplyGetClientOffsetOk(act.Cookie,
28012807
userInfo.Offset,
2802-
ts.first, ts.second);
2808+
ts.first, ts.second, ui->AnyCommits);
28032809

28042810
return;
28052811
}
@@ -2896,7 +2902,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
28962902
ui32 step = act.Step;
28972903
const ui64 readRuleGeneration = act.ReadRuleGeneration;
28982904

2899-
bool setSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION;
2905+
bool createSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION;
29002906
bool dropSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_SESSION;
29012907
bool commitNotFromReadSession = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.SessionId.empty());
29022908

@@ -2905,6 +2911,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
29052911
userInfo.Session = "";
29062912
userInfo.Generation = userInfo.Step = 0;
29072913
userInfo.Offset = 0;
2914+
userInfo.AnyCommits = false;
29082915

29092916
PQ_LOG_D("Topic '" << TopicName() << "' partition " << Partition << " user " << user
29102917
<< " drop done"
@@ -2920,24 +2927,25 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
29202927
userInfo.PartitionSessionId = 0;
29212928
userInfo.Generation = userInfo.Step = 0;
29222929
userInfo.Offset = 0;
2930+
userInfo.AnyCommits = false;
29232931

29242932
if (userInfo.Important) {
29252933
userInfo.Offset = StartOffset;
29262934
}
29272935
} else {
2928-
if (setSession || dropSession) {
2936+
if (createSession || dropSession) {
29292937
offset = userInfo.Offset;
29302938
auto *ui = UsersInfoStorage->GetIfExists(userInfo.User);
29312939
auto ts = ui ? GetTime(*ui, userInfo.Offset) : std::make_pair<TInstant, TInstant>(TInstant::Zero(), TInstant::Zero());
29322940

29332941
ScheduleReplyGetClientOffsetOk(act.Cookie,
29342942
offset,
2935-
ts.first, ts.second);
2943+
ts.first, ts.second, ui->AnyCommits);
29362944
} else {
29372945
ScheduleReplyOk(act.Cookie);
29382946
}
29392947

2940-
if (setSession) {
2948+
if (createSession) {
29412949
userInfo.Session = session;
29422950
userInfo.Generation = generation;
29432951
userInfo.Step = step;
@@ -2953,17 +2961,20 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
29532961

29542962
Y_ABORT_UNLESS(offset <= (ui64)Max<i64>(), "Unexpected Offset: %" PRIu64, offset);
29552963
PQ_LOG_D("Topic '" << TopicName() << "' partition " << Partition << " user " << user
2956-
<< (setSession || dropSession ? " session" : " offset")
2964+
<< (createSession || dropSession ? " session" : " offset")
29572965
<< " is set to " << offset << " (startOffset " << StartOffset << ") session " << session
29582966
);
29592967

29602968
userInfo.Offset = offset;
2969+
if (userInfo.Offset <= (i64)StartOffset) {
2970+
userInfo.AnyCommits = false;
2971+
}
29612972

29622973
if (LastOffsetHasBeenCommited(userInfo)) {
29632974
SendReadingFinished(user);
29642975
}
29652976

2966-
auto counter = setSession ? COUNTER_PQ_CREATE_SESSION_OK : (dropSession ? COUNTER_PQ_DELETE_SESSION_OK : COUNTER_PQ_SET_CLIENT_OFFSET_OK);
2977+
auto counter = createSession ? COUNTER_PQ_CREATE_SESSION_OK : (dropSession ? COUNTER_PQ_DELETE_SESSION_OK : COUNTER_PQ_SET_CLIENT_OFFSET_OK);
29672978
TabletCounters.Cumulative()[counter].Increment(1);
29682979
}
29692980
}
@@ -2976,12 +2987,16 @@ void TPartition::ScheduleReplyOk(const ui64 dst)
29762987

29772988
void TPartition::ScheduleReplyGetClientOffsetOk(const ui64 dst,
29782989
const i64 offset,
2979-
const TInstant writeTimestamp, const TInstant createTimestamp)
2990+
const TInstant writeTimestamp,
2991+
const TInstant createTimestamp,
2992+
bool consumerHasAnyCommits)
29802993
{
29812994
Replies.emplace_back(Tablet,
29822995
MakeReplyGetClientOffsetOk(dst,
29832996
offset,
2984-
writeTimestamp, createTimestamp).Release());
2997+
writeTimestamp,
2998+
createTimestamp,
2999+
consumerHasAnyCommits).Release());
29853000

29863001
}
29873002

@@ -3056,7 +3071,8 @@ void TPartition::AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
30563071
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated,
30573072
ui64 offset, ui32 gen, ui32 step, const TString& session,
30583073
ui64 readOffsetRewindSum,
3059-
ui64 readRuleGeneration)
3074+
ui64 readRuleGeneration,
3075+
bool anyCommits)
30603076
{
30613077
TBuffer idata;
30623078
{
@@ -3067,6 +3083,7 @@ void TPartition::AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
30673083
userData.SetSession(session);
30683084
userData.SetOffsetRewindSum(readOffsetRewindSum);
30693085
userData.SetReadRuleGeneration(readRuleGeneration);
3086+
userData.SetAnyCommits(anyCommits);
30703087

30713088
TString out;
30723089
Y_PROTOBUF_SUPPRESS_NODISCARD userData.SerializeToString(&out);
@@ -3127,7 +3144,8 @@ void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request)
31273144
userInfo->Offset, userInfo->Generation, userInfo->Step,
31283145
userInfo->Session,
31293146
ui ? ui->ReadOffsetRewindSum : 0,
3130-
userInfo->ReadRuleGeneration);
3147+
userInfo->ReadRuleGeneration,
3148+
userInfo->AnyCommits);
31313149
} else {
31323150
AddCmdDeleteRange(request,
31333151
ikey, ikeyDeprecated);
@@ -3156,27 +3174,27 @@ TUserInfoBase& TPartition::GetOrCreatePendingUser(const TString& user,
31563174
TMaybe<ui64> readRuleGeneration)
31573175
{
31583176
TUserInfoBase* userInfo = nullptr;
3159-
auto i = PendingUsersInfo.find(user);
3160-
if (i == PendingUsersInfo.end()) {
3161-
auto ui = UsersInfoStorage->GetIfExists(user);
3162-
auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user, readRuleGeneration));
3177+
auto pendingUserIt = PendingUsersInfo.find(user);
3178+
if (pendingUserIt == PendingUsersInfo.end()) {
3179+
auto userIt = UsersInfoStorage->GetIfExists(user);
3180+
auto [newPendingUserIt, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user, readRuleGeneration));
31633181

3164-
if (ui) {
3165-
p->second.Session = ui->Session;
3166-
p->second.PartitionSessionId = ui->PartitionSessionId;
3167-
p->second.PipeClient = ui->PipeClient;
3182+
if (userIt) {
3183+
newPendingUserIt->second.Session = userIt->Session;
3184+
newPendingUserIt->second.PartitionSessionId = userIt->PartitionSessionId;
3185+
newPendingUserIt->second.PipeClient = userIt->PipeClient;
31683186

3169-
p->second.Generation = ui->Generation;
3170-
p->second.Step = ui->Step;
3171-
p->second.Offset = ui->Offset;
3172-
p->second.ReadRuleGeneration = ui->ReadRuleGeneration;
3173-
p->second.Important = ui->Important;
3174-
p->second.ReadFromTimestamp = ui->ReadFromTimestamp;
3187+
newPendingUserIt->second.Generation = userIt->Generation;
3188+
newPendingUserIt->second.Step = userIt->Step;
3189+
newPendingUserIt->second.Offset = userIt->Offset;
3190+
newPendingUserIt->second.ReadRuleGeneration = userIt->ReadRuleGeneration;
3191+
newPendingUserIt->second.Important = userIt->Important;
3192+
newPendingUserIt->second.ReadFromTimestamp = userIt->ReadFromTimestamp;
31753193
}
31763194

3177-
userInfo = &p->second;
3195+
userInfo = &newPendingUserIt->second;
31783196
} else {
3179-
userInfo = &i->second;
3197+
userInfo = &pendingUserIt->second;
31803198
}
31813199
AffectedUsers.insert(user);
31823200

@@ -3205,7 +3223,9 @@ THolder<TEvPQ::TEvProxyResponse> TPartition::MakeReplyOk(const ui64 dst)
32053223

32063224
THolder<TEvPQ::TEvProxyResponse> TPartition::MakeReplyGetClientOffsetOk(const ui64 dst,
32073225
const i64 offset,
3208-
const TInstant writeTimestamp, const TInstant createTimestamp)
3226+
const TInstant writeTimestamp,
3227+
const TInstant createTimestamp,
3228+
bool consumerHasAnyCommits)
32093229
{
32103230
auto response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
32113231
NKikimrClient::TResponse& resp = *response->Response;
@@ -3225,6 +3245,7 @@ THolder<TEvPQ::TEvProxyResponse> TPartition::MakeReplyGetClientOffsetOk(const ui
32253245
user->SetEndOffset(EndOffset);
32263246
user->SetSizeLag(GetSizeLag(offset));
32273247
user->SetWriteTimestampEstimateMS(WriteTimestampEstimate.MilliSeconds());
3248+
user->SetClientHasAnyCommits(consumerHasAnyCommits);
32283249

32293250
return response;
32303251
}

ydb/core/persqueue/partition.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
156156
NKikimrPQ::TError::EKind kind, const TString& reason);
157157
void ReplyErrorForStoredWrites(const TActorContext& ctx);
158158

159-
void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
159+
void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp, bool consumerHasAnyCommits);
160160
void ReplyOk(const TActorContext& ctx, const ui64 dst);
161161
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo);
162162

@@ -340,7 +340,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
340340
void ScheduleReplyOk(const ui64 dst);
341341
void ScheduleReplyGetClientOffsetOk(const ui64 dst,
342342
const i64 offset,
343-
const TInstant writeTimestamp, const TInstant createTimestamp);
343+
const TInstant writeTimestamp,
344+
const TInstant createTimestamp,
345+
bool consumerHasAnyCommits);
344346
void ScheduleReplyError(const ui64 dst,
345347
NPersQueue::NErrorCode::EErrorCode errorCode,
346348
const TString& error);
@@ -356,7 +358,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
356358
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated,
357359
ui64 offset, ui32 gen, ui32 step, const TString& session,
358360
ui64 readOffsetRewindSum,
359-
ui64 readRuleGeneration);
361+
ui64 readRuleGeneration,
362+
bool anyCommits);
360363
void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request);
361364
void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
362365
void AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request);
@@ -369,7 +372,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
369372
THolder<TEvPQ::TEvProxyResponse> MakeReplyOk(const ui64 dst);
370373
THolder<TEvPQ::TEvProxyResponse> MakeReplyGetClientOffsetOk(const ui64 dst,
371374
const i64 offset,
372-
const TInstant writeTimestamp, const TInstant createTimestamp);
375+
const TInstant writeTimestamp,
376+
const TInstant createTimestamp,
377+
bool consumerHasAnyCommits);
373378
THolder<TEvPQ::TEvError> MakeReplyError(const ui64 dst,
374379
NPersQueue::NErrorCode::EErrorCode errorCode,
375380
const TString& error);

ydb/core/persqueue/partition_read.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
236236
}
237237
if (!userInfo) {
238238
userInfo = &UsersInfoStorage->Create(
239-
ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
239+
ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}, false
240240
);
241241
}
242242
if (userInfo->Offset < (i64)StartOffset)
@@ -298,7 +298,7 @@ void TPartition::Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext
298298
ui64 offset = Max<i64>(userInfo.Offset, 0);
299299
auto ts = GetTime(userInfo, offset);
300300
TabletCounters.Cumulative()[COUNTER_PQ_GET_CLIENT_OFFSET_OK].Increment(1);
301-
ReplyGetClientOffsetOk(ctx, ev->Get()->Cookie, userInfo.Offset, ts.first, ts.second);
301+
ReplyGetClientOffsetOk(ctx, ev->Get()->Cookie, userInfo.Offset, ts.first, ts.second, userInfo.AnyCommits);
302302
}
303303

304304
void TPartition::Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx) {

0 commit comments

Comments
 (0)