Skip to content

Commit 72fe0b8

Browse files
authored
Support schema evolution (#1226)
1 parent d0450e2 commit 72fe0b8

File tree

2 files changed

+102
-61
lines changed

2 files changed

+102
-61
lines changed

src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java

Lines changed: 58 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2525
import java.io.IOException;
2626
import java.nio.ByteBuffer;
27+
import java.util.Arrays;
2728
import java.util.Iterator;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.Executors;
3233
import java.util.concurrent.TimeUnit;
34+
import java.util.stream.Collectors;
3335
import lombok.extern.slf4j.Slf4j;
3436
import org.apache.pulsar.client.api.Schema;
3537
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -119,57 +121,65 @@ private void flush(Map<String, List<Record<GenericRecord>>> recordsToInsertByTop
119121
final long timeStampForPartitioning = System.currentTimeMillis();
120122
for (Map.Entry<String, List<Record<GenericRecord>>> entry : recordsToInsertByTopic.entrySet()) {
121123
String topicName = entry.getKey();
122-
List<Record<GenericRecord>> singleTopicRecordsToInsert = entry.getValue();
123-
Record<GenericRecord> firstRecord = singleTopicRecordsToInsert.get(0);
124-
Schema<GenericRecord> schema;
125-
try {
126-
schema = getPulsarSchema(firstRecord);
127-
} catch (Exception e) {
128-
log.error("Failed to retrieve message schema", e);
129-
bulkHandleFailedRecords(e, singleTopicRecordsToInsert);
130-
return;
131-
}
132-
133-
if (!format.doSupportPulsarSchemaType(schema.getSchemaInfo().getType())) {
134-
String errorMsg = "Sink does not support schema type of pulsar: " + schema.getSchemaInfo().getType();
135-
log.error(errorMsg);
136-
bulkHandleFailedRecords(new UnsupportedOperationException(errorMsg), singleTopicRecordsToInsert);
137-
return;
138-
}
124+
Map<String, List<Record<GenericRecord>>> schemaVersionMap = entry.getValue().stream()
125+
.collect(Collectors.groupingBy(r -> {
126+
byte[] schemaVersionBytes = r.getValue().getSchemaVersion();
127+
return schemaVersionBytes != null ? Arrays.toString(schemaVersionBytes) : "null";
128+
}
129+
));
130+
for (List<Record<GenericRecord>> singleTopicRecordsToInsert : schemaVersionMap.values()) {
131+
Record<GenericRecord> firstRecord = singleTopicRecordsToInsert.get(0);
132+
Schema<GenericRecord> schema;
133+
try {
134+
schema = getPulsarSchema(firstRecord);
135+
} catch (Exception e) {
136+
log.error("Failed to retrieve message schema", e);
137+
bulkHandleFailedRecords(e, singleTopicRecordsToInsert);
138+
return;
139+
}
139140

140-
String filepath = "";
141-
try {
142-
format.initSchema(schema);
143-
final Iterator<Record<GenericRecord>> iter = singleTopicRecordsToInsert.iterator();
144-
filepath = buildPartitionPath(firstRecord, partitioner, format, timeStampForPartitioning);
145-
ByteBuffer payload = bindValue(iter, format);
146-
int uploadSize = singleTopicRecordsToInsert.size();
147-
long uploadBytes = getBytesSum(singleTopicRecordsToInsert);
148-
log.info("Uploading blob {} from topic {} uploadSize:{} uploadBytes:{} currentBatchStatus:{}",
149-
filepath, topicName, uploadSize, uploadBytes, batchManager.getCurrentStatsStr());
150-
long elapsedMs = System.currentTimeMillis();
151-
blobWriter.uploadBlob(filepath, payload);
152-
elapsedMs = System.currentTimeMillis() - elapsedMs;
153-
log.debug("Uploading blob {} elapsed time in ms: {}", filepath, elapsedMs);
154-
singleTopicRecordsToInsert.forEach(Record::ack);
155-
if (sinkContext != null) {
156-
sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, singleTopicRecordsToInsert.size());
157-
sinkContext.recordMetric(METRICS_LATEST_UPLOAD_ELAPSED_TIME, elapsedMs);
141+
if (!format.doSupportPulsarSchemaType(schema.getSchemaInfo().getType())) {
142+
String errorMsg = "Sink does not support schema type of pulsar: "
143+
+ schema.getSchemaInfo().getType();
144+
log.error(errorMsg);
145+
bulkHandleFailedRecords(new UnsupportedOperationException(errorMsg), singleTopicRecordsToInsert);
146+
return;
158147
}
159-
log.info("Successfully uploaded blob {} from topic {} uploadSize {} uploadBytes {}",
160-
filepath, entry.getKey(),
161-
uploadSize, uploadBytes);
162-
} catch (Exception e) {
163-
if (e instanceof ContainerNotFoundException) {
164-
log.error("Blob {} is not found", filepath, e);
165-
} else if (e instanceof IOException) {
166-
log.error("Failed to write to blob {}", filepath, e);
167-
} else if (e instanceof UnsupportedOperationException || e instanceof IllegalArgumentException) {
168-
log.error("Failed to handle message schema {}", schema, e);
169-
} else {
170-
log.error("Encountered unknown error writing to blob {}", filepath, e);
148+
149+
String filepath = "";
150+
try {
151+
format.initSchema(schema);
152+
final Iterator<Record<GenericRecord>> iter = singleTopicRecordsToInsert.iterator();
153+
filepath = buildPartitionPath(firstRecord, partitioner, format, timeStampForPartitioning);
154+
ByteBuffer payload = bindValue(iter, format);
155+
int uploadSize = singleTopicRecordsToInsert.size();
156+
long uploadBytes = getBytesSum(singleTopicRecordsToInsert);
157+
log.info("Uploading blob {} from topic {} uploadSize:{} uploadBytes:{} currentBatchStatus:{}",
158+
filepath, topicName, uploadSize, uploadBytes, batchManager.getCurrentStatsStr());
159+
long elapsedMs = System.currentTimeMillis();
160+
blobWriter.uploadBlob(filepath, payload);
161+
elapsedMs = System.currentTimeMillis() - elapsedMs;
162+
log.debug("Uploading blob {} elapsed time in ms: {}", filepath, elapsedMs);
163+
singleTopicRecordsToInsert.forEach(Record::ack);
164+
if (sinkContext != null) {
165+
sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, singleTopicRecordsToInsert.size());
166+
sinkContext.recordMetric(METRICS_LATEST_UPLOAD_ELAPSED_TIME, elapsedMs);
167+
}
168+
log.info("Successfully uploaded blob {} from topic {} uploadSize {} uploadBytes {}",
169+
filepath, topicName,
170+
uploadSize, uploadBytes);
171+
} catch (Exception e) {
172+
if (e instanceof ContainerNotFoundException) {
173+
log.error("Blob {} is not found", filepath, e);
174+
} else if (e instanceof IOException) {
175+
log.error("Failed to write to blob {}", filepath, e);
176+
} else if (e instanceof UnsupportedOperationException || e instanceof IllegalArgumentException) {
177+
log.error("Failed to handle message schema {}", schema, e);
178+
} else {
179+
log.error("Encountered unknown error writing to blob {}", filepath, e);
180+
}
181+
bulkHandleFailedRecords(e, singleTopicRecordsToInsert);
171182
}
172-
bulkHandleFailedRecords(e, singleTopicRecordsToInsert);
173183
}
174184
}
175185
}

src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchBlendTest.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.awaitility.Awaitility.await;
2222
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.eq;
2324
import static org.mockito.Mockito.atLeast;
2425
import static org.mockito.Mockito.atLeastOnce;
2526
import static org.mockito.Mockito.doReturn;
@@ -70,7 +71,9 @@ public class CloudStorageSinkBatchBlendTest {
7071
private BlobWriter mockBlobWriter;
7172

7273
@Mock
73-
private Record<GenericRecord> mockRecord;
74+
private Record<GenericRecord> mockRecordSchemaV1;
75+
76+
private Record<GenericRecord> mockRecordSchemaV2;
7477

7578
private Map<String, Object> config;
7679

@@ -90,7 +93,8 @@ public void setup() throws Exception {
9093
this.sink = spy(new CloudStorageGenericRecordSink());
9194
this.mockSinkContext = mock(SinkContext.class);
9295
this.mockBlobWriter = mock(BlobWriter.class);
93-
this.mockRecord = mock(Record.class);
96+
this.mockRecordSchemaV1 = mock(Record.class);
97+
this.mockRecordSchemaV2 = mock(Record.class);
9498

9599
doReturn("a/test.json").when(sink)
96100
.buildPartitionPath(any(Record.class), any(Partitioner.class), any(Format.class), any(Long.class));
@@ -105,10 +109,17 @@ public void setup() throws Exception {
105109
GenericRecord genericRecord = spy(createTestRecord(schema));
106110
doReturn(new byte[]{0x1}).when(genericRecord).getSchemaVersion();
107111

108-
when(mockRecord.getTopicName()).thenReturn(Optional.of("test-topic"));
109-
when(mockRecord.getValue()).thenReturn(genericRecord);
110-
when(mockRecord.getSchema()).thenAnswer((Answer<Schema>) invocationOnMock -> schema);
111-
when(mockRecord.getMessage()).thenReturn(Optional.of(mockMessage));
112+
when(mockRecordSchemaV1.getTopicName()).thenReturn(Optional.of("test-topic"));
113+
when(mockRecordSchemaV1.getValue()).thenReturn(genericRecord);
114+
when(mockRecordSchemaV1.getSchema()).thenAnswer((Answer<Schema>) invocationOnMock -> schema);
115+
when(mockRecordSchemaV1.getMessage()).thenReturn(Optional.of(mockMessage));
116+
117+
GenericRecord genericRecord2 = spy(createTestRecord(schema));
118+
doReturn(new byte[]{0x2}).when(genericRecord2).getSchemaVersion();
119+
when(mockRecordSchemaV2.getTopicName()).thenReturn(Optional.of("test-topic"));
120+
when(mockRecordSchemaV2.getValue()).thenReturn(genericRecord2);
121+
when(mockRecordSchemaV2.getSchema()).thenAnswer((Answer<Schema>) invocationOnMock -> schema);
122+
when(mockRecordSchemaV2.getMessage()).thenReturn(Optional.of(mockMessage));
112123
}
113124

114125
@After
@@ -178,15 +189,15 @@ public void repeatedlyFlushOnMultiConditionTest() throws Exception {
178189
int randomMultiplier = ThreadLocalRandom.current().nextInt(1, 6);
179190
return PAYLOAD_BYTES * randomMultiplier;
180191
});
181-
when(mockRecord.getMessage()).thenReturn(Optional.of(randomMessage));
192+
when(mockRecordSchemaV1.getMessage()).thenReturn(Optional.of(randomMessage));
182193

183194
int numberOfRecords = 100;
184195
for (int i = 0; i < numberOfRecords; i++) {
185-
this.sink.write(mockRecord);
196+
this.sink.write(mockRecordSchemaV1);
186197
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 500));
187198
}
188199
await().atMost(Duration.ofSeconds(60)).untilAsserted(
189-
() -> verify(mockRecord, times(numberOfRecords)).ack()
200+
() -> verify(mockRecordSchemaV1, times(numberOfRecords)).ack()
190201
);
191202
}
192203

@@ -198,7 +209,7 @@ public void testBatchCleanupWhenFlushCrashed() throws Exception {
198209
this.config.put("batchSize", 1);
199210

200211
this.sink.open(this.config, this.mockSinkContext);
201-
when(mockRecord.getSchema()).thenThrow(new OutOfMemoryError());
212+
when(mockRecordSchemaV1.getSchema()).thenThrow(new OutOfMemoryError());
202213
sendMockRecord(1);
203214
await().atMost(Duration.ofSeconds(10)).untilAsserted(
204215
() -> {
@@ -208,11 +219,31 @@ public void testBatchCleanupWhenFlushCrashed() throws Exception {
208219
);
209220
}
210221

222+
@Test
223+
public void testSchemaEvolution() throws Exception {
224+
this.config.put("batchTimeMs", 1000000);
225+
this.config.put("maxBatchBytes", 100 * PAYLOAD_BYTES);
226+
this.config.put("batchSize", 10);
227+
228+
this.sink.open(this.config, this.mockSinkContext);
229+
230+
for (int i = 0; i < 3; i++) {
231+
this.sink.write(mockRecordSchemaV1);
232+
}
233+
for (int i = 0; i < 7; i++) {
234+
this.sink.write(mockRecordSchemaV2);
235+
}
236+
237+
await().atMost(Duration.ofSeconds(10)).untilAsserted(
238+
() -> verify(mockBlobWriter, times(2)).uploadBlob(eq("a/test.json"), any(ByteBuffer.class))
239+
);
240+
}
241+
211242
private void verifyRecordAck(int numberOfRecords) throws Exception {
212243
this.sink.open(this.config, this.mockSinkContext);
213244
sendMockRecord(numberOfRecords);
214245
await().atMost(Duration.ofSeconds(30)).untilAsserted(
215-
() -> verify(mockRecord, times(numberOfRecords)).ack()
246+
() -> verify(mockRecordSchemaV1, times(numberOfRecords)).ack()
216247
);
217248
}
218249

@@ -227,13 +258,13 @@ private void verifySinkFlush() throws Exception {
227258
() -> verify(mockBlobWriter, atLeastOnce()).uploadBlob(any(String.class), any(ByteBuffer.class))
228259
);
229260
await().atMost(Duration.ofSeconds(10)).untilAsserted(
230-
() -> verify(mockRecord, atLeast(5)).ack()
261+
() -> verify(mockRecordSchemaV1, atLeast(5)).ack()
231262
);
232263
}
233264

234265
private void sendMockRecord(int numberOfRecords) throws Exception {
235266
for (int i = 0; i < numberOfRecords; i++) {
236-
this.sink.write(mockRecord);
267+
this.sink.write(mockRecordSchemaV1);
237268
}
238269
}
239270

0 commit comments

Comments
 (0)