Skip to content

Commit 607beb3

Browse files
FloatingCrowbarnshestakov
authored andcommitted
New kafka messages for describeConfigs (#14235)
1 parent e6a0163 commit 607beb3

17 files changed

+4315
-598
lines changed

ydb/core/kafka_proxy/actors/actors.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ struct TContext {
5252
NKikimr::NPQ::TRlContext RlContext;
5353

5454
bool Authenticated() {
55-
55+
5656
return !RequireAuthentication || AuthenticationStep == SUCCESS;
57-
57+
5858
}
5959

6060
TActorId DiscoveryCacheActor;
@@ -186,6 +186,7 @@ NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, cons
186186
NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message);
187187
NActors::IActor* CreateKafkaCreateTopicsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TCreateTopicsRequestData>& message);
188188
NActors::IActor* CreateKafkaCreatePartitionsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TCreatePartitionsRequestData>& message);
189+
NActors::IActor* CreateKafkaDescribeConfigsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TDescribeConfigsRequestData>& message);
189190
NActors::IActor* CreateKafkaAlterConfigsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TAlterConfigsRequestData>& message);
190191

191192
} // namespace NKafka

ydb/core/kafka_proxy/actors/control_plane_common.h

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ inline TRetentionsConversionResult ConvertRetentions(std::optional<TString> rete
6363
RETENTION_MS_CONFIG_NAME,
6464
[&result](std::optional<ui64> retention) -> void { result.Ms = retention; }
6565
);
66-
66+
6767
convertRetention(
6868
retentionBytes,
6969
RETENTION_BYTES_CONFIG_NAME,
7070
[&result](std::optional<ui64> retention) -> void { result.Bytes = retention; }
7171
);
72-
72+
7373
return result;
7474
}
7575

@@ -107,7 +107,7 @@ inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTo
107107
} else {
108108
return std::optional<THolder<TEvKafka::TEvTopicModificationResponse>>();
109109
}
110-
}
110+
}
111111

112112
template<class T>
113113
inline std::unordered_set<TString> ExtractDuplicates(
@@ -134,15 +134,15 @@ class TAlterTopicActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<T, U
134134
public:
135135

136136
TAlterTopicActor(
137-
TActorId requester,
137+
TActorId requester,
138138
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
139139
TString topicPath,
140140
TString databaseName)
141141
: TBase(new U(
142142
userToken,
143143
topicPath,
144144
databaseName,
145-
[this](const EKafkaErrors status, const std::string& message) {
145+
[this](const EKafkaErrors status, const std::string& message, const ::google::protobuf::Message&) {
146146
this->SendResult(status,TString{message});
147147
})
148148
)
@@ -176,15 +176,15 @@ class TAlterTopicActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<T, U
176176
const std::shared_ptr<TString> SerializedToken;
177177
};
178178

179-
class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpCtx {
179+
class TKafkaTopicRequestCtx : public NKikimr::NGRpcService::IRequestOpCtx {
180180
public:
181-
using TRequest = TKafkaTopicModificationRequest;
181+
using TRequest = TKafkaTopicRequestCtx;
182182

183-
TKafkaTopicModificationRequest(
183+
TKafkaTopicRequestCtx(
184184
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
185185
TString topicPath,
186186
TString databaseName,
187-
const std::function<void(const EKafkaErrors, const std::string&)> sendResultCallback)
187+
const std::function<void(const EKafkaErrors, const std::string&, const google::protobuf::Message& result)> sendResultCallback)
188188
: UserToken(userToken)
189189
, TopicPath(topicPath)
190190
, DatabaseName(databaseName)
@@ -239,7 +239,7 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
239239
};
240240

241241
void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
242-
ProcessYdbStatusCode(status);
242+
ProcessYdbStatusCode(status, google::protobuf::Empty{});
243243
};
244244

245245
void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override {
@@ -334,26 +334,24 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
334334
}
335335

336336
void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override {
337-
Y_UNUSED(result);
338-
ProcessYdbStatusCode(status);
337+
ProcessYdbStatusCode(status, result);
339338
};
340339

341340
void SendResult(
342341
const google::protobuf::Message& result,
343342
Ydb::StatusIds::StatusCode status,
344343
const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {
345344

346-
Y_UNUSED(result);
347345
Y_UNUSED(message);
348-
ProcessYdbStatusCode(status);
346+
ProcessYdbStatusCode(status, result);
349347
};
350348

351349
const Ydb::Operations::OperationParams& operation_params() const {
352350
return DummyParams;
353351
}
354352

355-
static TKafkaTopicModificationRequest* GetProtoRequest(std::shared_ptr<IRequestOpCtx> request) {
356-
return static_cast<TKafkaTopicModificationRequest*>(&(*request));
353+
static TKafkaTopicRequestCtx* GetProtoRequest(std::shared_ptr<IRequestOpCtx> request) {
354+
return static_cast<TKafkaTopicRequestCtx*>(&(*request));
357355
}
358356

359357
protected:
@@ -371,11 +369,12 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
371369
const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
372370
const TString TopicPath;
373371
const TString DatabaseName;
374-
const std::function<void(const EKafkaErrors status, const std::string& message)> SendResultCallback;
372+
const std::function<void(const EKafkaErrors status, const std::string& message, const google::protobuf::Message& result)> SendResultCallback;
375373
NYql::TIssue Issue;
376374

377-
void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status) {
378-
SendResultCallback(Convert(status), Issue.GetMessage());
375+
void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status, const google::protobuf::Message& result) {
376+
SendResultCallback(Convert(status), Issue.GetMessage(), result);
379377
}
380378
};
381-
}
379+
380+
} //namespace NKafka

ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,22 @@
1111

1212
namespace NKafka {
1313

14-
class TKafkaAlterConfigsRequest: public TKafkaTopicModificationRequest {
14+
class TKafkaAlterConfigsRequest: public TKafkaTopicRequestCtx {
1515
public:
1616
TKafkaAlterConfigsRequest(
1717
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
1818
TString topicPath,
1919
TString databaseName,
20-
const std::function<void(const EKafkaErrors, const std::string&)> sendResultCallback)
21-
: TKafkaTopicModificationRequest(userToken, topicPath, databaseName, sendResultCallback)
20+
const std::function<void(const EKafkaErrors, const std::string&, const google::protobuf::Message&)> sendResultCallback)
21+
: TKafkaTopicRequestCtx(userToken, topicPath, databaseName, sendResultCallback)
2222
{
2323
};
2424

2525
protected:
2626
EKafkaErrors Convert(Ydb::StatusIds::StatusCode& status) override {
2727
return status == Ydb::StatusIds::BAD_REQUEST
2828
? INVALID_CONFIG
29-
: TKafkaTopicModificationRequest::Convert(status);
29+
: TKafkaTopicRequestCtx::Convert(status);
3030
}
3131
};
3232

ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
207207
}
208208
};
209209

210-
class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicModificationRequest> {
210+
class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicRequestCtx> {
211211
public:
212212

213213
TCreatePartitionsActor(
@@ -216,7 +216,7 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T
216216
TString topicPath,
217217
TString databaseName,
218218
ui32 partitionsNumber)
219-
: TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicModificationRequest>(
219+
: TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicRequestCtx>(
220220
requester,
221221
userToken,
222222
topicPath,

ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111

1212
namespace NKafka {
1313

14-
class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreateTopicActor, TKafkaTopicModificationRequest> {
15-
using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreateTopicActor, TKafkaTopicModificationRequest>;
14+
class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreateTopicActor, TKafkaTopicRequestCtx> {
15+
using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreateTopicActor, TKafkaTopicRequestCtx>;
1616
public:
1717

1818
TCreateTopicActor(
@@ -23,11 +23,11 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
2323
ui32 partitionsNumber,
2424
std::optional<ui64> retentionMs,
2525
std::optional<ui64> retentionBytes)
26-
: TBase(new TKafkaTopicModificationRequest(
26+
: TBase(new TKafkaTopicRequestCtx(
2727
userToken,
2828
topicPath,
2929
databaseName,
30-
[this](EKafkaErrors status, const std::string& message) {
30+
[this](EKafkaErrors status, const std::string& message, const google::protobuf::Message&) {
3131
this->SendResult(status, TString{message});
3232
})
3333
)

0 commit comments

Comments
 (0)