Skip to content

Commit 4cd009c

Browse files
ti-chi-botlhy1024Tema
authored
api: support to query whether pd has loaded region (#8749) (#9319)
close #8426, close #8748 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Signed-off-by: lhy1024 <admin@liudos.us> Signed-off-by: artem_danilov <artem.danilov@airbnb.com> Co-authored-by: lhy1024 <liuhanyang@pingcap.com> Co-authored-by: lhy1024 <admin@liudos.us> Co-authored-by: Artem Danilov <artem.danilov@airbnb.com>
1 parent bac6558 commit 4cd009c

File tree

6 files changed

+208
-21
lines changed

6 files changed

+208
-21
lines changed

pkg/storage/storage.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ import (
1818
"context"
1919
"sync/atomic"
2020

21+
clientv3 "go.etcd.io/etcd/client/v3"
22+
2123
"github.com/pingcap/kvproto/pkg/metapb"
2224
"github.com/tikv/pd/pkg/core"
2325
"github.com/tikv/pd/pkg/encryption"
2426
"github.com/tikv/pd/pkg/storage/endpoint"
2527
"github.com/tikv/pd/pkg/storage/kv"
2628
"github.com/tikv/pd/pkg/utils/syncutil"
27-
clientv3 "go.etcd.io/etcd/client/v3"
2829
)
2930

3031
// Storage is the interface for the backend storage of the PD.
@@ -73,13 +74,21 @@ func NewRegionStorageWithLevelDBBackend(
7374

7475
// TODO: support other KV storage backends like BadgerDB in the future.
7576

77+
type regionSource int
78+
79+
const (
80+
unloaded regionSource = iota
81+
fromEtcd
82+
fromLeveldb
83+
)
84+
7685
type coreStorage struct {
7786
Storage
7887
regionStorage endpoint.RegionStorage
7988

80-
useRegionStorage int32
81-
regionLoaded bool
82-
mu syncutil.Mutex
89+
useRegionStorage atomic.Bool
90+
regionLoaded regionSource
91+
mu syncutil.RWMutex
8392
}
8493

8594
// NewCoreStorage creates a new core storage with the given default and region storage.
@@ -90,6 +99,7 @@ func NewCoreStorage(defaultStorage Storage, regionStorage endpoint.RegionStorage
9099
return &coreStorage{
91100
Storage: defaultStorage,
92101
regionStorage: regionStorage,
102+
regionLoaded: unloaded,
93103
}
94104
}
95105

@@ -116,12 +126,12 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi
116126
if useLocalRegionStorage {
117127
// Switch the region storage to regionStorage, all region info will be read/saved by the internal
118128
// regionStorage, and in most cases it's LevelDB-backend.
119-
atomic.StoreInt32(&ps.useRegionStorage, 1)
129+
ps.useRegionStorage.Store(true)
120130
return ps.regionStorage
121131
}
122132
// Switch the region storage to defaultStorage, all region info will be read/saved by the internal
123133
// defaultStorage, and in most cases it's etcd-backend.
124-
atomic.StoreInt32(&ps.useRegionStorage, 0)
134+
ps.useRegionStorage.Store(false)
125135
return ps.Storage
126136
}
127137

@@ -133,48 +143,53 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi
133143
return s.LoadRegions(ctx, f)
134144
}
135145

136-
if atomic.LoadInt32(&ps.useRegionStorage) == 0 {
137-
return ps.Storage.LoadRegions(ctx, f)
138-
}
139-
140146
ps.mu.Lock()
141147
defer ps.mu.Unlock()
142-
if !ps.regionLoaded {
148+
149+
if !ps.useRegionStorage.Load() {
150+
err := ps.Storage.LoadRegions(ctx, f)
151+
if err == nil {
152+
ps.regionLoaded = fromEtcd
153+
}
154+
return err
155+
}
156+
157+
if ps.regionLoaded == unloaded {
143158
if err := ps.regionStorage.LoadRegions(ctx, f); err != nil {
144159
return err
145160
}
146-
ps.regionLoaded = true
161+
ps.regionLoaded = fromLeveldb
147162
}
148163
return nil
149164
}
150165

151166
// LoadRegion loads one region from storage.
152167
func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) {
153-
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
168+
if ps.useRegionStorage.Load() {
154169
return ps.regionStorage.LoadRegion(regionID, region)
155170
}
156171
return ps.Storage.LoadRegion(regionID, region)
157172
}
158173

159174
// LoadRegions loads all regions from storage to RegionsInfo.
160175
func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
161-
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
176+
if ps.useRegionStorage.Load() {
162177
return ps.regionStorage.LoadRegions(ctx, f)
163178
}
164179
return ps.Storage.LoadRegions(ctx, f)
165180
}
166181

167182
// SaveRegion saves one region to storage.
168183
func (ps *coreStorage) SaveRegion(region *metapb.Region) error {
169-
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
184+
if ps.useRegionStorage.Load() {
170185
return ps.regionStorage.SaveRegion(region)
171186
}
172187
return ps.Storage.SaveRegion(region)
173188
}
174189

175190
// DeleteRegion deletes one region from storage.
176191
func (ps *coreStorage) DeleteRegion(region *metapb.Region) error {
177-
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
192+
if ps.useRegionStorage.Load() {
178193
return ps.regionStorage.DeleteRegion(region)
179194
}
180195
return ps.Storage.DeleteRegion(region)
@@ -197,3 +212,14 @@ func (ps *coreStorage) Close() error {
197212
}
198213
return nil
199214
}
215+
216+
// AreRegionsLoaded returns whether the regions are loaded.
217+
func AreRegionsLoaded(s Storage) bool {
218+
ps := s.(*coreStorage)
219+
ps.mu.RLock()
220+
defer ps.mu.RUnlock()
221+
if ps.useRegionStorage.Load() {
222+
return ps.regionLoaded == fromLeveldb
223+
}
224+
return ps.regionLoaded == fromEtcd
225+
}

server/api/health_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,22 @@ func TestHealthSlice(t *testing.T) {
4949
re := require.New(t)
5050
cfgs, svrs, clean := mustNewCluster(re, 3)
5151
defer clean()
52-
var leader, follow *server.Server
52+
var leader, follower *server.Server
5353

5454
for _, svr := range svrs {
5555
if !svr.IsClosed() && svr.GetMember().IsLeader() {
5656
leader = svr
5757
} else {
58-
follow = svr
58+
follower = svr
5959
}
6060
}
6161
mustBootstrapCluster(re, leader)
6262
addr := leader.GetConfig().ClientUrls + apiPrefix + "/api/v1/health"
63-
follow.Close()
63+
follower.Close()
6464
resp, err := testDialClient.Get(addr)
6565
re.NoError(err)
6666
defer resp.Body.Close()
6767
buf, err := io.ReadAll(resp.Body)
6868
re.NoError(err)
69-
checkSliceResponse(re, buf, cfgs, follow.GetConfig().Name)
69+
checkSliceResponse(re, buf, cfgs, follower.GetConfig().Name)
7070
}

server/apiv2/handlers/ready.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2024 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package handlers
16+
17+
import (
18+
"net/http"
19+
20+
"github.com/gin-gonic/gin"
21+
22+
"github.com/pingcap/failpoint"
23+
24+
"github.com/tikv/pd/pkg/storage"
25+
"github.com/tikv/pd/server"
26+
"github.com/tikv/pd/server/apiv2/middlewares"
27+
)
28+
29+
// ReadyStatus reflects the cluster's ready status.
30+
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
31+
type ReadyStatus struct {
32+
RegionLoaded bool `json:"region_loaded"`
33+
}
34+
35+
// Ready checks if the region is loaded.
36+
// @Summary It will return whether pd follower is ready to became leader. This request is always served by the instance that receives it and never forwarded to the leader.
37+
// @Router /ready [get]
38+
// @Param verbose query bool false "Whether to return details."
39+
// @Success 200
40+
// @Failure 500
41+
func Ready(c *gin.Context) {
42+
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
43+
s := svr.GetStorage()
44+
regionLoaded := storage.AreRegionsLoaded(s)
45+
failpoint.Inject("loadRegionSlow", func(val failpoint.Value) {
46+
if s, ok := val.(string); ok {
47+
if svr.GetAddr() == s {
48+
regionLoaded = false
49+
}
50+
}
51+
})
52+
if regionLoaded {
53+
c.Status(http.StatusOK)
54+
} else {
55+
c.Status(http.StatusInternalServerError)
56+
}
57+
58+
if _, ok := c.GetQuery("verbose"); !ok {
59+
return
60+
}
61+
resp := &ReadyStatus{
62+
RegionLoaded: regionLoaded,
63+
}
64+
if regionLoaded {
65+
c.IndentedJSON(http.StatusOK, resp)
66+
} else {
67+
c.AbortWithStatusJSON(http.StatusInternalServerError, resp)
68+
}
69+
}

server/apiv2/router.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ func NewV2Handler(_ context.Context, svr *server.Server) (http.Handler, apiutil.
5050
c.Set(middlewares.ServerContextKey, svr)
5151
c.Next()
5252
})
53-
router.Use(middlewares.Redirector())
5453
root := router.Group(apiV2Prefix)
54+
// add ready handler before Redirector to avoid redirecting it to the leader
55+
root.GET("ready", handlers.Ready)
56+
root.Use(middlewares.Redirector())
5557
handlers.RegisterKeyspace(root)
5658
handlers.RegisterTSOKeyspaceGroup(root)
5759
handlers.RegisterMicroService(root)
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright 2024 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package handlers
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"io"
21+
"net/http"
22+
"testing"
23+
24+
"github.com/stretchr/testify/require"
25+
26+
"github.com/pingcap/failpoint"
27+
28+
"github.com/tikv/pd/server/apiv2/handlers"
29+
"github.com/tikv/pd/tests"
30+
)
31+
32+
func TestReadyAPI(t *testing.T) {
33+
re := require.New(t)
34+
ctx, cancel := context.WithCancel(context.Background())
35+
defer cancel()
36+
cluster, err := tests.NewTestCluster(ctx, 3)
37+
re.NoError(err)
38+
defer cluster.Destroy()
39+
re.NoError(cluster.RunInitialServers())
40+
re.NotEmpty(cluster.WaitLeader())
41+
leader := cluster.GetLeaderServer()
42+
re.NoError(leader.BootstrapCluster())
43+
leaderURL := leader.GetConfig().ClientUrls + v2Prefix + "/ready"
44+
followerServer := cluster.GetServer(cluster.GetFollower())
45+
followerURL := followerServer.GetConfig().ClientUrls + v2Prefix + "/ready"
46+
checkReadyAPI(re, leaderURL, true)
47+
checkReadyAPI(re, followerURL, true)
48+
49+
// check ready status when region is not loaded for leader
50+
failpoint.Enable("github.com/tikv/pd/server/apiv2/handlers/loadRegionSlow", `return("`+leader.GetAddr()+`")`)
51+
checkReadyAPI(re, leaderURL, false)
52+
checkReadyAPI(re, followerURL, true)
53+
54+
// check ready status when region is not loaded for follower
55+
failpoint.Enable("github.com/tikv/pd/server/apiv2/handlers/loadRegionSlow", `return("`+followerServer.GetAddr()+`")`)
56+
checkReadyAPI(re, leaderURL, true)
57+
checkReadyAPI(re, followerURL, false)
58+
59+
failpoint.Disable("github.com/tikv/pd/server/apiv2/handlers/loadRegionSlow")
60+
}
61+
62+
func checkReadyAPI(re *require.Assertions, url string, isReady bool) {
63+
expectCode := http.StatusOK
64+
if !isReady {
65+
expectCode = http.StatusInternalServerError
66+
}
67+
// check ready status
68+
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
69+
re.NoError(err)
70+
resp, err := tests.TestDialClient.Do(req)
71+
re.NoError(err)
72+
defer resp.Body.Close()
73+
buf, err := io.ReadAll(resp.Body)
74+
re.NoError(err)
75+
re.Empty(buf)
76+
re.Equal(expectCode, resp.StatusCode)
77+
// check ready status with verbose
78+
req, err = http.NewRequest(http.MethodGet, url+"?verbose", http.NoBody)
79+
re.NoError(err)
80+
resp, err = tests.TestDialClient.Do(req)
81+
re.NoError(err)
82+
defer resp.Body.Close()
83+
buf, err = io.ReadAll(resp.Body)
84+
re.NoError(err)
85+
r := &handlers.ReadyStatus{}
86+
re.NoError(json.Unmarshal(buf, &r))
87+
re.Equal(expectCode, resp.StatusCode)
88+
re.Equal(isReady, r.RegionLoaded)
89+
}

tests/server/apiv2/handlers/testutil.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
)
3131

3232
const (
33+
v2Prefix = "/pd/api/v2"
3334
keyspacesPrefix = "/pd/api/v2/keyspaces"
3435
keyspaceGroupsPrefix = "/pd/api/v2/tso/keyspace-groups"
3536
)

0 commit comments

Comments
 (0)