Skip to content

Commit e5b3ba9

Browse files
authored
Expose the RPC TImeout call (#313)
closes: #312 Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
1 parent 202c881 commit e5b3ba9

File tree

5 files changed

+40
-24
lines changed

5 files changed

+40
-24
lines changed

pkg/stream/client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ type Client struct {
7171
serverProperties map[string]string
7272
}
7373

74-
func newClient(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
74+
func newClient(connectionName string, broker *Broker,
75+
tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
7576
var clientBroker = broker
7677
if broker == nil {
7778
clientBroker = newBrokerDefault()
@@ -100,7 +101,7 @@ func newClient(connectionName string, broker *Broker, tcpParameters *TCPParamete
100101
mutex: &sync.Mutex{},
101102
destructor: &sync.Once{},
102103
},
103-
socketCallTimeout: defaultSocketCallTimeout,
104+
socketCallTimeout: rpcTimeOut,
104105
availableFeatures: newAvailableFeatures(),
105106
}
106107
c.setConnectionName(connectionName)

pkg/stream/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ var _ = Describe("Streaming testEnvironment", func() {
183183
})
184184

185185
It("Client.queryOffset won't hang if timeout happens", func() {
186-
cli := newClient("connName", nil, nil, nil)
186+
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
187187
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
188188
cli.socketCallTimeout = time.Millisecond
189189

@@ -193,7 +193,7 @@ var _ = Describe("Streaming testEnvironment", func() {
193193
})
194194

195195
It("Client.queryPublisherSequence won't hang if timeout happens", func() {
196-
cli := newClient("connName", nil, nil, nil)
196+
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
197197
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
198198
cli.socketCallTimeout = time.Millisecond
199199

@@ -203,7 +203,7 @@ var _ = Describe("Streaming testEnvironment", func() {
203203
})
204204

205205
It("Client.StreamStats won't hang if timeout happens", func() {
206-
cli := newClient("connName", nil, nil, nil)
206+
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
207207
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
208208
cli.socketCallTimeout = time.Millisecond
209209

pkg/stream/coordinator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ var _ = Describe("Coordinator", func() {
1414
client *Client
1515
)
1616
BeforeEach(func() {
17-
client = newClient("test-client", nil, nil, nil)
17+
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)
1818

1919
})
2020
AfterEach(func() {
@@ -131,7 +131,7 @@ var _ = Describe("Coordinator", func() {
131131
client *Client
132132
)
133133
BeforeEach(func() {
134-
client = newClient("test-client", nil, nil, nil)
134+
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)
135135

136136
})
137137
AfterEach(func() {
@@ -185,7 +185,7 @@ var _ = Describe("Coordinator", func() {
185185
client *Client
186186
)
187187
BeforeEach(func() {
188-
client = newClient("test-client", nil, nil, nil)
188+
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)
189189

190190
})
191191
AfterEach(func() {

pkg/stream/environment.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
2525
if options == nil {
2626
options = NewEnvironmentOptions()
2727
}
28-
client := newClient("go-stream-locator", nil, options.TCPParameters, options.SaslConfiguration)
28+
29+
if options.RPCTimeout <= 0 {
30+
options.RPCTimeout = defaultSocketCallTimeout
31+
}
32+
33+
client := newClient("go-stream-locator", nil,
34+
options.TCPParameters, options.SaslConfiguration, options.RPCTimeout)
2935
defer func(client *Client) {
3036
err := client.Close()
3137
if err != nil {
@@ -82,7 +88,8 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
8288
}
8389
func (env *Environment) newReconnectClient() (*Client, error) {
8490
broker := env.options.ConnectionParameters[0]
85-
client := newClient("go-stream-locator", broker, env.options.TCPParameters, env.options.SaslConfiguration)
91+
client := newClient("go-stream-locator", broker, env.options.TCPParameters,
92+
env.options.SaslConfiguration, env.options.RPCTimeout)
8693

8794
err := client.connect()
8895
tentatives := 1
@@ -96,7 +103,7 @@ func (env *Environment) newReconnectClient() (*Client, error) {
96103
rand.Seed(time.Now().UnixNano())
97104
n := rand.Intn(len(env.options.ConnectionParameters))
98105
client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters,
99-
env.options.SaslConfiguration)
106+
env.options.SaslConfiguration, env.options.RPCTimeout)
100107
tentatives = tentatives + 1
101108
err = client.connect()
102109

@@ -139,7 +146,7 @@ func (env *Environment) NewProducer(streamName string, producerOptions *Producer
139146
return nil, err
140147
}
141148

142-
return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver)
149+
return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver, env.options.RPCTimeout)
143150
}
144151

145152
func (env *Environment) StreamExists(streamName string) (bool, error) {
@@ -229,7 +236,7 @@ func (env *Environment) NewConsumer(streamName string,
229236
return nil, err
230237
}
231238

232-
return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver)
239+
return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver, env.options.RPCTimeout)
233240
}
234241

235242
func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error) {
@@ -258,6 +265,7 @@ type EnvironmentOptions struct {
258265
MaxProducersPerClient int
259266
MaxConsumersPerClient int
260267
AddressResolver *AddressResolver
268+
RPCTimeout time.Duration
261269
}
262270

263271
func NewEnvironmentOptions() *EnvironmentOptions {
@@ -267,6 +275,7 @@ func NewEnvironmentOptions() *EnvironmentOptions {
267275
ConnectionParameters: []*Broker{},
268276
TCPParameters: newTCPParameterDefault(),
269277
SaslConfiguration: newSaslConfigurationDefault(),
278+
RPCTimeout: defaultSocketCallTimeout,
270279
}
271280
}
272281

@@ -429,6 +438,11 @@ func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptio
429438
return envOptions
430439
}
431440

441+
func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions {
442+
envOptions.RPCTimeout = timeout
443+
return envOptions
444+
}
445+
432446
type environmentCoordinator struct {
433447
mutex *sync.Mutex
434448
mutexContext *sync.RWMutex
@@ -510,7 +524,7 @@ func (c *Client) maybeCleanConsumers(streamName string) {
510524
}
511525

512526
func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string,
513-
options *ProducerOptions) (*Producer, error) {
527+
options *ProducerOptions, rpcTimeout time.Duration) (*Producer, error) {
514528
cc.mutex.Lock()
515529
defer cc.mutex.Unlock()
516530
cc.mutexContext.Lock()
@@ -528,7 +542,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
528542
}
529543

530544
if clientResult == nil {
531-
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration)
545+
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
532546
}
533547

534548
err := clientResult.connect()
@@ -545,7 +559,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
545559
if err != nil {
546560
return nil, err
547561
}
548-
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration)
562+
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
549563
err = clientResult.connect()
550564
if err != nil {
551565
return nil, err
@@ -562,8 +576,8 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
562576
return producer, nil
563577
}
564578

565-
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
566-
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration)
579+
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
580+
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut)
567581
chMeta := make(chan metaDataUpdateEvent, 1)
568582
clientResult.metadataListener = chMeta
569583
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
@@ -584,7 +598,7 @@ func (cc *environmentCoordinator) newClientForProducer(connectionName string, le
584598

585599
func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
586600
streamName string, messagesHandler MessagesHandler,
587-
options *ConsumerOptions) (*Consumer, error) {
601+
options *ConsumerOptions, rpcTimeout time.Duration) (*Consumer, error) {
588602
cc.mutex.Lock()
589603
defer cc.mutex.Unlock()
590604
cc.mutexContext.Lock()
@@ -598,7 +612,7 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro
598612
}
599613

600614
if clientResult == nil {
601-
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration)
615+
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
602616
chMeta := make(chan metaDataUpdateEvent)
603617
clientResult.metadataListener = chMeta
604618
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
@@ -663,7 +677,7 @@ func newProducers(maxItemsForClient int) *producersEnvironment {
663677
}
664678

665679
func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName string,
666-
options *ProducerOptions, resolver *AddressResolver) (*Producer, error) {
680+
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
667681
ps.mutex.Lock()
668682
defer ps.mutex.Unlock()
669683
leader, err := clientLocator.BrokerLeader(streamName)
@@ -683,7 +697,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
683697
leader.cloneFrom(clientLocator.broker, resolver)
684698

685699
producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters,
686-
clientLocator.saslConfiguration, streamName, options)
700+
clientLocator.saslConfiguration, streamName, options, rpcTimeOut)
687701
if err != nil {
688702
return nil, err
689703
}
@@ -728,7 +742,7 @@ func newConsumerEnvironment(maxItemsForClient int) *consumersEnvironment {
728742

729743
func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName string,
730744
messagesHandler MessagesHandler,
731-
consumerOptions *ConsumerOptions, resolver *AddressResolver) (*Consumer, error) {
745+
consumerOptions *ConsumerOptions, resolver *AddressResolver, rpcTimeout time.Duration) (*Consumer, error) {
732746
ps.mutex.Lock()
733747
defer ps.mutex.Unlock()
734748
consumerBroker, err := clientLocator.BrokerForConsumer(streamName)
@@ -753,7 +767,7 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
753767
consumer, err := ps.consumersCoordinator[coordinatorKey].
754768
newConsumer(clientProvidedName, consumerBroker, clientLocator.tcpParameters,
755769
clientLocator.saslConfiguration,
756-
streamName, messagesHandler, consumerOptions)
770+
streamName, messagesHandler, consumerOptions, rpcTimeout)
757771
if err != nil {
758772
return nil, err
759773
}

pkg/stream/environment_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ var _ = Describe("Environment test", func() {
217217
MaxProducersPerClient: 1,
218218
MaxConsumersPerClient: 1,
219219
AddressResolver: nil,
220+
RPCTimeout: defaultSocketCallTimeout,
220221
})
221222

222223
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)