These connectors allow data transfer between Apache Kafka and LittleHorse.
- LittleHorse Connectors for Kafka Connect
This connector allows you to execute WfRuns into LittleHorse. It supports all the Variable Types provided by LittleHorse.
More about running workflows at LittleHorse Quickstart.
Message Part | Description | Type | Valid Values |
---|---|---|---|
key |
Ignored | any | any |
value |
Define the variables field of the workflow |
map | key-value not null |
More about run workflow fields at RunWfRequest.
You can manipulate the message structure with Single Message Transformations (SMTs).
Next workflow executes a task that receives a String
as parameter called name
:
Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
wf.execute("greet", name);
});
There is a topic names
, with this data in the topic:
key: null, value: {"name":"Anakin Skywalker"}
key: null, value: {"name":"Luke Skywalker"}
key: null, value: {"name":"Leia Organa"}
key: null, value: {"name":"Padme Amidala"}
Next connector configuration will execute WfRuns
with the variable name
.
{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.WfRunSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"wf.spec.name": "greetings"
}
More configurations at WfRun Sink Connector Configurations.
This connector allows you to execute External Events into LittleHorse.
More about running external events at LittleHorse External Events.
Message Part | Description | Type | Valid Values |
---|---|---|---|
key |
Define the associated wf_run_id |
string | non-empty string |
value |
Define the content of the external event |
any | any not null |
More about external event fields at PutExternalEventRequest.
You can manipulate the message structure with Single Message Transformations (SMTs).
Next workflow waits for the event set-name
to assign the variable name
and then execute the task greet
.
Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
WfRunVariable name = wf.declareStr("name");
name.assign(wf.waitForEvent("set-name"));
wf.execute("greet", name);
});
There is a topic names
with this data:
key: 64512de2a4b5470a9a8a2846b9a8a444, value: Anakin Skywalker
key: 79af0ae572bb4c19842c19dd7cad6598, value: Luke Skywalker
key: 30e1afe9a30748339594cadc3d537ecd, value: Leia Organa
key: e01547de3d294efdb6417abf35f3c960, value: Padme Amidala
Next configuration will execute external events where the message key will be the WfRunId
and
the message value will be the Content
(more at PutExternalEventRequest):
{
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"external.event.name": "set-name"
}
More configurations at ExternalEvent Sink Connector Configurations.
Message Part | Description | Type | Valid Values |
---|---|---|---|
key |
Define the associated CorrelationId |
string | non-empty string |
value |
Define the Content of the correlated event |
any | any not null |
More about correlated event fields at PutCorrelatedEventRequest.
You can manipulate the message structure with Single Message Transformations (SMTs).
Next workflow waits for the event payment-id
with a specific id (CorrelationId
),
when the correlated event is trigger with the same id the workflow is allowed to continue.
Workflow workflow = Workflow.newWorkflow("process-payment", wf -> {
WfRunVariable paymentId = wf.declareStr("payment-id");
wf.execute("process-payment", wf.waitForEvent("payment-id").withCorrelationId(paymentId));
});
There is a topic payments
with this data:
key: d1e912b0cffe40138e452d413dc8ab53, value: {"name":"R2-D2","credits":6279.0}
key: 8f8e36ef6cb7476fafcd95493d5a183d, value: {"name":"C-3PO","credits":6286.0}
key: b31289d3b1484ef4945b31baf6df58f3, value: {"name":"BB-8","credits":5047.0}
key: 9aa240b59cd74590a01939fa4c87ebea, value: {"name":"Super Battle Droid","credits":9607.0}
Next configuration will execute external events where the message key
will be the CorrelationId
and
the message value
will be the Content
(more at PutCorrelatedEventRequest):
{
"tasks.max": 2,
"connector.class": "io.littlehorse.connect.CorrelatedEventSinkConnector",
"topics": "payments",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"lhc.api.port": 2024,
"lhc.api.host": "littlehorse",
"lhc.tenant.id": "default",
"external.event.name": "payment-id"
}
More configurations at CorrelatedEvent Sink Connector Configurations.
To ensure idempotency, we generate a unique id for each request to LH in lowercase and with the next format:
{connector name}-{topic name}-{partition}-{offset}
The connector name must be a valid hostname format, example my-littlehorse-connector1
.
The topic name will be changed to a valid hostname format, example: My_Topic
to my-topic
.
A hostname is a lowercase alphanumeric string separated by a -
.
LH does not support uppercase letters for defining WfRunIds, and the only special character allowed is -
.
More at LittleHorse Variables.
If two topics generate the same unique id (example:
My_Topic
andMy.Topic
generatemy-topic
) it is recommended to create two different connectors.
These connectors support parallelism by running more than one task.
Specify the number of tasks in the tasks.max
configuration parameter.
More configurations at Configure Sink Connector.
These connectors support Dead Letter Queue (DLQ).
More about DLQs at Kafka Connect Dead Letter Queue.
Note that LittleHorse kernel is data type aware. When reading data from the Kafka topic with either WfRunSinkConnector or ExternalEventSinkConnector the data types in the topic correlate with the data LittleHorse kernel expects.
A common issue is with the Boolean data type. If LittleHorse kernel expects a Boolean type "True" or "False", this must match Boolean data type in the schema of the topic.
For testing, it is common to use kafka-console-producer.sh
tool provided by Apache Kafka, this tool can only produce String or Integer values. In order to accuratly send a primitive type other than String or Interger you must use a converter in the Kafka Connect connector configuration.
Example:
{
"name": "external-identity-verified",
"config": {
"tasks.max": 2,
"topics": "names",
"connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"lhc.api.port": 2023,
"lhc.api.host": "localhost",
"lhc.tenant.id": "default",
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "boolean",
"external.event.name": "identity-verified"
}
}
Note the lines that begin with "transforms", with those we are casting the String data type sent by kafka-console-producer.sh
to the primitive Boolean.
For more information:
These connectors support Protobuf
, Json
and Avro
through converters.
More about converters at Kafka Connect Converters
Kafka connect ensures provisioning secrets through the ConfigProvider interface, so these connectors support external secrets by default.
More about secrets at Externalize Secrets.
- WfRun Sink Connector Configurations.
- ExternalEvent Sink Connector Configurations.
- Kafka Sink Connector Configurations.
- LittleHorse Client Configurations.
For all available versions go to GitHub Releases.
These connectors keep the same versioning as LittleHorse.
For more examples go to examples.
For development instructions go to DEVELOPMENT.md.
- Java version 17 or greater.
- Apache Kafka version 3.8 or greater, equivalent to Confluent Platform 7.8 or greater (Interoperability for Confluent Platform).
- LittleHorse version 0.14 or greater.
All code in this repository is covered by the Server Side Public License, Version 1 and is copyright of LittleHorse Enterprises LLC.