diff --git a/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs b/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs index 43c856ce8..eb75dcabe 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/IKafkaEnvelopeMapper.cs @@ -6,7 +6,7 @@ namespace Wolverine.Kafka; -public interface IKafkaEnvelopeMapper : IEnvelopeMapper, Message>; +public interface IKafkaEnvelopeMapper : IEnvelopeMapper, Message>; /// /// Option to publish or receive raw JSON from or to Kafka topics @@ -23,17 +23,17 @@ public JsonOnlyMapper(KafkaTopic topic, JsonSerializerOptions options) _messageTypeName = topic.MessageType?.ToMessageTypeName(); } - public void MapEnvelopeToOutgoing(Envelope envelope, Message outgoing) + public void MapEnvelopeToOutgoing(Envelope envelope, Message outgoing) { outgoing.Key = envelope.GroupId; if (envelope.Data != null && envelope.Data.Any()) { - outgoing.Value = Encoding.Default.GetString(envelope.Data); + outgoing.Value = envelope.Data; } else if (envelope.Message != null) { - outgoing.Value = JsonSerializer.Serialize(envelope.Message, _options); + outgoing.Value = Encoding.Default.GetBytes(JsonSerializer.Serialize(envelope.Message, _options)); } else { @@ -42,9 +42,9 @@ public void MapEnvelopeToOutgoing(Envelope envelope, Message out } } - public void MapIncomingToEnvelope(Envelope envelope, Message incoming) + public void MapIncomingToEnvelope(Envelope envelope, Message incoming) { - envelope.Data = Encoding.Default.GetBytes(incoming.Value); + envelope.Data = incoming.Value; envelope.MessageType = _messageTypeName; } diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs index c7ff57881..a435907e5 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/InlineKafkaSender.cs @@ -6,7 +6,7 @@ namespace Wolverine.Kafka.Internals; public class InlineKafkaSender : ISender, IDisposable { private readonly KafkaTopic _topic; - private readonly IProducer _producer; + private readonly IProducer _producer; public InlineKafkaSender(KafkaTopic topic) { diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs index 9191d1b2f..0e661191a 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaEnvelopeMapper.cs @@ -5,19 +5,19 @@ namespace Wolverine.Kafka.Internals; -internal class KafkaEnvelopeMapper : EnvelopeMapper, Message>, IKafkaEnvelopeMapper +internal class KafkaEnvelopeMapper : EnvelopeMapper, Message>, IKafkaEnvelopeMapper { public KafkaEnvelopeMapper(Endpoint endpoint) : base(endpoint) { } - protected override void writeOutgoingHeader(Message outgoing, string key, string value) + protected override void writeOutgoingHeader(Message outgoing, string key, string value) { outgoing.Headers.Add(key, Encoding.Default.GetBytes(value)); } - protected override bool tryReadIncomingHeader(Message incoming, string key, out string value) + protected override bool tryReadIncomingHeader(Message incoming, string key, out string value) { if (incoming.Headers.TryGetLastBytes(key, out var bytes)) { diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs index bc7eb0d1b..a6d244eec 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs @@ -9,7 +9,7 @@ namespace Wolverine.Kafka.Internals; public class KafkaListener : IListener, IDisposable { - private readonly IConsumer _consumer; + private readonly IConsumer _consumer; private CancellationTokenSource _cancellation = new(); private readonly Task _runner; private readonly IReceiver _receiver; @@ -17,7 +17,7 @@ public class KafkaListener : IListener, IDisposable private readonly QualityOfService _qualityOfService; public KafkaListener(KafkaTopic topic, ConsumerConfig config, - IConsumer consumer, IReceiver receiver, + IConsumer consumer, IReceiver receiver, ILogger logger) { Address = topic.Uri; diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs index cbd5bf588..43d20b96d 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaTransport.cs @@ -12,10 +12,10 @@ public class KafkaTransport : BrokerTransport public Cache Topics { get; } public ProducerConfig ProducerConfig { get; } = new(); - public Action> ConfigureProducerBuilders { get; internal set; } = _ => {}; + public Action> ConfigureProducerBuilders { get; internal set; } = _ => {}; public ConsumerConfig ConsumerConfig { get; } = new(); - public Action> ConfigureConsumerBuilders { get; internal set; } = _ => {}; + public Action> ConfigureConsumerBuilders { get; internal set; } = _ => {}; public AdminClientConfig AdminClientConfig { get; } = new(); public Action ConfigureAdminClientBuilders { get; internal set; } = _ => {}; @@ -76,16 +76,16 @@ public override IEnumerable DiagnosticColumns() yield break; } - internal IProducer CreateProducer(ProducerConfig? config) + internal IProducer CreateProducer(ProducerConfig? config) { - var producerBuilder = new ProducerBuilder(config ?? ProducerConfig); + var producerBuilder = new ProducerBuilder(config ?? ProducerConfig); ConfigureProducerBuilders(producerBuilder); return producerBuilder.Build(); } - internal IConsumer CreateConsumer(ConsumerConfig? config) + internal IConsumer CreateConsumer(ConsumerConfig? config) { - var consumerBuilder = new ConsumerBuilder(config ?? ConsumerConfig); + var consumerBuilder = new ConsumerBuilder(config ?? ConsumerConfig); ConfigureConsumerBuilders(consumerBuilder); return consumerBuilder.Build(); } diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs index b1e082d22..e783a7b00 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaSenderProtocol.cs @@ -7,7 +7,7 @@ namespace Wolverine.Kafka; public class KafkaSenderProtocol : ISenderProtocol, IDisposable { private readonly KafkaTopic _topic; - private readonly IProducer _producer; + private readonly IProducer _producer; public KafkaSenderProtocol(KafkaTopic topic) { diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index cd1a4dc63..8d665cc8a 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using Confluent.Kafka.Admin; using Microsoft.Extensions.Logging; +using System.Text; using Wolverine.Configuration; using Wolverine.Kafka.Internals; using Wolverine.Runtime; @@ -70,10 +71,10 @@ public async ValueTask CheckAsync() try { using var client = Parent.CreateProducer(ProducerConfig); - await client.ProduceAsync(TopicName, new Message + await client.ProduceAsync(TopicName, new Message { Key = "ping", - Value = "ping" + Value = Encoding.Default.GetBytes("ping") }); diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs index 485e92f07..c3172986e 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExpression.cs @@ -44,7 +44,7 @@ public KafkaTransportExpression ConfigureProducers(Action config /// /// /// - public KafkaTransportExpression ConfigureProducerBuilders(Action> configure) + public KafkaTransportExpression ConfigureProducerBuilders(Action> configure) { _transport.ConfigureProducerBuilders = configure; return this; @@ -67,7 +67,7 @@ public KafkaTransportExpression ConfigureConsumers(Action config /// /// /// - public KafkaTransportExpression ConfigureConsumerBuilders(Action> configure) + public KafkaTransportExpression ConfigureConsumerBuilders(Action> configure) { _transport.ConfigureConsumerBuilders = configure; return this; diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs index 71d07aaff..585153553 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs @@ -110,12 +110,12 @@ public static KafkaSubscriberConfiguration ToKafkaTopics(this IPublishToExpressi return new KafkaSubscriberConfiguration(topic); } - internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string topicName, Message message) + internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string topicName, Message message) { var envelope = new Envelope { PartitionKey = message.Key, - Data = Encoding.Default.GetBytes(message.Value), + Data = message.Value, TopicName = topicName }; @@ -126,12 +126,12 @@ internal static Envelope CreateEnvelope(this IKafkaEnvelopeMapper mapper, string return envelope; } - internal static Message CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope) + internal static Message CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope) { - var message = new Message + var message = new Message { Key = !string.IsNullOrEmpty(envelope.PartitionKey) ? envelope.PartitionKey : envelope.Id.ToString(), - Value = Encoding.Default.GetString(envelope.Data), + Value = envelope.Data, Headers = new Headers() };