Skip to content

Commit 991b218

Browse files
authored
Add support for jetstream domains (#256)
1 parent fb9cf64 commit 991b218

File tree

17 files changed

+151
-39
lines changed

17 files changed

+151
-39
lines changed

deploy/crds.yml

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,9 @@ spec:
291291
description: When true, the KV Store will initiate TLS before server INFO.
292292
type: boolean
293293
default: false
294+
jsDomain:
295+
description: The JetStream domain to use for the stream.
296+
type: string
294297
status:
295298
type: object
296299
properties:
@@ -318,7 +321,7 @@ spec:
318321
jsonPath: .status.conditions[?(@.type == 'Ready')].reason
319322
- name: Stream Name
320323
type: string
321-
description: The name of the Jetstream Stream.
324+
description: The name of the JetStream Stream.
322325
jsonPath: .spec.name
323326
- name: Subjects
324327
type: string
@@ -486,7 +489,7 @@ spec:
486489
jsonPath: .status.conditions[?(@.type == 'Ready')].reason
487490
- name: Stream Name
488491
type: string
489-
description: The name of the Jetstream Stream.
492+
description: The name of the JetStream Stream.
490493
jsonPath: .spec.name
491494
- name: Subjects
492495
type: string
@@ -678,6 +681,9 @@ spec:
678681
description: When true, the KV Store will initiate TLS before server INFO.
679682
type: boolean
680683
default: false
684+
jsDomain:
685+
description: The JetStream domain to use for the consumer.
686+
type: string
681687
status:
682688
type: object
683689
properties:
@@ -705,11 +711,11 @@ spec:
705711
jsonPath: .status.conditions[?(@.type == 'Ready')].reason
706712
- name: Stream
707713
type: string
708-
description: The name of the Jetstream Stream.
714+
description: The name of the JetStream Stream.
709715
jsonPath: .spec.streamName
710716
- name: Consumer
711717
type: string
712-
description: The name of the Jetstream Consumer.
718+
description: The name of the JetStream Consumer.
713719
jsonPath: .spec.durableName
714720
- name: Ack Policy
715721
type: string
@@ -829,11 +835,11 @@ spec:
829835
jsonPath: .status.conditions[?(@.type == 'Ready')].reason
830836
- name: Stream
831837
type: string
832-
description: The name of the Jetstream Stream.
838+
description: The name of the JetStream Stream.
833839
jsonPath: .spec.streamName
834840
- name: Consumer
835841
type: string
836-
description: The name of the Jetstream Consumer.
842+
description: The name of the JetStream Consumer.
837843
jsonPath: .spec.durableName
838844
- name: Ack Policy
839845
type: string
@@ -966,7 +972,7 @@ spec:
966972
jsonPath: .status.conditions[?(@.type == 'Ready')].reason
967973
- name: Stream Template Name
968974
type: string
969-
description: The name of the Jetstream Stream Template.
975+
description: The name of the JetStream Stream Template.
970976
jsonPath: .spec.name
971977
- name: Subjects
972978
type: string
@@ -1281,6 +1287,9 @@ spec:
12811287
description: When true, the KV Store will initiate TLS before server INFO.
12821288
type: boolean
12831289
default: false
1290+
jsDomain:
1291+
description: The JetStream domain to use for the KV store.
1292+
type: string
12841293
status:
12851294
type: object
12861295
properties:
@@ -1419,9 +1428,12 @@ spec:
14191428
items:
14201429
type: string
14211430
tlsFirst:
1422-
description: When true, the KV Store will initiate TLS before server INFO.
1431+
description: When true, the Object Store will initiate TLS before server INFO.
14231432
type: boolean
14241433
default: false
1434+
jsDomain:
1435+
description: The JetStream domain to use for the Object Store.
1436+
type: string
14251437
status:
14261438
type: object
14271439
properties:

internal/controller/client.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type NatsConfig struct {
2424
Token string `json:"token,omitempty"`
2525
User string `json:"username,omitempty"`
2626
Password string `json:"password,omitempty"`
27+
JsDomain string `json:"js_domain,omitempty"`
2728
}
2829

2930
func (o *NatsConfig) Copy() *NatsConfig {
@@ -97,6 +98,10 @@ func (o *NatsConfig) Overlay(overlay *NatsConfig) {
9798
o.ServerURL = overlay.ServerURL
9899
}
99100

101+
if overlay.JsDomain != "" {
102+
o.JsDomain = overlay.JsDomain
103+
}
104+
100105
if overlay.Certificate != "" && overlay.Key != "" {
101106
o.Certificate = overlay.Certificate
102107
o.Key = overlay.Key
@@ -191,7 +196,7 @@ type Closable interface {
191196
Close()
192197
}
193198

194-
func CreateJSMClient(conn *pooledConnection, pedantic bool) (*jsm.Manager, error) {
199+
func CreateJSMClient(conn *pooledConnection, pedantic bool, domain string) (*jsm.Manager, error) {
195200
if !conn.nc.IsConnected() {
196201
return nil, errors.New("not connected")
197202
}
@@ -210,6 +215,9 @@ func CreateJSMClient(conn *pooledConnection, pedantic bool) (*jsm.Manager, error
210215
if pedantic {
211216
jsmOpts = append(jsmOpts, jsm.WithPedanticRequests())
212217
}
218+
if domain != "" {
219+
jsmOpts = append(jsmOpts, jsm.WithDomain(domain))
220+
}
213221

214222
jsmClient, err := jsm.New(conn.nc, jsmOpts...)
215223
if err != nil {
@@ -222,8 +230,18 @@ func CreateJSMClient(conn *pooledConnection, pedantic bool) (*jsm.Manager, error
222230
// CreateJetStreamClient creates new Jetstream client with a connection based on the given NatsConfig.
223231
// Returns a jetstream.Jetstream client and the Closable of the underlying connection.
224232
// Close should be called when the client is no longer used.
225-
func CreateJetStreamClient(conn *pooledConnection, pedantic bool) (jetstream.JetStream, error) {
226-
js, err := jetstream.New(conn.nc)
233+
func CreateJetStreamClient(conn *pooledConnection, pedantic bool, domain string) (jetstream.JetStream, error) {
234+
var (
235+
err error
236+
js jetstream.JetStream
237+
)
238+
239+
if domain != "" {
240+
js, err = jetstream.NewWithDomain(conn.nc, domain)
241+
} else {
242+
js, err = jetstream.New(conn.nc)
243+
}
244+
227245
if err != nil {
228246
return nil, fmt.Errorf("new jetstream: %w", err)
229247
}

internal/controller/consumer_controller_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,9 +653,10 @@ var _ = Describe("Consumer Controller", func() {
653653
connPool := newConnPool(0)
654654
conn, err := connPool.Get(&NatsConfig{ServerURL: altServer.ClientURL()}, true)
655655
Expect(err).NotTo(HaveOccurred())
656+
domain := ""
656657

657658
// Setup altClient for alternate server
658-
altClient, err := CreateJetStreamClient(conn, true)
659+
altClient, err := CreateJetStreamClient(conn, true, domain)
659660
defer conn.Close()
660661
Expect(err).NotTo(HaveOccurred())
661662

internal/controller/jetstream_controller.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (c *jsController) WithJSMClient(opts api.ConnectionOpts, ns string, op func
106106
return err
107107
}
108108

109-
jsmClient, err := CreateJSMClient(conn, true)
109+
jsmClient, err := CreateJSMClient(conn, true, cfg.JsDomain)
110110
if err != nil {
111111
return fmt.Errorf("create jsm client: %w", err)
112112
}
@@ -126,7 +126,7 @@ func (c *jsController) WithJetStreamClient(opts api.ConnectionOpts, ns string, o
126126
return err
127127
}
128128

129-
jsClient, err := CreateJetStreamClient(conn, true)
129+
jsClient, err := CreateJetStreamClient(conn, true, cfg.JsDomain)
130130
if err != nil {
131131
return fmt.Errorf("create jetstream client: %w", err)
132132
}
@@ -381,6 +381,10 @@ func natsConfigFromOpts(opts api.ConnectionOpts) *NatsConfig {
381381
natsConfig.Key = opts.TLS.ClientKey
382382
}
383383

384+
if opts.JsDomain != "" {
385+
natsConfig.JsDomain = opts.JsDomain
386+
}
387+
384388
return natsConfig
385389
}
386390

internal/controller/keyvalue_controller_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,9 +665,10 @@ var _ = Describe("KeyValue Controller", func() {
665665
connPool := newConnPool(0)
666666
conn, err := connPool.Get(&NatsConfig{ServerURL: altServer.ClientURL()}, true)
667667
Expect(err).NotTo(HaveOccurred())
668+
domain := ""
668669

669670
By("checking if the keyvalue was created on the alternative server")
670-
altClient, err := CreateJetStreamClient(conn, true)
671+
altClient, err := CreateJetStreamClient(conn, true, domain)
671672
defer conn.Close()
672673
Expect(err).NotTo(HaveOccurred())
673674

internal/controller/objectstore_controller_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,9 +655,10 @@ var _ = Describe("ObjectStore Controller", func() {
655655
connPool := newConnPool(0)
656656
conn, err := connPool.Get(&NatsConfig{ServerURL: altServer.ClientURL()}, true)
657657
Expect(err).NotTo(HaveOccurred())
658+
domain := ""
658659

659660
By("checking if the objectstore was created on the alternative server")
660-
altClient, err := CreateJetStreamClient(conn, true)
661+
altClient, err := CreateJetStreamClient(conn, true, domain)
661662
defer conn.Close()
662663
Expect(err).NotTo(HaveOccurred())
663664

internal/controller/stream_controller_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,9 +672,10 @@ var _ = Describe("Stream Controller", func() {
672672
connPool := newConnPool(0)
673673
conn, err := connPool.Get(&NatsConfig{ServerURL: altServer.ClientURL()}, true)
674674
Expect(err).NotTo(HaveOccurred())
675+
domain := ""
675676

676677
By("checking if the stream was created on the alternative server")
677-
altClient, err := CreateJetStreamClient(conn, true)
678+
altClient, err := CreateJetStreamClient(conn, true, domain)
678679
defer conn.Close()
679680
Expect(err).NotTo(HaveOccurred())
680681

internal/controller/suite_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,9 @@ var _ = BeforeSuite(func() {
102102
connPool := newConnPool(0)
103103
conn, err := connPool.Get(testNatsConfig, true)
104104
Expect(err).NotTo(HaveOccurred())
105+
domain := ""
105106

106-
jsClient, err = CreateJetStreamClient(conn, true)
107+
jsClient, err = CreateJetStreamClient(conn, true, domain)
107108
Expect(err).NotTo(HaveOccurred())
108109
})
109110

pkg/jetstream/apis/jetstream/v1beta2/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type ConnectionOpts struct {
3535
Servers []string `json:"servers"`
3636
TLS TLS `json:"tls"`
3737
TLSFirst bool `json:"tlsFirst"`
38+
JsDomain string `json:"jsDomain"`
3839
}
3940

4041
type ConsumerLimits struct {

pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/connectionopts.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/jetstream/generated/clientset/versioned/fake/clientset_generated.go

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/jetstream/generated/clientset/versioned/typed/jetstream/v1beta2/jetstream_client.go

Lines changed: 3 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/jetstream/generated/informers/externalversions/jetstream/v1beta2/account.go

Lines changed: 14 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)