Skip to content

Commit 18213f3

Browse files
committed
distributed tx commit on read session
1 parent 3be7fe7 commit 18213f3

18 files changed

+564
-218
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ void TPartition::ReplyOk(const TActorContext& ctx, const ui64 dst) {
156156
}
157157

158158
void TPartition::ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset,
159-
const TInstant writeTimestamp, const TInstant createTimestamp) {
160-
ctx.Send(Tablet, MakeReplyGetClientOffsetOk(dst, offset, writeTimestamp, createTimestamp).Release());
159+
const TInstant writeTimestamp, const TInstant createTimestamp, bool consumerHasAnyCommits) {
160+
ctx.Send(Tablet, MakeReplyGetClientOffsetOk(dst, offset, writeTimestamp, createTimestamp, consumerHasAnyCommits).Release());
161161
}
162162

163163
NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i) {
@@ -2266,19 +2266,22 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
22662266
Y_ABORT_UNLESS(userInfo.Offset == (i64)operation.GetCommitOffsetsBegin());
22672267
}
22682268

2269-
if (operation.GetCommitOffsetsEnd() < StartOffset) {
2269+
if (operation.GetCommitOffsetsEnd() <= StartOffset) {
2270+
userInfo.AnyCommits = false;
22702271
userInfo.Offset = StartOffset;
22712272
} else if (operation.GetCommitOffsetsEnd() > EndOffset) {
2273+
userInfo.AnyCommits = true;
22722274
userInfo.Offset = EndOffset;
22732275
} else {
2276+
userInfo.AnyCommits = true;
22742277
userInfo.Offset = operation.GetCommitOffsetsEnd();
22752278
}
22762279

2277-
if (operation.GetKillReadSession()) { // savnik check default
2280+
if (operation.GetKillReadSession()) {
22782281
userInfo.Session = "";
22792282
userInfo.PartitionSessionId = 0;
22802283
userInfo.Generation = 0;
2281-
userInfo.Step = 0; // savnik check need drop this
2284+
userInfo.Step = 0;
22822285
userInfo.PipeClient = {};
22832286
}
22842287
}
@@ -2404,6 +2407,9 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
24042407
userInfo.Generation = actual->Generation;
24052408
userInfo.Step = actual->Step;
24062409
userInfo.Offset = actual->Offset;
2410+
if (userInfo.Offset <= (i64)StartOffset) {
2411+
userInfo.AnyCommits = false;
2412+
}
24072413
userInfo.ReadRuleGeneration = actual->ReadRuleGeneration;
24082414
userInfo.ReadFromTimestamp = actual->ReadFromTimestamp;
24092415
userInfo.HasReadRule = true;
@@ -2588,7 +2594,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
25882594
return EProcessResult::Continue;
25892595
}
25902596

2591-
void TPartition::ExecImmediateTx(TTransaction& t)
2597+
void TPartition::ExecImmediateTx(TTransaction& t) // savnik
25922598
{
25932599
--ImmediateTxCount;
25942600
const auto& record = t.ProposeTransaction->GetRecord();
@@ -2620,7 +2626,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
26202626
"the consumer has been deleted");
26212627
return;
26222628
}
2623-
TUserInfoBase& userInfo = GetOrCreatePendingUser(user);
2629+
TUserInfoBase& pendingUserInfo = GetOrCreatePendingUser(user);
26242630

26252631
if (operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
26262632
ScheduleReplyPropose(record,
@@ -2630,7 +2636,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
26302636
return;
26312637
}
26322638

2633-
if (userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
2639+
if (pendingUserInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
26342640
ScheduleReplyPropose(record,
26352641
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
26362642
NKikimrPQ::TError::BAD_REQUEST,
@@ -2645,7 +2651,7 @@ void TPartition::ExecImmediateTx(TTransaction& t)
26452651
"incorrect offset range (commit to the future)");
26462652
return;
26472653
}
2648-
userInfo.Offset = operation.GetCommitOffsetsEnd();
2654+
pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
26492655
}
26502656
CommitWriteOperations(t);
26512657

@@ -2780,7 +2786,7 @@ void TPartition::CommitUserAct(TEvPQ::TEvSetClientInfo& act) {
27802786
userInfo.PipeClient = act.PipeClient;
27812787
ScheduleReplyGetClientOffsetOk(act.Cookie,
27822788
userInfo.Offset,
2783-
ts.first, ts.second);
2789+
ts.first, ts.second, ui->AnyCommits);
27842790

27852791
return;
27862792
}
@@ -2877,7 +2883,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
28772883
ui32 step = act.Step;
28782884
const ui64 readRuleGeneration = act.ReadRuleGeneration;
28792885

2880-
bool setSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION;
2886+
bool createSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION;
28812887
bool dropSession = act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_SESSION;
28822888
bool commitNotFromReadSession = (act.Type == TEvPQ::TEvSetClientInfo::ESCI_OFFSET && act.SessionId.empty());
28832889

@@ -2886,6 +2892,7 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
28862892
userInfo.Session = "";
28872893
userInfo.Generation = userInfo.Step = 0;
28882894
userInfo.Offset = 0;
2895+
userInfo.AnyCommits = false;
28892896

28902897
PQ_LOG_D("Topic '" << TopicName() << "' partition " << Partition << " user " << user
28912898
<< " drop done"
@@ -2901,24 +2908,25 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
29012908
userInfo.PartitionSessionId = 0;
29022909
userInfo.Generation = userInfo.Step = 0;
29032910
userInfo.Offset = 0;
2911+
userInfo.AnyCommits = false;
29042912

29052913
if (userInfo.Important) {
29062914
userInfo.Offset = StartOffset;
29072915
}
29082916
} else {
2909-
if (setSession || dropSession) {
2917+
if (createSession || dropSession) {
29102918
offset = userInfo.Offset;
29112919
auto *ui = UsersInfoStorage->GetIfExists(userInfo.User);
29122920
auto ts = ui ? GetTime(*ui, userInfo.Offset) : std::make_pair<TInstant, TInstant>(TInstant::Zero(), TInstant::Zero());
29132921

29142922
ScheduleReplyGetClientOffsetOk(act.Cookie,
29152923
offset,
2916-
ts.first, ts.second);
2924+
ts.first, ts.second, ui->AnyCommits);
29172925
} else {
29182926
ScheduleReplyOk(act.Cookie);
29192927
}
29202928

2921-
if (setSession) {
2929+
if (createSession) {
29222930
userInfo.Session = session;
29232931
userInfo.Generation = generation;
29242932
userInfo.Step = step;
@@ -2934,17 +2942,20 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
29342942

29352943
Y_ABORT_UNLESS(offset <= (ui64)Max<i64>(), "Unexpected Offset: %" PRIu64, offset);
29362944
PQ_LOG_D("Topic '" << TopicName() << "' partition " << Partition << " user " << user
2937-
<< (setSession || dropSession ? " session" : " offset")
2945+
<< (createSession || dropSession ? " session" : " offset")
29382946
<< " is set to " << offset << " (startOffset " << StartOffset << ") session " << session
29392947
);
29402948

29412949
userInfo.Offset = offset;
2950+
if (userInfo.Offset <= (i64)StartOffset) {
2951+
userInfo.AnyCommits = false;
2952+
}
29422953

29432954
if (LastOffsetHasBeenCommited(userInfo)) {
29442955
SendReadingFinished(user);
29452956
}
29462957

2947-
auto counter = setSession ? COUNTER_PQ_CREATE_SESSION_OK : (dropSession ? COUNTER_PQ_DELETE_SESSION_OK : COUNTER_PQ_SET_CLIENT_OFFSET_OK);
2958+
auto counter = createSession ? COUNTER_PQ_CREATE_SESSION_OK : (dropSession ? COUNTER_PQ_DELETE_SESSION_OK : COUNTER_PQ_SET_CLIENT_OFFSET_OK);
29482959
TabletCounters.Cumulative()[counter].Increment(1);
29492960
}
29502961
}
@@ -2957,12 +2968,16 @@ void TPartition::ScheduleReplyOk(const ui64 dst)
29572968

29582969
void TPartition::ScheduleReplyGetClientOffsetOk(const ui64 dst,
29592970
const i64 offset,
2960-
const TInstant writeTimestamp, const TInstant createTimestamp)
2971+
const TInstant writeTimestamp,
2972+
const TInstant createTimestamp,
2973+
bool consumerHasAnyCommits)
29612974
{
29622975
Replies.emplace_back(Tablet,
29632976
MakeReplyGetClientOffsetOk(dst,
29642977
offset,
2965-
writeTimestamp, createTimestamp).Release());
2978+
writeTimestamp,
2979+
createTimestamp,
2980+
consumerHasAnyCommits).Release());
29662981

29672982
}
29682983

@@ -3037,7 +3052,8 @@ void TPartition::AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
30373052
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated,
30383053
ui64 offset, ui32 gen, ui32 step, const TString& session,
30393054
ui64 readOffsetRewindSum,
3040-
ui64 readRuleGeneration)
3055+
ui64 readRuleGeneration,
3056+
bool anyCommits)
30413057
{
30423058
TBuffer idata;
30433059
{
@@ -3048,6 +3064,7 @@ void TPartition::AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
30483064
userData.SetSession(session);
30493065
userData.SetOffsetRewindSum(readOffsetRewindSum);
30503066
userData.SetReadRuleGeneration(readRuleGeneration);
3067+
userData.SetAnyCommits(anyCommits);
30513068

30523069
TString out;
30533070
Y_PROTOBUF_SUPPRESS_NODISCARD userData.SerializeToString(&out);
@@ -3108,7 +3125,8 @@ void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request)
31083125
userInfo->Offset, userInfo->Generation, userInfo->Step,
31093126
userInfo->Session,
31103127
ui ? ui->ReadOffsetRewindSum : 0,
3111-
userInfo->ReadRuleGeneration);
3128+
userInfo->ReadRuleGeneration,
3129+
userInfo->AnyCommits);
31123130
} else {
31133131
AddCmdDeleteRange(request,
31143132
ikey, ikeyDeprecated);
@@ -3137,27 +3155,27 @@ TUserInfoBase& TPartition::GetOrCreatePendingUser(const TString& user,
31373155
TMaybe<ui64> readRuleGeneration)
31383156
{
31393157
TUserInfoBase* userInfo = nullptr;
3140-
auto i = PendingUsersInfo.find(user);
3141-
if (i == PendingUsersInfo.end()) {
3142-
auto ui = UsersInfoStorage->GetIfExists(user);
3143-
auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user, readRuleGeneration));
3158+
auto pendingUserIt = PendingUsersInfo.find(user);
3159+
if (pendingUserIt == PendingUsersInfo.end()) {
3160+
auto userIt = UsersInfoStorage->GetIfExists(user);
3161+
auto [newPendingUserIt, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user, readRuleGeneration));
31443162

3145-
if (ui) {
3146-
p->second.Session = ui->Session;
3147-
p->second.PartitionSessionId = ui->PartitionSessionId;
3148-
p->second.PipeClient = ui->PipeClient;
3163+
if (userIt) {
3164+
newPendingUserIt->second.Session = userIt->Session;
3165+
newPendingUserIt->second.PartitionSessionId = userIt->PartitionSessionId;
3166+
newPendingUserIt->second.PipeClient = userIt->PipeClient;
31493167

3150-
p->second.Generation = ui->Generation;
3151-
p->second.Step = ui->Step;
3152-
p->second.Offset = ui->Offset;
3153-
p->second.ReadRuleGeneration = ui->ReadRuleGeneration;
3154-
p->second.Important = ui->Important;
3155-
p->second.ReadFromTimestamp = ui->ReadFromTimestamp;
3168+
newPendingUserIt->second.Generation = userIt->Generation;
3169+
newPendingUserIt->second.Step = userIt->Step;
3170+
newPendingUserIt->second.Offset = userIt->Offset;
3171+
newPendingUserIt->second.ReadRuleGeneration = userIt->ReadRuleGeneration;
3172+
newPendingUserIt->second.Important = userIt->Important;
3173+
newPendingUserIt->second.ReadFromTimestamp = userIt->ReadFromTimestamp;
31563174
}
31573175

3158-
userInfo = &p->second;
3176+
userInfo = &newPendingUserIt->second;
31593177
} else {
3160-
userInfo = &i->second;
3178+
userInfo = &pendingUserIt->second;
31613179
}
31623180
AffectedUsers.insert(user);
31633181

@@ -3186,7 +3204,9 @@ THolder<TEvPQ::TEvProxyResponse> TPartition::MakeReplyOk(const ui64 dst)
31863204

31873205
THolder<TEvPQ::TEvProxyResponse> TPartition::MakeReplyGetClientOffsetOk(const ui64 dst,
31883206
const i64 offset,
3189-
const TInstant writeTimestamp, const TInstant createTimestamp)
3207+
const TInstant writeTimestamp,
3208+
const TInstant createTimestamp,
3209+
bool consumerHasAnyCommits)
31903210
{
31913211
auto response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
31923212
NKikimrClient::TResponse& resp = *response->Response;
@@ -3206,6 +3226,7 @@ THolder<TEvPQ::TEvProxyResponse> TPartition::MakeReplyGetClientOffsetOk(const ui
32063226
user->SetEndOffset(EndOffset);
32073227
user->SetSizeLag(GetSizeLag(offset));
32083228
user->SetWriteTimestampEstimateMS(WriteTimestampEstimate.MilliSeconds());
3229+
user->SetClientHasAnyCommits(consumerHasAnyCommits);
32093230

32103231
return response;
32113232
}

ydb/core/persqueue/partition.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
155155
NKikimrPQ::TError::EKind kind, const TString& reason);
156156
void ReplyErrorForStoredWrites(const TActorContext& ctx);
157157

158-
void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
158+
void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp, bool consumerHasAnyCommits);
159159
void ReplyOk(const TActorContext& ctx, const ui64 dst);
160160
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo);
161161

@@ -339,7 +339,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
339339
void ScheduleReplyOk(const ui64 dst);
340340
void ScheduleReplyGetClientOffsetOk(const ui64 dst,
341341
const i64 offset,
342-
const TInstant writeTimestamp, const TInstant createTimestamp);
342+
const TInstant writeTimestamp,
343+
const TInstant createTimestamp,
344+
bool consumerHasAnyCommits);
343345
void ScheduleReplyError(const ui64 dst,
344346
NPersQueue::NErrorCode::EErrorCode errorCode,
345347
const TString& error);
@@ -355,7 +357,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
355357
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated,
356358
ui64 offset, ui32 gen, ui32 step, const TString& session,
357359
ui64 readOffsetRewindSum,
358-
ui64 readRuleGeneration);
360+
ui64 readRuleGeneration,
361+
bool anyCommits);
359362
void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request);
360363
void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
361364
void AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request);
@@ -368,7 +371,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
368371
THolder<TEvPQ::TEvProxyResponse> MakeReplyOk(const ui64 dst);
369372
THolder<TEvPQ::TEvProxyResponse> MakeReplyGetClientOffsetOk(const ui64 dst,
370373
const i64 offset,
371-
const TInstant writeTimestamp, const TInstant createTimestamp);
374+
const TInstant writeTimestamp,
375+
const TInstant createTimestamp,
376+
bool consumerHasAnyCommits);
372377
THolder<TEvPQ::TEvError> MakeReplyError(const ui64 dst,
373378
NPersQueue::NErrorCode::EErrorCode errorCode,
374379
const TString& error);

ydb/core/persqueue/partition_read.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
217217
}
218218
if (!userInfo) {
219219
userInfo = &UsersInfoStorage->Create(
220-
ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
220+
ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}, false
221221
);
222222
}
223223
if (userInfo->Offset < (i64)StartOffset)
@@ -279,7 +279,7 @@ void TPartition::Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext
279279
ui64 offset = Max<i64>(userInfo.Offset, 0);
280280
auto ts = GetTime(userInfo, offset);
281281
TabletCounters.Cumulative()[COUNTER_PQ_GET_CLIENT_OFFSET_OK].Increment(1);
282-
ReplyGetClientOffsetOk(ctx, ev->Get()->Cookie, userInfo.Offset, ts.first, ts.second);
282+
ReplyGetClientOffsetOk(ctx, ev->Get()->Cookie, userInfo.Offset, ts.first, ts.second, userInfo.AnyCommits);
283283
}
284284

285285
void TPartition::Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx) {

0 commit comments

Comments
 (0)