Skip to content

Commit 61f8eb6

Browse files
committed
transport/grpc: add WithConnPool dialer option
This "internal" option allows clients to share a connection pool for longrunning GAPIC clients. Most usages of WithGRPCConn internally should be changed to WithConnPool. Change-Id: I496a6e229c0d5695de8e1381704ac11ca078b162 Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/51532 Reviewed-by: kokoro <noreply+kokoro@google.com> Reviewed-by: Noah Dietz <ndietz@google.com> Reviewed-by: Tyler Bui-Palsulich <tbp@google.com>
1 parent b1dd698 commit 61f8eb6

File tree

5 files changed

+93
-59
lines changed

5 files changed

+93
-59
lines changed

internal/conn_pool.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2020 Google LLC.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package internal
6+
7+
import (
8+
"google.golang.org/grpc"
9+
)
10+
11+
// ConnPool is a pool of grpc.ClientConns.
12+
type ConnPool interface {
13+
// Conn returns a ClientConn from the pool.
14+
//
15+
// Conns aren't returned to the pool.
16+
Conn() *grpc.ClientConn
17+
18+
// Num returns the number of connections in the pool.
19+
//
20+
// It will always return the same value.
21+
Num() int
22+
23+
// Close closes every ClientConn in the pool.
24+
//
25+
// The error returned by Close may be a single error or multiple errors.
26+
Close() error
27+
28+
// ConnPool implements grpc.ClientConnInterface to enable it to be used directly with generated proto stubs.
29+
grpc.ClientConnInterface
30+
}

internal/settings.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ type DialSettings struct {
2929
HTTPClient *http.Client
3030
GRPCDialOpts []grpc.DialOption
3131
GRPCConn *grpc.ClientConn
32-
GRPCConnPool int
32+
GRPCConnPool ConnPool
33+
GRPCConnPoolSize int
3334
NoAuth bool
3435
TelemetryDisabled bool
3536

@@ -71,6 +72,12 @@ func (ds *DialSettings) Validate() error {
7172
if nCreds > 1 && !(nCreds == 2 && ds.TokenSource != nil && ds.CredentialsFile != "") {
7273
return errors.New("multiple credential options provided")
7374
}
75+
if ds.GRPCConn != nil && ds.GRPCConnPool != nil {
76+
return errors.New("WithGRPCConn is incompatible with WithConnPool")
77+
}
78+
if ds.HTTPClient != nil && ds.GRPCConnPool != nil {
79+
return errors.New("WithHTTPClient is incompatible with WithConnPool")
80+
}
7481
if ds.HTTPClient != nil && ds.GRPCConn != nil {
7582
return errors.New("WithHTTPClient is incompatible with WithGRPCConn")
7683
}

option/option.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func WithGRPCConnectionPool(size int) ClientOption {
150150
type withGRPCConnectionPool int
151151

152152
func (w withGRPCConnectionPool) Apply(o *internal.DialSettings) {
153-
o.GRPCConnPool = int(w)
153+
o.GRPCConnPoolSize = int(w)
154154
}
155155

156156
// WithAPIKey returns a ClientOption that specifies an API key to be used

transport/grpc/dial.go

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,16 @@ var timeoutDialerOption grpc.DialOption
3636
// Dial returns a GRPC connection for use communicating with a Google cloud
3737
// service, configured with the given ClientOptions.
3838
func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
39-
var o internal.DialSettings
40-
for _, opt := range opts {
41-
opt.Apply(&o)
39+
o, err := processAndValidateOpts(opts)
40+
if err != nil {
41+
return nil, err
4242
}
43-
if o.GRPCConnPool != 0 {
43+
if o.GRPCConnPool != nil {
44+
return o.GRPCConnPool.Conn(), nil
45+
}
46+
if o.GRPCConnPoolSize != 0 {
4447
// NOTE(cbro): RoundRobin and WithBalancer are deprecated and we need to remove usages of it.
45-
balancer := grpc.RoundRobin(internal.NewPoolResolver(o.GRPCConnPool, &o))
48+
balancer := grpc.RoundRobin(internal.NewPoolResolver(o.GRPCConnPoolSize, o))
4649
o.GRPCDialOpts = append(o.GRPCDialOpts, grpc.WithBalancer(balancer))
4750
}
4851
return dial(ctx, false, o)
@@ -52,9 +55,9 @@ func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, e
5255
// with fake or mock Google cloud service implementations, such as emulators.
5356
// The connection is configured with the given ClientOptions.
5457
func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
55-
var o internal.DialSettings
56-
for _, opt := range opts {
57-
opt.Apply(&o)
58+
o, err := processAndValidateOpts(opts)
59+
if err != nil {
60+
return nil, err
5861
}
5962
return dial(ctx, true, o)
6063
}
@@ -67,12 +70,15 @@ func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.Clien
6770
//
6871
// This API is subject to change as we further refine requirements. It will go away if gRPC stubs accept an interface instead of the concrete ClientConn type. See https://github.com/grpc/grpc-go/issues/1287.
6972
func DialPool(ctx context.Context, opts ...option.ClientOption) (ConnPool, error) {
70-
var o internal.DialSettings
71-
for _, opt := range opts {
72-
opt.Apply(&o)
73+
o, err := processAndValidateOpts(opts)
74+
if err != nil {
75+
return nil, err
7376
}
74-
poolSize := o.GRPCConnPool
75-
o.GRPCConnPool = 0 // we don't *need* to set this to zero, but it's safe to.
77+
if o.GRPCConnPool != nil {
78+
return o.GRPCConnPool, nil
79+
}
80+
poolSize := o.GRPCConnPoolSize
81+
o.GRPCConnPoolSize = 0 // we don't *need* to set this to zero, but it's safe to.
7682

7783
if poolSize == 0 || poolSize == 1 {
7884
// Fast path for common case for a connection pool with a single connection.
@@ -95,10 +101,7 @@ func DialPool(ctx context.Context, opts ...option.ClientOption) (ConnPool, error
95101
return pool, nil
96102
}
97103

98-
func dial(ctx context.Context, insecure bool, o internal.DialSettings) (*grpc.ClientConn, error) {
99-
if err := o.Validate(); err != nil {
100-
return nil, err
101-
}
104+
func dial(ctx context.Context, insecure bool, o *internal.DialSettings) (*grpc.ClientConn, error) {
102105
if o.HTTPClient != nil {
103106
return nil, errors.New("unsupported HTTP client specified")
104107
}
@@ -112,7 +115,7 @@ func dial(ctx context.Context, insecure bool, o internal.DialSettings) (*grpc.Cl
112115
if o.APIKey != "" {
113116
log.Print("API keys are not supported for gRPC APIs. Remove the WithAPIKey option from your client-creating call.")
114117
}
115-
creds, err := internal.Creds(ctx, &o)
118+
creds, err := internal.Creds(ctx, o)
116119
if err != nil {
117120
return nil, err
118121
}
@@ -180,7 +183,7 @@ func dial(ctx context.Context, insecure bool, o internal.DialSettings) (*grpc.Cl
180183
return grpc.DialContext(ctx, o.Endpoint, grpcOpts...)
181184
}
182185

183-
func addOCStatsHandler(opts []grpc.DialOption, settings internal.DialSettings) []grpc.DialOption {
186+
func addOCStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
184187
if settings.TelemetryDisabled {
185188
return opts
186189
}
@@ -255,3 +258,30 @@ func isDirectPathEnabled(endpoint string) bool {
255258
}
256259
return false
257260
}
261+
262+
func processAndValidateOpts(opts []option.ClientOption) (*internal.DialSettings, error) {
263+
var o internal.DialSettings
264+
for _, opt := range opts {
265+
opt.Apply(&o)
266+
}
267+
if err := o.Validate(); err != nil {
268+
return nil, err
269+
}
270+
return &o, nil
271+
}
272+
273+
type connPoolOption struct{ ConnPool }
274+
275+
// WithConnPool returns a ClientOption that specifies the ConnPool
276+
// connection to use as the basis of communications.
277+
//
278+
// This is only to be used by Google client libraries internally, for example
279+
// when creating a longrunning API client that shares the same connection pool
280+
// as a service client.
281+
func WithConnPool(p ConnPool) option.ClientOption {
282+
return connPoolOption{p}
283+
}
284+
285+
func (o connPoolOption) Apply(s *internal.DialSettings) {
286+
s.GRPCConnPool = o.ConnPool
287+
}

transport/grpc/pool.go

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,56 +9,23 @@ import (
99
"fmt"
1010
"sync/atomic"
1111

12+
"google.golang.org/api/internal"
1213
"google.golang.org/grpc"
1314
)
1415

1516
// ConnPool is a pool of grpc.ClientConns.
16-
type ConnPool interface {
17-
// Conn returns a ClientConn from the pool.
18-
//
19-
// Conns aren't returned to the pool.
20-
Conn() *grpc.ClientConn
21-
22-
// Num returns the number of connections in the pool.
23-
//
24-
// It will always return the same value.
25-
Num() int
26-
27-
// Close closes every ClientConn in the pool.
28-
//
29-
// The error returned by Close may be a single error or multiple errors.
30-
Close() error
31-
32-
grpc.ClientConnInterface
33-
}
17+
type ConnPool = internal.ConnPool // NOTE(cbro): type alias to export the type. It must live in internal to avoid a circular dependency.
3418

3519
var _ ConnPool = &roundRobinConnPool{}
3620
var _ ConnPool = &singleConnPool{}
3721

3822
// singleConnPool is a special case for a single connection.
3923
type singleConnPool struct {
40-
conn *grpc.ClientConn
41-
}
42-
43-
func (p *singleConnPool) Conn() *grpc.ClientConn {
44-
return p.conn
45-
}
46-
47-
func (p *singleConnPool) Num() int {
48-
return 1
24+
*grpc.ClientConn
4925
}
5026

51-
func (p *singleConnPool) Close() error {
52-
return p.conn.Close()
53-
}
54-
55-
func (p *singleConnPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
56-
return p.conn.Invoke(ctx, method, args, reply, opts...)
57-
}
58-
59-
func (p *singleConnPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
60-
return p.conn.NewStream(ctx, desc, method, opts...)
61-
}
27+
func (p *singleConnPool) Conn() *grpc.ClientConn { return p.ClientConn }
28+
func (p *singleConnPool) Num() int { return 1 }
6229

6330
type roundRobinConnPool struct {
6431
conns []*grpc.ClientConn

0 commit comments

Comments
 (0)