diff --git a/pkg/stream/client.go b/pkg/stream/client.go index f5da7c9e..44be73ae 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -71,7 +71,8 @@ type Client struct { serverProperties map[string]string } -func newClient(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client { +func newClient(connectionName string, broker *Broker, + tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client { var clientBroker = broker if broker == nil { clientBroker = newBrokerDefault() @@ -100,7 +101,7 @@ func newClient(connectionName string, broker *Broker, tcpParameters *TCPParamete mutex: &sync.Mutex{}, destructor: &sync.Once{}, }, - socketCallTimeout: defaultSocketCallTimeout, + socketCallTimeout: rpcTimeOut, availableFeatures: newAvailableFeatures(), } c.setConnectionName(connectionName) diff --git a/pkg/stream/client_test.go b/pkg/stream/client_test.go index db3eaf11..d6b229d6 100644 --- a/pkg/stream/client_test.go +++ b/pkg/stream/client_test.go @@ -183,7 +183,7 @@ var _ = Describe("Streaming testEnvironment", func() { }) It("Client.queryOffset won't hang if timeout happens", func() { - cli := newClient("connName", nil, nil, nil) + cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout) cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{})) cli.socketCallTimeout = time.Millisecond @@ -193,7 +193,7 @@ var _ = Describe("Streaming testEnvironment", func() { }) It("Client.queryPublisherSequence won't hang if timeout happens", func() { - cli := newClient("connName", nil, nil, nil) + cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout) cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{})) cli.socketCallTimeout = time.Millisecond @@ -203,7 +203,7 @@ var _ = Describe("Streaming testEnvironment", func() { }) It("Client.StreamStats won't hang if timeout happens", func() { - cli := newClient("connName", nil, nil, nil) + cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout) cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{})) cli.socketCallTimeout = time.Millisecond diff --git a/pkg/stream/coordinator_test.go b/pkg/stream/coordinator_test.go index 5b53bc64..bfc48771 100644 --- a/pkg/stream/coordinator_test.go +++ b/pkg/stream/coordinator_test.go @@ -14,7 +14,7 @@ var _ = Describe("Coordinator", func() { client *Client ) BeforeEach(func() { - client = newClient("test-client", nil, nil, nil) + client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout) }) AfterEach(func() { @@ -131,7 +131,7 @@ var _ = Describe("Coordinator", func() { client *Client ) BeforeEach(func() { - client = newClient("test-client", nil, nil, nil) + client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout) }) AfterEach(func() { @@ -185,7 +185,7 @@ var _ = Describe("Coordinator", func() { client *Client ) BeforeEach(func() { - client = newClient("test-client", nil, nil, nil) + client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout) }) AfterEach(func() { diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 91924056..68d68387 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -25,7 +25,13 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { if options == nil { options = NewEnvironmentOptions() } - client := newClient("go-stream-locator", nil, options.TCPParameters, options.SaslConfiguration) + + if options.RPCTimeout <= 0 { + options.RPCTimeout = defaultSocketCallTimeout + } + + client := newClient("go-stream-locator", nil, + options.TCPParameters, options.SaslConfiguration, options.RPCTimeout) defer func(client *Client) { err := client.Close() if err != nil { @@ -82,7 +88,8 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { } func (env *Environment) newReconnectClient() (*Client, error) { broker := env.options.ConnectionParameters[0] - client := newClient("go-stream-locator", broker, env.options.TCPParameters, env.options.SaslConfiguration) + client := newClient("go-stream-locator", broker, env.options.TCPParameters, + env.options.SaslConfiguration, env.options.RPCTimeout) err := client.connect() tentatives := 1 @@ -96,7 +103,7 @@ func (env *Environment) newReconnectClient() (*Client, error) { rand.Seed(time.Now().UnixNano()) n := rand.Intn(len(env.options.ConnectionParameters)) client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters, - env.options.SaslConfiguration) + env.options.SaslConfiguration, env.options.RPCTimeout) tentatives = tentatives + 1 err = client.connect() @@ -139,7 +146,7 @@ func (env *Environment) NewProducer(streamName string, producerOptions *Producer return nil, err } - return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver) + return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver, env.options.RPCTimeout) } func (env *Environment) StreamExists(streamName string) (bool, error) { @@ -229,7 +236,7 @@ func (env *Environment) NewConsumer(streamName string, return nil, err } - return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver) + return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver, env.options.RPCTimeout) } func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error) { @@ -258,6 +265,7 @@ type EnvironmentOptions struct { MaxProducersPerClient int MaxConsumersPerClient int AddressResolver *AddressResolver + RPCTimeout time.Duration } func NewEnvironmentOptions() *EnvironmentOptions { @@ -267,6 +275,7 @@ func NewEnvironmentOptions() *EnvironmentOptions { ConnectionParameters: []*Broker{}, TCPParameters: newTCPParameterDefault(), SaslConfiguration: newSaslConfigurationDefault(), + RPCTimeout: defaultSocketCallTimeout, } } @@ -429,6 +438,11 @@ func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptio return envOptions } +func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions { + envOptions.RPCTimeout = timeout + return envOptions +} + type environmentCoordinator struct { mutex *sync.Mutex mutexContext *sync.RWMutex @@ -510,7 +524,7 @@ func (c *Client) maybeCleanConsumers(streamName string) { } func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, - options *ProducerOptions) (*Producer, error) { + options *ProducerOptions, rpcTimeout time.Duration) (*Producer, error) { cc.mutex.Lock() defer cc.mutex.Unlock() cc.mutexContext.Lock() @@ -528,7 +542,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP } if clientResult == nil { - clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration) + clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout) } err := clientResult.connect() @@ -545,7 +559,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP if err != nil { return nil, err } - clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration) + clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout) err = clientResult.connect() if err != nil { return nil, err @@ -562,8 +576,8 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP return producer, nil } -func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client { - clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration) +func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client { + clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut) chMeta := make(chan metaDataUpdateEvent, 1) clientResult.metadataListener = chMeta go func(ch <-chan metaDataUpdateEvent, cl *Client) { @@ -584,7 +598,7 @@ func (cc *environmentCoordinator) newClientForProducer(connectionName string, le func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, messagesHandler MessagesHandler, - options *ConsumerOptions) (*Consumer, error) { + options *ConsumerOptions, rpcTimeout time.Duration) (*Consumer, error) { cc.mutex.Lock() defer cc.mutex.Unlock() cc.mutexContext.Lock() @@ -598,7 +612,7 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro } if clientResult == nil { - clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration) + clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout) chMeta := make(chan metaDataUpdateEvent) clientResult.metadataListener = chMeta go func(ch <-chan metaDataUpdateEvent, cl *Client) { @@ -663,7 +677,7 @@ func newProducers(maxItemsForClient int) *producersEnvironment { } func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName string, - options *ProducerOptions, resolver *AddressResolver) (*Producer, error) { + options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) { ps.mutex.Lock() defer ps.mutex.Unlock() leader, err := clientLocator.BrokerLeader(streamName) @@ -683,7 +697,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st leader.cloneFrom(clientLocator.broker, resolver) producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters, - clientLocator.saslConfiguration, streamName, options) + clientLocator.saslConfiguration, streamName, options, rpcTimeOut) if err != nil { return nil, err } @@ -728,7 +742,7 @@ func newConsumerEnvironment(maxItemsForClient int) *consumersEnvironment { func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName string, messagesHandler MessagesHandler, - consumerOptions *ConsumerOptions, resolver *AddressResolver) (*Consumer, error) { + consumerOptions *ConsumerOptions, resolver *AddressResolver, rpcTimeout time.Duration) (*Consumer, error) { ps.mutex.Lock() defer ps.mutex.Unlock() consumerBroker, err := clientLocator.BrokerForConsumer(streamName) @@ -753,7 +767,7 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName consumer, err := ps.consumersCoordinator[coordinatorKey]. newConsumer(clientProvidedName, consumerBroker, clientLocator.tcpParameters, clientLocator.saslConfiguration, - streamName, messagesHandler, consumerOptions) + streamName, messagesHandler, consumerOptions, rpcTimeout) if err != nil { return nil, err } diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index e4144fc9..d6917aa1 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -217,6 +217,7 @@ var _ = Describe("Environment test", func() { MaxProducersPerClient: 1, MaxConsumersPerClient: 1, AddressResolver: nil, + RPCTimeout: defaultSocketCallTimeout, }) Expect(err).NotTo(HaveOccurred())