Skip to content

Commit 5b97cf9

Browse files
author
Boris Granveaud
authored
Changed AlterPartitionReassignments request to support multiple topics (#1204)
* changed AlterPartitionReassignments request to support multiple topics * keep compatibility with existing code.
1 parent f48706e commit 5b97cf9

File tree

4 files changed

+147
-12
lines changed

4 files changed

+147
-12
lines changed

alterpartitionreassignments.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ type AlterPartitionReassignmentsRequest struct {
1313
// Address of the kafka broker to send the request to.
1414
Addr net.Addr
1515

16-
// Topic is the name of the topic to alter partitions in.
16+
// Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to
17+
// reassign to multiple topics.
1718
Topic string
1819

1920
// Assignments is the list of partition reassignments to submit to the API.
@@ -26,10 +27,13 @@ type AlterPartitionReassignmentsRequest struct {
2627
// AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
2728
// partition.
2829
type AlterPartitionReassignmentsRequestAssignment struct {
30+
// Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.
31+
Topic string
32+
2933
// PartitionID is the ID of the partition to make the reassignments in.
3034
PartitionID int
3135

32-
// BrokerIDs is a slice of brokers to set the partition replicas to.
36+
// BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.
3337
BrokerIDs []int
3438
}
3539

@@ -46,6 +50,9 @@ type AlterPartitionReassignmentsResponse struct {
4650
// AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
4751
// doing reassignments for a single partition.
4852
type AlterPartitionReassignmentsResponsePartitionResult struct {
53+
// Topic is the topic name.
54+
Topic string
55+
4956
// PartitionID is the ID of the partition that was altered.
5057
PartitionID int
5158

@@ -58,16 +65,29 @@ func (c *Client) AlterPartitionReassignments(
5865
ctx context.Context,
5966
req *AlterPartitionReassignmentsRequest,
6067
) (*AlterPartitionReassignmentsResponse, error) {
61-
apiPartitions := []alterpartitionreassignments.RequestPartition{}
68+
apiTopicMap := make(map[string]*alterpartitionreassignments.RequestTopic)
6269

6370
for _, assignment := range req.Assignments {
71+
topic := assignment.Topic
72+
if topic == "" {
73+
topic = req.Topic
74+
}
75+
76+
apiTopic := apiTopicMap[topic]
77+
if apiTopic == nil {
78+
apiTopic = &alterpartitionreassignments.RequestTopic{
79+
Name: topic,
80+
}
81+
apiTopicMap[topic] = apiTopic
82+
}
83+
6484
replicas := []int32{}
6585
for _, brokerID := range assignment.BrokerIDs {
6686
replicas = append(replicas, int32(brokerID))
6787
}
6888

69-
apiPartitions = append(
70-
apiPartitions,
89+
apiTopic.Partitions = append(
90+
apiTopic.Partitions,
7191
alterpartitionreassignments.RequestPartition{
7292
PartitionIndex: int32(assignment.PartitionID),
7393
Replicas: replicas,
@@ -77,12 +97,10 @@ func (c *Client) AlterPartitionReassignments(
7797

7898
apiReq := &alterpartitionreassignments.Request{
7999
TimeoutMs: int32(req.Timeout.Milliseconds()),
80-
Topics: []alterpartitionreassignments.RequestTopic{
81-
{
82-
Name: req.Topic,
83-
Partitions: apiPartitions,
84-
},
85-
},
100+
}
101+
102+
for _, apiTopic := range apiTopicMap {
103+
apiReq.Topics = append(apiReq.Topics, *apiTopic)
86104
}
87105

88106
protoResp, err := c.roundTrip(
@@ -104,6 +122,7 @@ func (c *Client) AlterPartitionReassignments(
104122
resp.PartitionResults = append(
105123
resp.PartitionResults,
106124
AlterPartitionReassignmentsResponsePartitionResult{
125+
Topic: topicResult.Name,
107126
PartitionID: int(partitionResult.PartitionIndex),
108127
Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
109128
},

alterpartitionreassignments_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,64 @@ func TestClientAlterPartitionReassignments(t *testing.T) {
5656
)
5757
}
5858
}
59+
60+
func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) {
61+
if !ktesting.KafkaIsAtLeast("2.4.0") {
62+
return
63+
}
64+
65+
ctx := context.Background()
66+
client, shutdown := newLocalClient()
67+
defer shutdown()
68+
69+
topic1 := makeTopic()
70+
topic2 := makeTopic()
71+
createTopic(t, topic1, 2)
72+
createTopic(t, topic2, 2)
73+
defer func() {
74+
deleteTopic(t, topic1)
75+
deleteTopic(t, topic2)
76+
}()
77+
78+
// Local kafka only has 1 broker, so any partition reassignments are really no-ops.
79+
resp, err := client.AlterPartitionReassignments(
80+
ctx,
81+
&AlterPartitionReassignmentsRequest{
82+
Assignments: []AlterPartitionReassignmentsRequestAssignment{
83+
{
84+
Topic: topic1,
85+
PartitionID: 0,
86+
BrokerIDs: []int{1},
87+
},
88+
{
89+
Topic: topic1,
90+
PartitionID: 1,
91+
BrokerIDs: []int{1},
92+
},
93+
{
94+
Topic: topic2,
95+
PartitionID: 0,
96+
BrokerIDs: []int{1},
97+
},
98+
},
99+
},
100+
)
101+
102+
if err != nil {
103+
t.Fatal(err)
104+
}
105+
if resp.Error != nil {
106+
t.Error(
107+
"Unexpected error in response",
108+
"expected", nil,
109+
"got", resp.Error,
110+
)
111+
}
112+
if len(resp.PartitionResults) != 3 {
113+
t.Error(
114+
"Unexpected length of partition results",
115+
"expected", 3,
116+
"got", len(resp.PartitionResults),
117+
)
118+
}
119+
}

protocol/alterpartitionreassignments/alterpartitionreassignments.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type RequestTopic struct {
2323

2424
type RequestPartition struct {
2525
PartitionIndex int32 `kafka:"min=v0,max=v0"`
26-
Replicas []int32 `kafka:"min=v0,max=v0"`
26+
Replicas []int32 `kafka:"min=v0,max=v0,nullable"`
2727
}
2828

2929
func (r *Request) ApiKey() protocol.ApiKey {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package alterpartitionreassignments_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
7+
"github.com/segmentio/kafka-go/protocol/prototest"
8+
)
9+
10+
const (
11+
v0 = 0
12+
)
13+
14+
func TestAlterPartitionReassignmentsRequest(t *testing.T) {
15+
prototest.TestRequest(t, v0, &alterpartitionreassignments.Request{
16+
TimeoutMs: 1,
17+
Topics: []alterpartitionreassignments.RequestTopic{
18+
{
19+
Name: "topic-1",
20+
Partitions: []alterpartitionreassignments.RequestPartition{
21+
{
22+
PartitionIndex: 1,
23+
Replicas: []int32{1, 2, 3},
24+
},
25+
{
26+
PartitionIndex: 2,
27+
},
28+
},
29+
},
30+
},
31+
})
32+
}
33+
34+
func TestAlterPartitionReassignmentsResponse(t *testing.T) {
35+
prototest.TestResponse(t, v0, &alterpartitionreassignments.Response{
36+
ErrorCode: 1,
37+
ErrorMessage: "error",
38+
ThrottleTimeMs: 1,
39+
Results: []alterpartitionreassignments.ResponseResult{
40+
{
41+
Name: "topic-1",
42+
Partitions: []alterpartitionreassignments.ResponsePartition{
43+
{
44+
PartitionIndex: 1,
45+
ErrorMessage: "error",
46+
ErrorCode: 1,
47+
},
48+
{
49+
PartitionIndex: 2,
50+
},
51+
},
52+
},
53+
},
54+
})
55+
}

0 commit comments

Comments
 (0)