Skip to content

Commit 5e75a0b

Browse files
authored
fix: handle gaps in offsets (#2252)
1 parent 483c2a0 commit 5e75a0b

File tree

2 files changed

+161
-64
lines changed

2 files changed

+161
-64
lines changed

src/containers/Tenant/Diagnostics/TopicData/__test__/getData.test.ts

Lines changed: 132 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,6 @@ import {prepareResponse} from '../getData';
33
import {TOPIC_DATA_FETCH_LIMIT} from '../utils/constants';
44

55
describe('prepareResponse', () => {
6-
// Test case 1: Normal case with no removed messages
7-
test('should handle case with no removed messages', () => {
8-
const response: TopicDataResponse = {
9-
StartOffset: '100',
10-
EndOffset: '120',
11-
Messages: [{Offset: '100'}, {Offset: '101'}, {Offset: '102'}] as TopicMessage[],
12-
};
13-
14-
const result = prepareResponse(response, 100);
15-
16-
expect(result.start).toBe(100);
17-
expect(result.end).toBe(120);
18-
expect(result.messages.length).toBe(3);
19-
expect(result.messages[0]).toEqual({Offset: '100'});
20-
expect(result.messages[1]).toEqual({Offset: '101'});
21-
expect(result.messages[2]).toEqual({Offset: '102'});
22-
});
23-
24-
// Test case 2: Case with some removed messages
256
test('should handle case with some removed messages', () => {
267
const response: TopicDataResponse = {
278
StartOffset: '105',
@@ -33,22 +14,20 @@ describe('prepareResponse', () => {
3314

3415
expect(result.start).toBe(105);
3516
expect(result.end).toBe(120);
36-
expect(result.messages.length).toBe(8); // 5 removed + 3 actual
17+
expect(result.messages.length).toBe(20);
3718

38-
// Check removed messages
39-
expect(result.messages[0]).toEqual({Offset: '100', removed: true});
40-
expect(result.messages[1]).toEqual({Offset: '101', removed: true});
41-
expect(result.messages[2]).toEqual({Offset: '102', removed: true});
42-
expect(result.messages[3]).toEqual({Offset: '103', removed: true});
43-
expect(result.messages[4]).toEqual({Offset: '104', removed: true});
44-
45-
// Check actual messages
19+
expect(result.messages[0]).toEqual({Offset: 100, removed: true});
20+
expect(result.messages[1]).toEqual({Offset: 101, removed: true});
21+
expect(result.messages[2]).toEqual({Offset: 102, removed: true});
22+
expect(result.messages[3]).toEqual({Offset: 103, removed: true});
23+
expect(result.messages[4]).toEqual({Offset: 104, removed: true});
4624
expect(result.messages[5]).toEqual({Offset: '105'});
4725
expect(result.messages[6]).toEqual({Offset: '106'});
4826
expect(result.messages[7]).toEqual({Offset: '107'});
27+
expect(result.messages[8]).toEqual({Offset: 108, removed: true});
28+
expect(result.messages[19]).toEqual({Offset: 119, removed: true});
4929
});
5030

51-
// Test case 3: Case with more removed messages than the limit
5231
test('should handle case with more removed messages than the limit', () => {
5332
const response: TopicDataResponse = {
5433
StartOffset: '150',
@@ -64,11 +43,10 @@ describe('prepareResponse', () => {
6443

6544
// All messages should be "removed" placeholders since there are more than the limit
6645
for (let i = 0; i < TOPIC_DATA_FETCH_LIMIT; i++) {
67-
expect(result.messages[i]).toEqual({Offset: `${100 + i}`, removed: true});
46+
expect(result.messages[i]).toEqual({Offset: 100 + i, removed: true});
6847
}
6948
});
7049

71-
// Test case 4: Case with non-numeric offsets
7250
test('should handle case with non-numeric offsets', () => {
7351
const response: TopicDataResponse = {
7452
StartOffset: 'not-a-number',
@@ -82,33 +60,32 @@ describe('prepareResponse', () => {
8260
expect(result.start).toBe(0);
8361
expect(result.end).toBe(0);
8462

85-
// Since start (0) < offset (100), removedMessagesCount is negative
86-
// No removed messages should be added
87-
expect(result.messages.length).toBe(2);
88-
expect(result.messages[0]).toEqual({Offset: '100'});
89-
expect(result.messages[1]).toEqual({Offset: '101'});
63+
// Since end (0) <= offset (100), no messages should be processed
64+
expect(result.messages.length).toBe(0);
9065
});
9166

92-
// Test case 5: Case with empty Messages array
9367
test('should handle case with empty Messages array', () => {
9468
const response: TopicDataResponse = {
9569
StartOffset: '100',
96-
EndOffset: '100',
70+
EndOffset: '120',
9771
Messages: [],
9872
};
9973

10074
const result = prepareResponse(response, 100);
10175

10276
expect(result.start).toBe(100);
103-
expect(result.end).toBe(100);
104-
expect(result.messages.length).toBe(0);
77+
expect(result.end).toBe(120);
78+
// Should have placeholders for all offsets in range
79+
expect(result.messages.length).toBe(TOPIC_DATA_FETCH_LIMIT);
80+
// All should be marked as removed
81+
expect(result.messages[0]).toEqual({Offset: 100, removed: true});
82+
expect(result.messages[19]).toEqual({Offset: 119, removed: true});
10583
});
10684

107-
// Test case 6: Case with more messages than the limit
10885
test('should handle case with more messages than the limit', () => {
10986
// Create an array of 30 messages (more than TOPIC_DATA_FETCH_LIMIT)
11087
const messages: TopicMessage[] = [];
111-
for (let i = 0; i < TOPIC_DATA_FETCH_LIMIT + 1; i++) {
88+
for (let i = 0; i < TOPIC_DATA_FETCH_LIMIT + 10; i++) {
11289
messages.push({Offset: `${100 + i}`} as TopicMessage);
11390
}
11491

@@ -132,7 +109,6 @@ describe('prepareResponse', () => {
132109
expect(result.messages[2]).toEqual({Offset: '102'});
133110
});
134111

135-
// Test case 7: Case with both removed messages and actual messages within limit
136112
test('should handle case with both removed and actual messages within limit', () => {
137113
const response: TopicDataResponse = {
138114
StartOffset: '110',
@@ -147,5 +123,118 @@ describe('prepareResponse', () => {
147123

148124
// 10 removed + 10 actual = 20 (TOPIC_DATA_FETCH_LIMIT)
149125
expect(result.messages.length).toBe(TOPIC_DATA_FETCH_LIMIT);
126+
127+
// Check first 10 messages are marked as removed
128+
for (let i = 0; i < 10; i++) {
129+
expect(result.messages[i]).toEqual({Offset: 100 + i, removed: true});
130+
}
131+
132+
// Check next 10 messages are actual messages
133+
for (let i = 0; i < 10; i++) {
134+
expect(result.messages[i + 10]).toEqual({Offset: `${110 + i}`});
135+
}
136+
});
137+
138+
test('should handle case with gaps in message offsets', () => {
139+
const response: TopicDataResponse = {
140+
StartOffset: '100',
141+
EndOffset: '110',
142+
Messages: [
143+
{Offset: '100'} as TopicMessage,
144+
{Offset: '102'} as TopicMessage,
145+
{Offset: '105'} as TopicMessage,
146+
{Offset: '109'} as TopicMessage,
147+
],
148+
};
149+
150+
const result = prepareResponse(response, 100);
151+
152+
expect(result.start).toBe(100);
153+
expect(result.end).toBe(110);
154+
expect(result.messages.length).toBe(10);
155+
156+
// Check actual messages
157+
expect(result.messages[0]).toEqual({Offset: '100'});
158+
expect(result.messages[2]).toEqual({Offset: '102'});
159+
expect(result.messages[5]).toEqual({Offset: '105'});
160+
expect(result.messages[9]).toEqual({Offset: '109'});
161+
162+
// Check removed messages (gaps)
163+
expect(result.messages[1]).toEqual({Offset: 101, removed: true});
164+
expect(result.messages[3]).toEqual({Offset: 103, removed: true});
165+
expect(result.messages[4]).toEqual({Offset: 104, removed: true});
166+
expect(result.messages[6]).toEqual({Offset: 106, removed: true});
167+
expect(result.messages[7]).toEqual({Offset: 107, removed: true});
168+
expect(result.messages[8]).toEqual({Offset: 108, removed: true});
169+
});
170+
171+
test('should handle case with offset greater than EndOffset', () => {
172+
const response: TopicDataResponse = {
173+
StartOffset: '100',
174+
EndOffset: '110',
175+
Messages: [{Offset: '100'}, {Offset: '101'}, {Offset: '102'}] as TopicMessage[],
176+
};
177+
178+
const result = prepareResponse(response, 115);
179+
180+
expect(result.start).toBe(100);
181+
expect(result.end).toBe(110);
182+
// Since offset > end, no messages should be processed
183+
expect(result.messages.length).toBe(0);
184+
});
185+
186+
test('should handle case with offset equal to EndOffset', () => {
187+
const response: TopicDataResponse = {
188+
StartOffset: '100',
189+
EndOffset: '110',
190+
Messages: [{Offset: '100'}, {Offset: '101'}, {Offset: '102'}] as TopicMessage[],
191+
};
192+
193+
const result = prepareResponse(response, 110);
194+
195+
expect(result.start).toBe(100);
196+
expect(result.end).toBe(110);
197+
// Since offset = end, no messages should be processed
198+
expect(result.messages.length).toBe(0);
199+
});
200+
201+
test('should handle case with undefined Messages', () => {
202+
const response: TopicDataResponse = {
203+
StartOffset: '100',
204+
EndOffset: '110',
205+
// Messages is undefined
206+
};
207+
208+
const result = prepareResponse(response, 100);
209+
210+
expect(result.start).toBe(100);
211+
expect(result.end).toBe(110);
212+
// Should have placeholders for all offsets in range
213+
expect(result.messages.length).toBe(10);
214+
// All should be marked as removed
215+
for (let i = 0; i < 10; i++) {
216+
expect(result.messages[i]).toEqual({Offset: 100 + i, removed: true});
217+
}
218+
});
219+
220+
test('should handle zero StartOffset and EndOffset', () => {
221+
const response: TopicDataResponse = {
222+
StartOffset: '0',
223+
EndOffset: '10',
224+
Messages: [{Offset: '0'}, {Offset: '1'}, {Offset: '2'}] as TopicMessage[],
225+
};
226+
227+
const result = prepareResponse(response, 0);
228+
229+
expect(result.start).toBe(0);
230+
expect(result.end).toBe(10);
231+
// Since offset equals start, only actual messages should be returned
232+
expect(result.messages.length).toBe(10);
233+
// Check if the function correctly handles zero offsets
234+
expect(result.messages[0]).toEqual({Offset: '0'});
235+
expect(result.messages[1]).toEqual({Offset: '1'});
236+
expect(result.messages[2]).toEqual({Offset: '2'});
237+
expect(result.messages[3]).toEqual({Offset: 3, removed: true});
238+
expect(result.messages[9]).toEqual({Offset: 9, removed: true});
150239
});
151240
});

src/containers/Tenant/Diagnostics/TopicData/getData.ts

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {isNil} from 'lodash';
22

33
import type {FetchData} from '../../../../components/PaginatedTable';
4+
import {TOPIC_MESSAGE_SIZE_LIMIT} from '../../../../store/reducers/topic';
45
import type {
56
TopicDataRequest,
67
TopicDataResponse,
@@ -26,28 +27,34 @@ export function prepareResponse(response: TopicDataResponse, offset: number) {
2627
const start = safeParseNumber(StartOffset);
2728
const end = safeParseNumber(EndOffset);
2829

29-
const removedMessagesCount = start - offset;
30-
31-
const result: TopicMessageEnhanced[] = [];
32-
for (let i = 0; i < Math.min(TOPIC_DATA_FETCH_LIMIT, removedMessagesCount); i++) {
33-
result.push({
34-
Offset: String(offset + i),
35-
removed: true,
36-
});
37-
}
38-
for (
39-
let i = 0;
40-
i <
41-
Math.min(
42-
TOPIC_DATA_FETCH_LIMIT,
43-
TOPIC_DATA_FETCH_LIMIT - removedMessagesCount,
44-
Messages.length,
45-
);
46-
i++
47-
) {
48-
result.push(Messages[i]);
30+
const normalizedMessages: TopicMessageEnhanced[] = [];
31+
32+
const limit = Math.min(TOPIC_DATA_FETCH_LIMIT, Math.max(end - offset, 0));
33+
let i = 0;
34+
let j = 0;
35+
while (j < limit) {
36+
const currentOffset = offset + j;
37+
const currentMessage = Messages[i];
38+
if (currentOffset < start) {
39+
normalizedMessages.push({
40+
Offset: currentOffset,
41+
removed: true,
42+
});
43+
} else if (
44+
!isNil(currentMessage?.Offset) &&
45+
String(currentMessage.Offset) === String(currentOffset)
46+
) {
47+
normalizedMessages.push(currentMessage);
48+
i++;
49+
} else {
50+
normalizedMessages.push({
51+
Offset: currentOffset,
52+
removed: true,
53+
});
54+
}
55+
j++;
4956
}
50-
return {start, end, messages: result};
57+
return {start, end, messages: normalizedMessages};
5158
}
5259

5360
export const generateTopicDataGetter = ({
@@ -77,6 +84,7 @@ export const generateTopicDataGetter = ({
7784
partition,
7885
limit,
7986
last_offset: normalizedOffset + limit,
87+
message_size_limit: TOPIC_MESSAGE_SIZE_LIMIT,
8088
};
8189
queryParams.offset = normalizedOffset;
8290

0 commit comments

Comments
 (0)