Skip to content

Commit 8877289

Browse files
authored
Init blobstorage Job instead of exec InPod (#182)
1 parent 621564d commit 8877289

File tree

21 files changed

+2041
-480
lines changed

21 files changed

+2041
-480
lines changed

api/v1alpha1/database_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,9 @@ type DatabaseServices struct {
264264
func init() {
265265
SchemeBuilder.Register(&Database{}, &DatabaseList{})
266266
}
267+
268+
func (r *Database) AnyCertificatesAdded() bool {
269+
return len(r.Spec.CABundle) > 0 ||
270+
r.Spec.Service.GRPC.TLSConfiguration.Enabled ||
271+
r.Spec.Service.Interconnect.TLSConfiguration.Enabled
272+
}

api/v1alpha1/storage_types.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@ type StorageSpec struct {
1313

1414
StorageNodeSpec `json:",inline"`
1515

16+
// (Optional) Operator connection settings
17+
// Default: (not specified)
18+
// +optional
19+
OperatorConnection *ConnectionOptions `json:"operatorConnection,omitempty"`
20+
21+
// (Optional) Init blobstorage Job settings
22+
// Default: (not specified)
23+
// +optional
24+
InitJob *StorageInitJobSpec `json:"initJob,omitempty"`
25+
1626
// (Optional) NodeSet inline configuration to split into multiple StatefulSets
1727
// Default: (not specified)
1828
// +optional
@@ -28,11 +38,6 @@ type StorageClusterSpec struct {
2838
// +optional
2939
Domain string `json:"domain"`
3040

31-
// (Optional) Operator connection settings
32-
// Default: (not specified)
33-
// +optional
34-
OperatorConnection *ConnectionOptions `json:"operatorConnection,omitempty"`
35-
3641
// Data storage topology mode
3742
// For details, see https://ydb.tech/docs/en/cluster/topology
3843
// FIXME mirror-3-dc is only supported with external configuration
@@ -162,6 +167,36 @@ type StorageNodeSpec struct {
162167
AdditionalAnnotations map[string]string `json:"additionalAnnotations,omitempty"`
163168
}
164169

170+
type StorageInitJobSpec struct {
171+
// (Optional) Container resource limits. Any container limits
172+
// can be specified.
173+
// Default: (not specified)
174+
// +optional
175+
Resources *corev1.ResourceRequirements `json:"resources,omitempty"`
176+
177+
// (Optional) NodeSelector is a selector which must be true for the pod to fit on a node.
178+
// Selector which must match a node's labels for the pod to be scheduled on that node.
179+
// More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
180+
// +optional
181+
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
182+
183+
// (Optional) If specified, the pod's scheduling constraints
184+
// +optional
185+
Affinity *corev1.Affinity `json:"affinity,omitempty"`
186+
187+
// (Optional) If specified, the pod's tolerations.
188+
// +optional
189+
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
190+
191+
// (Optional) Additional custom resource labels that are added to all resources
192+
// +optional
193+
AdditionalLabels map[string]string `json:"additionalLabels,omitempty"`
194+
195+
// (Optional) Additional custom resource annotations that are added to all resources
196+
// +optional
197+
AdditionalAnnotations map[string]string `json:"additionalAnnotations,omitempty"`
198+
}
199+
165200
// StorageStatus defines the observed state of Storage
166201
type StorageStatus struct {
167202
State constants.ClusterState `json:"state"`
@@ -203,3 +238,9 @@ type StorageServices struct {
203238
func init() {
204239
SchemeBuilder.Register(&Storage{}, &StorageList{})
205240
}
241+
242+
func (r *Storage) AnyCertificatesAdded() bool {
243+
return len(r.Spec.CABundle) > 0 ||
244+
r.Spec.Service.GRPC.TLSConfiguration.Enabled ||
245+
r.Spec.Service.Interconnect.TLSConfiguration.Enabled
246+
}

api/v1alpha1/storage_webhook.go

Lines changed: 95 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,20 @@ package v1alpha1
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
67

78
"github.com/google/go-cmp/cmp"
89
"github.com/google/go-cmp/cmp/cmpopts"
910
"gopkg.in/yaml.v3"
10-
v1 "k8s.io/api/core/v1"
11+
corev1 "k8s.io/api/core/v1"
1112
"k8s.io/apimachinery/pkg/runtime"
1213
"k8s.io/utils/strings/slices"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/client"
1516
logf "sigs.k8s.io/controller-runtime/pkg/log"
1617
"sigs.k8s.io/controller-runtime/pkg/webhook"
1718

19+
"github.com/ydb-platform/ydb-kubernetes-operator/internal/configuration/schema"
1820
. "github.com/ydb-platform/ydb-kubernetes-operator/internal/controllers/constants" //nolint:revive,stylecheck
1921
)
2022

@@ -29,15 +31,28 @@ func (r *Storage) SetupWebhookWithManager(mgr ctrl.Manager) error {
2931
}
3032

3133
func (r *Storage) GetStorageEndpointWithProto() string {
34+
return fmt.Sprintf("%s%s", r.GetStorageProto(), r.GetStorageEndpoint())
35+
}
36+
37+
func (r *Storage) GetStorageProto() string {
3238
proto := GRPCProto
3339
if r.IsStorageEndpointSecure() {
3440
proto = GRPCSProto
3541
}
3642

37-
return fmt.Sprintf("%s%s", proto, r.GetStorageEndpoint())
43+
return proto
3844
}
3945

4046
func (r *Storage) GetStorageEndpoint() string {
47+
endpoint := r.GetGRPCServiceEndpoint()
48+
if r.IsRemoteNodeSetsOnly() {
49+
endpoint = r.GetHostFromConfigEndpoint()
50+
}
51+
52+
return endpoint
53+
}
54+
55+
func (r *Storage) GetGRPCServiceEndpoint() string {
4156
host := fmt.Sprintf(GRPCServiceFQDNFormat, r.Name, r.Namespace)
4257
if r.Spec.Service.GRPC.ExternalHost != "" {
4358
host = r.Spec.Service.GRPC.ExternalHost
@@ -46,15 +61,44 @@ func (r *Storage) GetStorageEndpoint() string {
4661
return fmt.Sprintf("%s:%d", host, GRPCPort)
4762
}
4863

64+
// +k8s:deepcopy-gen=false
65+
type PartialHostsConfig struct {
66+
Hosts []schema.Host `yaml:"hosts,flow"`
67+
}
68+
69+
func (r *Storage) GetHostFromConfigEndpoint() string {
70+
// skip handle error because we already checked in webhook
71+
hostsConfig := PartialHostsConfig{}
72+
_ = yaml.Unmarshal([]byte(r.Spec.Configuration), &hostsConfig)
73+
74+
randNum := rand.Int31n(r.Spec.Nodes) // #nosec G404
75+
host := hostsConfig.Hosts[randNum].Host
76+
return fmt.Sprintf("%s:%d", host, GRPCPort)
77+
}
78+
4979
func (r *Storage) IsStorageEndpointSecure() bool {
5080
if r.Spec.Service.GRPC.TLSConfiguration != nil {
5181
return r.Spec.Service.GRPC.TLSConfiguration.Enabled
5282
}
5383
return false
5484
}
5585

86+
func (r *Storage) IsRemoteNodeSetsOnly() bool {
87+
if len(r.Spec.NodeSets) == 0 {
88+
return false
89+
}
90+
91+
for _, nodeSet := range r.Spec.NodeSets {
92+
if nodeSet.Remote == nil {
93+
return false
94+
}
95+
}
96+
97+
return true
98+
}
99+
56100
// +k8s:deepcopy-gen=false
57-
type PartialYamlConfig struct {
101+
type PartialDomainsConfig struct {
58102
DomainsConfig struct {
59103
SecurityConfig struct {
60104
EnforceUserTokenRequirement bool `yaml:"enforce_user_token_requirement"`
@@ -88,12 +132,12 @@ func (r *StorageDefaulter) Default(ctx context.Context, obj runtime.Object) erro
88132
}
89133

90134
if storage.Spec.Image.PullPolicyName == nil {
91-
policy := v1.PullIfNotPresent
135+
policy := corev1.PullIfNotPresent
92136
storage.Spec.Image.PullPolicyName = &policy
93137
}
94138

95139
if storage.Spec.Resources == nil {
96-
storage.Spec.Resources = &v1.ResourceRequirements{}
140+
storage.Spec.Resources = &corev1.ResourceRequirements{}
97141
}
98142

99143
if storage.Spec.Service == nil {
@@ -142,23 +186,35 @@ func (r *Storage) ValidateCreate() error {
142186
configuration := make(map[string]interface{})
143187
err := yaml.Unmarshal([]byte(r.Spec.Configuration), &configuration)
144188
if err != nil {
145-
return fmt.Errorf("failed to parse Storage.spec.configuration, error: %w", err)
189+
return fmt.Errorf("failed to parse .spec.configuration, error: %w", err)
190+
}
191+
192+
hostsConfig := PartialHostsConfig{}
193+
err = yaml.Unmarshal([]byte(r.Spec.Configuration), &hostsConfig)
194+
if err != nil {
195+
return fmt.Errorf("failed to parse YAML to determine `hosts`, error: %w", err)
146196
}
197+
147198
var nodesNumber int32
148-
if configuration["hosts"] == nil {
199+
if len(hostsConfig.Hosts) == 0 {
149200
nodesNumber = r.Spec.Nodes
150201
} else {
151-
hosts, ok := configuration["hosts"].([]interface{})
152-
if !ok {
153-
return fmt.Errorf("failed to parse Storage.spec.configuration, error: invalid hosts section")
154-
}
155-
nodesNumber = int32(len(hosts))
202+
nodesNumber = int32(len(hostsConfig.Hosts))
203+
}
204+
205+
minNodesPerErasure := map[ErasureType]int32{
206+
ErasureMirror3DC: 9,
207+
ErasureBlock42: 8,
208+
None: 1,
209+
}
210+
if nodesNumber < minNodesPerErasure[r.Spec.Erasure] {
211+
return fmt.Errorf("erasure type %v requires at least %v storage nodes", r.Spec.Erasure, minNodesPerErasure[r.Spec.Erasure])
156212
}
157213

158-
yamlConfig := PartialYamlConfig{}
214+
yamlConfig := PartialDomainsConfig{}
159215
err = yaml.Unmarshal([]byte(r.Spec.Configuration), &yamlConfig)
160216
if err != nil {
161-
return fmt.Errorf("failed to parse YAML to determine `enforce_user_token_requirement`")
217+
return fmt.Errorf("failed to parse YAML to determine `enforce_user_token_requirement`, error: %w", err)
162218
}
163219

164220
var authEnabled bool
@@ -170,15 +226,6 @@ func (r *Storage) ValidateCreate() error {
170226
return fmt.Errorf("field 'spec.operatorConnection' does not satisfy with config option `enforce_user_token_requirement: %t`", authEnabled)
171227
}
172228

173-
minNodesPerErasure := map[ErasureType]int32{
174-
ErasureMirror3DC: 9,
175-
ErasureBlock42: 8,
176-
None: 1,
177-
}
178-
if nodesNumber < minNodesPerErasure[r.Spec.Erasure] {
179-
return fmt.Errorf("erasure type %v requires at least %v storage nodes", r.Spec.Erasure, minNodesPerErasure[r.Spec.Erasure])
180-
}
181-
182229
if r.Spec.NodeSets != nil {
183230
var nodesInSetsCount int32
184231
for _, nodeSetInline := range r.Spec.NodeSets {
@@ -237,7 +284,29 @@ func (r *Storage) ValidateUpdate(old runtime.Object) error {
237284
configuration := make(map[string]interface{})
238285
err := yaml.Unmarshal([]byte(r.Spec.Configuration), &configuration)
239286
if err != nil {
240-
return fmt.Errorf("failed to parse Storage.spec.configuration, error: %w", err)
287+
return fmt.Errorf("failed to parse .spec.configuration, error: %w", err)
288+
}
289+
290+
hostsConfig := PartialHostsConfig{}
291+
err = yaml.Unmarshal([]byte(r.Spec.Configuration), &hostsConfig)
292+
if err != nil {
293+
return fmt.Errorf("failed to parse YAML to determine `hosts`, error: %w", err)
294+
}
295+
296+
var nodesNumber int32
297+
if len(hostsConfig.Hosts) == 0 {
298+
nodesNumber = r.Spec.Nodes
299+
} else {
300+
nodesNumber = int32(len(hostsConfig.Hosts))
301+
}
302+
303+
minNodesPerErasure := map[ErasureType]int32{
304+
ErasureMirror3DC: 9,
305+
ErasureBlock42: 8,
306+
None: 1,
307+
}
308+
if nodesNumber < minNodesPerErasure[r.Spec.Erasure] {
309+
return fmt.Errorf("erasure type %v requires at least %v storage nodes", r.Spec.Erasure, minNodesPerErasure[r.Spec.Erasure])
241310
}
242311

243312
if !r.Spec.OperatorSync {
@@ -258,10 +327,10 @@ func (r *Storage) ValidateUpdate(old runtime.Object) error {
258327
}
259328
}
260329

261-
yamlConfig := PartialYamlConfig{}
330+
yamlConfig := PartialDomainsConfig{}
262331
err = yaml.Unmarshal([]byte(r.Spec.Configuration), &yamlConfig)
263332
if err != nil {
264-
return fmt.Errorf("failed to parse YAML to determine `enforce_user_token_requirement`")
333+
return fmt.Errorf("failed to parse YAML to determine `enforce_user_token_requirement`, error: %w", err)
265334
}
266335

267336
var authEnabled bool

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 63 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)