Skip to content

Commit 82ad81e

Browse files
committed
Fix minor actions
1 parent ad3c022 commit 82ad81e

File tree

6 files changed

+53
-47
lines changed

6 files changed

+53
-47
lines changed

lib/kafka_ex.ex

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ defmodule KafkaEx do
133133
def describe_group(consumer_group_name, opts \\ []) do
134134
worker_name = Keyword.get(opts, :worker_name, Config.default_worker())
135135

136-
case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do
136+
case Server.call(worker_name, {:describe_groups, [consumer_group_name], opts}) do
137137
{:ok, [group]} -> {:ok, group}
138138
{:error, error} -> {:error, error}
139139
end
@@ -227,8 +227,9 @@ defmodule KafkaEx do
227227
"""
228228
@spec latest_offset(binary, integer, atom | pid) ::
229229
[OffsetResponse.t()] | :topic_not_found
230-
def latest_offset(topic, partition, name \\ Config.default_worker()),
231-
do: offset(topic, partition, :latest, name)
230+
def latest_offset(topic, partition, name \\ Config.default_worker()) do
231+
offset(topic, partition, :latest, name)
232+
end
232233

233234
@doc """
234235
Get the offset of the earliest message still persistent in Kafka
@@ -240,10 +241,10 @@ defmodule KafkaEx do
240241
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [0], partition: 0}], topic: "foo"}]
241242
```
242243
"""
243-
@spec earliest_offset(binary, integer, atom | pid) ::
244-
[OffsetResponse.t()] | :topic_not_found
245-
def earliest_offset(topic, partition, name \\ Config.default_worker()),
246-
do: offset(topic, partition, :earliest, name)
244+
@spec earliest_offset(binary, integer, atom | pid) :: [OffsetResponse.t()] | :topic_not_found
245+
def earliest_offset(topic, partition, name \\ Config.default_worker()) do
246+
offset(topic, partition, :earliest, name)
247+
end
247248

248249
@doc """
249250
Get the offset of the message sent at the specified date/time
@@ -255,14 +256,14 @@ defmodule KafkaEx do
255256
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: 0, offset: [256], partition: 0}], topic: "foo"}]
256257
```
257258
"""
258-
@spec offset(
259-
binary,
260-
number,
261-
:calendar.datetime() | :earliest | :latest,
262-
atom | pid
263-
) :: [OffsetResponse.t()] | :topic_not_found
259+
@type valid_timestamp :: :earliest | :latest | :calendar.datetime()
260+
@spec offset(binary, number, valid_timestamp, atom | pid) :: [OffsetResponse.t()] | :topic_not_found
264261
def offset(topic, partition, time, name \\ Config.default_worker()) do
265-
Server.call(name, {:offset, topic, partition, time})
262+
case Server.call(name, {:offset, topic, partition, time}) do
263+
{:ok, response} -> parse_offset_value(response)
264+
{:error, :topic_not_found} -> :topic_not_found
265+
result -> result
266+
end
266267
end
267268

268269
@wait_time 10
@@ -812,4 +813,25 @@ defmodule KafkaEx do
812813
end
813814
end
814815
end
816+
817+
# -------------------------------------------------------------------
818+
# Backwards compatibility
819+
# -------------------------------------------------------------------
820+
defp parse_offset_value(
821+
[%KafkaEx.New.Structs.Offset{topic: topic, partition_offsets: partition_offsets} | _] = offsets
822+
) do
823+
Enum.map(offsets, fn offset ->
824+
%OffsetResponse{
825+
topic: offset.topic,
826+
partition_offsets:
827+
Enum.map(offset.partition_offsets, fn value ->
828+
%{
829+
partition: value.partition,
830+
error_code: value.error_code,
831+
offset: [value.offset]
832+
}
833+
end)
834+
}
835+
end)
836+
end
815837
end

lib/kafka_ex/new/protocols/kayrock/list_offsets/v0_response_impl.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff
1313
end
1414

1515
defp build_offset(topic, %{partition: partition, error_code: 0, offsets: []}) do
16-
data = %{partition: partition, offset: 0}
16+
data = %{partition: partition, offset: 0, error_code: :no_error}
1717
{:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])}
1818
end
1919

2020
defp build_offset(topic, %{partition: partition, error_code: 0, offsets: [offset | _]}) do
21-
data = %{partition: partition, offset: offset}
21+
data = %{partition: partition, offset: offset, error_code: :no_error}
2222
{:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])}
2323
end
2424

lib/kafka_ex/new/protocols/kayrock/list_offsets/v1_response_impl.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff
1313
end
1414

1515
defp build_offset(topic, %{error_code: 0, partition: p, offset: o}) do
16-
data = %{partition: p, offset: o}
16+
data = %{partition: p, offset: o, error_code: :no_error}
1717
{:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])}
1818
end
1919

lib/kafka_ex/new/protocols/kayrock/list_offsets/v2_response_impl.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ defimpl KafkaEx.New.Protocols.Kayrock.ListOffsets.Response, for: Kayrock.ListOff
1313
end
1414

1515
defp build_offset(topic, %{error_code: 0, partition: p, offset: o, timestamp: t}) do
16-
data = %{partition: p, offset: o, timestamp: t}
16+
data = %{partition: p, offset: o, timestamp: t, error_code: :no_error}
1717
{:ok, KafkaEx.New.Structs.Offset.from_list_offset(topic, [data])}
1818
end
1919

lib/kafka_ex/new/structs/offset/partition_offset.ex

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ defmodule KafkaEx.New.Structs.Offset.PartitionOffset do
22
@moduledoc """
33
This module represents Offset value for a specific partition
44
"""
5-
defstruct [:partition, :offset, :timestamp]
5+
defstruct [:partition, :offset, :error_code, :timestamp]
66

77
@type partition :: KafkaEx.Types.partition()
88
@type offset :: KafkaEx.Types.offset()
99
@type timestamp :: KafkaEx.Types.timestamp()
10+
@type error_code :: KafkaEx.Types.error_code()
1011

1112
@type partition_response :: %{
1213
required(:partition) => partition,
14+
required(:error_code) => error_code,
1315
required(:offset) => offset,
1416
optional(:timestamp) => timestamp
1517
}
@@ -24,12 +26,14 @@ defmodule KafkaEx.New.Structs.Offset.PartitionOffset do
2426
For backward compatibility with kafka_ex, we will replace this nil values with -1
2527
"""
2628
@spec build(partition_response) :: __MODULE__.t()
27-
def build(%{partition: p, offset: o, timestamp: t}), do: do_build(p, o, t)
28-
def build(%{partition: p, offset: o}), do: do_build(p, o, -1)
29+
def build(%{partition: p, offset: o, error_code: e, timestamp: t}), do: do_build(p, o, e, t)
30+
def build(%{partition: p, offset: o, error_code: e}), do: do_build(p, o, e, -1)
31+
def build(%{partition: p, offset: o}), do: do_build(p, o, :no_error, -1)
2932

30-
defp do_build(partition, offset, timestamp) do
33+
defp do_build(partition, offset, error_code, timestamp) do
3134
%__MODULE__{
3235
partition: partition,
36+
error_code: error_code,
3337
offset: offset,
3438
timestamp: timestamp
3539
}

test/integration/kayrock/compatibility_test.exs

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do
4242
{:ok, %{consumer_group: consumer_group, topic: topic}}
4343
end
4444

45-
test "with new client - returns group metadata", %{
46-
client: client,
47-
consumer_group: consumer_group,
48-
topic: topic
49-
} do
45+
test "with new client - returns group metadata", %{client: client, consumer_group: consumer_group, topic: topic} do
5046
join_to_group(client, topic, consumer_group)
5147

5248
{:ok, group_metadata} = KafkaExAPI.describe_group(client, consumer_group)
@@ -57,11 +53,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do
5753
assert length(group_metadata.members) == 1
5854
end
5955

60-
test "with old client - returns group metadata", %{
61-
client: client,
62-
consumer_group: consumer_group,
63-
topic: topic
64-
} do
56+
test "with old client - returns group metadata", %{client: client, consumer_group: consumer_group, topic: topic} do
6557
join_to_group(client, topic, consumer_group)
6658

6759
{:ok, group_metadata} = KafkaEx.describe_group(consumer_group, worker_name: client)
@@ -109,21 +101,11 @@ defmodule KafkaEx.KayrockCompatibilityTest do
109101
end
110102

111103
test "produce/4 without an acq required returns :ok", %{client: client} do
112-
assert KafkaEx.produce("food", 0, "hey",
113-
worker_name: client,
114-
required_acks: 0
115-
) == :ok
104+
assert KafkaEx.produce("food", 0, "hey", worker_name: client, required_acks: 0) == :ok
116105
end
117106

118107
test "produce/4 with ack required returns an ack", %{client: client} do
119-
{:ok, offset} =
120-
KafkaEx.produce(
121-
"food",
122-
0,
123-
"hey",
124-
worker_name: client,
125-
required_acks: 1
126-
)
108+
{:ok, offset} = KafkaEx.produce("food",0,"hey",worker_name: client,required_acks: 1)
127109

128110
assert is_integer(offset)
129111
refute offset == nil
@@ -306,9 +288,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do
306288
assert offset == 0
307289
end
308290

309-
test "latest_offset retrieves offset of 0 for non-existing topic", %{
310-
client: client
311-
} do
291+
test "latest_offset retrieves offset of 0 for non-existing topic", %{client: client} do
312292
random_string = KafkaEx.TestHelpers.generate_random_string()
313293

314294
{:ok, produce_offset} =

0 commit comments

Comments
 (0)