Skip to content

Commit 2e4cc09

Browse files
committed
fix database initializing conditions
1 parent 073ed86 commit 2e4cc09

File tree

4 files changed

+145
-84
lines changed

4 files changed

+145
-84
lines changed

internal/cms/operation.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package cms
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
9+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
10+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
11+
"github.com/ydb-platform/ydb-go-sdk/v3"
12+
"sigs.k8s.io/controller-runtime/pkg/log"
13+
14+
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
15+
)
16+
17+
const (
18+
GetOperationTimeoutSeconds = 10
19+
)
20+
21+
type Operation struct {
22+
StorageEndpoint string
23+
Domain string
24+
Id string
25+
}
26+
27+
func (op *Operation) GetOperation(
28+
ctx context.Context,
29+
opts ...ydb.Option,
30+
) (*Ydb_Operations.GetOperationResponse, error) {
31+
logger := log.FromContext(ctx)
32+
33+
endpoint := fmt.Sprintf("%s/%s", op.StorageEndpoint, op.Domain)
34+
ydbCtx, ydbCtxCancel := context.WithTimeout(ctx, time.Second)
35+
defer ydbCtxCancel()
36+
conn, err := connection.Open(ydbCtx, endpoint, ydb.MergeOptions(opts...))
37+
if err != nil {
38+
return nil, fmt.Errorf("error connecting to YDB: %w", err)
39+
}
40+
defer func() {
41+
connection.Close(ydbCtx, conn)
42+
}()
43+
44+
cmsCtx, cmsCtxCancel := context.WithTimeout(ctx, GetOperationTimeoutSeconds*time.Second)
45+
defer cmsCtxCancel()
46+
client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(conn))
47+
request := &Ydb_Operations.GetOperationRequest{Id: op.Id}
48+
49+
logger.Info("CMS GetOperation", "endpoint", endpoint, "request", request)
50+
return client.GetOperation(cmsCtx, request)
51+
}
52+
53+
func (op *Operation) CheckGetOperationResponse(ctx context.Context, response *Ydb_Operations.GetOperationResponse) (bool, string, error) {
54+
logger := log.FromContext(ctx)
55+
56+
logger.Info("CMS GetOperation", "response", response)
57+
return CheckOperationStatus(response.GetOperation())
58+
}
59+
60+
func CheckOperationStatus(operation *Ydb_Operations.Operation) (bool, string, error) {
61+
if operation == nil {
62+
return false, "", ErrEmptyReplyFromStorage
63+
}
64+
65+
if !operation.GetReady() {
66+
return false, operation.Id, nil
67+
}
68+
69+
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
70+
return true, operation.Id, nil
71+
}
72+
73+
return true, operation.Id, fmt.Errorf("YDB response error: %v %v", operation.Status, operation.Issues)
74+
}

internal/cms/tenant.go

Lines changed: 26 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,21 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
78

89
"github.com/ydb-platform/ydb-go-genproto/Ydb_Cms_V1"
9-
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
10-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1110
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Cms"
12-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
1311
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
1412
"sigs.k8s.io/controller-runtime/pkg/log"
1513

1614
ydbv1alpha1 "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1"
1715
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
1816
)
1917

18+
const (
19+
CreateDatabaseTimeoutSeconds = 10
20+
)
21+
2022
var ErrEmptyReplyFromStorage = errors.New("empty reply from storage")
2123

2224
type Tenant struct {
@@ -28,31 +30,37 @@ type Tenant struct {
2830
SharedDatabasePath string
2931
}
3032

31-
func (t *Tenant) Create(
33+
func (t *Tenant) CreateDatabase(
3234
ctx context.Context,
3335
opts ...ydb.Option,
34-
) (string, error) {
36+
) (*Ydb_Cms.CreateDatabaseResponse, error) {
3537
logger := log.FromContext(ctx)
36-
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
37-
conn, err := connection.Open(ctx, url, opts...)
38+
39+
endpoint := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
40+
ydbCtx, ydbCtxCancel := context.WithTimeout(ctx, time.Second)
41+
defer ydbCtxCancel()
42+
conn, err := connection.Open(ydbCtx, endpoint, opts...)
3843
if err != nil {
39-
logger.Error(err, "Error connecting to YDB storage")
40-
return "", err
44+
logger.Error(err, "Error connecting to YDB")
45+
return nil, err
4146
}
4247
defer func() {
43-
connection.Close(ctx, conn)
48+
connection.Close(ydbCtx, conn)
4449
}()
4550

51+
cmsCtx, cmsCtxCancel := context.WithTimeout(ctx, CreateDatabaseTimeoutSeconds*time.Second)
52+
defer cmsCtxCancel()
4653
client := Ydb_Cms_V1.NewCmsServiceClient(ydb.GRPCConn(conn))
47-
logger.Info(fmt.Sprintf("creating tenant, url: %s", url))
4854
request := t.makeCreateDatabaseRequest()
49-
logger.Info(fmt.Sprintf("creating tenant, request: %s", request))
50-
response, err := client.CreateDatabase(ctx, request)
51-
if err != nil {
52-
return "", err
53-
}
54-
logger.Info(fmt.Sprintf("creating tenant, response: %s", response))
55-
return processDatabaseCreationOperation(response.Operation)
55+
logger.Info("CMS CreateDatabase", "endpoint", endpoint, "request", request)
56+
return client.CreateDatabase(cmsCtx, request)
57+
}
58+
59+
func (t *Tenant) CheckCreateDatabaseResponse(ctx context.Context, response *Ydb_Cms.CreateDatabaseResponse) (bool, string, error) {
60+
logger := log.FromContext(ctx)
61+
62+
logger.Info("CMS CreateDatabase", "response", response)
63+
return CheckOperationStatus(response.GetOperation())
5664
}
5765

5866
func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
@@ -87,47 +95,3 @@ func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
8795
}
8896
return request
8997
}
90-
91-
func processDatabaseCreationOperation(operation *Ydb_Operations.Operation) (string, error) {
92-
if operation == nil {
93-
return "", ErrEmptyReplyFromStorage
94-
}
95-
if !operation.Ready {
96-
return operation.Id, nil
97-
}
98-
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
99-
return "", nil
100-
}
101-
return "", fmt.Errorf("YDB response error: %v %v", operation.Status, operation.Issues)
102-
}
103-
104-
func (t *Tenant) CheckCreateOperation(
105-
ctx context.Context,
106-
operationID string,
107-
opts ...ydb.Option,
108-
) (bool, error, error) {
109-
logger := log.FromContext(ctx)
110-
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
111-
conn, err := connection.Open(ctx, url, opts...)
112-
if err != nil {
113-
logger.Error(err, "Error connecting to YDB storage")
114-
return false, nil, err
115-
}
116-
defer func() {
117-
connection.Close(ctx, conn)
118-
}()
119-
120-
client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(conn))
121-
request := &Ydb_Operations.GetOperationRequest{Id: operationID}
122-
logger.Info(fmt.Sprintf("checking operation, url: %s, operationId: %s, request: %s", url, operationID, request))
123-
response, err := client.GetOperation(ctx, request)
124-
if err != nil {
125-
return false, nil, err
126-
}
127-
logger.Info(fmt.Sprintf("checking operation, response: %s", response))
128-
if response.Operation == nil {
129-
return false, nil, ErrEmptyReplyFromStorage
130-
}
131-
oid, err := processDatabaseCreationOperation(response.Operation)
132-
return len(oid) == 0, err, nil
133-
}

internal/controllers/constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const (
4545
ReasonInProgress = "InProgress"
4646
ReasonNotRequired = "NotRequired"
4747
ReasonCompleted = "Completed"
48+
ReasonFailed = "Failed"
4849

4950
DefaultRequeueDelay = 10 * time.Second
5051
StatusUpdateRequeueDelay = 1 * time.Second

internal/controllers/database/init.go

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/ydb-platform/ydb-go-sdk/v3"
7+
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
88
corev1 "k8s.io/api/core/v1"
99
apierrors "k8s.io/apimachinery/pkg/api/errors"
1010
"k8s.io/apimachinery/pkg/api/meta"
@@ -72,62 +72,71 @@ func (r *Reconciler) setInitDatabaseCompleted(
7272
return r.updateStatus(ctx, database, StatusUpdateRequeueDelay)
7373
}
7474

75-
func (r *Reconciler) checkCreateTenantOperation(
75+
func (r *Reconciler) checkCreateDatabaseOperation(
7676
ctx context.Context,
7777
database *resources.DatabaseBuilder,
7878
tenant *cms.Tenant,
7979
ydbOptions ydb.Option,
8080
) (bool, ctrl.Result, error) {
8181
condition := meta.FindStatusCondition(database.Status.Conditions, CreateDatabaseOperationCondition)
82-
if condition == nil || len(condition.Message) == 0 {
82+
if len(condition.Message) == 0 {
8383
// Something is wrong with the condition where we save operation id
8484
// retry create tenant
8585
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
8686
Type: CreateDatabaseOperationCondition,
87-
Status: metav1.ConditionTrue,
88-
Reason: ReasonNotRequired,
87+
Status: metav1.ConditionFalse,
88+
Reason: ReasonFailed,
8989
})
9090
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
9191
}
92-
operationID := condition.Message
93-
finished, operationErr, err := tenant.CheckCreateOperation(ctx, operationID, ydbOptions)
92+
93+
operation := &cms.Operation{
94+
StorageEndpoint: tenant.StorageEndpoint,
95+
Domain: tenant.Domain,
96+
Id: condition.Message,
97+
}
98+
response, err := operation.GetOperation(ctx, ydbOptions)
9499
if err != nil {
95100
r.Recorder.Event(
96101
database,
97102
corev1.EventTypeWarning,
98103
"InitializingFailed",
99-
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
104+
fmt.Sprintf("Failed to check creation operation, operationID %s: %s", operation.Id, err),
100105
)
101106
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
102107
}
103-
if operationErr != nil {
104-
// Creation operation failed - retry Create Tenant
108+
109+
finished, operationID, err := operation.CheckGetOperationResponse(ctx, response)
110+
if err != nil {
105111
r.Recorder.Event(
106112
database,
107113
corev1.EventTypeWarning,
108114
"InitializingFailed",
109-
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, operationErr),
115+
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
110116
)
111117
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
112-
Type: CreateDatabaseOperationCondition,
113-
Status: metav1.ConditionTrue,
114-
Reason: ReasonNotRequired,
118+
Type: CreateDatabaseOperationCondition,
119+
Status: metav1.ConditionFalse,
120+
Reason: ReasonFailed,
121+
Message: fmt.Sprintf("Failed to create tenant %s", tenant.Path),
115122
})
116-
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
123+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
117124
}
125+
118126
if !finished {
119127
r.Recorder.Event(
120128
database,
121129
corev1.EventTypeWarning,
122-
"Pending",
130+
string(DatabaseInitializing),
123131
fmt.Sprintf("Tenant creation operation is not completed, operationID: %s", operationID),
124132
)
125133
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, nil
126134
}
135+
127136
r.Recorder.Event(
128137
database,
129138
corev1.EventTypeNormal,
130-
"Initialized",
139+
string(DatabaseInitializing),
131140
fmt.Sprintf("Tenant %s created", tenant.Path),
132141
)
133142
return r.setInitDatabaseCompleted(ctx, database, "Database initialized successfully")
@@ -239,9 +248,10 @@ func (r *Reconciler) initializeTenant(
239248
ydbOpts := ydb.MergeOptions(ydb.WithCredentials(creds), tlsOptions)
240249

241250
if meta.IsStatusConditionFalse(database.Status.Conditions, CreateDatabaseOperationCondition) {
242-
return r.checkCreateTenantOperation(ctx, database, tenant, ydbOpts)
251+
return r.checkCreateDatabaseOperation(ctx, database, tenant, ydbOpts)
243252
}
244-
operationID, err := tenant.Create(ctx, ydb.WithCredentials(creds), tlsOptions)
253+
254+
response, err := tenant.CreateDatabase(ctx, ydbOpts)
245255
if err != nil {
246256
r.Recorder.Event(
247257
database,
@@ -251,11 +261,23 @@ func (r *Reconciler) initializeTenant(
251261
)
252262
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
253263
}
254-
if len(operationID) > 0 {
264+
265+
finished, operationID, err := tenant.CheckCreateDatabaseResponse(ctx, response)
266+
if err != nil {
267+
r.Recorder.Event(
268+
database,
269+
corev1.EventTypeWarning,
270+
"InitializingFailed",
271+
fmt.Sprintf("Failed %s: %s", tenant.Path, err),
272+
)
273+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
274+
}
275+
276+
if !finished {
255277
r.Recorder.Event(
256278
database,
257279
corev1.EventTypeWarning,
258-
"Pending",
280+
string(DatabaseInitializing),
259281
fmt.Sprintf("Tenant creation operation in progress, operationID: %s", operationID),
260282
)
261283
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
@@ -269,7 +291,7 @@ func (r *Reconciler) initializeTenant(
269291
r.Recorder.Event(
270292
database,
271293
corev1.EventTypeNormal,
272-
"Initialized",
294+
string(DatabaseInitializing),
273295
fmt.Sprintf("Tenant %s created", tenant.Path),
274296
)
275297

0 commit comments

Comments
 (0)