diff --git a/.github/workflows/examples.yaml b/.github/workflows/examples.yaml index def8ce1fa1..50c47d76bc 100644 --- a/.github/workflows/examples.yaml +++ b/.github/workflows/examples.yaml @@ -17,7 +17,7 @@ jobs: strategy: fail-fast: false matrix: - ydb-version: [23.3, 24.1, trunk] + ydb-version: [24.1, trunk] services: ydb: image: ydbplatform/local-ydb:${{ matrix.ydb-version }} diff --git a/examples/basic_example/CMakeLists.txt b/examples/basic_example/CMakeLists.txt index 7c6f6227dc..75d2d1b253 100644 --- a/examples/basic_example/CMakeLists.txt +++ b/examples/basic_example/CMakeLists.txt @@ -3,13 +3,15 @@ add_executable(basic_example) target_link_libraries(basic_example PUBLIC yutil getopt - YDB-CPP-SDK::Table + YDB-CPP-SDK::Query + YDB-CPP-SDK::Params + YDB-CPP-SDK::Driver ) target_sources(basic_example PRIVATE - ${YDB_SDK_SOURCE_DIR}/examples/basic_example/main.cpp - ${YDB_SDK_SOURCE_DIR}/examples/basic_example/basic_example_data.cpp - ${YDB_SDK_SOURCE_DIR}/examples/basic_example/basic_example.cpp + main.cpp + basic_example_data.cpp + basic_example.cpp ) vcs_info(basic_example) diff --git a/examples/basic_example/basic_example.cpp b/examples/basic_example/basic_example.cpp index 5c506f39ad..5d3b901413 100644 --- a/examples/basic_example/basic_example.cpp +++ b/examples/basic_example/basic_example.cpp @@ -1,14 +1,13 @@ #include "basic_example.h" +#include + #include -#include #include using namespace NYdb; -using namespace NYdb::NTable; - -namespace { +using namespace NYdb::NQuery; class TYdbErrorException : public yexception { public: @@ -18,13 +17,13 @@ class TYdbErrorException : public yexception { TStatus Status; }; -void ThrowOnError(const TStatus& status) { +static void ThrowOnError(const TStatus& status) { if (!status.IsSuccess()) { throw TYdbErrorException(status) << status; } } -void PrintStatus(const TStatus& status) { +static void PrintStatus(const TStatus& status) { std::cerr << "Status: " << ToString(status.GetStatus()) << std::endl; std::cerr << status.GetIssues().ToString(); } @@ -45,86 +44,87 @@ std::string OptionalToString(const std::optional& opt) return "(NULL)"; } -std::string JoinPath(const std::string& basePath, const std::string& path) { - if (basePath.empty()) { - return path; - } - - std::filesystem::path prefixPathSplit(basePath); - prefixPathSplit /= path; - - return prefixPathSplit; -} - /////////////////////////////////////////////////////////////////////////////// -//! Creates sample tables with CrateTable API. -void CreateTables(TTableClient client, const std::string& path) { - ThrowOnError(client.RetryOperationSync([path](TSession session) { - auto seriesDesc = TTableBuilder() - .AddNullableColumn("series_id", EPrimitiveType::Uint64) - .AddNullableColumn("title", EPrimitiveType::Utf8) - .AddNullableColumn("series_info", EPrimitiveType::Utf8) - .AddNullableColumn("release_date", EPrimitiveType::Uint64) - .SetPrimaryKeyColumn("series_id") - .Build(); - - return session.CreateTable(JoinPath(path, "series"), std::move(seriesDesc)).GetValueSync(); +static void CreateTables(TQueryClient client, const std::string& path) { + ThrowOnError(client.RetryQuerySync([path](TSession session) { + auto query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + CREATE TABLE series ( + series_id Uint64, + title Utf8, + series_info Utf8, + release_date Uint64, + PRIMARY KEY (series_id) + ); + )", path.c_str()); + return session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); })); - ThrowOnError(client.RetryOperationSync([path](TSession session) { - auto seasonsDesc = TTableBuilder() - .AddNullableColumn("series_id", EPrimitiveType::Uint64) - .AddNullableColumn("season_id", EPrimitiveType::Uint64) - .AddNullableColumn("title", EPrimitiveType::Utf8) - .AddNullableColumn("first_aired", EPrimitiveType::Uint64) - .AddNullableColumn("last_aired", EPrimitiveType::Uint64) - .SetPrimaryKeyColumns({"series_id", "season_id"}) - .Build(); - - return session.CreateTable(JoinPath(path, "seasons"), std::move(seasonsDesc)).GetValueSync(); + ThrowOnError(client.RetryQuerySync([path](TSession session) { + auto query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + CREATE TABLE seasons ( + series_id Uint64, + season_id Uint64, + title Utf8, + first_aired Uint64, + last_aired Uint64, + PRIMARY KEY (series_id, season_id) + ); + )", path.c_str()); + return session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); })); - ThrowOnError(client.RetryOperationSync([path](TSession session) { - auto episodesDesc = TTableBuilder() - .AddNullableColumn("series_id", EPrimitiveType::Uint64) - .AddNullableColumn("season_id", EPrimitiveType::Uint64) - .AddNullableColumn("episode_id", EPrimitiveType::Uint64) - .AddNullableColumn("title", EPrimitiveType::Utf8) - .AddNullableColumn("air_date", EPrimitiveType::Uint64) - .SetPrimaryKeyColumns({"series_id", "season_id", "episode_id"}) - .Build(); - - return session.CreateTable(JoinPath(path, "episodes"), - std::move(episodesDesc)).GetValueSync(); + ThrowOnError(client.RetryQuerySync([path](TSession session) { + auto query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + CREATE TABLE episodes ( + series_id Uint64, + season_id Uint64, + episode_id Uint64, + title Utf8, + air_date Uint64, + PRIMARY KEY (series_id, season_id, episode_id) + ); + )", path.c_str()); + return session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); })); } -//! Describe existing table. -void DescribeTable(TTableClient client, const std::string& path, const std::string& name) { - std::optional desc; - - ThrowOnError(client.RetryOperationSync([path, name, &desc](TSession session) { - auto result = session.DescribeTable(JoinPath(path, name)).GetValueSync(); - - if (result.IsSuccess()) { - desc = result.GetTableDescription(); - } +/////////////////////////////////////////////////////////////////////////////// - return result; +static void DropTables(TQueryClient client, const std::string& path) { + ThrowOnError(client.RetryQuerySync([path](TSession session) { + auto query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + DROP TABLE series; + )", path.c_str()); + return session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); })); - std::cout << "> Describe table: " << name << std::endl; - for (auto& column : desc->GetColumns()) { - std::cout << "Column, name: " << column.Name << ", type: " << FormatType(column.Type) << std::endl; - } + ThrowOnError(client.RetryQuerySync([path](TSession session) { + auto query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + DROP TABLE seasons; + )", path.c_str()); + return session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + })); + + ThrowOnError(client.RetryQuerySync([path](TSession session) { + auto query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + DROP TABLE episodes; + )", path.c_str()); + return session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + })); } /////////////////////////////////////////////////////////////////////////////// -//! Fills sample tables with data in single parameterized data query. -TStatus FillTableDataTransaction(TSession session, const std::string& path) { - auto query = std::format(R"( +void FillTableData(TQueryClient client, const std::string& path) { + ThrowOnError(client.RetryQuerySync([path](TSession session) { + auto query = std::format(R"( PRAGMA TablePathPrefix("{}"); DECLARE $seriesData AS List& resultSet) -{ - auto query = std::format(R"( - PRAGMA TablePathPrefix("{}"); - - SELECT series_id, title, CAST(CAST(release_date AS Date) AS String) AS release_date - FROM series - WHERE series_id = 1; - )", path); + )", path.c_str()); - auto txControl = - // Begin new transaction with SerializableRW mode - TTxControl::BeginTx(TTxSettings::SerializableRW()) - // Commit transaction at the end of the query - .CommitTx(); - - // Executes data query with specified transaction control settings. - auto result = session.ExecuteDataQuery(query, txControl).GetValueSync(); - - if (result.IsSuccess()) { - // Index of result set corresponds to its order in YQL query - resultSet = result.GetResultSet(0); - } - - return result; -} - -//! Shows basic usage of mutating operations. -TStatus UpsertSimpleTransaction(TSession session, const std::string& path) { - auto query = std::format(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("{}"); + auto params = GetTablesDataParams(); - UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES - (2, 6, 1, "TBD"); - )", path); - - return session.ExecuteDataQuery(query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); -} - -//! Shows usage of parameters in data queries. -TStatus SelectWithParamsTransaction(TSession session, const std::string& path, - uint64_t seriesId, uint64_t seasonId, std::optional& resultSet) -{ - auto query = std::format(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("{}"); - - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - - SELECT sa.title AS season_title, sr.title AS series_title - FROM seasons AS sa - INNER JOIN series AS sr - ON sa.series_id = sr.series_id - WHERE sa.series_id = $seriesId AND sa.season_id = $seasonId; - )", path); - - // Type of parameter values should be exactly the same as in DECLARE statements. - auto params = session.GetParamsBuilder() - .AddParam("$seriesId") - .Uint64(seriesId) - .Build() - .AddParam("$seasonId") - .Uint64(seasonId) - .Build() - .Build(); - - auto result = session.ExecuteDataQuery( - query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params).GetValueSync(); - - if (result.IsSuccess()) { - resultSet = result.GetResultSet(0); - } - - return result; + return session.ExecuteQuery( + query, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params).GetValueSync(); + })); } -//! Shows usage of prepared queries. -TStatus PreparedSelectTransaction(TSession session, const std::string& path, - uint64_t seriesId, uint64_t seasonId, uint64_t episodeId, std::optional& resultSet) -{ - // Once prepared, query data is stored in the session and identified by QueryId. - // Local query cache is used to keep track of queries, prepared in current session. - auto query = std::format(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("{}"); - - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - DECLARE $episodeId AS Uint64; - - SELECT * - FROM episodes - WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; - )", path); - - // Prepare query or get result from query cache - auto prepareResult = session.PrepareDataQuery(query).GetValueSync(); - if (!prepareResult.IsSuccess()) { - return prepareResult; - } - - if (!prepareResult.IsQueryFromCache()) { - std::cerr << "+Finished preparing query: PreparedSelectTransaction" << std::endl; - } - - auto dataQuery = prepareResult.GetQuery(); - - auto params = dataQuery.GetParamsBuilder() - .AddParam("$seriesId") - .Uint64(seriesId) - .Build() - .AddParam("$seasonId") - .Uint64(seasonId) - .Build() - .AddParam("$episodeId") - .Uint64(episodeId) - .Build() - .Build(); - - auto result = dataQuery.Execute(TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params).GetValueSync(); - - if (result.IsSuccess()) { +void SelectSimple(TQueryClient client, const std::string& path) { + std::optional resultSet; + ThrowOnError(client.RetryQuerySync([&path, &resultSet](TSession session) { + auto query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + + SELECT series_id, title, CAST(CAST(release_date AS Date) AS String) AS release_date + FROM series + WHERE series_id = 1; + )", path.c_str()); + + auto txControl = + // Begin new transaction with SerializableRW mode + TTxControl::BeginTx(TTxSettings::SerializableRW()) + // Commit transaction at the end of the query + .CommitTx(); + + auto result = session.ExecuteQuery(query, txControl).GetValueSync(); + if (!result.IsSuccess()) { + return result; + } resultSet = result.GetResultSet(0); - } - - return result; -} - -//! Shows usage of transactions consisting of multiple data queries with client logic between them. -TStatus MultiStepTransaction(TSession session, const std::string& path, uint64_t seriesId, uint64_t seasonId, - std::optional& resultSet) -{ - auto query1 = std::format(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("{}"); - - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - - SELECT first_aired AS from_date FROM seasons - WHERE series_id = $seriesId AND season_id = $seasonId; - )", path); - - auto params1 = session.GetParamsBuilder() - .AddParam("$seriesId") - .Uint64(seriesId) - .Build() - .AddParam("$seasonId") - .Uint64(seasonId) - .Build() - .Build(); - - // Execute first query to get the required values to the client. - // Transaction control settings don't set CommitTx flag to keep transaction active - // after query execution. - auto result = session.ExecuteDataQuery( - query1, - TTxControl::BeginTx(TTxSettings::SerializableRW()), - params1).GetValueSync(); - - if (!result.IsSuccess()) { return result; - } - - // Get active transaction id - auto tx = result.GetTransaction(); - - TResultSetParser parser(result.GetResultSet(0)); - parser.TryNextRow(); - auto date = parser.ColumnParser("from_date").GetOptionalUint64(); - - // Perform some client logic on returned values - auto userFunc = [] (const TInstant fromDate) { - return fromDate + TDuration::Days(15); - }; - - TInstant fromDate = TInstant::Days(*date); - TInstant toDate = userFunc(fromDate); - - // Construct next query based on the results of client logic - auto query2 = std::format(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("{}"); - - DECLARE $seriesId AS Uint64; - DECLARE $fromDate AS Uint64; - DECLARE $toDate AS Uint64; - - SELECT season_id, episode_id, title, air_date FROM episodes - WHERE series_id = $seriesId AND air_date >= $fromDate AND air_date <= $toDate; - )", path); - - auto params2 = session.GetParamsBuilder() - .AddParam("$seriesId") - .Uint64(seriesId) - .Build() - .AddParam("$fromDate") - .Uint64(fromDate.Days()) - .Build() - .AddParam("$toDate") - .Uint64(toDate.Days()) - .Build() - .Build(); - - // Execute second query. - // Transaction control settings continues active transaction (tx) and - // commits it at the end of second query execution. - result = session.ExecuteDataQuery( - query2, - TTxControl::Tx(*tx).CommitTx(), - params2).GetValueSync(); - - if (result.IsSuccess()) { - resultSet = result.GetResultSet(0); - } - - return result; -} - -// Show usage of explicit Begin/Commit transaction control calls. -// In most cases it's better to use transaction control settings in ExecuteDataQuery calls instead -// to avoid additional hops to YDB cluster and allow more efficient execution of queries. -TStatus ExplicitTclTransaction(TSession session, const std::string& path, const TInstant& airDate) { - auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW()).GetValueSync(); - if (!beginResult.IsSuccess()) { - return beginResult; - } - - // Get newly created transaction id - auto tx = beginResult.GetTransaction(); - - auto query = std::format(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("{}"); - - DECLARE $airDate AS Date; - - UPDATE episodes SET air_date = CAST($airDate AS Uint16) WHERE title = "TBD"; - )", path); - - auto params = session.GetParamsBuilder() - .AddParam("$airDate") - .Date(airDate) - .Build() - .Build(); - - // Execute data query. - // Transaction control settings continues active transaction (tx) - auto updateResult = session.ExecuteDataQuery(query, - TTxControl::Tx(tx), - params).GetValueSync(); - - if (!updateResult.IsSuccess()) { - return updateResult; - } - - // Commit active transaction (tx) - return tx.Commit().GetValueSync(); -} - -} - -/////////////////////////////////////////////////////////////////////////////// - -void SelectSimple(TTableClient client, const std::string& path) { - std::optional resultSet; - ThrowOnError(client.RetryOperationSync([path, &resultSet](TSession session) { - return SelectSimpleTransaction(session, path, resultSet); })); TResultSetParser parser(*resultSet); if (parser.TryNextRow()) { std::cout << "> SelectSimple:" << std::endl << "Series" - << ", Id: " << OptionalToString(parser.ColumnParser("series_id").GetOptionalUint64()) - << ", Title: " << OptionalToString(parser.ColumnParser("title").GetOptionalUtf8()) - << ", Release date: " << OptionalToString(parser.ColumnParser("release_date").GetOptionalString()) - << std::endl; + << ", Id: " << OptionalToString(parser.ColumnParser("series_id").GetOptionalUint64()) + << ", Title: " << OptionalToString(parser.ColumnParser("title").GetOptionalUtf8()) + << ", Release date: " << OptionalToString(parser.ColumnParser("release_date").GetOptionalString()) + << std::endl; } } -void UpsertSimple(TTableClient client, const std::string& path) { - ThrowOnError(client.RetryOperationSync([path](TSession session) { - return UpsertSimpleTransaction(session, path); +void UpsertSimple(TQueryClient client, const std::string& path) { + ThrowOnError(client.RetryQuerySync([path](TSession session) { + auto query = std::format(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("{}"); + + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES + (2, 6, 1, "TBD"); + )", path.c_str()); + + return session.ExecuteQuery(query, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); })); } -void SelectWithParams(TTableClient client, const std::string& path) { +void SelectWithParams(TQueryClient client, const std::string& path) { std::optional resultSet; - ThrowOnError(client.RetryOperationSync([path, &resultSet](TSession session) { - return SelectWithParamsTransaction(session, path, 2, 3, resultSet); + ThrowOnError(client.RetryQuerySync([&path, &resultSet](TSession session) { + ui64 seriesId = 2; + ui64 seasonId = 3; + auto query = std::format(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("{}"); + + DECLARE $seriesId AS Uint64; + DECLARE $seasonId AS Uint64; + + SELECT sa.title AS season_title, sr.title AS series_title + FROM seasons AS sa + INNER JOIN series AS sr + ON sa.series_id = sr.series_id + WHERE sa.series_id = $seriesId AND sa.season_id = $seasonId; + )", path.c_str()); + + auto params = TParamsBuilder() + .AddParam("$seriesId") + .Uint64(seriesId) + .Build() + .AddParam("$seasonId") + .Uint64(seasonId) + .Build() + .Build(); + + auto result = session.ExecuteQuery( + query, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params).GetValueSync(); + + if (!result.IsSuccess()) { + return result; + } + resultSet = result.GetResultSet(0); + return result; })); TResultSetParser parser(*resultSet); if (parser.TryNextRow()) { std::cout << "> SelectWithParams:" << std::endl << "Season" - << ", Title: " << OptionalToString(parser.ColumnParser("season_title").GetOptionalUtf8()) - << ", Series title: " << OptionalToString(parser.ColumnParser("series_title").GetOptionalUtf8()) - << std::endl; + << ", Title: " << OptionalToString(parser.ColumnParser("season_title").GetOptionalUtf8()) + << ", Series title: " << OptionalToString(parser.ColumnParser("series_title").GetOptionalUtf8()) + << std::endl; } } -void PreparedSelect(TTableClient client, const std::string& path, uint32_t seriesId, uint32_t seasonId, uint32_t episodeId) { +void MultiStep(TQueryClient client, const std::string& path) { std::optional resultSet; - ThrowOnError(client.RetryOperationSync([path, seriesId, seasonId, episodeId, &resultSet](TSession session) { - return PreparedSelectTransaction(session, path, seriesId, seasonId, episodeId, resultSet); - })); + ThrowOnError(client.RetryQuerySync([&path, &resultSet](TSession session) { + ui64 seriesId = 2; + ui64 seasonId = 5; + auto query1 = std::format(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("{}"); + + DECLARE $seriesId AS Uint64; + DECLARE $seasonId AS Uint64; + + SELECT first_aired AS from_date FROM seasons + WHERE series_id = $seriesId AND season_id = $seasonId; + )", path.c_str()); + + auto params1 = TParamsBuilder() + .AddParam("$seriesId") + .Uint64(seriesId) + .Build() + .AddParam("$seasonId") + .Uint64(seasonId) + .Build() + .Build(); - TResultSetParser parser(*resultSet); - if (parser.TryNextRow()) { - auto airDate = TInstant::Days(*parser.ColumnParser("air_date").GetOptionalUint64()); + // Execute first query to get the required values to the client. + // Transaction control settings don't set CommitTx flag to keep transaction active + // after query execution. + auto result = session.ExecuteQuery( + query1, + TTxControl::BeginTx(TTxSettings::SerializableRW()), + params1); - std::cout << "> PreparedSelect:" << std::endl << "Episode " << OptionalToString(parser.ColumnParser("episode_id").GetOptionalUint64()) - << ", Title: " << OptionalToString(parser.ColumnParser("title").GetOptionalUtf8()) - << ", Air date: " << airDate.FormatLocalTime("%a %b %d, %Y") - << std::endl; - } -} + auto resultValue = result.GetValueSync(); -void MultiStep(TTableClient client, const std::string& path) { - std::optional resultSet; - ThrowOnError(client.RetryOperationSync([path, &resultSet](TSession session) { - return MultiStepTransaction(session, path, 2, 5, resultSet); + if (!resultValue.IsSuccess()) { + return resultValue; + } + + // Get active transaction id + auto tx = resultValue.GetTransaction(); + + TResultSetParser parser(resultValue.GetResultSet(0)); + parser.TryNextRow(); + auto date = parser.ColumnParser("from_date").GetOptionalUint64(); + + // Perform some client logic on returned values + auto userFunc = [] (const TInstant fromDate) { + return fromDate + TDuration::Days(15); + }; + + TInstant fromDate = TInstant::Days(*date); + TInstant toDate = userFunc(fromDate); + + // Construct next query based on the results of client logic + auto query2 = std::format(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("{}"); + + DECLARE $seriesId AS Uint64; + DECLARE $fromDate AS Uint64; + DECLARE $toDate AS Uint64; + + SELECT season_id, episode_id, title, air_date FROM episodes + WHERE series_id = $seriesId AND air_date >= $fromDate AND air_date <= $toDate; + )", path.c_str()); + + auto params2 = TParamsBuilder() + .AddParam("$seriesId") + .Uint64(seriesId) + .Build() + .AddParam("$fromDate") + .Uint64(fromDate.Days()) + .Build() + .AddParam("$toDate") + .Uint64(toDate.Days()) + .Build() + .Build(); + + // Execute second query. + // Transaction control settings continues active transaction (tx) and + // commits it at the end of second query execution. + auto result2 = session.ExecuteQuery( + query2, + TTxControl::Tx(tx->GetId()).CommitTx(), + params2).GetValueSync(); + + if (!result2.IsSuccess()) { + return result2; + } + resultSet = result2.GetResultSet(0); + return result2; })); TResultSetParser parser(*resultSet); @@ -520,106 +382,147 @@ void MultiStep(TTableClient client, const std::string& path) { auto airDate = TInstant::Days(*parser.ColumnParser("air_date").GetOptionalUint64()); std::cout << "Episode " << OptionalToString(parser.ColumnParser("episode_id").GetOptionalUint64()) - << ", Season: " << OptionalToString(parser.ColumnParser("season_id").GetOptionalUint64()) - << ", Title: " << OptionalToString(parser.ColumnParser("title").GetOptionalUtf8()) - << ", Air date: " << airDate.FormatLocalTime("%a %b %d, %Y") - << std::endl; + << ", Season: " << OptionalToString(parser.ColumnParser("season_id").GetOptionalUint64()) + << ", Title: " << OptionalToString(parser.ColumnParser("title").GetOptionalUtf8()) + << ", Air date: " << airDate.FormatLocalTime("%a %b %d, %Y") + << std::endl; } } -void ExplicitTcl(TTableClient client, const std::string& path) { - ThrowOnError(client.RetryOperationSync([path](TSession session) { - return ExplicitTclTransaction(session, path, TInstant::Now()); - })); -} +void ExplicitTcl(TQueryClient client, const std::string& path) { + // Show usage of explicit Begin/Commit transaction control calls. + // In most cases it's better to use transaction control settings in ExecuteDataQuery calls instead + // to avoid additional hops to YDB cluster and allow more efficient execution of queries. + ThrowOnError(client.RetryQuerySync([&path](TQueryClient client) -> TStatus { + auto airDate = TInstant::Now(); + auto session = client.GetSession().GetValueSync().GetSession(); + auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW()).GetValueSync(); + if (!beginResult.IsSuccess()) { + return beginResult; + } -void ScanQuerySelect(TTableClient client, const std::string& path) { - auto query = std::format(R"( - --!syntax_v1 - PRAGMA TablePathPrefix("{}"); + // Get newly created transaction id + auto tx = beginResult.GetTransaction(); - DECLARE $series AS List; + auto query = std::format(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("{}"); - SELECT series_id, season_id, title, CAST(CAST(first_aired AS Date) AS String) AS first_aired - FROM seasons - WHERE series_id IN $series - )", path); + DECLARE $airDate AS Date; - auto parameters = TParamsBuilder() - .AddParam("$series") - .BeginList() - .AddListItem().Uint64(1) - .AddListItem().Uint64(10) - .EndList().Build() - .Build(); + UPDATE episodes SET air_date = CAST($airDate AS Uint16) WHERE title = "TBD"; + )", path.c_str()); - // Executes scan query - auto result = client.StreamExecuteScanQuery(query, parameters).GetValueSync(); + auto params = TParamsBuilder() + .AddParam("$airDate") + .Date(airDate) + .Build() + .Build(); - if (!result.IsSuccess()) { - std::cerr << "ScanQuery execution failure: " << result.GetIssues().ToString() << std::endl; - return; - } + // Execute query. + // Transaction control settings continues active transaction (tx) + auto updateResult = session.ExecuteQuery(query, + TTxControl::Tx(tx.GetId()), + params).GetValueSync(); - bool eos = false; - std::cout << "> ScanQuerySelect:" << std::endl; - while (!eos) { - auto streamPart = result.ReadNext().ExtractValueSync(); + if (!updateResult.IsSuccess()) { + return updateResult; + } + // Commit active transaction (tx) + return tx.Commit().GetValueSync(); + })); +} - if (!streamPart.IsSuccess()) { - eos = true; - if (!streamPart.EOS()) { - std::cerr << "ScanQuery execution failure: " << streamPart.GetIssues().ToString() << std::endl; - } - continue; +void StreamQuerySelect(TQueryClient client, const std::string& path) { + std::vector resultSets; + // WARNING: Do not use without RetryQuery!!! + ThrowOnError(client.RetryQuerySync([path, &resultSets](TQueryClient client) -> TStatus { + resultSets.clear(); + auto query = std::format(R"( + --!syntax_v1 + PRAGMA TablePathPrefix("{}"); + + DECLARE $series AS List; + + SELECT series_id, season_id, title, CAST(CAST(first_aired AS Date) AS String) AS first_aired + FROM seasons + WHERE series_id IN $series + ORDER BY season_id; + )", path.c_str()); + + auto parameters = TParamsBuilder() + .AddParam("$series") + .BeginList() + .AddListItem().Uint64(1) + .AddListItem().Uint64(10) + .EndList().Build() + .Build(); + + // Executes stream query + auto resultStreamQuery = client.StreamExecuteQuery(query, TTxControl::NoTx(), parameters).GetValueSync(); + + if (!resultStreamQuery.IsSuccess()) { + return resultStreamQuery; } - if (streamPart.HasResultSet()) { - auto rs = streamPart.ExtractResultSet(); - auto columns = rs.GetColumnsMeta(); - - TResultSetParser parser(rs); - while (parser.TryNextRow()) { - std::cout << "Season" - << ", SeriesId: " << OptionalToString(parser.ColumnParser("series_id").GetOptionalUint64()) - << ", SeasonId: " << OptionalToString(parser.ColumnParser("season_id").GetOptionalUint64()) - << ", Title: " << OptionalToString(parser.ColumnParser("title").GetOptionalUtf8()) - << ", Air date: " << OptionalToString(parser.ColumnParser("first_aired").GetOptionalString()) - << std::endl; + bool eos = false; + + while (!eos) { + auto streamPart = resultStreamQuery.ReadNext().ExtractValueSync(); + + if (!streamPart.IsSuccess()) { + eos = true; + if (!streamPart.EOS()) { + return streamPart; + } + continue; } + + if (streamPart.HasResultSet()) { + auto rs = streamPart.ExtractResultSet(); + resultSets.push_back(rs); + } + } + return TStatus(EStatus::SUCCESS, NYql::TIssues()); + })); + + std::cout << "> StreamQuery:" << std::endl; + for (auto rs : resultSets) { + TResultSetParser parser(rs); + while (parser.TryNextRow()) { + std::cout << "Season" + << ", SeriesId: " << OptionalToString(parser.ColumnParser("series_id").GetOptionalUint64()) + << ", SeasonId: " << OptionalToString(parser.ColumnParser("season_id").GetOptionalUint64()) + << ", Title: " << OptionalToString(parser.ColumnParser("title").GetOptionalUtf8()) + << ", Air date: " << OptionalToString(parser.ColumnParser("first_aired").GetOptionalString()) + << std::endl; } } } + /////////////////////////////////////////////////////////////////////////////// bool Run(const TDriver& driver, const std::string& path) { - TTableClient client(driver); + TQueryClient client(driver); try { CreateTables(client, path); - DescribeTable(client, path, "series"); - - ThrowOnError(client.RetryOperationSync([path](TSession session) { - return FillTableDataTransaction(session, path); - })); + FillTableData(client, path); SelectSimple(client, path); UpsertSimple(client, path); SelectWithParams(client, path); - PreparedSelect(client, path, 2, 3, 7); - PreparedSelect(client, path, 2, 3, 8); - MultiStep(client, path); ExplicitTcl(client, path); - PreparedSelect(client, path, 2, 6, 1); + StreamQuerySelect(client, path); - ScanQuerySelect(client, path); + DropTables(client, path); } catch (const TYdbErrorException& e) { std::cerr << "Execution failed due to fatal error:" << std::endl; @@ -628,4 +531,4 @@ bool Run(const TDriver& driver, const std::string& path) { } return true; -} +} \ No newline at end of file diff --git a/examples/basic_example/basic_example.h b/examples/basic_example/basic_example.h index 99e0804f4b..63c557cd66 100644 --- a/examples/basic_example/basic_example.h +++ b/examples/basic_example/basic_example.h @@ -1,7 +1,10 @@ #pragma once #include -#include +#include +#include +#include +#include NYdb::TParams GetTablesDataParams(); diff --git a/include/ydb-cpp-sdk/client/query/client.h b/include/ydb-cpp-sdk/client/query/client.h index a25f83531a..df44734b89 100644 --- a/include/ydb-cpp-sdk/client/query/client.h +++ b/include/ydb-cpp-sdk/client/query/client.h @@ -94,9 +94,9 @@ class TQueryClient { TAsyncStatus RetryQuery(TQueryWithoutSessionFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings()); - TStatus RetryQuery(const TQuerySyncFunc& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings()); + TStatus RetryQuerySync(const TQuerySyncFunc& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings()); - TStatus RetryQuery(const TQueryWithoutSessionSyncFunc& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings()); + TStatus RetryQuerySync(const TQueryWithoutSessionSyncFunc& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings()); TAsyncExecuteQueryResult RetryQuery(const std::string& query, const TTxControl& txControl, TDuration timeout, bool isIndempotent); diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index 31732432b6..fac961ec62 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -594,12 +594,12 @@ TAsyncStatus TQueryClient::RetryQuery(TQueryWithoutSessionFunc&& queryFunc, TRet return ctx->Execute(); } -TStatus TQueryClient::RetryQuery(const TQuerySyncFunc& queryFunc, TRetryOperationSettings settings) { +TStatus TQueryClient::RetryQuerySync(const TQuerySyncFunc& queryFunc, TRetryOperationSettings settings) { NRetry::Sync::TRetryWithSession ctx(*this, queryFunc, settings); return ctx.Execute(); } -TStatus TQueryClient::RetryQuery(const TQueryWithoutSessionSyncFunc& queryFunc, TRetryOperationSettings settings) { +TStatus TQueryClient::RetryQuerySync(const TQueryWithoutSessionSyncFunc& queryFunc, TRetryOperationSettings settings) { NRetry::Sync::TRetryWithoutSession ctx(*this, queryFunc, settings); return ctx.Execute(); } diff --git a/src/client/topic/impl/write_session_impl.cpp b/src/client/topic/impl/write_session_impl.cpp index 1cc47bf517..43c41f2faa 100644 --- a/src/client/topic/impl/write_session_impl.cpp +++ b/src/client/topic/impl/write_session_impl.cpp @@ -1187,7 +1187,17 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { (*Counters->BytesInflightCompressed) += block.Data.size(); PackedMessagesToSend.emplace(std::move(block)); - SendImpl(); + + if (!SendImplScheduled.exchange(true)) { + CompressionExecutor->Post([cbContext = SelfContext]() { + if (auto self = cbContext->LockShared()) { + self->SendImplScheduled = false; + with_lock (self->Lock) { + self->SendImpl(); + } + } + }); + } return memoryUsage; } diff --git a/src/client/topic/impl/write_session_impl.h b/src/client/topic/impl/write_session_impl.h index d8d78a487e..8f3283858f 100644 --- a/src/client/topic/impl/write_session_impl.h +++ b/src/client/topic/impl/write_session_impl.h @@ -455,6 +455,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility bool Connected = false; bool Started = false; + std::atomic SendImplScheduled = false; std::atomic Aborting = 0; bool SessionEstablished = false; ui32 PartitionId = 0; diff --git a/src/client/topic/ut/basic_usage_ut.cpp b/src/client/topic/ut/basic_usage_ut.cpp index 2743f13775..c1a7bf4a70 100644 --- a/src/client/topic/ut/basic_usage_ut.cpp +++ b/src/client/topic/ut/basic_usage_ut.cpp @@ -455,14 +455,15 @@ Y_UNIT_TEST_SUITE(BasicUsage) { UNIT_ASSERT(!futureWrite.HasValue()); Cerr << ">>>TEST: future write has no value " << Endl; - RunTasks(stepByStepExecutor, {0}); + RunTasks(stepByStepExecutor, {0}); // Run compression task. + RunTasks(stepByStepExecutor, {1}); // Run send task. futureWrite.GetValueSync(); UNIT_ASSERT(futureWrite.HasValue()); Cerr << ">>>TEST: future write has value " << Endl; UNIT_ASSERT(!futureRead.HasValue()); Cerr << ">>>TEST: future read has no value " << Endl; - RunTasks(stepByStepExecutor, {1}); + RunTasks(stepByStepExecutor, {2}); // Run decompression task. futureRead.GetValueSync(); UNIT_ASSERT(futureRead.HasValue()); Cerr << ">>>TEST: future read has value " << Endl;