NATS Source is a user-defined source for Numaflow that facilitates reading messages from a NATS server.
- Quick Start
- Using NATS Source in Your Numaflow Pipeline
- JSON Configuration
- Environment Variables Configuration
- Debugging NATS Source
This quick start guide will help you to set up and run a NATS source in a Numaflow pipeline on your local kube cluster. Follow the steps below to get started:
In the current folder, run:
kubectl apply -k ./example
Execute the following command to verify the pipeline is up and running:
kubectl get pipeline nats-source-e2e
You should see:
NAME PHASE MESSAGE VERTICES AGE
nats-source-e2e Running 3 1m
Port-forward the NATS server to your local machine:
kubectl port-forward svc/nats 4222:4222
Next, send messages:
nats pub test-subject "Hello World" --user=testingtoken
Replace the "xxxxx" with the appropriate out vertex pod name:
kubectl logs nats-source-e2e-out-0-xxxxx
You should see the output similar to:
2023/09/05 19:18:44 (out) Payload - Hello World Keys - [] EventTime - 1693941455870
To delete the Numaflow pipeline and the NATS server, run:
kubectl delete -k ./example
Congratulations! You have successfully run a NATS source in a Numaflow pipeline on your local kube cluster.
To integrate the NATS source in your own Numaflow pipeline, follow these detailed steps:
Deploy your own NATS server to your cluster. Refer to the NATS Docs for guidance.
Define the NATS source configuration in a ConfigMap and mount it to the NATS source pod as a volume. Create a ConfigMap using the example below:
apiVersion: v1
data:
nats-config.yaml: |
url: nats
subject: test-subject
queue: my-queue
auth:
token:
localobjectreference:
name: nats-auth-fake-token
key: fake-token
kind: ConfigMap
metadata:
name: nats-config-map
The configuration contains the following fields:
url
: The NATS server URL.subject
: The NATS subject to subscribe to.queue
: The NATS queue group name.auth
: The NATS authentication information.token
: The NATS authentication token information.name
: The name of the secret that contains the authentication token.key
: The key of the authentication token in the secret.
Please notice that the fields declared above isn't the exhaustive list of all the fields that can be specified in the NATS source configuration. For more information, please refer to the NATS Source Configuration Struct.
Name your NATS Configuration ConfigMap as nats-config.yaml
and mount it to the NATS source pod as a volume under path /etc/config
.
Create all the secrets that are referenced in the NATS source configuration and mount them to the NATS source pod as volumes under path /etc/secrets/{secret-name}
.
Include the NATS Source in your pipeline using the template below:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: nats-source-e2e
spec:
vertices:
- name: in
scale:
min: 2
volumes:
- name: my-config-mount
configMap:
name: nats-config-map
- name: my-secret-mount
secret:
secretName: nats-auth-fake-token
source:
udsource:
container:
image: quay.io/numaio/numaflow-source/nats-source-go:v0.99.0
volumeMounts:
- name: my-config-mount
mountPath: /etc/config
- name: my-secret-mount
mountPath: /etc/secrets/nats-auth-fake-token
- name: out
sink:
log: {}
edges:
- from: in
to: out
Here is a template for creating the secret:
apiVersion: v1
kind: Secret
metadata:
name: nats-auth-fake-token
stringData:
fake-token: "testingtoken"
Now, execute the pipeline to start reading messages from the NATS server.
By default, Numaflow NATS Source uses YAML as configuration format.
You can also specify the NATS source configuration in JSON format. Find below a guide on how to set the configuration using JSON:
Here is how you can craft a ConfigMap in JSON format:
apiVersion: v1
data:
nats-config.json: |
{
"url":"nats",
"subject":"test-subject",
"queue":"my-queue",
"auth":{
"token":{
"name":"nats-auth-fake-token",
"key":"fake-token"
}
}
}
kind: ConfigMap
metadata:
name: nats-config-map
Adjust your pipeline template to facilitate JSON configuration as shown below:
source:
udsource:
container:
image: quay.io/numaio/numaflow-source/nats-source-go:v0.99.0
env:
- name: CONFIG_FORMAT
value: json
volumeMounts:
...
Remember to set the CONFIG_FORMAT
environment variable to json
.
You can also specify the NATS source configuration using environment variables, which saves you from creating the ConfigMap.
NATS source checks the environment variable NATS_CONFIG
for the configuration. The value of the environment variable should be a YAML or JSON string.
See an equivalent example below:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: nats-source-e2e
spec:
vertices:
- name: in
scale:
min: 2
volumes:
- name: my-secret-mount
secret:
secretName: nats-auth-fake-token
source:
udsource:
container:
image: quay.io/numaio/numaflow-source/nats-source-go:v0.99.0
env:
- name: NATS_CONFIG
value: |
url: nats
subject: test-subject
queue: my-queue
auth:
token:
localobjectreference:
name: nats-auth-fake-token
key: fake-token
volumeMounts:
- name: my-secret-mount
mountPath: /etc/secrets/nats-auth-fake-token
- name: out
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: out
To debug the NATS source, you can set the NUMAFLOW_DEBUG
environment variable to true
in the NATS source container.
source:
udsource:
container:
image: quay.io/numaio/numaflow-source/nats-source-go:v0.99.0
env:
- name: NUMAFLOW_DEBUG
value: "true"
volumeMounts:
...