Skip to content

synadia-io/flink-connector-nats

Repository files navigation

Synadia

Synadia NATS to Flink Connector

Current Release: 2.2.0   Current Snapshot: 2.2.1-SNAPSHOT

License Apache 2 Maven Central javadoc Build Main Badge Release Badge

Table of Contents

Java Version

The connector requires Java version 11 or later to be compatible with Flink libraries. The JNats library is built with Java 8 and is compatible with being run by a later version of Java.

Releases and Versioning

This project will adhere to semver except where a class is commented as "INTERNAL" and annotated with the @Internal annotation (org.apache.flink.annotation.Internal)

Version Note Regarding 2.1.4 and 2.2.0

There was technically a violation of semver. An api that was renamed in 2.1.4 should only have been deprecated. 2.2.0 fixes this by changing the name back.

In JetStreamSubjectConfiguration.Builder, ackMode() was incorrectly renamed to ackBehavior()

This has been "fixed" as in

Bob Slydell: "We, uh, we fixed the glitch. So he won't be receiving a paycheck anymore, so it'll just work itself out naturally."

Very sorry, we appreciate your understanding, at least we didn't steal your stapler. Red Swingline

Builders

There are fluent builders throughout the code.

1. The only way to supply connection information is through a Properties object instance or a Properties file. More details can be found in the Connection Properties section of this document.

  • If neither a Properties object nor a file are supplied, connections will be attempted insecurely to the default url, nats://localhost:4222.
  • The properties are used to create a JNats client Options object.

2. When configuring JetStream, if your stream has a domain or prefix, you can supply options used to create a JetStreamOptions object. More details can be found in the JetStream Options section of this document.

3. You can configure with code or from JSON or YAML files, or a combination.

Since a configuration property can be set from a property and can be set directly with a fluent builder method, the last one called will be used. For example, there is a property sink_converter_class_name where you can specify the sink converter class. If first you call jsonConfigFile(...) with a file that has that key, and then you call sinkConverterClass(...) the value set with sinkConverterClass method will be used.

4. Some builders have both "set" and "add" methods.

  • If you set(...) you replace whatever was in place before with the new configuration.
  • If you add(...) you append the new configuration to whatever was already set or added.

Connection Properties

There are two ways to configure connection properties.

1. Passing a file location via .connectionPropertiesFile(String) This is probably the most likely way that you will configure the connection.

Configuration files must be available on all Flink nodes that might run execute the job, otherwise, an IOException will be thrown when trying to open the file, and the node will fail to run.

NatsSink<String> sink = new NatsSinkBuilder<String>()
    ...
    .connectionPropertiesFile("/path/to/connection.properties")
    ...        
    .build();

2. Passing a Properties object prepared in code .connectionProperties(Properties)

When the properties are prepared in code and given as an object, it is serialized during construction so it can be sent over the wire to nodes that will be running instances of sink or source. This will reduce the requirement for having a properties file on each instance but at the tradeoff of having this information passed over the wire in normal java object serialization form, which is not necessarily secure.

Properties connectionProperties = ...
NatsSink<String> sink = new NatsSinkBuilder<String>
    ...
    .connectionProperties(connectionProperties)
    ...        
    .build();

For full connection properties see the NATS - Java Client, README Options Properties

Example Connection Properties

This is a simple example Connection Properties File you can review.

This is an example of using properties to set up tls:

io.nats.client.url=tls://myhost:4222
io.nats.client.keyStorePassword=kspassword
io.nats.client.keyStore=/path/to/keystore.jks
io.nats.client.trustStorePassword=tspassword
io.nats.client.trustStore=/path/to/truststore.jks

When a property references a file location, those files must be present on all nodes.

JetStream Options

The following properties can be set in the connection Properties object or file and are required if your streams live in domains. See the server documentation for more info.

Property JetStreamOptions Builder Method
jso_prefix prefix(String prefix)
jso_domain domain(String domain)

Source and Sink Concepts

The output of a source becomes the input of a sink. This project's sources get messages from NATS and emits them in a form that can be received by a sink. They are generic, meaning you can provide a converter to emit the type expected by your target source.

The sink receives input from a source. This project's sink uses converters to process data it receives from the source and extract message data and headers, which will then be published to NATS by the sink. This project's sinks are generic, meaning you can provide a converter to handle emitted type.

There will probably be a time when you read data from a source other than NATS or write data to a source other than NATS. It will be up to you to understand what a foreign source outputs or a foreign sink expects as input.

Maybe a non-NATS source emits an encrypted array of bytes. If you are just storing that data as is as the payload of a NATS message, you can use the built-in ByteArraySourceConverter. Otherwise, using that class as your starting point, you can customize what it does with that encrypted input.

Maybe you want to sink a subject or stream of data to some non-NATS sink. Maybe that foreign sink is just a passthrough for message data in either byte or String form, then you can use the provided sink converters, for instance a ByteArraySinkConverter But maybe you have headers in your messages, and you combine those headers, the subject and the into some JSON. Then you will need to provide a custom sink converter.

A converter will receive the entire NATS Message so you can extract all the information you need. If you are using the JetStreamSource, the message will contain metadata, including the stream sequence.

Converters

All converter implementations must implement either SourceConverter or SinkConverter interfaces.

A SourceConverter takes a message as input and outputs the type that sinks are expecting.

public interface SourceConverter<OutputT> extends Serializable, ResultTypeQueryable<OutputT> {
  /**
   * Read a message and to create an instance of the output type.
   * @return the output object
   */
  OutputT convert(Message message);
}

A SinkConverter takes the input from the source and returns a SinkMessage that can have a byte[] for the message payload and headers in a Headers object.

public interface SinkConverter<InputT> extends Serializable {
  /**
   * Create a SinkMessage based on the input object given to the sink.
   * If you return null, no messages will be published for this input.
   * @param input the input object
   * @return The SinkMessage object or null.
   */
  SinkMessage convert(InputT input);
}

These converters are supplied with the project;

Class Use
AsciiStringSourceConverter Message payload is an ASCII string
Utf8StringSourceConverter Message payload is a UTF-8 string
ByteArraySourceConverter Message payload is binary.
AsciiStringSinkConverter Sink input is an ASCII string.
Utf8StringSinkConverter Sink input is a UTF-8 string.
ByteArraySinkConverter Sink input is a byte array.

It will be likely that you want to supply a custom converter. A good place to start is by looking at the provided converters.

Something important to remember is that your converters must be Serializable. If you don't have any state or configuration to save, this is a non-issue. If you do have some state to save, look at the String converters; their state is the name of the character set.

Sinks

Both the NatsSink and JetStreamSink expect input from a source and use a converter to extract data and optionally headers from that input and produce a SinkMessage. The SinkMessage is then published.

The difference between NatsSink and JetStreamSink is that NatsSink publishes a message to a subject that it assumes is not part of a JetStream stream. It "fires and forgets". On the other hand, the JetStreamSink publishes the message to what it assumes to be a JetStream subject and waits for a Publish Ack. If the publishing fails, the sink throws an exception.

Subjects may not have wildcards. The builder will not validate this, it's up to you to ensure you configure this properly, otherwise publishing will fail during runtime with an exception.

JetStream subjects must exist in your target NATS system otherwise publishing will fail during runtime with an exception.

To construct a NatsSink, you must use the NatsSinkBuilder. To construct a JetStreamSink, you must use the JetStreamSinkBuilder. The sinks can be configured in code or from files on JSON or YAML format. They support these property keys:

  • sink_converter_class_name
  • subjects

Sources

There are two types of Flink source implementations available.

  1. NatsSource, which subscribes to NATS core subjects.
  2. JetStreamSource, which consumes JetStream stream subjects.

Subjects with wildcards are allowed and are treated as one subject for splits. This is normal NATS behavior. If a subject is invalid or the supplied stream does not have a matching subject, consuming will fail during runtime with an exception.

NatsSource

A NatsSource subscribes to one or more core NATS subjects and uses a Source Converter implementation to convert the messages to the output type emitted to sinks.

  • Each subject is subscribed in its own split, which Flink can run in different threads or in different nodes depending on your Flink configuration.

  • A NatsSource is currently only unbounded, meaning it runs forever. It's on the TODO list to make this the source able to be bounded, meaning limited to run for a number of messages or period of time.

To construct a NatsSource, you must use the NatsSourceBuilder. The source can be configured in code or from files on JSON or YAML format. It supports these property keys:

  • source_converter_class_name
  • subjects

JetStreamSource

A JetStreamSource consumes to one or more JetStream subjects and uses a Source Converter implementation to convert the messages to the output type emitted to sinks.

  • Each subject is consumed in its own split, which Flink can run in different threads or in different nodes depending on your Flink configuration.

To construct a JetStreamSource, you must use the JetStreamSourceBuilder.

A JetStreamSource is composed of a SourceConverter and one or more JetStreamSubjectConfiguration instances.

All instances of JetStreamSubjectConfiguration must be of the same boundedness, meaning they must all be configured with a maximum number of messages to read or none of them are configured with a maximum.

JetStreamSubjectConfiguration

A JetStreamSubjectConfiguration is created by using the JetStreamSubjectConfiguration Builder

JetStreamSubjectConfiguration subjectConfiguration = 
    JetStreamSubjectConfiguration.builder()
        ...
        .build();
  • A configuration requires a stream name and a subject.
  • You can optionally supply a start sequence or start time, which will have the effect of starting the consumption at that point in the stream. A start time can be in any format that java.time.ZonedDateTime can parse. See ZonedDateTime.parse
  • You can specify a maximum number of messages to read. This has the effect of making the source BOUNDED.
  • You can specify "AckBehavior"
    • NoAck - Ordered consumer used, no acks, messages are not acknowledged
    • AckAll - Consumer uses AckPolicy.All. Messages are tracked as they are sourced and the last one is acked at checkpoint
    • AllButDoNotAck - Consumer uses AckPolicy.All but the source does not ack at the checkpoint, leaving acking up to the user. If messages are not acked in time, they will be redelivered to the source.
    • ExplicitButDoNotAck - Consumer uses AckPolicy.Explicit but the source does not ack at the checkpoint, leaving acking up to the user. If messages are not acked in time, they will be redelivered to the source.
  • You must specify AckBehavior other than NoAck if your stream is a work queue
  • AckBehavior other than NoAck are generally slower to process messages.
  • You can specify the consumer batch size and threshold percent if you feel you need to tune the behavior of the consumer.

The source can be configured in code or from files on JSON or YAML format. It supports these property keys:

{
  "source_converter_class_name": "io.synadia.flink.message.Utf8StringSourceConverter",
  "jetstream_subject_configurations": [
    {
      "stream_name": "streamName",
      "subject": "subject1",
      "start_sequence":  999,
      "start_time":  "2025-04-08T00:38:32.109526400Z",
      "max_messages_to_read": 10000,
      "ack_behavior":  "NoAck",
      "batch_size": 100,
      "threshold_percent": 25
    },
    {
      "stream_name": "streamName",
      "subject": "subject2"
    },
    {
      "stream_name": "anotherStream",
      "subject": "foo.>"
    },
    {
      "stream_name": "anotherStream",
      "subject": "bar.*"
    }
  ]
}
---
source_converter_class_name: io.synadia.flink.message.Utf8StringSourceConverter
jetstream_subject_configurations:
- stream_name: streamName
  subject: subject1
  start_sequence: 999
  start_time: '2025-04-08T00:38:32.109526400Z'
  max_messages_to_read: 10000
  ack_behavior: NoAck
  batch_size: 100
  threshold_percent: 25
- stream_name: streamName
  subject: subject2
- stream_name: anotherStream
  subject: foo.>
- stream_name: anotherStream
  subject: bar.*

Here are the JSON and YAML configuration files used in the examples

Getting the Library

The NATS client is available in the Maven central repository, and can be imported as a standard dependency in your build.gradle file:

Gradle

dependencies {
    implementation 'io.synadia:flink-connector-nats:{major.minor.patch}'
}

If you need the latest and greatest before Maven central updates, you can use:

repositories {
    mavenCentral()
    maven {
      url "https://repo1.maven.org/maven2/"
    }
}

If you need a snapshot version, you must add the url for the snapshots and change your dependency.

repositories {
    mavenCentral()
    maven {
      url "https://central.sonatype.com/repository/maven-snapshots/"
    }
}

dependencies {
   implementation 'io.synadia:flink-connector-nats:{major.minor.patch}-SNAPSHOT'
}

Maven

<dependency>
    <groupId>io.synadia</groupId>
    <artifactId>flink-connector-nats</artifactId>
    <version>{major.minor.patch}</version>
</dependency>

If you need the absolute latest, before it propagates to maven central, you can use the repository:

<repositories>
    <repository>
        <id>sonatype releases</id>
        <url>https://repo1.maven.org/maven2</url>
        <releases>
           <enabled>true</enabled>
        </releases>
    </repository>
</repositories>

If you need a snapshot version, you must enable snapshots and change your dependency.

<repositories>
    <repository>
        <id>sonatype snapshots</id>
        <url>https://central.sonatype.com/repository/maven-snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

<dependency>
    <groupId>io.synadia</groupId>
    <artifactId>flink-connector-nats</artifactId>
    <version>{major.minor.patch}-SNAPSHOT</version>
</dependency>

License

Copyright (c) 2023-2025 Synadia Communications Inc. All Rights Reserved. See LICENSE and NOTICE file for details.

About

Nats Java Flink Connector

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Contributors 6

Languages