Skip to content

Binary producer/consumer for kafka #1440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Wolverine.Kafka;

public interface IKafkaEnvelopeMapper : IEnvelopeMapper<Message<string, string>, Message<string, string>>;
public interface IKafkaEnvelopeMapper : IEnvelopeMapper<Message<string, byte[]>, Message<string, byte[]>>;

/// <summary>
/// Option to publish or receive raw JSON from or to Kafka topics
Expand All @@ -23,17 +23,17 @@ public JsonOnlyMapper(KafkaTopic topic, JsonSerializerOptions options)
_messageTypeName = topic.MessageType?.ToMessageTypeName();
}

public void MapEnvelopeToOutgoing(Envelope envelope, Message<string, string> outgoing)
public void MapEnvelopeToOutgoing(Envelope envelope, Message<string, byte[]> outgoing)
{
outgoing.Key = envelope.GroupId;

if (envelope.Data != null && envelope.Data.Any())
{
outgoing.Value = Encoding.Default.GetString(envelope.Data);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can skip this conversion

outgoing.Value = envelope.Data;
}
else if (envelope.Message != null)
{
outgoing.Value = JsonSerializer.Serialize(envelope.Message, _options);
outgoing.Value = Encoding.Default.GetBytes(JsonSerializer.Serialize(envelope.Message, _options));
}
else
{
Expand All @@ -42,9 +42,9 @@ public void MapEnvelopeToOutgoing(Envelope envelope, Message<string, string> out
}
}

public void MapIncomingToEnvelope(Envelope envelope, Message<string, string> incoming)
public void MapIncomingToEnvelope(Envelope envelope, Message<string, byte[]> incoming)
{
envelope.Data = Encoding.Default.GetBytes(incoming.Value);
envelope.Data = incoming.Value;
envelope.MessageType = _messageTypeName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Wolverine.Kafka.Internals;
public class InlineKafkaSender : ISender, IDisposable
{
private readonly KafkaTopic _topic;
private readonly IProducer<string,string> _producer;
private readonly IProducer<string, byte[]> _producer;

public InlineKafkaSender(KafkaTopic topic)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@

namespace Wolverine.Kafka.Internals;

internal class KafkaEnvelopeMapper : EnvelopeMapper<Message<string, string>, Message<string, string>>, IKafkaEnvelopeMapper
internal class KafkaEnvelopeMapper : EnvelopeMapper<Message<string, byte[]>, Message<string, byte[]>>, IKafkaEnvelopeMapper
{
public KafkaEnvelopeMapper(Endpoint endpoint) : base(endpoint)
{

}

protected override void writeOutgoingHeader(Message<string, string> outgoing, string key, string value)
protected override void writeOutgoingHeader(Message<string, byte[]> outgoing, string key, string value)
{
outgoing.Headers.Add(key, Encoding.Default.GetBytes(value));
}

protected override bool tryReadIncomingHeader(Message<string, string> incoming, string key, out string value)
protected override bool tryReadIncomingHeader(Message<string, byte[]> incoming, string key, out string value)
{
if (incoming.Headers.TryGetLastBytes(key, out var bytes))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace Wolverine.Kafka.Internals;

public class KafkaListener : IListener, IDisposable
{
private readonly IConsumer<string,string> _consumer;
private readonly IConsumer<string, byte[]> _consumer;
private CancellationTokenSource _cancellation = new();
private readonly Task _runner;
private readonly IReceiver _receiver;
private readonly string? _messageTypeName;
private readonly QualityOfService _qualityOfService;

public KafkaListener(KafkaTopic topic, ConsumerConfig config,
IConsumer<string, string> consumer, IReceiver receiver,
IConsumer<string, byte[]> consumer, IReceiver receiver,
ILogger<KafkaListener> logger)
{
Address = topic.Uri;
Expand Down
12 changes: 6 additions & 6 deletions src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ public class KafkaTransport : BrokerTransport<KafkaTopic>
public Cache<string, KafkaTopic> Topics { get; }

public ProducerConfig ProducerConfig { get; } = new();
public Action<ProducerBuilder<string, string>> ConfigureProducerBuilders { get; internal set; } = _ => {};
public Action<ProducerBuilder<string, byte[]>> ConfigureProducerBuilders { get; internal set; } = _ => {};

public ConsumerConfig ConsumerConfig { get; } = new();
public Action<ConsumerBuilder<string, string>> ConfigureConsumerBuilders { get; internal set; } = _ => {};
public Action<ConsumerBuilder<string, byte[]>> ConfigureConsumerBuilders { get; internal set; } = _ => {};

public AdminClientConfig AdminClientConfig { get; } = new();
public Action<AdminClientBuilder> ConfigureAdminClientBuilders { get; internal set; } = _ => {};
Expand Down Expand Up @@ -76,16 +76,16 @@ public override IEnumerable<PropertyColumn> DiagnosticColumns()
yield break;
}

internal IProducer<string, string> CreateProducer(ProducerConfig? config)
internal IProducer<string, byte[]> CreateProducer(ProducerConfig? config)
{
var producerBuilder = new ProducerBuilder<string, string>(config ?? ProducerConfig);
var producerBuilder = new ProducerBuilder<string, byte[]>(config ?? ProducerConfig);
ConfigureProducerBuilders(producerBuilder);
return producerBuilder.Build();
}

internal IConsumer<string, string> CreateConsumer(ConsumerConfig? config)
internal IConsumer<string, byte[]> CreateConsumer(ConsumerConfig? config)
{
var consumerBuilder = new ConsumerBuilder<string, string>(config ?? ConsumerConfig);
var consumerBuilder = new ConsumerBuilder<string, byte[]>(config ?? ConsumerConfig);
ConfigureConsumerBuilders(consumerBuilder);
return consumerBuilder.Build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Wolverine.Kafka;
public class KafkaSenderProtocol : ISenderProtocol, IDisposable
{
private readonly KafkaTopic _topic;
private readonly IProducer<string,string> _producer;
private readonly IProducer<string, byte[]> _producer;

public KafkaSenderProtocol(KafkaTopic topic)
{
Expand Down
5 changes: 3 additions & 2 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Logging;
using System.Text;
using Wolverine.Configuration;
using Wolverine.Kafka.Internals;
using Wolverine.Runtime;
Expand Down Expand Up @@ -70,10 +71,10 @@ public async ValueTask<bool> CheckAsync()
try
{
using var client = Parent.CreateProducer(ProducerConfig);
await client.ProduceAsync(TopicName, new Message<string, string>
await client.ProduceAsync(TopicName, new Message<string, byte[]>
{
Key = "ping",
Value = "ping"
Value = Encoding.Default.GetBytes("ping")
});


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public KafkaTransportExpression ConfigureProducers(Action<ProducerConfig> config
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
public KafkaTransportExpression ConfigureProducerBuilders(Action<ProducerBuilder<string, string>> configure)
public KafkaTransportExpression ConfigureProducerBuilders(Action<ProducerBuilder<string, byte[]>> configure)
{
_transport.ConfigureProducerBuilders = configure;
return this;
Expand All @@ -67,7 +67,7 @@ public KafkaTransportExpression ConfigureConsumers(Action<ConsumerConfig> config
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
public KafkaTransportExpression ConfigureConsumerBuilders(Action<ConsumerBuilder<string, string>> configure)
public KafkaTransportExpression ConfigureConsumerBuilders(Action<ConsumerBuilder<string, byte[]>> configure)
{
_transport.ConfigureConsumerBuilders = configure;
return this;
Expand Down
10 changes: 5 additions & 5 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public static KafkaSubscriberConfiguration ToKafkaTopics(this IPublishToExpressi
return new KafkaSubscriberConfiguration(topic);
}

internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string topicName, Message<string, string> message)
internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string topicName, Message<string, byte[]> message)
{
var envelope = new Envelope
{
PartitionKey = message.Key,
Data = Encoding.Default.GetBytes(message.Value),
Data = message.Value,
TopicName = topicName
};

Expand All @@ -126,12 +126,12 @@ internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string
return envelope;
}

internal static Message<string, string> CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope)
internal static Message<string, byte[]> CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope)
{
var message = new Message<string, string>
var message = new Message<string, byte[]>
{
Key = !string.IsNullOrEmpty(envelope.PartitionKey) ? envelope.PartitionKey : envelope.Id.ToString(),
Value = Encoding.Default.GetString(envelope.Data),
Value = envelope.Data,
Headers = new Headers()
};

Expand Down
Loading