Skip to content

Commit fca7a28

Browse files
authored
Add ddl to create an iceberg data source (#16652)
1 parent 6c875d7 commit fca7a28

File tree

22 files changed

+1317
-33
lines changed

22 files changed

+1317
-33
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
#include "external_source_builder.h"
2+
#include "validation_functions.h"
3+
4+
#include <util/string/join.h>
5+
#include <ydb/core/protos/flat_scheme_op.pb.h>
6+
7+
namespace NKikimr::NExternalSource {
8+
namespace {
9+
10+
class TValidatedExternalDataSource final : public IExternalSource {
11+
public:
12+
TValidatedExternalDataSource(
13+
const TString& name,
14+
const std::vector<TExternalSourceBuilder::TAuthHolder>& authMethods,
15+
const std::unordered_map<TString, TExternalSourceBuilder::TConditionalValidator>& availableProperties,
16+
const std::vector<TRegExMatch>& hostnamePatterns)
17+
: Name_(name)
18+
, AuthMethodsForCheck_(authMethods)
19+
, AvailableProperties_(availableProperties)
20+
, HostnamePatterns_(hostnamePatterns)
21+
{
22+
23+
}
24+
25+
virtual TString Pack(const NKikimrExternalSources::TSchema&,
26+
const NKikimrExternalSources::TGeneral&) const override {
27+
ythrow TExternalSourceException() << "Internal error. Only external table supports pack operation";
28+
}
29+
30+
virtual TString GetName() const override {
31+
return Name_;
32+
}
33+
34+
virtual bool HasExternalTable() const override {
35+
return false;
36+
}
37+
38+
virtual TVector<TString> GetAuthMethods() const override {
39+
TVector<TString> result;
40+
41+
for (auto a : AuthMethodsForCheck_) {
42+
result.push_back(a.Auth);
43+
}
44+
45+
return result;
46+
}
47+
48+
TVector<TString> GetAuthMethods(const TString& externalDataSourceDescription) const {
49+
NKikimrSchemeOp::TExternalDataSourceDescription proto;
50+
51+
if (!proto.ParseFromString(externalDataSourceDescription)) {
52+
ythrow TExternalSourceException()
53+
<< "Internal error. "
54+
<< "Couldn't parse protobuf with external data source description";
55+
}
56+
57+
TVector<TString> result;
58+
59+
for (auto a : AuthMethodsForCheck_) {
60+
if (a.UseCondition(proto.GetProperties().GetProperties())) {
61+
result.push_back(a.Auth);
62+
}
63+
}
64+
65+
return result;
66+
}
67+
68+
virtual TMap<TString, TVector<TString>> GetParameters(const TString&) const override {
69+
ythrow TExternalSourceException() << "Internal error. Only external table supports parameters";
70+
}
71+
72+
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {
73+
NKikimrSchemeOp::TExternalDataSourceDescription proto;
74+
75+
if (!proto.ParseFromString(externalDataSourceDescription)) {
76+
ythrow TExternalSourceException()
77+
<< "Internal error. "
78+
<< "Couldn't parse protobuf with external data source description";
79+
}
80+
81+
auto properties = proto.GetProperties().GetProperties();
82+
std::unordered_set<TString> validatedProperties;
83+
84+
for (const auto& [key, value] : properties) {
85+
auto p = AvailableProperties_.find(key);
86+
87+
if (AvailableProperties_.end() == p) {
88+
throw TExternalSourceException() << "Unsupported property: " << key;
89+
}
90+
91+
// validate property value
92+
if (p->second.ApplyCondition(properties)) {
93+
p->second.Validator(key, value);
94+
}
95+
96+
validatedProperties.emplace(key);
97+
}
98+
99+
// validate properties that has been left
100+
for (const auto& [property, validator] : AvailableProperties_) {
101+
if (validatedProperties.contains(property)) {
102+
continue;
103+
}
104+
105+
if (validator.ApplyCondition(properties)) {
106+
validator.Validator(property, "");
107+
}
108+
}
109+
110+
ValidateHostname(HostnamePatterns_, proto.GetLocation());
111+
}
112+
113+
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
114+
return NThreading::MakeFuture(std::move(meta));
115+
}
116+
117+
virtual bool CanLoadDynamicMetadata() const override {
118+
return false;
119+
}
120+
121+
private:
122+
const TString Name_;
123+
const std::vector<TExternalSourceBuilder::TAuthHolder> AuthMethodsForCheck_;
124+
const std::unordered_map<TString, TExternalSourceBuilder::TConditionalValidator> AvailableProperties_;
125+
const std::vector<TRegExMatch> HostnamePatterns_;
126+
};
127+
128+
} // unnamed
129+
130+
TExternalSourceBuilder::TExternalSourceBuilder(const TString& name)
131+
: Name_(name)
132+
{
133+
}
134+
135+
TExternalSourceBuilder& TExternalSourceBuilder::Auth(const TVector<TString>& authMethods, TCondition condition) {
136+
for (auto a : authMethods) {
137+
AuthMethodsForCheck_.push_back(TExternalSourceBuilder::TAuthHolder{a, condition});
138+
}
139+
140+
return *this;
141+
}
142+
143+
TExternalSourceBuilder& TExternalSourceBuilder::Property(TString name, TValidator validator, TCondition condition) {
144+
AvailableProperties_.emplace(name, TExternalSourceBuilder::TConditionalValidator{validator, condition});
145+
return *this;
146+
}
147+
148+
TExternalSourceBuilder& TExternalSourceBuilder::Properties(const TSet<TString>& availableProperties, TValidator validator, TCondition condition) {
149+
for (auto p : availableProperties) {
150+
Property(p, validator, condition);
151+
}
152+
153+
return *this;
154+
}
155+
156+
TExternalSourceBuilder& TExternalSourceBuilder::HostnamePatterns(const std::vector<TRegExMatch>& patterns) {
157+
HostnamePatterns_.insert(
158+
HostnamePatterns_.end(), patterns.begin(), patterns.end());
159+
return *this;
160+
}
161+
162+
IExternalSource::TPtr TExternalSourceBuilder::Build() {
163+
return MakeIntrusive<TValidatedExternalDataSource>(
164+
std::move(Name_), std::move(AuthMethodsForCheck_), std::move(AvailableProperties_), std::move(HostnamePatterns_));
165+
}
166+
167+
TCondition GetHasSettingCondition(const TString& property, const TString& value) {
168+
return [property, value](const ::google::protobuf::Map<TProtoStringType, TProtoStringType>& properties) -> bool {
169+
auto it = properties.find(property);
170+
return properties.end() != it && value == it->second;
171+
};
172+
}
173+
174+
TValidator GetRequiredValidator() {
175+
return [](const TString& property, const TString& value){
176+
if (!value.empty()) {
177+
return;
178+
}
179+
180+
throw TExternalSourceException() << "required property: " << property << " is not set";
181+
};
182+
}
183+
184+
TValidator GetIsInListValidator(const std::unordered_set<TString>& values, bool required) {
185+
auto joinedValues = JoinSeq(", ", values);
186+
187+
return [values, required, joinedValues](const TString& property, const TString& value){
188+
if (value.empty() && required) {
189+
throw TExternalSourceException() << " required property: " << property << " is not set";
190+
}
191+
192+
if (value.empty()) {
193+
return;
194+
}
195+
196+
if (!values.contains(value)) {
197+
throw TExternalSourceException()
198+
<< " property: " << property
199+
<< " has wrong value: " << value
200+
<< " allowed values: " << joinedValues;
201+
}
202+
};
203+
}
204+
205+
} // NKikimr::NExternalSource
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#pragma once
2+
3+
#include "external_source.h"
4+
5+
#include <library/cpp/regex/pcre/regexp.h>
6+
#include <util/generic/set.h>
7+
8+
namespace NKikimr::NExternalSource {
9+
10+
typedef std::function<void(const TString&, const TString&)> TValidator;
11+
typedef std::function<bool(const ::google::protobuf::Map<TProtoStringType, TProtoStringType>&)> TCondition;
12+
13+
///
14+
/// Builder to create an external data source with validations
15+
///
16+
class TExternalSourceBuilder {
17+
public:
18+
struct TAuthHolder {
19+
TString Auth;
20+
21+
// When auth has to be used
22+
TCondition UseCondition;
23+
};
24+
25+
struct TConditionalValidator {
26+
TValidator Validator;
27+
28+
// When validator has to be applied
29+
TCondition ApplyCondition;
30+
};
31+
32+
public:
33+
explicit TExternalSourceBuilder(const TString& name);
34+
35+
~TExternalSourceBuilder() = default;
36+
37+
///
38+
/// Add auth methods which are returned from the "source" only if a condition is true.
39+
/// A condition is applied to source's ddl in @sa IExternalSource::GetAuthMethods
40+
/// call.
41+
///
42+
TExternalSourceBuilder& Auth(const TVector<TString>& authMethods, TCondition condition);
43+
44+
TExternalSourceBuilder& Auth(const TVector<TString>& authMethods) {
45+
return Auth(authMethods, [](const ::google::protobuf::Map<TProtoStringType, TProtoStringType>&){
46+
return true;
47+
});
48+
}
49+
50+
///
51+
/// Add property which can be in a "source".
52+
///
53+
/// @param name name of a property
54+
/// @param validator validator which is applied to a property from a source's ddl
55+
/// in @sa IExternalSource::ValidateExternalDataSource call
56+
/// @param condition condition that defines to use validator or not, if condition returns true
57+
/// for source's ddl then validator is applied; otherwise, validator is skiped;
58+
/// condition is executed in @sa IExternalSource::ValidateExternalDataSource call
59+
/// before validator
60+
///
61+
TExternalSourceBuilder& Property(const TString name, TValidator validator, TCondition condition);
62+
63+
TExternalSourceBuilder& Properties(const TSet<TString>& properties, TValidator validator, TCondition condition);
64+
65+
TExternalSourceBuilder& HostnamePatterns(const std::vector<TRegExMatch>& patterns);
66+
67+
///
68+
/// Create external data source
69+
///
70+
IExternalSource::TPtr Build();
71+
72+
TExternalSourceBuilder& Property(const TString name, TValidator validator) {
73+
return Property(name, validator, [](const ::google::protobuf::Map<TProtoStringType, TProtoStringType>&){
74+
return true;
75+
});
76+
}
77+
78+
TExternalSourceBuilder& Property(const TString name) {
79+
return Property(name, [](const TString&, const TString&){});
80+
}
81+
82+
TExternalSourceBuilder& Properties(const TSet<TString>& properties, TValidator validator) {
83+
return Properties(properties, validator, [](const ::google::protobuf::Map<TProtoStringType, TProtoStringType>&){
84+
return true;
85+
});
86+
}
87+
88+
TExternalSourceBuilder& Properties(const TSet<TString>& properties) {
89+
return Properties(properties, [](const TString&, const TString&){});
90+
}
91+
92+
private:
93+
TString Name_;
94+
std::vector<TAuthHolder> AuthMethodsForCheck_;
95+
std::unordered_map<TString, TConditionalValidator> AvailableProperties_;
96+
std::vector<TRegExMatch> HostnamePatterns_;
97+
};
98+
99+
///
100+
/// Create a condition that returns "true" if a source's ddl has
101+
/// property "p" with value equals to "v"
102+
///
103+
TCondition GetHasSettingCondition(const TString& p, const TString& v);
104+
105+
///
106+
/// Create a validator which check that source's ddl has a property with non empty value
107+
///
108+
TValidator GetRequiredValidator();
109+
110+
///
111+
/// Create a validator which check that source's ddl has a property with a value from list
112+
///
113+
/// @param values list of allowed values
114+
/// @param required allow property without value
115+
///
116+
TValidator GetIsInListValidator(const std::unordered_set<TString>& values, bool required);
117+
118+
} // NKikimr::NExternalSource

0 commit comments

Comments
 (0)