Skip to content

Commit f2d9e08

Browse files
author
Boris Granveaud
authored
Implementation of ListPartitionReassignments API (#1203)
* implemented ListPartitionReassignments API. * fix nullable * fix lint * fix tag.
1 parent 5b97cf9 commit f2d9e08

File tree

4 files changed

+296
-0
lines changed

4 files changed

+296
-0
lines changed

listpartitionreassignments.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"net"
6+
"time"
7+
8+
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
9+
)
10+
11+
// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.
12+
type ListPartitionReassignmentsRequest struct {
13+
// Address of the kafka broker to send the request to.
14+
Addr net.Addr
15+
16+
// Topics we want reassignments for, mapped by their name, or nil to list everything.
17+
Topics map[string]ListPartitionReassignmentsRequestTopic
18+
19+
// Timeout is the amount of time to wait for the request to complete.
20+
Timeout time.Duration
21+
}
22+
23+
// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single
24+
// topic.
25+
type ListPartitionReassignmentsRequestTopic struct {
26+
// The partitions to list partition reassignments for.
27+
PartitionIndexes []int
28+
}
29+
30+
// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.
31+
type ListPartitionReassignmentsResponse struct {
32+
// Error is set to a non-nil value including the code and message if a top-level
33+
// error was encountered.
34+
Error error
35+
36+
// Topics contains results for each topic, mapped by their name.
37+
Topics map[string]ListPartitionReassignmentsResponseTopic
38+
}
39+
40+
// ListPartitionReassignmentsResponseTopic contains the detailed result of
41+
// ongoing reassignments for a topic.
42+
type ListPartitionReassignmentsResponseTopic struct {
43+
// Partitions contains result for topic partitions.
44+
Partitions []ListPartitionReassignmentsResponsePartition
45+
}
46+
47+
// ListPartitionReassignmentsResponsePartition contains the detailed result of
48+
// ongoing reassignments for a single partition.
49+
type ListPartitionReassignmentsResponsePartition struct {
50+
// PartitionIndex contains index of the partition.
51+
PartitionIndex int
52+
53+
// Replicas contains the current replica set.
54+
Replicas []int
55+
56+
// AddingReplicas contains the set of replicas we are currently adding.
57+
AddingReplicas []int
58+
59+
// RemovingReplicas contains the set of replicas we are currently removing.
60+
RemovingReplicas []int
61+
}
62+
63+
func (c *Client) ListPartitionReassignments(
64+
ctx context.Context,
65+
req *ListPartitionReassignmentsRequest,
66+
) (*ListPartitionReassignmentsResponse, error) {
67+
apiReq := &listpartitionreassignments.Request{
68+
TimeoutMs: int32(req.Timeout.Milliseconds()),
69+
}
70+
71+
for topicName, topicReq := range req.Topics {
72+
apiReq.Topics = append(
73+
apiReq.Topics,
74+
listpartitionreassignments.RequestTopic{
75+
Name: topicName,
76+
PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes),
77+
},
78+
)
79+
}
80+
81+
protoResp, err := c.roundTrip(
82+
ctx,
83+
req.Addr,
84+
apiReq,
85+
)
86+
if err != nil {
87+
return nil, err
88+
}
89+
apiResp := protoResp.(*listpartitionreassignments.Response)
90+
91+
resp := &ListPartitionReassignmentsResponse{
92+
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
93+
Topics: make(map[string]ListPartitionReassignmentsResponseTopic),
94+
}
95+
96+
for _, topicResult := range apiResp.Topics {
97+
respTopic := ListPartitionReassignmentsResponseTopic{}
98+
for _, partitionResult := range topicResult.Partitions {
99+
respTopic.Partitions = append(
100+
respTopic.Partitions,
101+
ListPartitionReassignmentsResponsePartition{
102+
PartitionIndex: int(partitionResult.PartitionIndex),
103+
Replicas: int32ToIntArray(partitionResult.Replicas),
104+
AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas),
105+
RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas),
106+
},
107+
)
108+
}
109+
resp.Topics[topicResult.Name] = respTopic
110+
}
111+
112+
return resp, nil
113+
}
114+
115+
func intToInt32Array(arr []int) []int32 {
116+
if arr == nil {
117+
return nil
118+
}
119+
res := make([]int32, len(arr))
120+
for i := range arr {
121+
res[i] = int32(arr[i])
122+
}
123+
return res
124+
}
125+
126+
func int32ToIntArray(arr []int32) []int {
127+
if arr == nil {
128+
return nil
129+
}
130+
res := make([]int, len(arr))
131+
for i := range arr {
132+
res[i] = int(arr[i])
133+
}
134+
return res
135+
}

listpartitionreassignments_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
ktesting "github.com/segmentio/kafka-go/testing"
8+
)
9+
10+
func TestClientListPartitionReassignments(t *testing.T) {
11+
if !ktesting.KafkaIsAtLeast("2.4.0") {
12+
return
13+
}
14+
15+
ctx := context.Background()
16+
client, shutdown := newLocalClient()
17+
defer shutdown()
18+
19+
topic := makeTopic()
20+
createTopic(t, topic, 2)
21+
defer deleteTopic(t, topic)
22+
23+
// Can't really get an ongoing partition reassignment with local Kafka, so just do a superficial test here.
24+
resp, err := client.ListPartitionReassignments(
25+
ctx,
26+
&ListPartitionReassignmentsRequest{
27+
Topics: map[string]ListPartitionReassignmentsRequestTopic{
28+
topic: {PartitionIndexes: []int{0, 1}},
29+
},
30+
},
31+
)
32+
33+
if err != nil {
34+
t.Fatal(err)
35+
}
36+
if resp.Error != nil {
37+
t.Error(
38+
"Unexpected error in response",
39+
"expected", nil,
40+
"got", resp.Error,
41+
)
42+
}
43+
if len(resp.Topics) != 0 {
44+
t.Error(
45+
"Unexpected length of topic results",
46+
"expected", 0,
47+
"got", len(resp.Topics),
48+
)
49+
}
50+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package listpartitionreassignments
2+
3+
import "github.com/segmentio/kafka-go/protocol"
4+
5+
func init() {
6+
protocol.Register(&Request{}, &Response{})
7+
}
8+
9+
// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListPartitionReassignments.
10+
11+
type Request struct {
12+
// We need at least one tagged field to indicate that this is a "flexible" message
13+
// type.
14+
_ struct{} `kafka:"min=v0,max=v0,tag"`
15+
16+
TimeoutMs int32 `kafka:"min=v0,max=v0"`
17+
Topics []RequestTopic `kafka:"min=v0,max=v0,nullable"`
18+
}
19+
20+
type RequestTopic struct {
21+
// We need at least one tagged field to indicate that this is a "flexible" message
22+
// type.
23+
_ struct{} `kafka:"min=v0,max=v0,tag"`
24+
25+
Name string `kafka:"min=v0,max=v0"`
26+
PartitionIndexes []int32 `kafka:"min=v0,max=v0"`
27+
}
28+
29+
func (r *Request) ApiKey() protocol.ApiKey {
30+
return protocol.ListPartitionReassignments
31+
}
32+
33+
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
34+
return cluster.Brokers[cluster.Controller], nil
35+
}
36+
37+
type Response struct {
38+
// We need at least one tagged field to indicate that this is a "flexible" message
39+
// type.
40+
_ struct{} `kafka:"min=v0,max=v0,tag"`
41+
42+
ThrottleTimeMs int32 `kafka:"min=v0,max=v0"`
43+
ErrorCode int16 `kafka:"min=v0,max=v0"`
44+
ErrorMessage string `kafka:"min=v0,max=v0,nullable"`
45+
Topics []ResponseTopic `kafka:"min=v0,max=v0"`
46+
}
47+
48+
type ResponseTopic struct {
49+
// We need at least one tagged field to indicate that this is a "flexible" message
50+
// type.
51+
_ struct{} `kafka:"min=v0,max=v0,tag"`
52+
53+
Name string `kafka:"min=v0,max=v0"`
54+
Partitions []ResponsePartition `kafka:"min=v0,max=v0"`
55+
}
56+
57+
type ResponsePartition struct {
58+
// We need at least one tagged field to indicate that this is a "flexible" message
59+
// type.
60+
_ struct{} `kafka:"min=v0,max=v0,tag"`
61+
62+
PartitionIndex int32 `kafka:"min=v0,max=v0"`
63+
Replicas []int32 `kafka:"min=v0,max=v0"`
64+
AddingReplicas []int32 `kafka:"min=v0,max=v0"`
65+
RemovingReplicas []int32 `kafka:"min=v0,max=v0"`
66+
}
67+
68+
func (r *Response) ApiKey() protocol.ApiKey {
69+
return protocol.ListPartitionReassignments
70+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package listpartitionreassignments_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
7+
"github.com/segmentio/kafka-go/protocol/prototest"
8+
)
9+
10+
const (
11+
v0 = 0
12+
)
13+
14+
func TestListPartitionReassignmentsRequest(t *testing.T) {
15+
prototest.TestRequest(t, v0, &listpartitionreassignments.Request{
16+
Topics: []listpartitionreassignments.RequestTopic{
17+
{
18+
Name: "topic-1",
19+
PartitionIndexes: []int32{1, 2, 3},
20+
},
21+
},
22+
})
23+
}
24+
25+
func TestListPartitionReassignmentsResponse(t *testing.T) {
26+
prototest.TestResponse(t, v0, &listpartitionreassignments.Response{
27+
Topics: []listpartitionreassignments.ResponseTopic{
28+
{
29+
Name: "topic-1",
30+
Partitions: []listpartitionreassignments.ResponsePartition{
31+
{
32+
PartitionIndex: 1,
33+
Replicas: []int32{1, 2, 3},
34+
AddingReplicas: []int32{4, 5, 6},
35+
RemovingReplicas: []int32{7, 8, 9},
36+
},
37+
},
38+
},
39+
},
40+
})
41+
}

0 commit comments

Comments
 (0)