Skip to content

Commit 80d888d

Browse files
authored
pq: add logbroker federation support (#16293)
1 parent b7a5a76 commit 80d888d

28 files changed

+1185
-204
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 447 additions & 87 deletions
Large diffs are not rendered by default.

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
1010
#include <ydb/library/yql/providers/pq/proto/dq_task_params.pb.h>
11+
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
1112

1213
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h>
1314

@@ -31,10 +32,13 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
3132
ui64 taskId,
3233
const THashMap<TString, TString>& secureParams,
3334
const THashMap<TString, TString>& taskParams,
35+
NYdb::TDriver driver,
36+
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
3437
const NActors::TActorId& computeActorId,
3538
const NActors::TActorId& localRowDispatcherActorId,
3639
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
3740
const ::NMonitoring::TDynamicCounterPtr& counters,
38-
i64 bufferSize = PQRdReadDefaultFreeSpace);
41+
i64 bufferSize,
42+
const IPqGateway::TPtr& pqGateway);
3943

4044
} // namespace NYql::NDq

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 316 additions & 96 deletions
Large diffs are not rendered by default.

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void TDqPqReadActorBase::LoadState(const TSourceState& state) {
7676
TStringStream str;
7777
str << "SessionId: " << GetSessionId() << " Restoring offset: ";
7878
for (const auto& [key, value] : PartitionToOffset) {
79-
str << "{" << key.first << "," << key.second << "," << value << "},";
79+
str << "{" << key << "," << value << "},";
8080
}
8181
SRC_LOG_D(str.Str());
8282
StartingMessageTimestamp = minStartingMessageTs;
@@ -90,4 +90,4 @@ ui64 TDqPqReadActorBase::GetInputIndex() const {
9090

9191
const NYql::NDq::TDqAsyncStats& TDqPqReadActorBase::GetIngressStats() const {
9292
return IngressStats;
93-
}
93+
}

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
4+
#include <ydb/library/yql/providers/pq/common/pq_partition_key.h>
45
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
56
#include <ydb/library/yql/providers/pq/proto/dq_task_params.pb.h>
67

@@ -9,16 +10,16 @@ namespace NYql::NDq::NInternal {
910
class TDqPqReadActorBase : public IDqComputeActorAsyncInput {
1011

1112
public:
12-
using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.
13+
using TPartitionKey = ::NPq::TPartitionKey;
1314

1415
const ui64 InputIndex;
1516
THashMap<TPartitionKey, ui64> PartitionToOffset; // {cluster, partition} -> offset of next event.
1617
const TTxId TxId;
17-
const NPq::NProto::TDqPqTopicSource SourceParams;
18+
NPq::NProto::TDqPqTopicSource SourceParams;
1819
TDqAsyncStats IngressStats;
1920
TInstant StartingMessageTimestamp;
2021
TString LogPrefix;
21-
const NPq::NProto::TDqReadTaskParams ReadParams;
22+
NPq::NProto::TDqReadTaskParams ReadParams;
2223
const NActors::TActorId ComputeActorId;
2324
ui64 TaskId;
2425

ydb/library/yql/providers/pq/async_io/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ PEERDIR(
2323
yql/essentials/public/types
2424
yql/essentials/utils/log
2525
ydb/public/sdk/cpp/adapters/issue
26+
ydb/public/sdk/cpp/src/client/federated_topic
2627
ydb/public/sdk/cpp/src/client/driver
2728
ydb/public/sdk/cpp/src/client/topic
2829
ydb/public/sdk/cpp/src/client/types/credentials
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#include "pq_partition_key.h"
2+
namespace NPq {
3+
} // namespace NPq
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#pragma once
2+
#include <util/generic/hash.h>
3+
#include <util/generic/string.h>
4+
#include <util/stream/output.h>
5+
namespace NPq {
6+
struct TPartitionKey {
7+
TString Cluster;
8+
ui64 PartitionId;
9+
bool operator==(const TPartitionKey& other) const = default;
10+
ui64 Hash() const {
11+
return CombineHashes<ui64>(
12+
std::hash<TString>{} (Cluster),
13+
std::hash<ui64>{} (PartitionId)
14+
);
15+
}
16+
};
17+
}
18+
19+
template<>
20+
struct THash<NPq::TPartitionKey> {
21+
ui64 operator() (const NPq::TPartitionKey& x) const {
22+
return x.Hash();
23+
}
24+
};
25+
26+
template <>
27+
inline void Out<NPq::TPartitionKey>(IOutputStream& stream, TTypeTraits<NPq::TPartitionKey>::TFuncParam& t) {
28+
stream << t.PartitionId << '@' << t.Cluster;
29+
}

ydb/library/yql/providers/pq/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
pq_meta_fields.cpp
5+
pq_partition_key.cpp
56
yql_names.cpp
67
)
78

ydb/library/yql/providers/pq/common/yql_names.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace NYql {
66

77
constexpr TStringBuf PartitionsCountProp = "PartitionsCount";
8+
constexpr TStringBuf FederatedClustersProp = "FederatedClusters";
89
constexpr TStringBuf ConsumerSetting = "Consumer";
910
constexpr TStringBuf EndpointSetting = "Endpoint";
1011
constexpr TStringBuf SharedReading = "SharedReading";

0 commit comments

Comments
 (0)