Skip to content

Expose the RPC TImeout call #313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type Client struct {
serverProperties map[string]string
}

func newClient(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
func newClient(connectionName string, broker *Broker,
tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
var clientBroker = broker
if broker == nil {
clientBroker = newBrokerDefault()
Expand Down Expand Up @@ -100,7 +101,7 @@ func newClient(connectionName string, broker *Broker, tcpParameters *TCPParamete
mutex: &sync.Mutex{},
destructor: &sync.Once{},
},
socketCallTimeout: defaultSocketCallTimeout,
socketCallTimeout: rpcTimeOut,
availableFeatures: newAvailableFeatures(),
}
c.setConnectionName(connectionName)
Expand Down
6 changes: 3 additions & 3 deletions pkg/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ var _ = Describe("Streaming testEnvironment", func() {
})

It("Client.queryOffset won't hang if timeout happens", func() {
cli := newClient("connName", nil, nil, nil)
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
cli.socketCallTimeout = time.Millisecond

Expand All @@ -193,7 +193,7 @@ var _ = Describe("Streaming testEnvironment", func() {
})

It("Client.queryPublisherSequence won't hang if timeout happens", func() {
cli := newClient("connName", nil, nil, nil)
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
cli.socketCallTimeout = time.Millisecond

Expand All @@ -203,7 +203,7 @@ var _ = Describe("Streaming testEnvironment", func() {
})

It("Client.StreamStats won't hang if timeout happens", func() {
cli := newClient("connName", nil, nil, nil)
cli := newClient("connName", nil, nil, nil, defaultSocketCallTimeout)
cli.socket.writer = bufio.NewWriter(bytes.NewBuffer([]byte{}))
cli.socketCallTimeout = time.Millisecond

Expand Down
6 changes: 3 additions & 3 deletions pkg/stream/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Coordinator", func() {
client *Client
)
BeforeEach(func() {
client = newClient("test-client", nil, nil, nil)
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)

})
AfterEach(func() {
Expand Down Expand Up @@ -131,7 +131,7 @@ var _ = Describe("Coordinator", func() {
client *Client
)
BeforeEach(func() {
client = newClient("test-client", nil, nil, nil)
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)

})
AfterEach(func() {
Expand Down Expand Up @@ -185,7 +185,7 @@ var _ = Describe("Coordinator", func() {
client *Client
)
BeforeEach(func() {
client = newClient("test-client", nil, nil, nil)
client = newClient("test-client", nil, nil, nil, defaultSocketCallTimeout)

})
AfterEach(func() {
Expand Down
46 changes: 30 additions & 16 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
if options == nil {
options = NewEnvironmentOptions()
}
client := newClient("go-stream-locator", nil, options.TCPParameters, options.SaslConfiguration)

if options.RPCTimeout <= 0 {
options.RPCTimeout = defaultSocketCallTimeout
}

client := newClient("go-stream-locator", nil,
options.TCPParameters, options.SaslConfiguration, options.RPCTimeout)
defer func(client *Client) {
err := client.Close()
if err != nil {
Expand Down Expand Up @@ -82,7 +88,8 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
}
func (env *Environment) newReconnectClient() (*Client, error) {
broker := env.options.ConnectionParameters[0]
client := newClient("go-stream-locator", broker, env.options.TCPParameters, env.options.SaslConfiguration)
client := newClient("go-stream-locator", broker, env.options.TCPParameters,
env.options.SaslConfiguration, env.options.RPCTimeout)

err := client.connect()
tentatives := 1
Expand All @@ -96,7 +103,7 @@ func (env *Environment) newReconnectClient() (*Client, error) {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(len(env.options.ConnectionParameters))
client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters,
env.options.SaslConfiguration)
env.options.SaslConfiguration, env.options.RPCTimeout)
tentatives = tentatives + 1
err = client.connect()

Expand Down Expand Up @@ -139,7 +146,7 @@ func (env *Environment) NewProducer(streamName string, producerOptions *Producer
return nil, err
}

return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver)
return env.producers.newProducer(client, streamName, producerOptions, env.options.AddressResolver, env.options.RPCTimeout)
}

func (env *Environment) StreamExists(streamName string) (bool, error) {
Expand Down Expand Up @@ -229,7 +236,7 @@ func (env *Environment) NewConsumer(streamName string,
return nil, err
}

return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver)
return env.consumers.NewSubscriber(client, streamName, messagesHandler, options, env.options.AddressResolver, env.options.RPCTimeout)
}

func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error) {
Expand Down Expand Up @@ -258,6 +265,7 @@ type EnvironmentOptions struct {
MaxProducersPerClient int
MaxConsumersPerClient int
AddressResolver *AddressResolver
RPCTimeout time.Duration
}

func NewEnvironmentOptions() *EnvironmentOptions {
Expand All @@ -267,6 +275,7 @@ func NewEnvironmentOptions() *EnvironmentOptions {
ConnectionParameters: []*Broker{},
TCPParameters: newTCPParameterDefault(),
SaslConfiguration: newSaslConfigurationDefault(),
RPCTimeout: defaultSocketCallTimeout,
}
}

Expand Down Expand Up @@ -429,6 +438,11 @@ func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptio
return envOptions
}

func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *EnvironmentOptions {
envOptions.RPCTimeout = timeout
return envOptions
}

type environmentCoordinator struct {
mutex *sync.Mutex
mutexContext *sync.RWMutex
Expand Down Expand Up @@ -510,7 +524,7 @@ func (c *Client) maybeCleanConsumers(streamName string) {
}

func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string,
options *ProducerOptions) (*Producer, error) {
options *ProducerOptions, rpcTimeout time.Duration) (*Producer, error) {
cc.mutex.Lock()
defer cc.mutex.Unlock()
cc.mutexContext.Lock()
Expand All @@ -528,7 +542,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
}

if clientResult == nil {
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration)
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
}

err := clientResult.connect()
Expand All @@ -545,7 +559,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
if err != nil {
return nil, err
}
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration)
clientResult = cc.newClientForProducer(options.ClientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
err = clientResult.connect()
if err != nil {
return nil, err
Expand All @@ -562,8 +576,8 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
return producer, nil
}

func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration)
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut)
chMeta := make(chan metaDataUpdateEvent, 1)
clientResult.metadataListener = chMeta
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
Expand All @@ -584,7 +598,7 @@ func (cc *environmentCoordinator) newClientForProducer(connectionName string, le

func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
streamName string, messagesHandler MessagesHandler,
options *ConsumerOptions) (*Consumer, error) {
options *ConsumerOptions, rpcTimeout time.Duration) (*Consumer, error) {
cc.mutex.Lock()
defer cc.mutex.Unlock()
cc.mutexContext.Lock()
Expand All @@ -598,7 +612,7 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro
}

if clientResult == nil {
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration)
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
chMeta := make(chan metaDataUpdateEvent)
clientResult.metadataListener = chMeta
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
Expand Down Expand Up @@ -663,7 +677,7 @@ func newProducers(maxItemsForClient int) *producersEnvironment {
}

func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName string,
options *ProducerOptions, resolver *AddressResolver) (*Producer, error) {
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
leader, err := clientLocator.BrokerLeader(streamName)
Expand All @@ -683,7 +697,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
leader.cloneFrom(clientLocator.broker, resolver)

producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters,
clientLocator.saslConfiguration, streamName, options)
clientLocator.saslConfiguration, streamName, options, rpcTimeOut)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -728,7 +742,7 @@ func newConsumerEnvironment(maxItemsForClient int) *consumersEnvironment {

func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName string,
messagesHandler MessagesHandler,
consumerOptions *ConsumerOptions, resolver *AddressResolver) (*Consumer, error) {
consumerOptions *ConsumerOptions, resolver *AddressResolver, rpcTimeout time.Duration) (*Consumer, error) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
consumerBroker, err := clientLocator.BrokerForConsumer(streamName)
Expand All @@ -753,7 +767,7 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
consumer, err := ps.consumersCoordinator[coordinatorKey].
newConsumer(clientProvidedName, consumerBroker, clientLocator.tcpParameters,
clientLocator.saslConfiguration,
streamName, messagesHandler, consumerOptions)
streamName, messagesHandler, consumerOptions, rpcTimeout)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ var _ = Describe("Environment test", func() {
MaxProducersPerClient: 1,
MaxConsumersPerClient: 1,
AddressResolver: nil,
RPCTimeout: defaultSocketCallTimeout,
})

Expect(err).NotTo(HaveOccurred())
Expand Down