diff --git a/pkg/stream/available_features.go b/pkg/stream/available_features.go index 2f9b787e..22f1a409 100644 --- a/pkg/stream/available_features.go +++ b/pkg/stream/available_features.go @@ -45,7 +45,7 @@ func (a *availableFeatures) BrokerFilterEnabled() bool { func (a *availableFeatures) IsBrokerSingleActiveConsumerEnabled() bool { lock.Lock() defer lock.Unlock() - return a.brokerSingleActiveConsumerEnabled == a.is311OrMore + return a.brokerSingleActiveConsumerEnabled } func (a *availableFeatures) SetVersion(version string) error { diff --git a/pkg/stream/client.go b/pkg/stream/client.go index a5adc857..7a713b1b 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -212,9 +212,13 @@ func (c *Client) connect() error { return err2 } - logs.LogDebug("Server properties: %s", c.serverProperties) - if serverProperties["version"] == "" { - logs.LogInfo( + err = c.availableFeatures.SetVersion(serverProperties["version"]) + if err != nil { + logs.LogWarn("Error checking server version: %s", err) + } + + if serverProperties["version"] == "" || !c.availableFeatures.Is311OrMore() { + logs.LogDebug( "Server version is less than 3.11.0, skipping command version exchange") } else { err := c.exchangeVersion(c.serverProperties["version"])