Skip to content

Commit ae3ca4f

Browse files
authored
checkpoint storage: use more query parameters and settings (backport #13755) (#13978)
1 parent c009b69 commit ae3ca4f

File tree

2 files changed

+119
-51
lines changed

2 files changed

+119
-51
lines changed

ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp

Lines changed: 117 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,18 @@ struct TCheckpointContext : public TThrRefBase {
5757
TGenerationContextPtr GenerationContext;
5858
TCheckpointGraphDescriptionContextPtr CheckpointGraphDescriptionContext;
5959
IEntityIdGenerator::TPtr EntityIdGenerator;
60+
TExecDataQuerySettings Settings;
6061

6162
TCheckpointContext(const TCheckpointId& id,
6263
ECheckpointStatus status,
6364
ECheckpointStatus expected,
64-
ui64 stateSizeBytes)
65+
ui64 stateSizeBytes,
66+
TExecDataQuerySettings settings)
6567
: CheckpointId(id)
6668
, Status(status)
6769
, ExpectedStatus(expected)
6870
, StateSizeBytes(stateSizeBytes)
71+
, Settings(settings)
6972
{
7073
}
7174
};
@@ -218,7 +221,7 @@ TFuture<TStatus> CreateCheckpoint(const TCheckpointContextPtr& context) {
218221
}
219222

220223
auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx();
221-
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build()).Apply(
224+
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build(), context->Settings).Apply(
222225
[] (const TFuture<TDataQueryResult>& future) {
223226
TStatus status = future.GetValue();
224227
return status;
@@ -234,21 +237,41 @@ TFuture<TStatus> UpdateCheckpoint(const TCheckpointContextPtr& context) {
234237
auto query = Sprintf(R"(
235238
--!syntax_v1
236239
PRAGMA TablePathPrefix("%s");
237-
$ts = cast(%lu as Timestamp);
240+
DECLARE $graph_id AS String;
241+
DECLARE $coordinator_generation AS Uint64;
242+
DECLARE $seq_no AS Uint64;
243+
DECLARE $status AS Uint8;
244+
DECLARE $state_size AS Uint64;
245+
DECLARE $ts AS Timestamp;
238246
239247
UPSERT INTO %s (graph_id, coordinator_generation, seq_no, status, state_size, modified_by) VALUES
240-
("%s", %lu, %lu, %u, %lu, $ts);
248+
($graph_id, $coordinator_generation, $seq_no, $status, $state_size, $ts);
241249
)", generationContext->TablePathPrefix.c_str(),
242-
TInstant::Now().MicroSeconds(),
243-
CheckpointsMetadataTable,
244-
generationContext->PrimaryKey.c_str(),
245-
context->CheckpointId.CoordinatorGeneration,
246-
context->CheckpointId.SeqNo,
247-
(ui32)context->Status,
248-
context->StateSizeBytes);
250+
CheckpointsMetadataTable);
251+
252+
NYdb::TParamsBuilder params;
253+
params
254+
.AddParam("$graph_id")
255+
.String(generationContext->PrimaryKey)
256+
.Build()
257+
.AddParam("$coordinator_generation")
258+
.Uint64(context->CheckpointId.CoordinatorGeneration)
259+
.Build()
260+
.AddParam("$seq_no")
261+
.Uint64(context->CheckpointId.SeqNo)
262+
.Build()
263+
.AddParam("$status")
264+
.Uint8((ui8)context->Status)
265+
.Build()
266+
.AddParam("$state_size")
267+
.Uint64(context->StateSizeBytes)
268+
.Build()
269+
.AddParam("$ts")
270+
.Timestamp(TInstant::Now())
271+
.Build();
249272

250273
auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx();
251-
return generationContext->Session.ExecuteDataQuery(query, ttxControl).Apply(
274+
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build(), context->Settings).Apply(
252275
[] (const TFuture<TDataQueryResult>& future) {
253276
TStatus status = future.GetValue();
254277
return status;
@@ -262,15 +285,20 @@ TFuture<TDataQueryResult> SelectGraphDescId(const TCheckpointContextPtr& context
262285
auto query = Sprintf(R"(
263286
--!syntax_v1
264287
PRAGMA TablePathPrefix("%s");
288+
DECLARE $graph_desc_id AS String;
265289
266290
SELECT ref_count
267291
FROM %s
268-
WHERE id = "%s";
292+
WHERE id = $graph_desc_id;
269293
)", generationContext->TablePathPrefix.c_str(),
270-
CheckpointsGraphsDescriptionTable,
271-
graphDescContext->GraphDescId.c_str());
294+
CheckpointsGraphsDescriptionTable);
295+
NYdb::TParamsBuilder params;
296+
params
297+
.AddParam("$graph_desc_id")
298+
.String(graphDescContext->GraphDescId)
299+
.Build();
272300

273-
return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction));
301+
return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction), params.Build(), context->Settings);
274302
}
275303

276304
bool GraphDescIdExists(const TFuture<TDataQueryResult>& result) {
@@ -290,6 +318,7 @@ TFuture<TStatus> GenerateGraphDescId(const TCheckpointContextPtr& context) {
290318
if (!result.GetValue().IsSuccess()) {
291319
return MakeFuture<TStatus>(result.GetValue());
292320
}
321+
// TODO racing!
293322
if (!GraphDescIdExists(result)) {
294323
return MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues()));
295324
} else {
@@ -441,19 +470,33 @@ TFuture<TDataQueryResult> SelectCheckpoint(const TCheckpointContextPtr& context)
441470
auto query = Sprintf(R"(
442471
--!syntax_v1
443472
PRAGMA TablePathPrefix("%s");
473+
DECLARE $graph_id AS String;
474+
DECLARE $coordinator_generation AS Uint64;
475+
DECLARE $seq_no AS Uint64;
444476
445477
SELECT status
446478
FROM %s
447-
WHERE graph_id = "%s" AND coordinator_generation = %lu AND seq_no = %lu;
479+
WHERE graph_id = $graph_id AND coordinator_generation = $coordinator_generation AND seq_no = $seq_no;
448480
)", generationContext->TablePathPrefix.c_str(),
449-
CheckpointsMetadataTable,
450-
generationContext->PrimaryKey.c_str(),
451-
context->CheckpointId.CoordinatorGeneration,
452-
context->CheckpointId.SeqNo);
481+
CheckpointsMetadataTable);
482+
483+
NYdb::TParamsBuilder params;
484+
params
485+
.AddParam("$graph_id")
486+
.String(generationContext->PrimaryKey)
487+
.Build()
488+
.AddParam("$coordinator_generation")
489+
.Uint64(context->CheckpointId.CoordinatorGeneration)
490+
.Build()
491+
.AddParam("$seq_no")
492+
.Uint64(context->CheckpointId.SeqNo)
493+
.Build();
453494

454495
return generationContext->Session.ExecuteDataQuery(
455496
query,
456-
TTxControl::Tx(*generationContext->Transaction));
497+
TTxControl::Tx(*generationContext->Transaction),
498+
params.Build(),
499+
context->Settings);
457500
}
458501

459502
TFuture<TStatus> CheckCheckpoint(
@@ -766,7 +809,7 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
766809
ECheckpointStatus status)
767810
{
768811
Y_ABORT_UNLESS(graphDescId);
769-
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul);
812+
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
770813
checkpointContext->CheckpointGraphDescriptionContext = MakeIntrusive<TCheckpointGraphDescriptionContext>(graphDescId);
771814
return CreateCheckpointImpl(coordinator, checkpointContext);
772815
}
@@ -777,7 +820,7 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
777820
const NProto::TCheckpointGraphDescription& graphDesc,
778821
ECheckpointStatus status)
779822
{
780-
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul);
823+
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
781824
checkpointContext->CheckpointGraphDescriptionContext = MakeIntrusive<TCheckpointGraphDescriptionContext>(graphDesc);
782825
checkpointContext->EntityIdGenerator = EntityIdGenerator;
783826
return CreateCheckpointImpl(coordinator, checkpointContext);
@@ -818,7 +861,7 @@ TFuture<TIssues> TCheckpointStorage::UpdateCheckpointStatus(
818861
ECheckpointStatus prevStatus,
819862
ui64 stateSizeBytes)
820863
{
821-
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, newStatus, prevStatus, stateSizeBytes);
864+
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, newStatus, prevStatus, stateSizeBytes, DefaultExecDataQuerySettings());
822865
auto future = YdbConnection->TableClient.RetryOperation(
823866
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
824867
auto generationContext = MakeIntrusive<TGenerationContext>(
@@ -844,7 +887,7 @@ TFuture<TIssues> TCheckpointStorage::AbortCheckpoint(
844887
const TCoordinatorId& coordinator,
845888
const TCheckpointId& checkpointId)
846889
{
847-
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, ECheckpointStatus::Aborted, ECheckpointStatus::Pending, 0ul);
890+
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, ECheckpointStatus::Aborted, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
848891
auto future = YdbConnection->TableClient.RetryOperation(
849892
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
850893
auto generationContext = MakeIntrusive<TGenerationContext>(
@@ -903,28 +946,35 @@ TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckp
903946

904947
TFuture<TIssues> TCheckpointStorage::DeleteGraph(const TString& graphId) {
905948
auto future = YdbConnection->TableClient.RetryOperation(
906-
[prefix = YdbConnection->TablePathPrefix, graphId] (TSession session) {
949+
[prefix = YdbConnection->TablePathPrefix, graphId, settings = DefaultExecDataQuerySettings()] (TSession session) {
907950
// TODO: use prepared queries
908951
auto query = Sprintf(R"(
909952
--!syntax_v1
910953
PRAGMA TablePathPrefix("%s");
954+
DECLARE $graph_id AS String;
911955
912956
DELETE
913957
FROM %s
914-
WHERE graph_id = "%s";
958+
WHERE graph_id = $graph_id;
915959
916960
DELETE
917961
FROM %s
918-
WHERE graph_id = "%s";
962+
WHERE graph_id = $graph_id;
919963
)", prefix.c_str(),
920964
CoordinatorsSyncTable,
921-
graphId.c_str(),
922-
CheckpointsMetadataTable,
923-
graphId.c_str());
965+
CheckpointsMetadataTable);
966+
967+
NYdb::TParamsBuilder params;
968+
params
969+
.AddParam("$graph_id")
970+
.String(graphId)
971+
.Build();
924972

925973
auto future = session.ExecuteDataQuery(
926974
query,
927-
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
975+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
976+
params.Build(),
977+
settings);
928978

929979
return future.Apply(
930980
[] (const TFuture<TDataQueryResult>& future) {
@@ -941,30 +991,48 @@ TFuture<TIssues> TCheckpointStorage::MarkCheckpointsGC(
941991
const TCheckpointId& checkpointUpperBound)
942992
{
943993
auto future = YdbConnection->TableClient.RetryOperation(
944-
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
994+
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) {
945995
// TODO: use prepared queries
946996
auto query = Sprintf(R"(
947997
--!syntax_v1
948998
PRAGMA TablePathPrefix("%s");
949-
$ts = cast(%lu as Timestamp);
999+
DECLARE $ts AS Timestamp;
1000+
DECLARE $status AS Uint8;
1001+
DECLARE $graph_id AS String;
1002+
DECLARE $coordinator_generation AS Uint64;
1003+
DECLARE $seq_no AS Uint64;
9501004
9511005
UPDATE %s
952-
SET status = %u, modified_by = $ts
953-
WHERE graph_id = "%s" AND
954-
(coordinator_generation < %lu OR
955-
(coordinator_generation = %lu AND seq_no < %lu));
1006+
SET status = $status, modified_by = $ts
1007+
WHERE graph_id = $graph_id AND
1008+
(coordinator_generation < $coordinator_generation OR
1009+
(coordinator_generation = $coordinator_generation AND seq_no < $seq_no));
9561010
)", prefix.c_str(),
957-
TInstant::Now().MicroSeconds(),
958-
CheckpointsMetadataTable,
959-
(ui32)ECheckpointStatus::GC,
960-
graphId.c_str(),
961-
checkpointUpperBound.CoordinatorGeneration,
962-
checkpointUpperBound.CoordinatorGeneration,
963-
checkpointUpperBound.SeqNo);
1011+
CheckpointsMetadataTable);
1012+
1013+
NYdb::TParamsBuilder params;
1014+
params
1015+
.AddParam("$graph_id")
1016+
.String(graphId)
1017+
.Build()
1018+
.AddParam("$coordinator_generation")
1019+
.Uint64(checkpointUpperBound.CoordinatorGeneration)
1020+
.Build()
1021+
.AddParam("$seq_no")
1022+
.Uint64(checkpointUpperBound.SeqNo)
1023+
.Build()
1024+
.AddParam("$status")
1025+
.Uint8((ui8)ECheckpointStatus::GC)
1026+
.Build()
1027+
.AddParam("$ts")
1028+
.Timestamp(TInstant::Now())
1029+
.Build();
9641030

9651031
auto future = session.ExecuteDataQuery(
9661032
query,
967-
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
1033+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
1034+
params.Build(),
1035+
thisPtr->DefaultExecDataQuerySettings());
9681036

9691037
return future.Apply(
9701038
[] (const TFuture<TDataQueryResult>& future) {
@@ -981,7 +1049,7 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
9811049
const TCheckpointId& checkpointUpperBound)
9821050
{
9831051
auto future = YdbConnection->TableClient.RetryOperation(
984-
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
1052+
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, settings = DefaultExecDataQuerySettings()] (TSession session) {
9851053
// TODO: use prepared queries
9861054
using namespace fmt::literals;
9871055
const TString query = fmt::format(R"sql(
@@ -1040,7 +1108,7 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
10401108

10411109
auto future = session.ExecuteDataQuery(
10421110
query,
1043-
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params.Build());
1111+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params.Build(), settings);
10441112

10451113
return future.Apply(
10461114
[] (const TFuture<TDataQueryResult>& future) {

ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -725,8 +725,8 @@ TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) {
725725
726726
DELETE
727727
FROM %s
728-
WHERE graph_id = "%s";
729-
)", prefix.c_str(), StatesTable, graphId.c_str());
728+
WHERE graph_id = $graph_id;
729+
)", prefix.c_str(), StatesTable);
730730

731731
auto future = session.ExecuteDataQuery(
732732
query,

0 commit comments

Comments
 (0)