Skip to content

Commit d3ecf25

Browse files
authored
Improve the producer creation step (#352)
* Fixes #351 * The producer creation will return an error in case the query sequence fails * Add Missing command descriptions --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent ad04b60 commit d3ecf25

File tree

4 files changed

+48
-30
lines changed

4 files changed

+48
-30
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ rabbitmq-ha-proxy:
9595
mv compose/ha_tls/tls-gen/basic/result/server_*_certificate.pem compose/ha_tls/tls-gen/basic/result/server_certificate.pem
9696
mv compose/ha_tls/tls-gen/basic/result/server_*key.pem compose/ha_tls/tls-gen/basic/result/server_key.pem
9797
cd compose/ha_tls; docker build -t haproxy-rabbitmq-cluster .
98-
cd compose/ha_tls; docker-compose down
99-
cd compose/ha_tls; docker-compose up
98+
cd compose/ha_tls; docker compose down
99+
cd compose/ha_tls; docker compose up
100100

101101
rabbitmq-server-tls:
102102
cd compose/tls; rm -rf tls-gen;

pkg/stream/client.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -595,13 +595,26 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
595595
}
596596

597597
func (c *Client) internalDeclarePublisher(streamName string, producer *Producer) responseError {
598+
598599
publisherReferenceSize := 0
599600
if producer.options != nil {
600601
if producer.options.Name != "" {
601602
publisherReferenceSize = len(producer.options.Name)
602603
}
603604
}
604605

606+
if publisherReferenceSize > 0 {
607+
v, err := c.queryPublisherSequence(producer.options.Name, streamName)
608+
if err != nil {
609+
// if the client can't get the sequence, the function will return an error
610+
// because is not able to set the sequence
611+
// in most of the case the error timeout is during the re-connection
612+
// in this case the producer can't be created and the client will return an error
613+
return responseError{Err: err}
614+
}
615+
producer.sequence = v
616+
}
617+
605618
length := 2 + 2 + 4 + 1 + 2 + publisherReferenceSize + 2 + len(streamName)
606619
resp := c.coordinator.NewResponse(commandDeclarePublisher, streamName)
607620
correlationId := resp.correlationid
@@ -618,11 +631,6 @@ func (c *Client) internalDeclarePublisher(streamName string, producer *Producer)
618631
writeString(b, streamName)
619632
res := c.handleWrite(b.Bytes(), resp)
620633

621-
if publisherReferenceSize > 0 {
622-
v, _ := c.queryPublisherSequence(producer.options.Name, streamName)
623-
producer.sequence = v
624-
}
625-
626634
return res
627635
}
628636

pkg/stream/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ var _ = Describe("Streaming testEnvironment", func() {
198198
cli.socketCallTimeout = time.Millisecond
199199

200200
res, err := cli.queryPublisherSequence("ref", "stream")
201-
Expect(err.Error()).To(ContainSubstring("timeout 1 ms - waiting Code, operation: Command not handled 5"))
201+
Expect(err.Error()).To(ContainSubstring("timeout 1 ms - waiting Code, operation: CommandQueryPublisherSequence"))
202202
Expect(res).To(BeZero())
203203
})
204204

@@ -208,7 +208,7 @@ var _ = Describe("Streaming testEnvironment", func() {
208208
cli.socketCallTimeout = time.Millisecond
209209

210210
res, err := cli.StreamStats("stream")
211-
Expect(err.Error()).To(ContainSubstring("timeout 1 ms - waiting Code, operation: Command not handled 28"))
211+
Expect(err.Error()).To(ContainSubstring("timeout 1 ms - waiting Code, operation: CommandStreamStatus"))
212212
Expect(res).To(BeNil())
213213
})
214214

pkg/stream/constants.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -197,27 +197,37 @@ func lookErrorCode(errorCode uint16) error {
197197

198198
func lookUpCommand(command uint16) string {
199199
var constLookup = map[uint16]string{
200-
commandPeerProperties: `commandPeerProperties`,
201-
commandSaslHandshake: `commandSaslHandshake`,
202-
commandSaslAuthenticate: `commandSaslAuthenticate`,
203-
commandTune: `commandTune`,
204-
commandOpen: `commandOpen`,
205-
commandHeartbeat: `commandHeartbeat`,
206-
CommandMetadataUpdate: `CommandMetadataUpdate`,
207-
commandMetadata: `CommandMetadata`,
208-
commandDeleteStream: `CommandDeleteStream`,
209-
commandCreateStream: `CommandCreateStream`,
210-
CommandUnsubscribe: `CommandUnsubscribe`,
211-
CommandQueryOffset: `CommandQueryOffset`,
212-
commandCredit: `CommandCredit`,
213-
commandDeliver: `CommandDeliver`,
214-
commandSubscribe: `CommandSubscribe`,
215-
CommandDeletePublisher: `CommandDeletePublisher`,
216-
commandPublishError: `CommandPublishError`,
217-
commandPublishConfirm: `CommandPublishConfirm`,
218-
commandDeclarePublisher: `CommandDeclarePublisher`,
219-
commandUnitTest: `UnitTest`,
220-
CommandClose: `CommandClose`,
200+
commandPeerProperties: `commandPeerProperties`,
201+
commandSaslHandshake: `commandSaslHandshake`,
202+
commandSaslAuthenticate: `commandSaslAuthenticate`,
203+
commandTune: `commandTune`,
204+
commandOpen: `commandOpen`,
205+
commandHeartbeat: `commandHeartbeat`,
206+
CommandMetadataUpdate: `CommandMetadataUpdate`,
207+
commandMetadata: `CommandMetadata`,
208+
commandDeleteStream: `CommandDeleteStream`,
209+
commandCreateStream: `CommandCreateStream`,
210+
CommandUnsubscribe: `CommandUnsubscribe`,
211+
CommandQueryOffset: `CommandQueryOffset`,
212+
commandCredit: `CommandCredit`,
213+
commandDeliver: `CommandDeliver`,
214+
commandSubscribe: `CommandSubscribe`,
215+
CommandDeletePublisher: `CommandDeletePublisher`,
216+
commandPublishError: `CommandPublishError`,
217+
commandPublishConfirm: `CommandPublishConfirm`,
218+
commandDeclarePublisher: `CommandDeclarePublisher`,
219+
commandPublish: `CommandPublish`,
220+
commandQueryPublisherSequence: `CommandQueryPublisherSequence`,
221+
commandQueryRoute: `CommandQueryRoute`,
222+
commandQueryPartition: `CommandQueryPartition`,
223+
commandExchangeVersion: `CommandExchangeVersion`,
224+
commandStreamStatus: `CommandStreamStatus`,
225+
commandCreateSuperStream: `CommandCreateSuperStream`,
226+
commandDeleteSuperStream: `CommandDeleteSuperStream`,
227+
commandConsumerUpdate: `CommandConsumerUpdate`,
228+
229+
commandUnitTest: `UnitTest`,
230+
CommandClose: `CommandClose`,
221231
}
222232
if constLookup[command] == "" {
223233
return fmt.Sprintf("Command not handled %d", command)

0 commit comments

Comments
 (0)