Skip to content

Commit 3ce18cc

Browse files
authored
Issue #166 - Replica.updateMetastore calls metastore with batched partition lists. (#167)
* Issue 166 - call metastore with batched partition lists
1 parent 9e1da5e commit 3ce18cc

File tree

3 files changed

+209
-54
lines changed

3 files changed

+209
-54
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## [15.1.1] - 2020-02-06
2+
### Fixed
3+
* When replicating tables with large numbers of partitions, `Replica.updateMetadata` now calls add/alter partition in batches of 1000. See [#166](https://github.com/HotelsDotCom/circus-train/issues/166).
4+
15
## [15.1.0] - 2020-01-28
26
### Changed
37
* AVRO Schema Copier now re-uses the normal 'data' copier instead of its own. See [#162](https://github.com/HotelsDotCom/circus-train/issues/162).

circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright (C) 2016-2019 Expedia, Inc.
2+
* Copyright (C) 2016-2020 Expedia, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,9 +42,11 @@
4242
import org.slf4j.Logger;
4343
import org.slf4j.LoggerFactory;
4444

45+
import com.google.common.annotations.VisibleForTesting;
4546
import com.google.common.base.Enums;
4647
import com.google.common.base.Optional;
4748
import com.google.common.base.Supplier;
49+
import com.google.common.collect.Lists;
4850

4951
import com.hotels.bdp.circustrain.api.CircusTrainException;
5052
import com.hotels.bdp.circustrain.api.ReplicaLocationManager;
@@ -71,6 +73,7 @@ public class Replica extends HiveEndpoint {
7173
private final ReplicaCatalogListener replicaCatalogListener;
7274
private final ReplicationMode replicationMode;
7375
private final TableReplication tableReplication;
76+
private int partitionBatchSize = 1000;
7477

7578
/**
7679
* Use {@link ReplicaFactory}
@@ -91,6 +94,28 @@ public class Replica extends HiveEndpoint {
9194
this.tableReplication = tableReplication;
9295
}
9396

97+
/**
98+
* Use {@link ReplicaFactory}
99+
*/
100+
@VisibleForTesting
101+
Replica(
102+
ReplicaCatalog replicaCatalog,
103+
HiveConf replicaHiveConf,
104+
Supplier<CloseableMetaStoreClient> replicaMetaStoreClientSupplier,
105+
ReplicaTableFactory replicaTableFactory,
106+
HousekeepingListener housekeepingListener,
107+
ReplicaCatalogListener replicaCatalogListener,
108+
TableReplication tableReplication,
109+
int partitionBatchSize) {
110+
super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
111+
this.replicaCatalogListener = replicaCatalogListener;
112+
tableFactory = replicaTableFactory;
113+
this.housekeepingListener = housekeepingListener;
114+
replicationMode = tableReplication.getReplicationMode();
115+
this.tableReplication = tableReplication;
116+
this.partitionBatchSize = partitionBatchSize;
117+
}
118+
94119
public void updateMetadata(
95120
String eventId,
96121
TableAndStatistics sourceTable,
@@ -175,7 +200,13 @@ public void updateMetadata(
175200
if (!partitionsToCreate.isEmpty()) {
176201
LOG.info("Creating {} new partitions.", partitionsToCreate.size());
177202
try {
178-
client.add_partitions(partitionsToCreate);
203+
int counter = 0;
204+
for (List<Partition> sublist : Lists.partition(partitionsToCreate, partitionBatchSize)) {
205+
int start = counter * partitionBatchSize;
206+
LOG.info("Creating partitions {} through {}", start, start + sublist.size() - 1);
207+
client.add_partitions(sublist);
208+
counter++;
209+
}
179210
} catch (TException e) {
180211
throw new MetaStoreClientException("Unable to add partitions '"
181212
+ partitionsToCreate
@@ -189,7 +220,13 @@ public void updateMetadata(
189220
if (!partitionsToAlter.isEmpty()) {
190221
LOG.info("Altering {} existing partitions.", partitionsToAlter.size());
191222
try {
192-
client.alter_partitions(replicaDatabaseName, replicaTableName, partitionsToAlter);
223+
int counter = 0;
224+
for (List<Partition> sublist : Lists.partition(partitionsToAlter, partitionBatchSize)) {
225+
int start = counter * partitionBatchSize;
226+
LOG.info("Altering partitions {} through {}", start, start + sublist.size() - 1);
227+
client.alter_partitions(replicaDatabaseName, replicaTableName, sublist);
228+
counter++;
229+
}
193230
} catch (TException e) {
194231
throw new MetaStoreClientException("Unable to alter partitions '"
195232
+ partitionsToAlter
@@ -203,7 +240,13 @@ public void updateMetadata(
203240
if (!statisticsToSet.isEmpty()) {
204241
LOG.info("Setting column statistics for {} partitions.", statisticsToSet.size());
205242
try {
206-
client.setPartitionColumnStatistics(new SetPartitionsStatsRequest(statisticsToSet));
243+
int counter = 0;
244+
for (List<ColumnStatistics> sublist : Lists.partition(statisticsToSet, partitionBatchSize)) {
245+
int start = counter * partitionBatchSize;
246+
LOG.info("Setting column statistics for partitions {} through {}", start, start + sublist.size() - 1);
247+
client.setPartitionColumnStatistics(new SetPartitionsStatsRequest(sublist));
248+
counter++;
249+
}
207250
} catch (TException e) {
208251
throw new MetaStoreClientException(
209252
"Unable to set column statistics of replica table '" + replicaDatabaseName + "." + replicaTableName + "'",

circus-train-core/src/test/java/com/hotels/bdp/circustrain/core/replica/ReplicaTest.java

Lines changed: 158 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright (C) 2016-2019 Expedia, Inc.
2+
* Copyright (C) 2016-2020 Expedia, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import static org.mockito.Matchers.anyString;
2323
import static org.mockito.Matchers.eq;
2424
import static org.mockito.Mockito.never;
25+
import static org.mockito.Mockito.times;
2526
import static org.mockito.Mockito.verify;
2627
import static org.mockito.Mockito.verifyNoMoreInteractions;
2728
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@
3839
import java.util.List;
3940
import java.util.Map;
4041

42+
import org.apache.commons.collections.ListUtils;
4143
import org.apache.hadoop.fs.Path;
4244
import org.apache.hadoop.hive.common.StatsSetupConst;
4345
import org.apache.hadoop.hive.conf.HiveConf;
@@ -103,6 +105,7 @@ public class ReplicaTest {
103105
private static final FieldSchema FIELD_D = new FieldSchema(COLUMN_D, "string", null);
104106
private static final List<FieldSchema> FIELDS = Arrays.asList(FIELD_A, FIELD_B);
105107
private static final List<FieldSchema> PARTITIONS = Arrays.asList(FIELD_C, FIELD_D);
108+
private static final int TEST_PARTITION_BATCH_SIZE = 17;
106109

107110
public @Rule TemporaryFolder temporaryFolder = new TemporaryFolder();
108111

@@ -175,7 +178,7 @@ private void convertExistingReplicaTableToView() {
175178

176179
private Replica newReplica(TableReplication tableReplication) {
177180
return new Replica(replicaCatalog, hiveConf, metaStoreClientSupplier, tableFactory, houseKeepingListener,
178-
replicaCatalogListener, tableReplication);
181+
replicaCatalogListener, tableReplication, TEST_PARTITION_BATCH_SIZE);
179182
}
180183

181184
@Test
@@ -391,29 +394,69 @@ public void getName() throws Exception {
391394
assertThat(replica.getName(), is(NAME));
392395
}
393396

394-
@Test
395-
public void alteringExistingPartitionedReplicaTableWithPartitionsSucceeds() throws TException, IOException {
396-
Partition newPartition = newPartition("three", "four");
397-
ColumnStatistics newPartitionStatistics = newPartitionStatistics("three", "four");
398-
Partition modifiedPartition = new Partition(existingPartition);
399-
ColumnStatistics modifiedPartitionStatistics = newPartitionStatistics("one", "two");
400-
when(mockMetaStoreClient
401-
.getPartitionsByNames(DB_NAME, TABLE_NAME, Lists.newArrayList("c=one/d=two", "c=three/d=four")))
402-
.thenReturn(Arrays.asList(existingPartition));
397+
private void alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(
398+
int numTestAlterPartitions, int numTestAddPartitions) throws TException, IOException {
399+
List<Partition> existingPartitions = new ArrayList<>();
400+
List<Partition> newPartitions = new ArrayList<>();
401+
List<ColumnStatistics> modifiedPartitionStatisticsList = new ArrayList<>();
402+
List<ColumnStatistics> newPartitionStatisticsList = new ArrayList<>();
403+
404+
for (int i = 0; i < numTestAlterPartitions; i++) {
405+
existingPartitions.add(newPartition(String.format("exist_%s", i), String.format("exist_%s_sub", i)));
406+
modifiedPartitionStatisticsList.add(
407+
newPartitionStatistics(String.format("exist_%s", i), String.format("exist_%s_sub", i)));
408+
}
409+
410+
for (int i = 0; i < numTestAddPartitions; i++) {
411+
newPartitions.add(newPartition(String.format("new_%s", i), String.format("new_%s_sub", i)));
412+
newPartitionStatisticsList.add(
413+
newPartitionStatistics(String.format("new_%s", i), String.format("new_%s_sub", i)));
414+
}
415+
416+
List<List<Partition>> alterSublists = Lists.partition(existingPartitions, TEST_PARTITION_BATCH_SIZE);
417+
int numAlterBatches = alterSublists.size();
418+
int lastAlterBatchSize = numAlterBatches == 0 ? 0 : alterSublists.get(alterSublists.size()-1).size();
419+
420+
List<List<Partition>> addSublists = Lists.partition(newPartitions, TEST_PARTITION_BATCH_SIZE);
421+
int numAddBatches = addSublists.size();
422+
int lastAddBatchSize = numAddBatches == 0 ? 0 : addSublists.get(addSublists.size()-1).size();
423+
424+
List<List<ColumnStatistics>> statsSublists = Lists.partition(
425+
ListUtils.union(modifiedPartitionStatisticsList, newPartitionStatisticsList), TEST_PARTITION_BATCH_SIZE);
426+
int numStatisticsBatches = statsSublists.size();
427+
int lastStatisticsBatchSize = numStatisticsBatches == 0 ? 0 : statsSublists.get(statsSublists.size()-1).size();
428+
429+
430+
List<String> testPartitionNames = new ArrayList<>();
431+
for (Partition p : (List<Partition>) ListUtils.union(existingPartitions, newPartitions)) {
432+
testPartitionNames.add(partitionName((String[]) p.getValues().toArray()));
433+
}
434+
435+
when(mockMetaStoreClient.getPartitionsByNames(DB_NAME, TABLE_NAME, testPartitionNames))
436+
.thenReturn(existingPartitions);
403437

404438
Map<String, List<ColumnStatisticsObj>> partitionStatsMap = new HashMap<>();
405-
partitionStatsMap
406-
.put(Warehouse.makePartName(PARTITIONS, newPartition.getValues()), newPartitionStatistics.getStatsObj());
407-
partitionStatsMap
408-
.put(Warehouse.makePartName(PARTITIONS, modifiedPartition.getValues()),
409-
modifiedPartitionStatistics.getStatsObj());
439+
for (int i = 0; i < numTestAddPartitions; i++) {
440+
partitionStatsMap
441+
.put(Warehouse.makePartName(PARTITIONS, newPartitions.get(i).getValues()),
442+
newPartitionStatisticsList.get(i).getStatsObj());
443+
}
444+
for (int i = 0; i < numTestAlterPartitions; i++) {
445+
partitionStatsMap
446+
.put(Warehouse.makePartName(PARTITIONS, existingPartitions.get(i).getValues()),
447+
modifiedPartitionStatisticsList.get(i).getStatsObj());
448+
}
410449

411450
PartitionsAndStatistics partitionsAndStatistics = new PartitionsAndStatistics(sourceTable.getPartitionKeys(),
412-
Arrays.asList(modifiedPartition, newPartition), partitionStatsMap);
413-
when(mockReplicaLocationManager.getPartitionLocation(existingPartition))
414-
.thenReturn(new Path(tableLocation, "c=one/d=two"));
415-
when(mockReplicaLocationManager.getPartitionLocation(newPartition))
416-
.thenReturn(new Path(tableLocation, "c=three/d=four"));
451+
ListUtils.union(existingPartitions, newPartitions), partitionStatsMap);
452+
for (int i = 0; i < numTestAddPartitions; i++) {
453+
when(mockReplicaLocationManager.getPartitionLocation(newPartitions.get(i)))
454+
.thenReturn(new Path(tableLocation, String.format("c=new_%s/d=new_%s_sub", i, i)));
455+
}
456+
for (int i = 0; i < numTestAlterPartitions; i++) {
457+
when(mockReplicaLocationManager.getPartitionLocation(existingPartitions.get(i)))
458+
.thenReturn(new Path(tableLocation, String.format("c=exist_%s/d=exist_%s_sub", i, i)));
459+
}
417460

418461
existingReplicaTable.getParameters().put(REPLICATION_EVENT.parameterName(), "previousEventId");
419462

@@ -423,41 +466,106 @@ public void alteringExistingPartitionedReplicaTableWithPartitionsSucceeds() thro
423466

424467
verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
425468
verify(mockMetaStoreClient).updateTableColumnStatistics(columnStatistics);
426-
verify(mockReplicaLocationManager).addCleanUpLocation(anyString(), any(Path.class));
427-
verify(mockMetaStoreClient).alter_partitions(eq(DB_NAME), eq(TABLE_NAME), alterPartitionCaptor.capture());
428-
verify(mockMetaStoreClient).add_partitions(addPartitionCaptor.capture());
469+
verify(mockReplicaLocationManager, times(numTestAlterPartitions)).addCleanUpLocation(anyString(), any(Path.class));
470+
verify(mockMetaStoreClient, times(numAlterBatches)).alter_partitions(eq(DB_NAME), eq(TABLE_NAME), alterPartitionCaptor.capture());
471+
verify(mockMetaStoreClient, times(numAddBatches)).add_partitions(addPartitionCaptor.capture());
472+
473+
// Validate that the args were expected number of batches , and expected batch sizes
474+
List<List<Partition>> addCaptorValues = addPartitionCaptor.getAllValues();
475+
assertThat(addCaptorValues.size(), is(numAddBatches));
476+
477+
for (int batchNdx = 0; batchNdx < numAddBatches; batchNdx++) {
478+
int thisBatchSize = batchNdx < (numAddBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastAddBatchSize;
479+
List<Partition> addBatch = addCaptorValues.get(batchNdx);
480+
assertThat(addBatch.size(), is(thisBatchSize));
481+
for (int entryInBatchNdx = 0; entryInBatchNdx < addBatch.size(); entryInBatchNdx++) {
482+
assertThat(addBatch.get(entryInBatchNdx).getValues(),
483+
is(Arrays.asList(String.format("new_%s", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx),
484+
String.format("new_%s_sub", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx))));
485+
}
486+
}
429487

430-
assertThat(alterPartitionCaptor.getValue().size(), is(1));
431-
assertThat(addPartitionCaptor.getValue().size(), is(1));
488+
List<List<Partition>> alterCaptorValues = alterPartitionCaptor.getAllValues();
489+
assertThat(alterCaptorValues.size(), is(numAlterBatches));
490+
for (int batchNdx = 0; batchNdx < numAlterBatches; batchNdx++) {
491+
int thisBatchSize = batchNdx < (numAlterBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastAlterBatchSize;
492+
List<Partition> alterBatch = alterCaptorValues.get(batchNdx);
493+
assertThat(alterBatch.size(), is(thisBatchSize));
494+
for (int entryInBatchNdx = 0; entryInBatchNdx < alterBatch.size(); entryInBatchNdx++) {
495+
assertThat(alterBatch.get(entryInBatchNdx).getValues(),
496+
is(Arrays.asList(String.format("exist_%s", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx),
497+
String.format("exist_%s_sub", (batchNdx * TEST_PARTITION_BATCH_SIZE) + entryInBatchNdx))));
498+
}
499+
}
432500

433-
Partition altered = alterPartitionCaptor.getValue().get(0);
434-
assertThat(altered.getValues(), is(Arrays.asList("one", "two")));
501+
verify(mockMetaStoreClient, times(numStatisticsBatches)).setPartitionColumnStatistics(setStatsRequestCaptor.capture());
502+
List<SetPartitionsStatsRequest> statsRequestList = setStatsRequestCaptor.getAllValues();
503+
assertThat(statsRequestList.size(), is(numStatisticsBatches));
435504

436-
Partition added = addPartitionCaptor.getValue().get(0);
437-
assertThat(added.getValues(), is(Arrays.asList("three", "four")));
505+
List<ColumnStatistics> columnStats = new ArrayList<>();
506+
for (int colStatNdx = 0; colStatNdx < numStatisticsBatches; colStatNdx++) {
507+
int thisBatchSize = colStatNdx < (numStatisticsBatches - 1) ? TEST_PARTITION_BATCH_SIZE : lastStatisticsBatchSize;
508+
assertThat(statsRequestList.get(colStatNdx).getColStats().size(), is(thisBatchSize));
509+
columnStats.addAll(statsRequestList.get(colStatNdx).getColStats());
510+
}
438511

439-
verify(mockMetaStoreClient).setPartitionColumnStatistics(setStatsRequestCaptor.capture());
440-
SetPartitionsStatsRequest statsRequest = setStatsRequestCaptor.getValue();
512+
assertThat(columnStats.size(), is(numTestAlterPartitions + numTestAddPartitions));
441513

442-
List<ColumnStatistics> columnStats = new ArrayList<>(statsRequest.getColStats());
443-
Collections.sort(columnStats, new Comparator<ColumnStatistics>() {
444-
@Override
445-
public int compare(ColumnStatistics o1, ColumnStatistics o2) {
446-
return o1.getStatsDesc().getPartName().compareTo(o2.getStatsDesc().getPartName());
447-
}
448-
});
449-
assertThat(columnStats.size(), is(2));
514+
for (int colStatNdx = 0; colStatNdx < numTestAlterPartitions; colStatNdx++) {
515+
assertThat(columnStats.get(colStatNdx).getStatsDesc().isIsTblLevel(), is(false));
516+
assertThat(columnStats.get(colStatNdx).getStatsDesc().getDbName(), is(DB_NAME));
517+
assertThat(columnStats.get(colStatNdx).getStatsDesc().getTableName(), is(TABLE_NAME));
518+
assertThat(columnStats.get(colStatNdx).getStatsDesc().getPartName(),
519+
is(String.format("c=exist_%s/d=exist_%s_sub", colStatNdx, colStatNdx)));
520+
assertThat(columnStats.get(colStatNdx).getStatsObj().size(), is(2));
521+
}
450522

451-
assertThat(columnStats.get(0).getStatsDesc().isIsTblLevel(), is(false));
452-
assertThat(columnStats.get(0).getStatsDesc().getDbName(), is(DB_NAME));
453-
assertThat(columnStats.get(0).getStatsDesc().getTableName(), is(TABLE_NAME));
454-
assertThat(columnStats.get(0).getStatsDesc().getPartName(), is("c=one/d=two"));
455-
assertThat(columnStats.get(0).getStatsObj().size(), is(2));
456-
assertThat(columnStats.get(1).getStatsDesc().isIsTblLevel(), is(false));
457-
assertThat(columnStats.get(1).getStatsDesc().getDbName(), is(DB_NAME));
458-
assertThat(columnStats.get(1).getStatsDesc().getTableName(), is(TABLE_NAME));
459-
assertThat(columnStats.get(1).getStatsDesc().getPartName(), is("c=three/d=four"));
460-
assertThat(columnStats.get(1).getStatsObj().size(), is(2));
523+
for (int colStatNdx = numTestAlterPartitions, addPartColStatNdx = 0;
524+
colStatNdx < numTestAlterPartitions + numTestAddPartitions;
525+
colStatNdx++, addPartColStatNdx++) {
526+
assertThat(columnStats.get(colStatNdx).getStatsDesc().isIsTblLevel(), is(false));
527+
assertThat(columnStats.get(colStatNdx).getStatsDesc().getDbName(), is(DB_NAME));
528+
assertThat(columnStats.get(colStatNdx).getStatsDesc().getTableName(), is(TABLE_NAME));
529+
assertThat(columnStats.get(colStatNdx).getStatsDesc().getPartName(),
530+
is(String.format("c=new_%s/d=new_%s_sub", addPartColStatNdx, addPartColStatNdx)));
531+
assertThat(columnStats.get(colStatNdx).getStatsObj().size(), is(2));
532+
}
533+
}
534+
535+
536+
@Test
537+
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_0_0() throws TException, IOException {
538+
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(0,0);
539+
}
540+
541+
@Test
542+
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_0_1() throws TException, IOException {
543+
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(0,1);
544+
}
545+
546+
@Test
547+
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_1_0() throws TException, IOException {
548+
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(1,0);
549+
}
550+
551+
@Test
552+
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_1_1() throws TException, IOException {
553+
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(1,1);
554+
}
555+
556+
@Test
557+
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_boundaries() throws TException, IOException {
558+
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(TEST_PARTITION_BATCH_SIZE,TEST_PARTITION_BATCH_SIZE);
559+
}
560+
561+
@Test
562+
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_many() throws TException, IOException {
563+
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(17,28);
564+
}
565+
566+
@Test
567+
public void alteringExistingPartitionedReplicaTableWithNewPartitionsInBatchesSucceeds_lots() throws TException, IOException {
568+
alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(172,333);
461569
}
462570

463571
@Test

0 commit comments

Comments
 (0)