Kafkaesque is an ergonomic Laravel package that provides a clean, object-oriented interface for producing and consuming Apache Kafka messages. Built on top of the robust mateusjunges/laravel-kafka package, Kafkaesque simplifies Kafka operations with environment-aware topic management, Avro schema registry integration, and type-safe message handling.
Kafkaesque abstracts the complexity of Kafka message handling by providing:
- Environment-aware Topic Management: Automatically handle topic names across different environments (local, development, staging, production)
- Type-safe Message Schemas: Built on Spatie's Laravel Data for robust data validation and serialization
- Avro Schema Registry Integration: Seamless integration with Confluent Schema Registry for schema evolution
- Producer/Consumer Abstractions: Clean interfaces for both message production and consumption
- Flexible Architecture: Contract-based design allowing easy extension and customization
The package follows Laravel conventions and integrates seamlessly with your existing Laravel applications.
- PHP 8.3 or higher
- Laravel 10.45, 11.x, or 12.x
- ext-rdkafka extension
Install via Composer:
composer require aryeo/kafkaesque
The package requires the php-rdkafka
extension. Install it using:
# On macOS with Homebrew
brew install librdkafka
pecl install rdkafka
# On Ubuntu/Debian
sudo apt-get install librdkafka-dev
pecl install rdkafka
# On CentOS/RHEL
yum install librdkafka-devel
pecl install rdkafka
Add the extension to your php.ini
:
extension=rdkafka
Publish the configuration file:
php artisan vendor:publish --tag="kafkaesque-config"
Configure your Kafka connection in your .env
file:
KAFKA_BROKERS=localhost:9092
KAFKA_CONSUMER_GROUP_ID=your-app-group
KAFKA_SECURITY_PROTOCOL=PLAINTEXT
KAFKA_SASL_MECHANISMS=
KAFKA_SASL_USERNAME=
KAFKA_SASL_PASSWORD=
# For Avro Schema Registry (optional)
KAFKA_SCHEMA_REGISTRY_URL=http://localhost:8081
Create message schemas using Spatie's Laravel Data. These schemas define the structure of your Kafka messages.
Start with a simple schema by extending KafkaesqueSchema
:
<?php
namespace App\Kafka\Schemas;
use Aryeo\Kafkaesque\Schemas\KafkaesqueSchema;
class UserRegisteredSchema extends KafkaesqueSchema
{
public function __construct(
public readonly string $userId,
public readonly string $email,
public readonly string $name,
public readonly \DateTimeInterface $registeredAt
) {}
}
To enable Avro schema registry support, implement the IsAvroSchema
contract and add the required methods:
<?php
namespace App\Kafka\Schemas;
use Aryeo\Kafkaesque\Schemas\KafkaesqueSchema;
use Aryeo\Kafkaesque\Schemas\Contracts\IsAvroSchema;
use Aryeo\Kafkaesque\Registries\Environments\Contracts\IsRegistryEnvironment;
class UserRegisteredSchema extends KafkaesqueSchema implements IsAvroSchema
{
public function __construct(
public readonly string $userId,
public readonly string $email,
public readonly string $name,
public readonly \DateTimeInterface $registeredAt
) {}
public function getSubject(): string
{
return 'user-registered-value';
}
public function getVersion(IsRegistryEnvironment $environment): int
{
return match ($environment->getName()) {
'production' => 2,
default => 1,
};
}
}
Create a producer by extending KafkaesqueProducer
. For detailed configuration options, see the Producing Messages documentation.
<?php
namespace App\Kafka\Producers;
use Aryeo\Kafkaesque\Producers\KafkaesqueProducer;
use Junges\Kafka\Facades\Kafka;
class UserEventsProducer extends KafkaesqueProducer
{
public function __construct()
{
$this->producer = Kafka::producer()
->withBrokers(config('kafka.brokers'))
->withSasl(
username: config('kafka.username'),
password: config('kafka.password'),
mechanisms: 'SCRAM-SHA-256',
securityProtocol: 'sasl_ssl',
);
}
}
Start with a basic topic that can send messages:
<?php
namespace App\Kafka\Topics;
use Aryeo\Kafkaesque\Topics\KafkaesqueTopic;
use Aryeo\Kafkaesque\Topics\Contracts\IsProducible;
use App\Kafka\Producers\UserEventsProducer;
class UserEventsTopic extends KafkaesqueTopic implements IsProducible
{
public function getProducer(): KafkaesqueProducer
{
return new UserEventsProducer();
}
protected function getLocalName(): string
{
return 'local.user-events';
}
protected function getDevelopmentName(): string
{
return 'dev.user-events';
}
protected function getStagingName(): string
{
return 'staging.user-events';
}
protected function getProductionName(): string
{
return 'user-events';
}
protected function getTestingName(): string
{
return 'test.user-events';
}
}
If your schemas implement IsAvroSchema
, you need to configure a registry environment and add it to your topic:
First, create a registry environment:
<?php
namespace App\Kafka\Registries\Environments;
use Aryeo\Kafkaesque\Registries\Environments\Contracts\IsAvroRegistryEnvironment;
class ProductionRegistryEnvironment implements IsAvroRegistryEnvironment
{
public function getBaseUri(): string
{
return config('kafka.schema_registry_url');
}
public function getName(): string
{
return app()->environment();
}
}
Then, update your topic to implement HasAvroRegistry
:
<?php
namespace App\Kafka\Topics;
use Aryeo\Kafkaesque\Topics\KafkaesqueTopic;
use Aryeo\Kafkaesque\Topics\Contracts\IsProducible;
use Aryeo\Kafkaesque\Topics\Contracts\HasAvroRegistry;
use Aryeo\Kafkaesque\Registries\KafkaesqueRegistry;
use Aryeo\Kafkaesque\Registries\AvroRegistry;
use App\Kafka\Producers\UserEventsProducer;
class UserEventsTopic extends KafkaesqueTopic implements IsProducible, HasAvroRegistry
{
public function getProducer(): KafkaesqueProducer
{
return new UserEventsProducer();
}
public function getRegistry(): KafkaesqueRegistry
{
return new AvroRegistry(
environment: new ProductionRegistryEnvironment()
);
}
// ... environment name methods remain the same
}
Create a message that can be sent to topics:
<?php
namespace App\Kafka\Messages;
use Aryeo\Kafkaesque\Messages\KafkaesqueMessage;
use Aryeo\Kafkaesque\Messages\Contracts\IsProducible;
use App\Kafka\Schemas\UserRegisteredSchema;
use App\Kafka\Topics\UserEventsTopic;
class UserRegisteredMessage extends KafkaesqueMessage implements IsProducible
{
protected array $defaultTopics = [
UserEventsTopic::class,
];
public function __construct(
UserRegisteredSchema $body,
?string $key = null
) {
parent::__construct($body, $key);
}
}
Send messages to your configured topics. You have two options:
Option 1: Produce via message (sends to default topics):
$schema = new UserRegisteredSchema(
userId: '12345',
email: 'user@example.com',
name: 'John Doe',
registeredAt: now()
);
$message = new UserRegisteredMessage($schema, key: '12345');
$message->produce(); // Products message on default topics
Option 2: Produce via topic (sends to specific topic):
$schema = new UserRegisteredSchema(
userId: '12345',
email: 'user@example.com',
name: 'John Doe',
registeredAt: now()
);
$message = new UserRegisteredMessage($schema, key: '12345');
$topic = new UserEventsTopic();
$topic->produce($message); // Produces message on specific topic
Create a consumer by extending KafkaesqueConsumer
. For detailed configuration options, see the Consuming Messages documentation.
<?php
namespace App\Kafka\Consumers;
use Aryeo\Kafkaesque\Consumers\KafkaesqueConsumer;
use Junges\Kafka\Facades\Kafka;
class UserEventsConsumer extends KafkaesqueConsumer
{
public function __construct()
{
$this->consumerBuilder = Kafka::consumer()
->withBrokers(config('kafka.brokers'))
->withSasl(
username: config('kafka.username'),
password: config('kafka.password'),
mechanisms: 'SCRAM-SHA-256',
securityProtocol: 'sasl_ssl',
)
->withConsumerGroupId(config('kafka.consumer_group_id'));
}
}
Start with a basic topic that can receive and route messages:
<?php
namespace App\Kafka\Topics;
use Aryeo\Kafkaesque\Topics\KafkaesqueTopic;
use Aryeo\Kafkaesque\Topics\Contracts\IsConsumable;
use App\Kafka\Consumers\UserEventsConsumer;
use App\Kafka\Messages\UserRegisteredHandler;
use App\Kafka\Messages\UserUpdatedHandler;
use App\Kafka\Schemas\UserEventSchema;
use Junges\Kafka\Contracts\ConsumerMessage;
use Junges\Kafka\Contracts\MessageConsumer;
class UserEventsTopic extends KafkaesqueTopic implements IsConsumable
{
public function getConsumer(): KafkaesqueConsumer
{
return new UserEventsConsumer();
}
protected function getDevelopmentName(): string
{
return 'dev.user-events';
}
protected function getLocalName(): string
{
return 'local.user-events';
}
protected function getProductionName(): string
{
return 'user-events';
}
protected function getStagingName(): string
{
return 'staging.user-events';
}
protected function getTestingName(): string
{
return 'test.user-events';
}
public function handleMessage(ConsumerMessage $message, MessageConsumer $consumer): void
{
$body = UserEventSchema::from($message->getBody());
// Route to specific message handlers
$messageClass = match ($body->eventType) {
'user.registered' => UserRegisteredHandler::class,
'user.updated' => UserUpdatedHandler::class,
default => null,
};
if ($messageClass) {
$handler = new $messageClass($body, $message->getKey());
$handler->handle();
}
}
}
If consuming Avro messages, ensure your topic implements HasAvroRegistry
(same as the producing setup above).
Create message handlers that process incoming data:
<?php
namespace App\Kafka\Messages;
use Aryeo\Kafkaesque\Messages\KafkaesqueMessage;
use Aryeo\Kafkaesque\Messages\Contracts\IsConsumable;
use App\Kafka\Schemas\UserEventSchema;
use App\Jobs\SendWelcomeEmail;
use App\Models\User;
class UserRegisteredHandler extends KafkaesqueMessage implements IsConsumable
{
public function __construct(
UserEventSchema $body,
?string $key = null
) {
parent::__construct($body, $key);
}
public function handle(): void
{
if (!$this->shouldHandle()) {
return;
}
// Dispatch asynchronous jobs to handle message outcomes
SendWelcomeEmail::dispatch($this->body->email);
}
protected function shouldHandle(): bool
{
return !is_null($this->body->userId) &&
!is_null($this->body->email);
}
}
Create an Artisan command to run your consumer:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use App\Kafka\Topics\UserEventsTopic;
class ConsumeUserEvents extends Command
{
protected $signature = 'kafka:consume-user-events';
public function handle(): void
{
(new UserEventsTopic())->consume(); // Uses UserEventsConsumer to process messages
}
}
<?php
namespace Tests\Feature\Kafka;
use Tests\TestCase;
use App\Kafka\Messages\UserRegisteredMessage;
use App\Kafka\Schemas\UserRegisteredSchema;
class UserRegisteredMessageTest extends TestCase
{
public function test_message_creation(): void
{
$schema = new UserRegisteredSchema(
userId: '12345',
email: 'test@example.com',
name: 'Test User',
registeredAt: now()
);
$message = new UserRegisteredMessage($schema, key: '12345');
$this->assertEquals('12345', $message->getKey());
$this->assertEquals('test@example.com', $message->getBody()->email);
}
}
Please see CHANGELOG for more information on what has changed recently.
Please see CONTRIBUTING for details.
Please review our security policy on how to report security vulnerabilities.
The MIT License (MIT). Please see License File for more information.