Skip to content

littlehorse-enterprises/lh-kafka-connect

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

39 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

LittleHorse Connectors for Kafka Connect

github confluent littlehorse

These connectors allow data transfer between Apache Kafka and LittleHorse.

Table Of Content

WfRunSinkConnector

This connector allows you to execute WfRuns into LittleHorse. It supports all the Variable Types provided by LittleHorse.

More about running workflows at LittleHorse Quickstart.

Expected Message Structure

Message Part Description Type Valid Values
key Ignored any any
value Define the variables field of the workflow map key-value not null

More about run workflow fields at RunWfRequest.

You can manipulate the message structure with Single Message Transformations (SMTs).

Quick Example

Next workflow executes a task that receives a String as parameter called name:

Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
    WfRunVariable name = wf.declareStr("name");
    wf.execute("greet", name);
});

There is a topic names, with this data in the topic:

key: null, value: {"name":"Anakin Skywalker"}
key: null, value: {"name":"Luke Skywalker"}
key: null, value: {"name":"Leia Organa"}
key: null, value: {"name":"Padme Amidala"}

Next connector configuration will execute WfRuns with the variable name.

{
  "tasks.max": 2,
  "topics": "names",
  "connector.class": "io.littlehorse.connect.WfRunSinkConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": false,
  "lhc.api.port": 2023,
  "lhc.api.host": "localhost",
  "lhc.tenant.id": "default",
  "wf.spec.name": "greetings"
}

More configurations at WfRun Sink Connector Configurations.

ExternalEventSinkConnector

This connector allows you to execute External Events into LittleHorse.

More about running external events at LittleHorse External Events.

Expected Message Structure

Message Part Description Type Valid Values
key Define the associated wf_run_id string non-empty string
value Define the content of the external event any any not null

More about external event fields at PutExternalEventRequest.

You can manipulate the message structure with Single Message Transformations (SMTs).

Quick Example

Next workflow waits for the event set-name to assign the variable name and then execute the task greet.

Workflow workflow = Workflow.newWorkflow("greetings", wf -> {
    WfRunVariable name = wf.declareStr("name");
    name.assign(wf.waitForEvent("set-name"));
    wf.execute("greet", name);
});

There is a topic names with this data:

key: 64512de2a4b5470a9a8a2846b9a8a444, value: Anakin Skywalker
key: 79af0ae572bb4c19842c19dd7cad6598, value: Luke Skywalker
key: 30e1afe9a30748339594cadc3d537ecd, value: Leia Organa
key: e01547de3d294efdb6417abf35f3c960, value: Padme Amidala

Next configuration will execute external events where the message key will be the WfRunId and the message value will be the Content (more at PutExternalEventRequest):

{
  "tasks.max": 2,
  "topics": "names",
  "connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "lhc.api.port": 2023,
  "lhc.api.host": "localhost",
  "lhc.tenant.id": "default",
  "external.event.name": "set-name"
}

More configurations at ExternalEvent Sink Connector Configurations.

CorrelatedEventSinkConnector

Expected Message Structure

Message Part Description Type Valid Values
key Define the associated CorrelationId string non-empty string
value Define the Content of the correlated event any any not null

More about correlated event fields at PutCorrelatedEventRequest.

You can manipulate the message structure with Single Message Transformations (SMTs).

Quick Example

Next workflow waits for the event payment-id with a specific id (CorrelationId), when the correlated event is trigger with the same id the workflow is allowed to continue.

Workflow workflow = Workflow.newWorkflow("process-payment", wf -> {
    WfRunVariable paymentId = wf.declareStr("payment-id");
    wf.execute("process-payment", wf.waitForEvent("payment-id").withCorrelationId(paymentId));
});

There is a topic payments with this data:

key: d1e912b0cffe40138e452d413dc8ab53, value: {"name":"R2-D2","credits":6279.0}
key: 8f8e36ef6cb7476fafcd95493d5a183d, value: {"name":"C-3PO","credits":6286.0}
key: b31289d3b1484ef4945b31baf6df58f3, value: {"name":"BB-8","credits":5047.0}
key: 9aa240b59cd74590a01939fa4c87ebea, value: {"name":"Super Battle Droid","credits":9607.0}

Next configuration will execute external events where the message key will be the CorrelationId and the message value will be the Content (more at PutCorrelatedEventRequest):

{
  "tasks.max": 2,
  "connector.class": "io.littlehorse.connect.CorrelatedEventSinkConnector",
  "topics": "payments",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": false,
  "lhc.api.port": 2024,
  "lhc.api.host": "littlehorse",
  "lhc.tenant.id": "default",
  "external.event.name": "payment-id"
}

More configurations at CorrelatedEvent Sink Connector Configurations.

Idempotent Writes

To ensure idempotency, we generate a unique id for each request to LH in lowercase and with the next format:

{connector name}-{topic name}-{partition}-{offset}

The connector name must be a valid hostname format, example my-littlehorse-connector1. The topic name will be changed to a valid hostname format, example: My_Topic to my-topic. A hostname is a lowercase alphanumeric string separated by a -.

LH does not support uppercase letters for defining WfRunIds, and the only special character allowed is -. More at LittleHorse Variables.

If two topics generate the same unique id (example: My_Topic and My.Topic generate my-topic) it is recommended to create two different connectors.

Multiple Tasks

These connectors support parallelism by running more than one task. Specify the number of tasks in the tasks.max configuration parameter.

More configurations at Configure Sink Connector.

Dead Letter Queue

These connectors support Dead Letter Queue (DLQ).

More about DLQs at Kafka Connect Dead Letter Queue.

Data Types

Note that LittleHorse kernel is data type aware. When reading data from the Kafka topic with either WfRunSinkConnector or ExternalEventSinkConnector the data types in the topic correlate with the data LittleHorse kernel expects.

A common issue is with the Boolean data type. If LittleHorse kernel expects a Boolean type "True" or "False", this must match Boolean data type in the schema of the topic.

For testing, it is common to use kafka-console-producer.sh tool provided by Apache Kafka, this tool can only produce String or Integer values. In order to accuratly send a primitive type other than String or Interger you must use a converter in the Kafka Connect connector configuration.

Example:

{
  "name": "external-identity-verified",
  "config": {
    "tasks.max": 2,
    "topics": "names",
    "connector.class": "io.littlehorse.connect.ExternalEventSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "lhc.api.port": 2023,
    "lhc.api.host": "localhost",
    "lhc.tenant.id": "default",
    "transforms": "Cast",
    "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
    "transforms.Cast.spec": "boolean",
    "external.event.name": "identity-verified"
  }
}

Note the lines that begin with "transforms", with those we are casting the String data type sent by kafka-console-producer.sh to the primitive Boolean.

For more information:

Converters

These connectors support Protobuf, Json and Avro through converters.

More about converters at Kafka Connect Converters

External Secrets

Kafka connect ensures provisioning secrets through the ConfigProvider interface, so these connectors support external secrets by default.

More about secrets at Externalize Secrets.

Configurations

Download

GitHub Release

For all available versions go to GitHub Releases.

Versioning

These connectors keep the same versioning as LittleHorse.

Examples

For more examples go to examples.

Development

For development instructions go to DEVELOPMENT.md.

Dependencies

License

SSPL LICENSE

All code in this repository is covered by the Server Side Public License, Version 1 and is copyright of LittleHorse Enterprises LLC.

About

LittleHorse Connectors for Kafka Connect

Topics

Resources

License

Security policy

Stars

Watchers

Forks

Contributors 3

  •  
  •  
  •  

Languages