A real-time data pipeline that ingests RSS feeds through Kafka Connect, processes them with built-in embedding functions, and stores them in Milvus vector database for GenAI applications.
Building real-time GenAI applications requires complex data pipelines that connect diverse source systems to vector databases.
Traditional approaches force developers to write extensive custom code for ETL processes and embedding pipelines, handling numerous pain points including type conversions, field filtering, timestamp parsing, null value management, and data transformation.
The complexity multiplies when dealing with dedicated compute resources, parallel processing, request throttling, and scheduling for embedding generation. Additionally, scheduled batch-based ETL pipelines introduce significant delays to insights, creating a lag between data availability and actionable intelligence. This creates a maintenance nightmare where teams constantly ask "Who's gonna maintain it?" while struggling with brittle custom code that's difficult to scale and migrate.
The solution requires "shifting left" - processing and acting on data as it arrives in real-time rather than waiting for batch processing cycles. This approach shortens the time to value and enables immediate insights for AI applications. By leveraging Free and Open Source Software (FOSS) tools with built-in capabilities, this project demonstrates how to subsume traditional ETL and embedding pipelines, practically removing the need to code or maintain complex data processing infrastructure.
This project creates a no-code data pipeline that:
- Ingests RSS feeds using Kafka Connect RSS Source Connector
- Processes data through Kafka topics
- Automatically generates embeddings using OpenAI's text-embedding-3-small model
- Stores structured data and embeddings in Milvus vector database
- Docker and Docker Compose
- Python 3.x with pymilvus==2.6.0
Before starting services, configure Milvus with your OpenAI API key for embedding functions:
cd cp-all-in-one/
cp sample_milvus.yaml milvus.yaml
Edit milvus.yaml
and replace <INSERT_YOUR_API_KEY>
with your actual OpenAI API key:
proxies:
default:
credential:
openai_key:
apikey: your-actual-openai-api-key-here
Spin up all services:
docker-compose up -d
This will start:
- Kafka broker
- Schema Registry
- Kafka Connect
- Control Center
- Milvus vector database
- MinIO (object storage for Milvus)
Run the topic creation script:
./02-create-topic.sh
This creates a test_topic
for RSS feed data.
Create the Milvus collection with schema and embedding functions:
python 01-create-milvus-collection.py
This script:
- Creates a collection called
realtime_embeddings
- Defines fields for RSS data (title, content, date, etc.)
- Sets up OpenAI embedding function for automatic text vectorization
- Configures vector index for similarity search
Start the RSS source connector:
./04-submit-rss-connector.sh
Then deploy the Milvus sink connector:
./05-submit-milvus-sink-connector.sh
- RSS Ingestion: RSS connector fetches news from Dow Jones RSS feed
- Data Processing: Kafka transforms flatten and clean the data
- Embedding Generation: Milvus automatically generates embeddings from content
- Vector Storage: Data and embeddings stored in Milvus for similarity search
- RSS Source Connector: Fetches from
https://feeds.content.dowjones.io/public/rss/RSSWorldNews
- Milvus Sink Connector: Streams data from Kafka to Milvus
- Vector Database: Stores text embeddings for semantic search
- Embedding Function: OpenAI text-embedding-3-small for 1536-dimensional vectors
The RSS source connector uses a declarative JSON configuration that defines data ingestion and transformation:
{
"name": "rss-to-kafka",
"connector.class": "org.kaliy.kafka.connect.rss.RssSourceConnector",
"rss.urls": "https://feeds.content.dowjones.io/public/rss/RSSWorldNews",
"topic": "test_topic"
}
name
: Unique identifier for the connector instanceconnector.class
: Java class implementing RSS source functionalityrss.urls
: Target RSS feed (Dow Jones World News)topic
: Destination Kafka topic for RSS data
{
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
value.converter
: Serializes RSS data as JSON messagesschemas.enable: false
: Disables schema registry, sends raw JSON
The connector applies three sequential transformations:
{
"transforms": "flatten,replacefield,TimestampConverter"
}
1. Flatten Transform
{
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
- Flattens nested RSS JSON structures
- Uses underscore delimiter (
feed.title
→feed_title
) - Essential for handling complex RSS/XML hierarchies
2. ReplaceField Transform
{
"transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.replacefield.exclude": "feed_url,author"
}
- Removes unnecessary fields (
feed_url
,author
) - Reduces message size and focuses on relevant content
- Optimizes downstream storage and processing
3. TimestampConverter Transform
{
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "date",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ss'Z'",
"transforms.TimestampConverter.target.type": "Timestamp"
}
- Converts RSS date strings to proper timestamps
- Expects ISO 8601 format from RSS feeds
- Outputs Kafka Connect Timestamp type for compatibility
The sink connector configuration handles data flow from Kafka to Milvus:
{
"name": "kafka-to-milvus",
"connector.class": "com.milvus.io.kafka.MilvusSinkConnector",
"public.endpoint": "http://standalone:19530",
"collection.name": "realtime_embeddings",
"topics": "test_topic",
"token": "root:Milvus",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
connector.class
: Official Zilliz/Milvus sink connectorpublic.endpoint
: Milvus server endpoint (Docker service name)collection.name
: Target collection matching Python schematopics
: Source Kafka topic (matches RSS connector output)token
: Milvus authentication (username:password format)- Data format: Same JSON converter settings ensure compatibility
These declarative configurations create a robust data pipeline with:
- Data Quality: Transformation chain cleanses and standardizes RSS data
- Schema Consistency: Flattening ensures predictable field names for Milvus
- Temporal Accuracy: Proper timestamp conversion enables time-based queries
- Storage Efficiency: Field exclusion reduces storage overhead
- Format Compatibility: Consistent JSON handling across source and sink
Access Confluent Control Center at http://localhost:9021
to monitor:
- Kafka topics and messages
- Connector status
- Data throughput
- Milvus endpoint:
http://localhost:19530
- Kafka broker:
localhost:9092
- Schema Registry:
http://localhost:8081
Once the pipeline is running, RSS feed data will be continuously:
- Ingested into Kafka
- Transformed and cleaned
- Embedded using OpenAI models
- Stored in Milvus for vector similarity search
You can then query the Milvus collection for semantic search across news articles.
This project builds upon the following open source components:
- Milvus - The vector database that powers semantic search capabilities
- Confluent Platform All-in-One - Docker Compose setup for Confluent Platform
- Zilliz Kafka Connect Milvus - Kafka Connect sink connector for Milvus
- Kaliy Kafka Connect RSS - Kafka Connect source connector for RSS feeds