Skip to content

Commit 2c34e9f

Browse files
authored
Fix loading session without cursor (#9246)
1 parent 4ee1a79 commit 2c34e9f

File tree

7 files changed

+57
-28
lines changed

7 files changed

+57
-28
lines changed

ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,17 @@ bool TSessionsManager::Load(NTable::TDatabase& database, const TColumnEngineForL
6767
NKikimrColumnShardDataSharingProto::TSourceSession protoSession;
6868
AFL_VERIFY(protoSession.ParseFromString(rowset.GetValue<Schema::SourceSessions::Details>()));
6969

70-
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic protoSessionCursorDynamic;
71-
AFL_VERIFY(protoSessionCursorDynamic.ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorDynamic>()));
70+
std::optional<NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic> protoSessionCursorDynamic;
71+
if (rowset.HaveValue<Schema::SourceSessions::CursorDynamic>()) {
72+
protoSessionCursorDynamic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic{};
73+
AFL_VERIFY(protoSessionCursorDynamic->ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorDynamic>()));
74+
}
7275

73-
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic protoSessionCursorStatic;
74-
AFL_VERIFY(protoSessionCursorStatic.ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorStatic>()));
76+
std::optional<NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic> protoSessionCursorStatic;
77+
if (rowset.HaveValue<Schema::SourceSessions::CursorStatic>()) {
78+
protoSessionCursorStatic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic{};
79+
AFL_VERIFY(protoSessionCursorStatic->ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorStatic>()));
80+
}
7581

7682
AFL_VERIFY(index);
7783
session->DeserializeFromProto(protoSession, protoSessionCursorDynamic, protoSessionCursorStatic).Validate();

ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#include "source.h"
2-
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
2+
3+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
34
#include <ydb/core/tx/columnshard/data_sharing/destination/events/transfer.h>
5+
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
6+
47
#include <ydb/library/formats/arrow/hash/xx_hash.h>
58

69
namespace NKikimr::NOlap::NDataSharing {
@@ -130,18 +133,29 @@ NKikimr::TConclusionStatus TSourceCursor::DeserializeFromProto(const NKikimrColu
130133
PathPortionHashes.emplace(i.GetPathId(), i.GetHash());
131134
}
132135
AFL_VERIFY(PathPortionHashes.size());
133-
StaticSaved = true;
136+
IsStaticSaved = true;
134137
return TConclusionStatus::Success();
135138
}
136139

137140
TSourceCursor::TSourceCursor(const TTabletId selfTabletId, const std::set<ui64>& pathIds, const TTransferContext transferContext)
138141
: SelfTabletId(selfTabletId)
139142
, TransferContext(transferContext)
140-
, PathIds(pathIds)
141-
{
143+
, PathIds(pathIds) {
142144
}
143145

144-
bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManager, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index) {
146+
void TSourceCursor::SaveToDatabase(NIceDb::TNiceDb& db, const TString& sessionId) {
147+
using SourceSessions = NKikimr::NColumnShard::Schema::SourceSessions;
148+
db.Table<SourceSessions>().Key(sessionId).Update(
149+
NIceDb::TUpdate<SourceSessions::CursorDynamic>(SerializeDynamicToProto().SerializeAsString()));
150+
if (!IsStaticSaved) {
151+
db.Table<SourceSessions>().Key(sessionId).Update(
152+
NIceDb::TUpdate<SourceSessions::CursorStatic>(SerializeStaticToProto().SerializeAsString()));
153+
IsStaticSaved = true;
154+
}
155+
}
156+
157+
bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManager,
158+
const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index) {
145159
AFL_VERIFY(!IsStartedFlag);
146160
std::map<ui64, std::map<ui32, std::shared_ptr<TPortionInfo>>> local;
147161
std::vector<std::shared_ptr<TPortionInfo>> portionsLock;
@@ -177,5 +191,4 @@ bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManag
177191
IsStartedFlag = true;
178192
return true;
179193
}
180-
181-
}
194+
} // namespace NKikimr::NOlap::NDataSharing

ydb/core/tx/columnshard/data_sharing/source/session/cursor.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ class TColumnEngineForLogs;
88
class TVersionedIndex;
99
}
1010

11+
namespace NKikimr::NIceDb {
12+
class TNiceDb;
13+
}
14+
1115
namespace NKikimr::NOlap::NDataSharing {
1216

1317
class TSharedBlobsManager;
@@ -30,8 +34,11 @@ class TSourceCursor {
3034
std::set<ui64> PathIds;
3135
THashMap<ui64, TString> PathPortionHashes;
3236
bool IsStartedFlag = false;
33-
YDB_ACCESSOR(bool, StaticSaved, false);
37+
bool IsStaticSaved = false;
3438
void BuildSelection(const std::shared_ptr<IStoragesManager>& storagesManager, const TVersionedIndex& index);
39+
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic SerializeDynamicToProto() const;
40+
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic SerializeStaticToProto() const;
41+
3542
public:
3643
bool IsAckDataReceived() const {
3744
return AckReceivedForPackIdx == PackIdx;
@@ -96,11 +103,10 @@ class TSourceCursor {
96103

97104
TSourceCursor(const TTabletId selfTabletId, const std::set<ui64>& pathIds, const TTransferContext transferContext);
98105

99-
bool Start(const std::shared_ptr<IStoragesManager>& storagesManager, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index);
100-
101-
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic SerializeDynamicToProto() const;
102-
NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic SerializeStaticToProto() const;
106+
void SaveToDatabase(class NIceDb::TNiceDb& db, const TString& sessionId);
103107

108+
bool Start(const std::shared_ptr<IStoragesManager>& storagesManager,
109+
const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index);
104110
[[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic& proto,
105111
const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic& protoStatic);
106112
};

ydb/core/tx/columnshard/data_sharing/source/session/source.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#include "source.h"
2-
#include <ydb/core/tx/columnshard/data_sharing/source/transactions/tx_finish_ack_to_source.h>
2+
3+
#include <ydb/core/tx/columnshard/data_locks/locks/list.h>
34
#include <ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.h>
5+
#include <ydb/core/tx/columnshard/data_sharing/source/transactions/tx_finish_ack_to_source.h>
46
#include <ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.h>
5-
#include <ydb/core/tx/columnshard/data_locks/locks/list.h>
67
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
78

89
namespace NKikimr::NOlap::NDataSharing {
@@ -68,6 +69,10 @@ TConclusion<std::unique_ptr<NTabletFlatExecutor::ITransaction>> TSourceSession::
6869
}
6970
}
7071

72+
void TSourceSession::SaveCursorToDatabase(NIceDb::TNiceDb& db) {
73+
GetCursorVerified()->SaveToDatabase(db, GetSessionId());
74+
}
75+
7176
void TSourceSession::ActualizeDestination(const NColumnShard::TColumnShard& shard, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) {
7277
AFL_VERIFY(IsInProgress() || IsPrepared());
7378
AFL_VERIFY(Cursor);

ydb/core/tx/columnshard/data_sharing/source/session/source.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
#include <ydb/core/tx/columnshard/data_sharing/common/session/common.h>
44
#include <ydb/core/tx/columnshard/common/tablet_id.h>
55

6+
namespace NKikimr::NIceDb {
7+
class TNiceDb;
8+
}
9+
610
namespace NKikimr::NOlap::NDataSharing {
711

812
class TSharedBlobsManager;
@@ -58,7 +62,9 @@ class TSourceSession: public TCommonSession {
5862
AFL_VERIFY(!!Cursor);
5963
return Cursor;
6064
}
61-
/*
65+
66+
void SaveCursorToDatabase(NIceDb::TNiceDb& db);
67+
/*
6268
bool TryNextCursor(const ui32 packIdx, const std::shared_ptr<IStoragesManager>& storagesManager, const TVersionedIndex& index) {
6369
AFL_VERIFY(Cursor);
6470
if (packIdx != Cursor->GetPackIdx()) {

ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,7 @@ bool TTxDataAckToSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc
2222
}
2323

2424
NIceDb::TNiceDb db(txc.DB);
25-
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
26-
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorDynamic>(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString()));
27-
if (!Session->GetCursorVerified()->GetStaticSaved()) {
28-
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
29-
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorStatic>(Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString()));
30-
Session->GetCursorVerified()->SetStaticSaved(true);
31-
}
25+
Session->SaveCursorToDatabase(db);
3226
std::swap(SharedBlobIds, sharedTabletBlobIds);
3327
return true;
3428
}

ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ namespace NKikimr::NOlap::NDataSharing {
66
bool TTxWriteSourceCursor::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
77
using namespace NColumnShard;
88
NIceDb::TNiceDb db(txc.DB);
9-
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
10-
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorDynamic>(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString()));
9+
Session->SaveCursorToDatabase(db);
1110
return true;
1211
}
1312

0 commit comments

Comments
 (0)