-
Notifications
You must be signed in to change notification settings - Fork 735
client, resource_manager: resource manager client #5810
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
ca83b1d
497c67d
321c2ad
0fc2ca5
ef25725
6a13cab
6f18863
2d2fc55
887d221
1b769a6
356f118
7bc3bc9
f73f782
d5aa646
72079f2
29f44e1
eefe996
69a3ac0
6115bc6
d776fca
00d13b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
package pd | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. file name need to be
CabinfeverB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
rmpb "github.com/pingcap/kvproto/pkg/resource_manager" | ||
"github.com/pingcap/log" | ||
"go.uber.org/zap" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
// KeyspaceClient manages keyspace metadata. | ||
CabinfeverB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
type ResourceManagerClient interface { | ||
ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) | ||
GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) | ||
AddResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings) (string, error) | ||
ModifyResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings) (string, error) | ||
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) | ||
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) | ||
} | ||
|
||
// leaderClient gets the client of current PD leader. | ||
CabinfeverB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (c *client) resouceManagerClient() rmpb.ResourceManagerClient { | ||
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok { | ||
return rmpb.NewResourceManagerClient(cc.(*grpc.ClientConn)) | ||
} | ||
return nil | ||
} | ||
|
||
// ListResourceGroups loads and returns target keyspace's metadata. | ||
func (c *client) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { | ||
req := &rmpb.ListResourceGroupsRequest{} | ||
resp, err := c.resouceManagerClient().ListResourceGroups(ctx, req) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this cause panic if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like |
||
if err != nil { | ||
return nil, err | ||
} | ||
resErr := resp.GetError() | ||
if resErr != nil { | ||
return nil, errors.Errorf("[pd]" + resErr.Message) | ||
CabinfeverB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return resp.GetGroups(), nil | ||
} | ||
|
||
func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) { | ||
req := &rmpb.GetResourceGroupRequest{ | ||
ResourceGroupName: resourceGroupName, | ||
} | ||
resp, err := c.resouceManagerClient().GetResourceGroup(ctx, req) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
if err != nil { | ||
return nil, err | ||
} | ||
resErr := resp.GetError() | ||
if resErr != nil { | ||
return nil, errors.Errorf("[pd]" + resErr.Message) | ||
CabinfeverB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return resp.GetGroup(), nil | ||
} | ||
|
||
func (c *client) AddResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings) (string, error) { | ||
return c.putResourceGroup(ctx, resourceGroupName, settings, 0 /* type add resource group */) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about using an enum type to distinguish the type rather than the comment? |
||
} | ||
|
||
func (c *client) ModifyResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings) (string, error) { | ||
return c.putResourceGroup(ctx, resourceGroupName, settings, 1 /* type modify resource group */) | ||
} | ||
|
||
func (c *client) putResourceGroup(ctx context.Context, resourceGroupName string, settings *rmpb.GroupSettings, typ int) (str string, err error) { | ||
group := &rmpb.ResourceGroup{ | ||
Name: resourceGroupName, | ||
Settings: settings, | ||
} | ||
req := &rmpb.PutResourceGroupRequest{ | ||
Group: group, | ||
} | ||
var resp *rmpb.PutResourceGroupResponse | ||
if typ == 0 { | ||
resp, err = c.resouceManagerClient().AddResourceGroup(ctx, req) | ||
} else { | ||
resp, err = c.resouceManagerClient().ModifyResourceGroup(ctx, req) | ||
} | ||
if err != nil { | ||
return str, err | ||
} | ||
resErr := resp.GetError() | ||
if resErr != nil { | ||
return str, errors.Errorf("[pd]" + resErr.Message) | ||
} | ||
str = resp.GetBody() | ||
return | ||
} | ||
|
||
func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { | ||
req := &rmpb.DeleteResourceGroupRequest{ | ||
ResourceGroupName: resourceGroupName, | ||
} | ||
resp, err := c.resouceManagerClient().DeleteResourceGroup(ctx, req) | ||
if err != nil { | ||
return "", err | ||
} | ||
resErr := resp.GetError() | ||
if resErr != nil { | ||
return "", errors.Errorf("[pd]" + resErr.Message) | ||
} | ||
return resp.GetBody(), nil | ||
} | ||
|
||
func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { | ||
req := &tokenRequest{ | ||
done: make(chan error, 1), | ||
requestCtx: ctx, | ||
clientCtx: c.ctx, | ||
} | ||
req.Requeset = request | ||
CabinfeverB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that this will easily be blocked since the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently we have not implemented batch, so it will also be blocked by |
||
grantedTokens, err := req.Wait() | ||
if err != nil { | ||
return nil, err | ||
} | ||
return grantedTokens, err | ||
} | ||
|
||
type tokenRequest struct { | ||
clientCtx context.Context | ||
requestCtx context.Context | ||
done chan error | ||
Requeset *rmpb.TokenBucketsRequest | ||
TokenBuckets []*rmpb.TokenBucketResponse | ||
} | ||
|
||
func (req *tokenRequest) Wait() (tokenBuckets []*rmpb.TokenBucketResponse, err error) { | ||
select { | ||
case err = <-req.done: | ||
err = errors.WithStack(err) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tokenBuckets = req.TokenBuckets | ||
return | ||
case <-req.requestCtx.Done(): | ||
return nil, errors.WithStack(req.requestCtx.Err()) | ||
case <-req.clientCtx.Done(): | ||
return nil, errors.WithStack(req.clientCtx.Err()) | ||
} | ||
} | ||
|
||
type tokenBatchController struct { | ||
tokenRequestCh chan *tokenRequest | ||
} | ||
|
||
func newTokenBatchController(tokenRequestCh chan *tokenRequest) *tokenBatchController { | ||
return &tokenBatchController{ | ||
tokenRequestCh: tokenRequestCh, | ||
} | ||
} | ||
|
||
type tokenDispatcher struct { | ||
dispatcherCancel context.CancelFunc | ||
tokenBatchController *tokenBatchController | ||
} | ||
|
||
type resourceManagerConnectionContext struct { | ||
stream rmpb.ResourceManager_AcquireTokenBucketsClient | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
} | ||
|
||
func (c *client) createTokenispatcher() { | ||
dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) | ||
dispatcher := &tokenDispatcher{ | ||
dispatcherCancel: dispatcherCancel, | ||
tokenBatchController: newTokenBatchController( | ||
make(chan *tokenRequest, 1)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about using a larger size? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently we have not implemented batch and do not seem to need a larger size |
||
} | ||
go c.handleResouceTokenDispatcher(dispatcherCtx, dispatcher.tokenBatchController) | ||
c.tokenDispatcher = dispatcher | ||
} | ||
|
||
func (c *client) handleResouceTokenDispatcher(dispatcherCtx context.Context, tbc *tokenBatchController) { | ||
var connection resourceManagerConnectionContext | ||
if err := c.tryResourceManagerConnect(dispatcherCtx, &connection); err != nil { | ||
log.Warn("get stream error", zap.Error(err)) | ||
} | ||
|
||
for { | ||
var firstTSORequest *tokenRequest | ||
CabinfeverB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
select { | ||
case <-dispatcherCtx.Done(): | ||
return | ||
case firstTSORequest = <-tbc.tokenRequestCh: | ||
} | ||
stream, streamCtx, cancel := connection.stream, connection.ctx, connection.cancel | ||
if stream == nil { | ||
c.tryResourceManagerConnect(dispatcherCtx, &connection) | ||
c.finishTokenRequest(firstTSORequest, nil, errors.Errorf("no stream")) | ||
continue | ||
} | ||
select { | ||
case <-streamCtx.Done(): | ||
log.Info("[pd] resource manager stream is canceled") | ||
cancel() | ||
stream = nil | ||
continue | ||
default: | ||
} | ||
err := c.processTokenRequests(stream, firstTSORequest) | ||
if err != nil { | ||
log.Info("processTokenRequests error", zap.Error(err)) | ||
cancel() | ||
connection.stream = nil | ||
} | ||
} | ||
|
||
} | ||
|
||
func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error { | ||
req := t.Requeset | ||
if err := stream.Send(req); err != nil { | ||
err = errors.WithStack(err) | ||
c.finishTokenRequest(t, nil, err) | ||
return err | ||
} | ||
resp, err := stream.Recv() | ||
if err != nil { | ||
err = errors.WithStack(err) | ||
c.finishTokenRequest(t, nil, err) | ||
return err | ||
} | ||
if resp.GetError() != nil { | ||
return errors.Errorf("[pd]" + resp.GetError().Message) | ||
} | ||
tokenBuckets := resp.GetResponses() | ||
c.finishTokenRequest(t, tokenBuckets, nil) | ||
return nil | ||
} | ||
|
||
func (c *client) finishTokenRequest(t *tokenRequest, tokenBuckets []*rmpb.TokenBucketResponse, err error) { | ||
t.TokenBuckets = tokenBuckets | ||
t.done <- err | ||
} | ||
|
||
func (c *client) tryResourceManagerConnect(ctx context.Context, connection *resourceManagerConnectionContext) error { | ||
var ( | ||
err error | ||
stream rmpb.ResourceManager_AcquireTokenBucketsClient | ||
) | ||
for i := 0; i < maxRetryTimes; i++ { | ||
cctx, cancel := context.WithCancel(ctx) | ||
stream, err = c.resouceManagerClient().AcquireTokenBuckets(cctx) | ||
if err == nil && stream != nil { | ||
connection.cancel = cancel | ||
connection.ctx = cctx | ||
connection.stream = stream | ||
return nil | ||
} | ||
cancel() | ||
select { | ||
case <-ctx.Done(): | ||
return err | ||
case <-time.After(retryInterval): | ||
} | ||
} | ||
return err | ||
} |
Uh oh!
There was an error while loading. Please reload this page.