Skip to content

client, resource_manager: resource manager client #5810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ type Client interface {

// KeyspaceClient manages keyspace metadata.
KeyspaceClient
// ResourceManagerClient
ResourceManagerClient
// Close closes the client.
Close()
}
Expand Down Expand Up @@ -390,6 +392,8 @@ type client struct {
// dc-location -> *lastTSO
lastTSMap sync.Map // Same as map[string]*lastTSO

tokenDispatcher *tokenDispatcher

// For internal usage.
checkTSDeadlineCh chan struct{}
leaderNetworkFailure int32
Expand Down Expand Up @@ -417,6 +421,7 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi
}
// Start the daemons.
c.updateTSODispatcher()
c.createTokenispatcher()
c.wg.Add(3)
go c.tsLoop()
go c.tsCancelLoop()
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172
github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172 h1:FYgKV9znRQmzVrrJDZ0gUfMIvKLAMU1tu1UKJib8bEQ=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26 h1:Tw4afZ2Tyr8iT8Oln6/szMjh5IDs+GtlnLsDo/Y2HEE=
github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
265 changes: 265 additions & 0 deletions client/resoucemanager_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
package pd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file name need to be resource rather than resouce


import (
"context"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// KeyspaceClient manages keyspace metadata.
type ResourceManagerClient interface {
ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error)
GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error)
AddResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings) (string, error)
ModifyResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
}

// leaderClient gets the client of current PD leader.
func (c *client) resouceManagerClient() rmpb.ResourceManagerClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return rmpb.NewResourceManagerClient(cc.(*grpc.ClientConn))
}
return nil
}

// ListResourceGroups loads and returns target keyspace's metadata.
func (c *client) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
req := &rmpb.ListResourceGroupsRequest{}
resp, err := c.resouceManagerClient().ListResourceGroups(ctx, req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this cause panic if the c.resouceManagerClient() return nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like leaderClient, I don't think c.resouceManagerClient() is going to return nil

if err != nil {
return nil, err
}
resErr := resp.GetError()
if resErr != nil {
return nil, errors.Errorf("[pd]" + resErr.Message)
}
return resp.GetGroups(), nil
}

func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) {
req := &rmpb.GetResourceGroupRequest{
ResourceGroupName: resourceGroupName,
}
resp, err := c.resouceManagerClient().GetResourceGroup(ctx, req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

if err != nil {
return nil, err
}
resErr := resp.GetError()
if resErr != nil {
return nil, errors.Errorf("[pd]" + resErr.Message)
}
return resp.GetGroup(), nil
}

func (c *client) AddResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings) (string, error) {
return c.putResourceGroup(ctx, resourceGroupName, settings, 0 /* type add resource group */)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using an enum type to distinguish the type rather than the comment?

}

func (c *client) ModifyResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings) (string, error) {
return c.putResourceGroup(ctx, resourceGroupName, settings, 1 /* type modify resource group */)
}

func (c *client) putResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings, typ int) (str string, err error) {
group := &rmpb.ResourceGroup{
Name: resourceGroupName,
Settings: settings,
}
req := &rmpb.PutResourceGroupRequest{
Group: group,
}
var resp *rmpb.PutResourceGroupResponse
if typ == 0 {
resp, err = c.resouceManagerClient().AddResourceGroup(ctx, req)
} else {
resp, err = c.resouceManagerClient().ModifyResourceGroup(ctx, req)
}
if err != nil {
return str, err
}
resErr := resp.GetError()
if resErr != nil {
return str, errors.Errorf("[pd]" + resErr.Message)
}
str = resp.GetBody()
return
}

func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
req := &rmpb.DeleteResourceGroupRequest{
ResourceGroupName: resourceGroupName,
}
resp, err := c.resouceManagerClient().DeleteResourceGroup(ctx, req)
if err != nil {
return "", err
}
resErr := resp.GetError()
if resErr != nil {
return "", errors.Errorf("[pd]" + resErr.Message)
}
return resp.GetBody(), nil
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
requestCtx: ctx,
clientCtx: c.ctx,
}
req.Requeset = request
c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this will easily be blocked since the tokenRequestCh's capacity is 1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have not implemented batch, so it will also be blocked by Wait()

grantedTokens, err := req.Wait()
if err != nil {
return nil, err
}
return grantedTokens, err
}

type tokenRequest struct {
clientCtx context.Context
requestCtx context.Context
done chan error
Requeset *rmpb.TokenBucketsRequest
TokenBuckets []*rmpb.TokenBucketResponse
}

func (req *tokenRequest) Wait() (tokenBuckets []*rmpb.TokenBucketResponse, err error) {
select {
case err = <-req.done:
err = errors.WithStack(err)
if err != nil {
return nil, err
}
tokenBuckets = req.TokenBuckets
return
case <-req.requestCtx.Done():
return nil, errors.WithStack(req.requestCtx.Err())
case <-req.clientCtx.Done():
return nil, errors.WithStack(req.clientCtx.Err())
}
}

type tokenBatchController struct {
tokenRequestCh chan *tokenRequest
}

func newTokenBatchController(tokenRequestCh chan *tokenRequest) *tokenBatchController {
return &tokenBatchController{
tokenRequestCh: tokenRequestCh,
}
}

type tokenDispatcher struct {
dispatcherCancel context.CancelFunc
tokenBatchController *tokenBatchController
}

type resourceManagerConnectionContext struct {
stream rmpb.ResourceManager_AcquireTokenBucketsClient
ctx context.Context
cancel context.CancelFunc
}

func (c *client) createTokenispatcher() {
dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx)
dispatcher := &tokenDispatcher{
dispatcherCancel: dispatcherCancel,
tokenBatchController: newTokenBatchController(
make(chan *tokenRequest, 1)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using a larger size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have not implemented batch and do not seem to need a larger size

}
go c.handleResouceTokenDispatcher(dispatcherCtx, dispatcher.tokenBatchController)
c.tokenDispatcher = dispatcher
}

func (c *client) handleResouceTokenDispatcher(dispatcherCtx context.Context, tbc *tokenBatchController) {
var connection resourceManagerConnectionContext
if err := c.tryResourceManagerConnect(dispatcherCtx, &connection); err != nil {
log.Warn("get stream error", zap.Error(err))
}

for {
var firstTSORequest *tokenRequest
select {
case <-dispatcherCtx.Done():
return
case firstTSORequest = <-tbc.tokenRequestCh:
}
stream, streamCtx, cancel := connection.stream, connection.ctx, connection.cancel
if stream == nil {
c.tryResourceManagerConnect(dispatcherCtx, &connection)
c.finishTokenRequest(firstTSORequest, nil, errors.Errorf("no stream"))
continue
}
select {
case <-streamCtx.Done():
log.Info("[pd] resource manager stream is canceled")
cancel()
stream = nil
continue
default:
}
err := c.processTokenRequests(stream, firstTSORequest)
if err != nil {
log.Info("processTokenRequests error", zap.Error(err))
cancel()
connection.stream = nil
}
}

}

func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error {
req := t.Requeset
if err := stream.Send(req); err != nil {
err = errors.WithStack(err)
c.finishTokenRequest(t, nil, err)
return err
}
resp, err := stream.Recv()
if err != nil {
err = errors.WithStack(err)
c.finishTokenRequest(t, nil, err)
return err
}
if resp.GetError() != nil {
return errors.Errorf("[pd]" + resp.GetError().Message)
}
tokenBuckets := resp.GetResponses()
c.finishTokenRequest(t, tokenBuckets, nil)
return nil
}

func (c *client) finishTokenRequest(t *tokenRequest, tokenBuckets []*rmpb.TokenBucketResponse, err error) {
t.TokenBuckets = tokenBuckets
t.done <- err
}

func (c *client) tryResourceManagerConnect(ctx context.Context, connection *resourceManagerConnectionContext) error {
var (
err error
stream rmpb.ResourceManager_AcquireTokenBucketsClient
)
for i := 0; i < maxRetryTimes; i++ {
cctx, cancel := context.WithCancel(ctx)
stream, err = c.resouceManagerClient().AcquireTokenBuckets(cctx)
if err == nil && stream != nil {
connection.cancel = cancel
connection.ctx = cctx
connection.stream = stream
return nil
}
cancel()
select {
case <-ctx.Done():
return err
case <-time.After(retryInterval):
}
}
return err
}
51 changes: 50 additions & 1 deletion pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package server

import (
"context"
"io"
"net/http"
"time"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/server"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -125,5 +129,50 @@ func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResource

// AcquireTokenBuckets implements ResourceManagerServer.AcquireTokenBuckets.
func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error {
return errors.New("Not implemented")
for {
select {
case <-s.ctx.Done():
return errors.New("server closed")
default:
}
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}
targetPeriodMs := request.GetTargetRequestPeriodMs()
resps := &rmpb.TokenBucketsResponse{}
for _, req := range request.Requests {
rg := s.manager.GetResourceGroup(req.ResourceGroupName)
if rg == nil {
return errors.New("resource group not found")
}
now := time.Now()
resp := &rmpb.TokenBucketResponse{
ResourceGroupName: rg.Name,
}
switch rg.Mode {
case rmpb.GroupMode_RUMode:
for _, re := range req.GetRuItems().GetRequestRU() {
switch re.Type {
case rmpb.RequestUnitType_RRU:
rg.UpdateRRU(now)
tokens := rg.RequestRRU(float64(re.Value), targetPeriodMs)
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens)
case rmpb.RequestUnitType_WRU:
rg.UpdateWRU(now)
tokens := rg.RequestWRU(float64(re.Value), targetPeriodMs)
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens)
}
}
case rmpb.GroupMode_NativeMode:
return errors.New("not supports the resource type")
}
log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName))
resps.Responses = append(resps.Responses, resp)
}
stream.Send(resps)
}
}
Loading