Skip to content

Commit d41df0a

Browse files
authored
Merge branch 'main' into fix_multi_uris
2 parents f055cf8 + e5b3ba9 commit d41df0a

File tree

5 files changed

+40
-24
lines changed

5 files changed

+40
-24
lines changed

pkg/stream/client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ type Client struct {
7171
serverProperties map[string]string
7272
}
7373

74-
func newClient(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
74+
func newClient(connectionName string, broker *Broker,
75+
tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
7576
var clientBroker = broker
7677
if broker == nil {
7778
clientBroker = newBrokerDefault()
@@ -100,7 +101,7 @@ func newClient(connectionName string, broker *Broker, tcpParameters *TCPParamete
100101
mutex: &sync.Mutex{},
101102
destructor: &sync.Once{},
102103
},
103-
socketCallTimeout: defaultSocketCallTimeout,
104+
socketCallTimeout: rpcTimeOut,
104105
availableFeatures: newAvailableFeatures(),
105106
}
106107
c.setConnectionName(connectionName)

pkg/stream/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ var _ = Describe("Streaming testEnvironment", func() {
183183
})
184184

185185
It("Client.queryOffset won't hang if timeout happens", func() {
186-
cli := newClient("connName", nil, nil, nil)
186+
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
187187
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
188188
cli.socketCallTimeout = time.Millisecond
189189

@@ -193,7 +193,7 @@ var _ = Describe("Streaming testEnvironment", func() {
193193
})
194194

195195
It("Client.queryPublisherSequence won't hang if timeout happens", func() {
196-
cli := newClient("connName", nil, nil, nil)
196+
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
197197
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
198198
cli.socketCallTimeout = time.Millisecond
199199

@@ -203,7 +203,7 @@ var _ = Describe("Streaming testEnvironment", func() {
203203
})
204204

205205
It("Client.StreamStats won't hang if timeout happens", func() {
206-
cli := newClient("connName", nil, nil, nil)
206+
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
207207
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
208208
cli.socketCallTimeout = time.Millisecond
209209

pkg/stream/coordinator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ var _ = Describe("Coordinator", func() {
1414
client *Client
1515
)
1616
BeforeEach(func() {
17-
client = newClient("test-client", nil, nil, nil)
17+
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)
1818

1919
})
2020
AfterEach(func() {
@@ -131,7 +131,7 @@ var _ = Describe("Coordinator", func() {
131131
client *Client
132132
)
133133
BeforeEach(func() {
134-
client = newClient("test-client", nil, nil, nil)
134+
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)
135135

136136
})
137137
AfterEach(func() {
@@ -185,7 +185,7 @@ var _ = Describe("Coordinator", func() {
185185
client *Client
186186
)
187187
BeforeEach(func() {
188-
client = newClient("test-client", nil, nil, nil)
188+
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)
189189

190190
})
191191
AfterEach(func() {

pkg/stream/environment.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
2525
if options == nil {
2626
options = NewEnvironmentOptions()
2727
}
28-
client := newClient("go-stream-locator", nil, options.TCPParameters, options.SaslConfiguration)
28+
29+
if options.RPCTimeout <= 0 {
30+
options.RPCTimeout = defaultSocketCallTimeout
31+
}
32+
33+
client := newClient("go-stream-locator", nil,
34+
options.TCPParameters, options.SaslConfiguration, options.RPCTimeout)
2935
defer func(client *Client) {
3036
err := client.Close()
3137
if err != nil {
@@ -96,7 +102,8 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
96102
}
97103
func (env *Environment) newReconnectClient() (*Client, error) {
98104
broker := env.options.ConnectionParameters[0]
99-
client := newClient("go-stream-locator", broker, env.options.TCPParameters, env.options.SaslConfiguration)
105+
client := newClient("go-stream-locator", broker, env.options.TCPParameters,
106+
env.options.SaslConfiguration, env.options.RPCTimeout)
100107

101108
err := client.connect()
102109
tentatives := 1
@@ -110,7 +117,7 @@ func (env *Environment) newReconnectClient() (*Client, error) {
110117
rand.Seed(time.Now().UnixNano())
111118
n := rand.Intn(len(env.options.ConnectionParameters))
112119
client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters,
113-
env.options.SaslConfiguration)
120+
env.options.SaslConfiguration, env.options.RPCTimeout)
114121
tentatives = tentatives + 1
115122
err = client.connect()
116123

@@ -153,7 +160,7 @@ func (env *Environment) NewProducer(streamName string, producerOptions *Producer
153160
return nil, err
154161
}
155162

156-
return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver)
163+
return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver, env.options.RPCTimeout)
157164
}
158165

159166
func (env *Environment) StreamExists(streamName string) (bool, error) {
@@ -243,7 +250,7 @@ func (env *Environment) NewConsumer(streamName string,
243250
return nil, err
244251
}
245252

246-
return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver)
253+
return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver, env.options.RPCTimeout)
247254
}
248255

249256
func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error) {
@@ -272,6 +279,7 @@ type EnvironmentOptions struct {
272279
MaxProducersPerClient int
273280
MaxConsumersPerClient int
274281
AddressResolver *AddressResolver
282+
RPCTimeout time.Duration
275283
}
276284

277285
func NewEnvironmentOptions() *EnvironmentOptions {
@@ -281,6 +289,7 @@ func NewEnvironmentOptions() *EnvironmentOptions {
281289
ConnectionParameters: []*Broker{},
282290
TCPParameters: newTCPParameterDefault(),
283291
SaslConfiguration: newSaslConfigurationDefault(),
292+
RPCTimeout: defaultSocketCallTimeout,
284293
}
285294
}
286295

@@ -443,6 +452,11 @@ func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptio
443452
return envOptions
444453
}
445454

455+
func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions {
456+
envOptions.RPCTimeout = timeout
457+
return envOptions
458+
}
459+
446460
type environmentCoordinator struct {
447461
mutex *sync.Mutex
448462
mutexContext *sync.RWMutex
@@ -524,7 +538,7 @@ func (c *Client) maybeCleanConsumers(streamName string) {
524538
}
525539

526540
func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string,
527-
options *ProducerOptions) (*Producer, error) {
541+
options *ProducerOptions, rpcTimeout time.Duration) (*Producer, error) {
528542
cc.mutex.Lock()
529543
defer cc.mutex.Unlock()
530544
cc.mutexContext.Lock()
@@ -542,7 +556,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
542556
}
543557

544558
if clientResult == nil {
545-
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration)
559+
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
546560
}
547561

548562
err := clientResult.connect()
@@ -559,7 +573,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
559573
if err != nil {
560574
return nil, err
561575
}
562-
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration)
576+
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
563577
err = clientResult.connect()
564578
if err != nil {
565579
return nil, err
@@ -576,8 +590,8 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
576590
return producer, nil
577591
}
578592

579-
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
580-
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration)
593+
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
594+
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut)
581595
chMeta := make(chan metaDataUpdateEvent, 1)
582596
clientResult.metadataListener = chMeta
583597
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
@@ -598,7 +612,7 @@ func (cc *environmentCoordinator) newClientForProducer(connectionName string, le
598612

599613
func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
600614
streamName string, messagesHandler MessagesHandler,
601-
options *ConsumerOptions) (*Consumer, error) {
615+
options *ConsumerOptions, rpcTimeout time.Duration) (*Consumer, error) {
602616
cc.mutex.Lock()
603617
defer cc.mutex.Unlock()
604618
cc.mutexContext.Lock()
@@ -612,7 +626,7 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro
612626
}
613627

614628
if clientResult == nil {
615-
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration)
629+
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
616630
chMeta := make(chan metaDataUpdateEvent)
617631
clientResult.metadataListener = chMeta
618632
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
@@ -677,7 +691,7 @@ func newProducers(maxItemsForClient int) *producersEnvironment {
677691
}
678692

679693
func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName string,
680-
options *ProducerOptions, resolver *AddressResolver) (*Producer, error) {
694+
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
681695
ps.mutex.Lock()
682696
defer ps.mutex.Unlock()
683697
leader, err := clientLocator.BrokerLeader(streamName)
@@ -697,7 +711,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
697711
leader.cloneFrom(clientLocator.broker, resolver)
698712

699713
producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters,
700-
clientLocator.saslConfiguration, streamName, options)
714+
clientLocator.saslConfiguration, streamName, options, rpcTimeOut)
701715
if err != nil {
702716
return nil, err
703717
}
@@ -742,7 +756,7 @@ func newConsumerEnvironment(maxItemsForClient int) *consumersEnvironment {
742756

743757
func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName string,
744758
messagesHandler MessagesHandler,
745-
consumerOptions *ConsumerOptions, resolver *AddressResolver) (*Consumer, error) {
759+
consumerOptions *ConsumerOptions, resolver *AddressResolver, rpcTimeout time.Duration) (*Consumer, error) {
746760
ps.mutex.Lock()
747761
defer ps.mutex.Unlock()
748762
consumerBroker, err := clientLocator.BrokerForConsumer(streamName)
@@ -767,7 +781,7 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
767781
consumer, err := ps.consumersCoordinator[coordinatorKey].
768782
newConsumer(clientProvidedName, consumerBroker, clientLocator.tcpParameters,
769783
clientLocator.saslConfiguration,
770-
streamName, messagesHandler, consumerOptions)
784+
streamName, messagesHandler, consumerOptions, rpcTimeout)
771785
if err != nil {
772786
return nil, err
773787
}

pkg/stream/environment_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ var _ = Describe("Environment test", func() {
217217
MaxProducersPerClient: 1,
218218
MaxConsumersPerClient: 1,
219219
AddressResolver: nil,
220+
RPCTimeout: defaultSocketCallTimeout,
220221
})
221222

222223
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)