Skip to content

Commit 171d4a9

Browse files
authored
Merge pull request #18017 from nshestakov/Kafka-improvements-25-1
Kafka improvements
2 parents 54ccdcb + 247e41c commit 171d4a9

36 files changed

+9633
-2688
lines changed

ydb/core/grpc_services/rpc_login.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,15 @@ class TLoginRPC : public TRpcRequestActor<TLoginRPC, TEvLoginRequest, true> {
105105
}
106106
}
107107

108+
void HandleDestroyed(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
109+
ReplyErrorAndPassAway(Ydb::StatusIds::UNAVAILABLE, "SchemeShard is unavailable");
110+
}
111+
108112
STATEFN(StateWork) {
109113
switch (ev->GetTypeRewrite()) {
110114
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
111115
hFunc(TEvTabletPipe::TEvClientConnected, HandleConnect);
116+
hFunc(TEvTabletPipe::TEvClientDestroyed, HandleDestroyed);
112117
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNavigate);
113118
hFunc(TEvSchemeShard::TEvLoginResult, HandleResult);
114119
hFunc(TEvLdapAuthProvider::TEvAuthenticateResponse, Handle);

ydb/core/kafka_proxy/actors/actors.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ struct TContext {
5252
NKikimr::NPQ::TRlContext RlContext;
5353

5454
bool Authenticated() {
55-
55+
5656
return !RequireAuthentication || AuthenticationStep == SUCCESS;
57-
57+
5858
}
5959

6060
TActorId DiscoveryCacheActor;
@@ -176,6 +176,7 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui
176176
const TActorId& discoveryCacheActor);
177177
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
178178
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
179+
NActors::IActor* CreateKafkaBalancerActor(const TContext::TPtr context, ui64 cookie);
179180
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
180181
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
181182
NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message);
@@ -185,6 +186,7 @@ NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, cons
185186
NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TOffsetFetchRequestData>& message);
186187
NActors::IActor* CreateKafkaCreateTopicsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TCreateTopicsRequestData>& message);
187188
NActors::IActor* CreateKafkaCreatePartitionsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TCreatePartitionsRequestData>& message);
189+
NActors::IActor* CreateKafkaDescribeConfigsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TDescribeConfigsRequestData>& message);
188190
NActors::IActor* CreateKafkaAlterConfigsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TAlterConfigsRequestData>& message);
189191

190192
} // namespace NKafka

ydb/core/kafka_proxy/actors/control_plane_common.h

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ inline TRetentionsConversionResult ConvertRetentions(std::optional<TString> rete
6363
RETENTION_MS_CONFIG_NAME,
6464
[&result](std::optional<ui64> retention) -> void { result.Ms = retention; }
6565
);
66-
66+
6767
convertRetention(
6868
retentionBytes,
6969
RETENTION_BYTES_CONFIG_NAME,
7070
[&result](std::optional<ui64> retention) -> void { result.Bytes = retention; }
7171
);
72-
72+
7373
return result;
7474
}
7575

@@ -107,7 +107,7 @@ inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTo
107107
} else {
108108
return std::optional<THolder<TEvKafka::TEvTopicModificationResponse>>();
109109
}
110-
}
110+
}
111111

112112
template<class T>
113113
inline std::unordered_set<TString> ExtractDuplicates(
@@ -134,15 +134,15 @@ class TAlterTopicActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<T, U
134134
public:
135135

136136
TAlterTopicActor(
137-
TActorId requester,
137+
TActorId requester,
138138
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
139139
TString topicPath,
140140
TString databaseName)
141141
: TBase(new U(
142142
userToken,
143143
topicPath,
144144
databaseName,
145-
[this](const EKafkaErrors status, const std::string& message) {
145+
[this](const EKafkaErrors status, const std::string& message, const ::google::protobuf::Message&) {
146146
this->SendResult(status,TString{message});
147147
})
148148
)
@@ -176,15 +176,15 @@ class TAlterTopicActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<T, U
176176
const std::shared_ptr<TString> SerializedToken;
177177
};
178178

179-
class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpCtx {
179+
class TKafkaTopicRequestCtx : public NKikimr::NGRpcService::IRequestOpCtx {
180180
public:
181-
using TRequest = TKafkaTopicModificationRequest;
181+
using TRequest = TKafkaTopicRequestCtx;
182182

183-
TKafkaTopicModificationRequest(
183+
TKafkaTopicRequestCtx(
184184
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
185185
TString topicPath,
186186
TString databaseName,
187-
const std::function<void(const EKafkaErrors, const std::string&)> sendResultCallback)
187+
const std::function<void(const EKafkaErrors, const std::string&, const google::protobuf::Message& result)> sendResultCallback)
188188
: UserToken(userToken)
189189
, TopicPath(topicPath)
190190
, DatabaseName(databaseName)
@@ -239,7 +239,7 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
239239
};
240240

241241
void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
242-
ProcessYdbStatusCode(status);
242+
ProcessYdbStatusCode(status, google::protobuf::Empty{});
243243
};
244244

245245
void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override {
@@ -334,26 +334,24 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
334334
}
335335

336336
void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override {
337-
Y_UNUSED(result);
338-
ProcessYdbStatusCode(status);
337+
ProcessYdbStatusCode(status, result);
339338
};
340339

341340
void SendResult(
342341
const google::protobuf::Message& result,
343342
Ydb::StatusIds::StatusCode status,
344343
const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {
345344

346-
Y_UNUSED(result);
347345
Y_UNUSED(message);
348-
ProcessYdbStatusCode(status);
346+
ProcessYdbStatusCode(status, result);
349347
};
350348

351349
const Ydb::Operations::OperationParams& operation_params() const {
352350
return DummyParams;
353351
}
354352

355-
static TKafkaTopicModificationRequest* GetProtoRequest(std::shared_ptr<IRequestOpCtx> request) {
356-
return static_cast<TKafkaTopicModificationRequest*>(&(*request));
353+
static TKafkaTopicRequestCtx* GetProtoRequest(std::shared_ptr<IRequestOpCtx> request) {
354+
return static_cast<TKafkaTopicRequestCtx*>(&(*request));
357355
}
358356

359357
protected:
@@ -371,11 +369,12 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
371369
const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
372370
const TString TopicPath;
373371
const TString DatabaseName;
374-
const std::function<void(const EKafkaErrors status, const std::string& message)> SendResultCallback;
372+
const std::function<void(const EKafkaErrors status, const std::string& message, const google::protobuf::Message& result)> SendResultCallback;
375373
NYql::TIssue Issue;
376374

377-
void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status) {
378-
SendResultCallback(Convert(status), Issue.GetMessage());
375+
void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status, const google::protobuf::Message& result) {
376+
SendResultCallback(Convert(status), Issue.GetMessage(), result);
379377
}
380378
};
381-
}
379+
380+
} //namespace NKafka

ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,22 @@
1111

1212
namespace NKafka {
1313

14-
class TKafkaAlterConfigsRequest: public TKafkaTopicModificationRequest {
14+
class TKafkaAlterConfigsRequest: public TKafkaTopicRequestCtx {
1515
public:
1616
TKafkaAlterConfigsRequest(
1717
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
1818
TString topicPath,
1919
TString databaseName,
20-
const std::function<void(const EKafkaErrors, const std::string&)> sendResultCallback)
21-
: TKafkaTopicModificationRequest(userToken, topicPath, databaseName, sendResultCallback)
20+
const std::function<void(const EKafkaErrors, const std::string&, const google::protobuf::Message&)> sendResultCallback)
21+
: TKafkaTopicRequestCtx(userToken, topicPath, databaseName, sendResultCallback)
2222
{
2323
};
2424

2525
protected:
2626
EKafkaErrors Convert(Ydb::StatusIds::StatusCode& status) override {
2727
return status == Ydb::StatusIds::BAD_REQUEST
2828
? INVALID_CONFIG
29-
: TKafkaTopicModificationRequest::Convert(status);
29+
: TKafkaTopicRequestCtx::Convert(status);
3030
}
3131
};
3232

0 commit comments

Comments
 (0)