|
| 1 | + |
| 2 | + |
| 3 | + |
| 4 | + |
| 5 | +| **Title** | **Kafka Connect Sink** | |
| 6 | +|----------------------|-------------------------------------| |
| 7 | +| **Nav Order** | 2 | |
| 8 | +| **Description** | Kafka Connect sink detailed doc | |
| 9 | +| **Parent** | Integration | |
| 10 | + |
| 11 | + |
| 12 | + |
| 13 | +## Get Started |
| 14 | + |
| 15 | +- [Obtaining the connector](#obtaining-the-connector) |
| 16 | +- [Configuring the connector](#configuring-the-connector) |
| 17 | +- [Kafka message format](#kafka-message-format) |
| 18 | + |
| 19 | +--- |
| 20 | + |
| 21 | +### **1️⃣ Obtaining the Connector** |
| 22 | + |
| 23 | +You can build the connector from [source](https://github.com/FalkorDB/falkordb-kafka-connect) or download the pre-built [JAR](https://github.com/FalkorDB/falkordb-kafka-connect/releases/download/v1.0.0/falkordb-kafka-connect-uber.jar) file from the releases. The GitHub repository includes a README with instructions for running the connector locally. The [GitHub](https://github.com/FalkorDB/falkordb-kafka-connect?tab=readme-ov-file#how-to-run-the-example) repository includes a README with instructions for running the connector locally. |
| 24 | + |
| 25 | +### **2️⃣ Configuring the Connector** |
| 26 | + |
| 27 | +Kafka Connector Properties Overview: |
| 28 | +This document explains the properties required to configure the FalkorDB Sink Connector for Apache Kafka. |
| 29 | +>Configurations should be specified in a properties file format. |
| 30 | +
|
| 31 | +#### Properties Overview |
| 32 | + |
| 33 | +| **Property** | **Description** | |
| 34 | +|--------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------| |
| 35 | +| `name` | Specifies the unique name of the connector instance, e.g., `falkordb-connector`. This name identifies the connector in the Kafka Connect framework. | |
| 36 | +| `connector.class` | Defines the Java class that implements the connector logic. Use `com.falkordb.FalkorDBSinkConnector` to write data from Kafka topics to FalkorDB. | |
| 37 | +| `tasks.max` | Sets the maximum number of tasks for the connector. A value of `1` uses a single task. Increasing this can boost throughput but requires resources. | |
| 38 | +| `topics` | Specifies the Kafka topic(s) to consume messages from. Set to `falkordb-topic` to read messages from this topic. | |
| 39 | +| `key.converter` | Defines the converter class for message keys. `StringConverter` treats keys as simple strings. | |
| 40 | +| `value.converter` | Specifies the converter for message values. `StringConverter` treats values as strings. | |
| 41 | +| `value.converter.schemas.enable` | Indicates whether schemas should be included with message values. Setting to `false` excludes schema information. | |
| 42 | +| `falkor.url` | Specifies the connection URL for FalkorDB. Example: `redis://localhost:6379`. Essential for connecting Kafka to FalkorDB. | |
| 43 | + |
| 44 | + |
| 45 | + |
| 46 | +>The above properties configure a Kafka Sink Connector that reads messages from a specified topic and writes them into |
| 47 | +FalkorDB using string conversion for both keys and values. Adjusting these properties allows you to tailor the |
| 48 | +connector's behavior according to your application's requirements. |
| 49 | + |
| 50 | + |
| 51 | +## Configuration Example |
| 52 | + |
| 53 | + ```properties |
| 54 | +name=falkordb-connector |
| 55 | +connector.class=com.falkordb.FalkorDBSinkConnector |
| 56 | +tasks.max=1 |
| 57 | +topics=falkordb-topic |
| 58 | +key.converter=org.apache.kafka.connect.storage.StringConverter |
| 59 | +value.converter=org.apache.kafka.connect.storage.StringConverter |
| 60 | +value.converter.schemas.enable=false |
| 61 | +falkor.url=redis://localhost:6379 |
| 62 | + ``` |
| 63 | + |
| 64 | +## Kafka Message Format |
| 65 | + |
| 66 | +#### JSON Structure Overview |
| 67 | + |
| 68 | +The message is an array containing multiple objects, each representing a command to be executed on the graph database. |
| 69 | +Below is a breakdown of the key components of each message object. |
| 70 | + |
| 71 | +Example: |
| 72 | + |
| 73 | +```json |
| 74 | +[ |
| 75 | + { |
| 76 | + "graphName": "falkordb", |
| 77 | + "command": "GRAPH_QUERY", |
| 78 | + "cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p", |
| 79 | + "parameters": { |
| 80 | + "location_param": "Location 0", |
| 81 | + "age_param": 20, |
| 82 | + "name_param": "Person 0" |
| 83 | + } |
| 84 | + }, |
| 85 | + { |
| 86 | + "graphName": "falkordb", |
| 87 | + "command": "GRAPH_QUERY", |
| 88 | + "cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p", |
| 89 | + "parameters": { |
| 90 | + "location_param": "Location 1", |
| 91 | + "age_param": 21, |
| 92 | + "name_param": "Person 1" |
| 93 | + } |
| 94 | + } |
| 95 | +] |
| 96 | + |
| 97 | +``` |
| 98 | + |
| 99 | +#### Key Components |
| 100 | + |
| 101 | +The table below explains essential properties for executing commands in FalkorDB through Kafka messages. |
| 102 | + |
| 103 | +| **Property** | **Description** | **Example** | **Explainer** | |
| 104 | +|-------------------|-----------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| |
| 105 | +| `graphName` | Specifies the name of the graph database where the command will be executed. | `"falkordb"`. Kafka messages can update multiple graphs. | | |
| 106 | +| `command` | Indicates the type of operation being performed. `"GRAPH_QUERY"` means a query will be executed against the graph database. | `"GRAPH_QUERY"` | | |
| 107 | +| `cypherCommand` | Contains the actual Cypher query to be executed. Cypher is a query language for graph databases. | ```cypher CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p ``` | Creates a `Person` node with `name`, `age`, and `location` properties. | |
| 108 | +| `parameters` | Holds key-value pairs for placeholders in the `cypherCommand`. | ```json {"name_param": "Person 0", "age_param": 20, "location_param": "Location 0"} ``` | Used to define properties for the new node. | |
| 109 | + |
0 commit comments