From 03c2c547b6376f5b672113528879e9c765894646 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Sun, 27 Apr 2025 14:46:19 -0700 Subject: [PATCH 1/4] Enable CompactionScanner for flushes --- .../apache/phoenix/execute/MutationState.java | 4 +- .../org/apache/phoenix/util/ScanUtil.java | 39 ++++++++++++++++ .../phoenix/util/WALAnnotationUtil.java | 8 +++- .../coprocessor/CompactionScanner.java | 24 +++++----- .../coprocessor/GlobalIndexRegionScanner.java | 4 ++ .../coprocessor/IndexerRegionScanner.java | 2 +- .../UngroupedAggregateRegionObserver.java | 46 ++++++++++++++++++- .../UngroupedAggregateRegionScanner.java | 7 ++- .../hbase/index/IndexRegionObserver.java | 37 +++++++++++++++ .../end2end/MaxLookbackExtendedIT.java | 38 +++++++++++++++ .../apache/phoenix/end2end/TableTTLIT.java | 21 +++++++-- 11 files changed, 209 insertions(+), 21 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index 26503508de3..1b4a9493e62 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -856,7 +856,9 @@ private void annotateMutationWithMetadata(PTable table, Mutation mutation) { Bytes.toBytes(table.getExternalSchemaId()) : null; byte[] lastDDLTimestamp = table.getLastDDLTimestamp() != null ? Bytes.toBytes(table.getLastDDLTimestamp()) : null; - WALAnnotationUtil.annotateMutation(mutation, tenantId, schemaName, tableName, tableType, lastDDLTimestamp); + WALAnnotationUtil.annotateMutation(mutation, tenantId, schemaName, tableName, tableType, + lastDDLTimestamp, SchemaUtil.getEmptyColumnFamily(table), + SchemaUtil.getEmptyColumnQualifier(table)); } /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java index 2800f2edd4e..8b454c693d1 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -1626,6 +1626,16 @@ public static void annotateScanWithMetadataAttributes(PTable table, Scan scan) { table.getTableName().getBytes()); scan.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(), table.getType().getValue().getBytes()); + if (scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME) == + null) { + scan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + SchemaUtil.getEmptyColumnFamily(table)); + } + if (scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME) == + null) { + scan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + SchemaUtil.getEmptyColumnQualifier(table)); + } if (table.getLastDDLTimestamp() != null) { scan.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(), Bytes.toBytes(table.getLastDDLTimestamp())); @@ -1648,6 +1658,8 @@ public static void annotateScanWithMetadataAttributes(PTable table, Scan scan) { * @param logicalTableName logical table name. * @param tableType table type. * @param timestamp last ddl timestamp. + * @param emptyCF empty column family name. + * @param emptyCQ empty column qualifier name. * @param mutation mutation object to attach attributes. */ public static void annotateMutationWithMetadataAttributes(byte[] tenantId, @@ -1655,6 +1667,8 @@ public static void annotateMutationWithMetadataAttributes(byte[] tenantId, byte[] logicalTableName, byte[] tableType, byte[] timestamp, + byte[] emptyCF, + byte[] emptyCQ, Mutation mutation) { if (tenantId != null) { mutation.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), @@ -1666,6 +1680,14 @@ public static void annotateMutationWithMetadataAttributes(byte[] tenantId, logicalTableName); mutation.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(), tableType); + if (emptyCF != null) { + mutation.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + emptyCF); + } + if (emptyCQ != null) { + mutation.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + emptyCQ); + } if (timestamp != null) { mutation.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(), timestamp); @@ -1690,6 +1712,19 @@ public static void annotateScanWithMetadataAttributes(Scan oldScan, Scan newScan oldScan.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()); byte[] timestamp = oldScan.getAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString()); + byte[] emptyCF = + oldScan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); + byte[] emptyCQ = + oldScan.getAttribute( + BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); + if (emptyCF != null) { + newScan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + emptyCF); + } + if (emptyCQ != null) { + newScan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + emptyCQ); + } if (tenantId != null) { newScan.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), tenantId); } @@ -1723,6 +1758,10 @@ public static void annotateMutationWithMetadataAttributes(PTable table, Mutation mutation.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), table.getTenantId().getBytes()); } + mutation.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + SchemaUtil.getEmptyColumnFamily(table)); + mutation.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + SchemaUtil.getEmptyColumnQualifier(table)); mutation.setAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString(), table.getSchemaName().getBytes()); mutation.setAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString(), diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java index 69afe0eb721..14718e873b1 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java @@ -17,6 +17,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.execute.MutationState; /** @@ -42,7 +43,8 @@ public static void annotateMutation(Mutation m, byte[] externalSchemaId) { } public static void annotateMutation(Mutation m, byte[] tenantId, byte[] schemaName, - byte[] logicalTableName, byte[] tableType, byte[] ddlTimestamp) { + byte[] logicalTableName, byte[] tableType, + byte[] ddlTimestamp, byte[] emptyCF, byte[] emptyCQ) { if (!m.getDurability().equals(Durability.SKIP_WAL)) { if (tenantId != null) { m.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), tenantId); @@ -52,6 +54,10 @@ public static void annotateMutation(Mutation m, byte[] tenantId, byte[] schemaNa logicalTableName); m.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(), tableType); m.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(), ddlTimestamp); + m.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + emptyCF); + m.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + emptyCQ); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 232a4a20c25..cd1cb426357 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -167,12 +167,14 @@ public class CompactionScanner implements InternalScanner { private static boolean forceMinorCompaction = false; public CompactionScanner(RegionCoprocessorEnvironment env, - Store store, - InternalScanner storeScanner, - long maxLookbackAgeInMillis, - boolean major, - boolean keepDeleted, - PTable table) throws IOException { + Store store, + InternalScanner storeScanner, + long maxLookbackAgeInMillis, + boolean major, + boolean keepDeleted, + PTable table, + byte[] emptyCF, + byte[] emptyCQ) throws IOException { this.storeScanner = storeScanner; this.region = env.getRegion(); this.store = store; @@ -180,13 +182,13 @@ public CompactionScanner(RegionCoprocessorEnvironment env, // Empty column family and qualifier are always needed to compute which all empty cells to retain // even during minor compactions. If required empty cells are not retained during // minor compactions then we can run into the risk of partial row expiry on next major compaction. - this.emptyCF = SchemaUtil.getEmptyColumnFamily(table); - this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table); + this.emptyCF = table != null ? SchemaUtil.getEmptyColumnFamily(table) : emptyCF; + this.emptyCQ = table != null ? SchemaUtil.getEmptyColumnQualifier(table) : emptyCQ; compactionTime = EnvironmentEdgeManager.currentTimeMillis(); columnFamilyName = store.getColumnFamilyName(); storeColumnFamily = columnFamilyName.getBytes(); tableName = region.getRegionInfo().getTable().getNameAsString(); - String dataTableName = table.getName().toString(); + String dataTableName = table != null ? table.getName().toString() : "_NA_"; Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR + columnFamilyName); this.maxLookbackInMillis = overriddenMaxLookback == null ? maxLookbackAgeInMillis : Math.max(maxLookbackAgeInMillis, overriddenMaxLookback); @@ -201,7 +203,7 @@ public CompactionScanner(RegionCoprocessorEnvironment env, this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : cfd.getKeepDeletedCells(); familyCount = region.getTableDescriptor().getColumnFamilies().length; localIndex = columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX); - emptyCFStore = familyCount == 1 || columnFamilyName.equals(Bytes.toString(emptyCF)) + emptyCFStore = familyCount == 1 || columnFamilyName.equals(Bytes.toString(this.emptyCF)) || localIndex; isCDCIndex = table != null ? CDCUtil.isCDCIndex(table) : false; @@ -232,7 +234,7 @@ public CompactionScanner(RegionCoprocessorEnvironment env, dataTableName, tableName, region.getRegionInfo().getEncodedName(), Bytes.toStringBinary(region.getRegionInfo().getStartKey()), Bytes.toStringBinary(region.getRegionInfo().getEndKey()), - Bytes.toString(this.emptyCF), Bytes.toString(emptyCQ), + Bytes.toString(this.emptyCF), Bytes.toString(this.emptyCQ), this.minVersion, this.maxVersion, this.keepDeletedCells.name(), this.familyCount, this.localIndex, this.emptyCFStore, compactionTime, maxLookbackWindowStart, maxLookbackInMillis, this.major)); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index eb8f1eac205..af04c3a2696 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -170,6 +170,8 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { protected byte[] logicalTableName; protected byte[] tableType; protected byte[] lastDdlTimestamp; + protected byte[] emptyCF; + protected byte[] emptyCQ; private final CompiledTTLExpression ttlExpression; // This relies on Hadoop Configuration to handle warning about deprecated configs and @@ -213,6 +215,8 @@ public GlobalIndexRegionScanner(final RegionScanner innerScanner, } tenantId = scan.getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString()); schemaName = scan.getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString()); + emptyCF = scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); + emptyCQ = scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); logicalTableName = scan.getAttribute( MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString()); tableType = scan.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java index c64612ac04f..26a76a2b9c8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java @@ -319,7 +319,7 @@ private void verifyIndex() throws IOException { private void setMutationAttributes(Mutation m, byte[] uuidValue) { ScanUtil.annotateMutationWithMetadataAttributes(tenantId, schemaName, - logicalTableName, tableType, lastDdlTimestamp, m); + logicalTableName, tableType, lastDdlTimestamp, emptyCF, emptyCQ, m); m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b3314752a2a..ca9f3875991 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -31,7 +31,9 @@ import java.security.PrivilegedExceptionAction; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -60,6 +62,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.Region; @@ -178,6 +181,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private Configuration compactionConfig; private Configuration indexWriteConfig; private ReadOnlyProps indexWriteProps; + private static final Map> TABLE_ATTRIBUTES = new HashMap<>(); + + public static Map> getTableAttributes() { + return TABLE_ATTRIBUTES; + } @Override public Optional getRegionObserver() { @@ -611,6 +619,42 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) { region.getTableDescriptor().getTableName().getName()) == 0); } + @Override + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner, FlushLifeCycleTracker tracker) + throws IOException { + if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { + return scanner; + } else { + TableName tableName = + c.getEnvironment().getRegion().getTableDescriptor().getTableName(); + Map tableAttributes = getTableAttributes().get(tableName); + if (tableAttributes == null) { + LOGGER.warn("Table {} has no table attributes for empty CF and empty CQ. " + + "IndexRegionObserver has not received any mutations yet? Do not" + + " perform Compaction now.", + tableName); + return scanner; + } + byte[] emptyCF = tableAttributes + .get(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); + byte[] emptyCQ = tableAttributes + .get(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); + if (emptyCF == null || emptyCQ == null) { + LOGGER.warn("Table {} has no empty CF and empty CQ attributes updated in cache. " + + "Do not perform Compaction now.", + tableName); + return scanner; + } + return User.runAsLoginUser( + (PrivilegedExceptionAction) () -> new CompactionScanner( + c.getEnvironment(), store, scanner, + BaseScannerRegionObserverConstants.getMaxLookbackInMillis( + c.getEnvironment().getConfiguration()), + false, true, null, emptyCF, emptyCQ)); + } + } + @Override public InternalScanner preCompact(ObserverContext c, Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, @@ -682,7 +726,7 @@ && isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { BaseScannerRegionObserverConstants.getMaxLookbackInMillis( c.getEnvironment().getConfiguration()), request.isMajor() || request.isAllFiles(), - keepDeleted, table + keepDeleted, table, null, null ); } else if (isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java index 32a837c0809..3569fb87080 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java @@ -793,9 +793,14 @@ private void annotateDataMutations(UngroupedAggregateRegionObserver.MutationList scan.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString()); byte[] ddlTimestamp = scan.getAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString()); + byte[] emptyCF = + scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); + byte[] emptyCQ = + scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); for (Mutation m : mutationsList) { - annotateMutation(m, tenantId, schemaName, logicalTableName, tableType, ddlTimestamp); + annotateMutation(m, tenantId, schemaName, logicalTableName, tableType, ddlTimestamp, + emptyCF, emptyCQ); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 6dc5974534d..e776e67f961 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -55,6 +55,7 @@ import org.apache.htrace.TraceScope; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.exception.DataExceedsCapacityException; @@ -395,6 +396,7 @@ public int getMaxPendingRowCount() { private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100; private byte[] encodedRegionName; + private boolean initialMutation; @Override public Optional getRegionObserver() { @@ -405,6 +407,7 @@ public Optional getRegionObserver() { public void start(CoprocessorEnvironment e) throws IOException { try { final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; + initialMutation = true; encodedRegionName = env.getRegion().getRegionInfo().getEncodedNameAsBytes(); String serverName = env.getServerName().getServerName(); if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) { @@ -1232,6 +1235,8 @@ private void preparePreIndexMutations(BatchMutateContext context, byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); HTableInterfaceReference hTableInterfaceReference = new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); + updateEmptyColNamesToStaticCache(TableName.valueOf(indexMaintainer.getIndexTableName()), + emptyCF, emptyCQ); List > updates = context.indexUpdates.get(hTableInterfaceReference); for (Pair update : updates) { Mutation m = update.getFirst(); @@ -1450,6 +1455,7 @@ public void preBatchMutateWithExceptions(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) { + if (initialMutation) { + initialMutation = false; + byte[] emptyCF = miniBatchOp.getOperation(0) + .getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); + byte[] emptyCQ = miniBatchOp.getOperation(0) + .getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); + if (emptyCF != null && emptyCQ != null) { + updateEmptyColNamesToStaticCache( + c.getEnvironment().getRegion().getTableDescriptor().getTableName(), emptyCF, + emptyCQ); + } + } + } + + private static void updateEmptyColNamesToStaticCache(TableName table, byte[] emptyCF, + byte[] emptyCQ) { + UngroupedAggregateRegionObserver.getTableAttributes().computeIfAbsent( + table, tableName -> { + Map tableAttributes = new HashMap<>(); + tableAttributes.put( + BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + emptyCF); + tableAttributes.put( + BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + emptyCQ); + return tableAttributes; + }); + } + /** * In case of ON DUPLICATE KEY IGNORE, if the row already exists no mutations will be * generated so release the row lock. diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java index 987d631bfe3..d9f65145c84 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java @@ -610,6 +610,44 @@ public void testRetainingLastRowVersion() throws Exception { } } + @Test(timeout = 60000) + public void testRetainingLastRowVersionForFlushes() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String tableName = generateUniqueName(); + createTable(tableName); + long timeIntervalBetweenTwoUpserts = (ttl / 2) + 1; + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + TableName dataTableName = TableName.valueOf(tableName); + injectEdge.incrementValue(1); + Statement stmt = conn.createStatement(); + stmt.execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab1')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab2')"); + conn.commit(); + injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000); + stmt.execute("upsert into " + tableName + " values ('a', 'ab3')"); + conn.commit(); + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000); + TestUtil.dumpTable(conn, dataTableName); + TestUtil.flush(utility, dataTableName); + injectEdge.incrementValue(1); + TestUtil.dumpTable(conn, dataTableName); + majorCompact(dataTableName); + injectEdge.incrementValue(1); + TestUtil.dumpTable(conn, dataTableName); + ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'"); + while (rs.next()) { + assertNotNull(rs.getString(3)); + assertNotNull(rs.getString(4)); + } + } + } + private void flush(TableName table) throws IOException { Admin admin = getUtility().getAdmin(); admin.flush(table); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index 1d68ce776c7..d66e01be205 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -133,7 +133,17 @@ public static synchronized Collection data() { { true, false, KeepDeletedCells.TTL, 5, 100, null}, { false, false, KeepDeletedCells.FALSE, 1, 100, 0}, { false, false, KeepDeletedCells.TRUE, 5, 50, 0}, - { false, false, KeepDeletedCells.TTL, 1, 25, 15}}); + { false, false, KeepDeletedCells.TTL, 1, 25, 15}, + { false, true, KeepDeletedCells.FALSE, 1, 100, null}, + { false, true, KeepDeletedCells.TRUE, 5, 50, null}, + { false, true, KeepDeletedCells.TTL, 1, 25, null}, + { true, true, KeepDeletedCells.FALSE, 5, 50, null}, + { true, true, KeepDeletedCells.TRUE, 1, 25, null}, + { true, true, KeepDeletedCells.TTL, 5, 100, null}, + { false, true, KeepDeletedCells.FALSE, 1, 100, 0}, + { false, true, KeepDeletedCells.TRUE, 5, 50, 0}, + { false, true, KeepDeletedCells.TTL, 1, 25, 15} + }); } /** @@ -273,10 +283,11 @@ public void testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled() Thread.sleep(1); } flush(TableName.valueOf(tableName)); - // Flushes dump and retain all the cells to HFile. - // Doing MAX_COLUMN_INDEX + 1 to account for empty cells - assertEquals(TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), row), - rowUpdateCounter * (MAX_COLUMN_INDEX + 1)); + // At every flush, extra cell versions should be removed. + // MAX_COLUMN_INDEX table columns and one empty column will be retained for + // each row version. + assertTrue(TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), row) + <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions); } // Run one minor compaction (in case no minor compaction has happened yet) TestUtil.minorCompact(utility, TableName.valueOf(tableName)); From 6b1d0ec39855a1da4ab9545efdb23cf252d03bae Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Sun, 27 Apr 2025 22:58:53 -0700 Subject: [PATCH 2/4] few fixes --- .../UngroupedAggregateRegionObserver.java | 47 ++++++++++++++---- .../hbase/index/IndexRegionObserver.java | 48 +++++-------------- 2 files changed, 50 insertions(+), 45 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index ca9f3875991..6f16069e315 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -181,11 +181,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private Configuration compactionConfig; private Configuration indexWriteConfig; private ReadOnlyProps indexWriteProps; - private static final Map> TABLE_ATTRIBUTES = new HashMap<>(); + private boolean isEmptyColumnNameCached; - public static Map> getTableAttributes() { - return TABLE_ATTRIBUTES; - } + private static final Map> TABLE_ATTRIBUTES = new HashMap<>(); @Override public Optional getRegionObserver() { @@ -194,6 +192,7 @@ public Optional getRegionObserver() { @Override public void start(CoprocessorEnvironment e) throws IOException { + this.isEmptyColumnNameCached = false; /* * We need to create a copy of region's configuration since we don't want any side effect of * setting the RpcControllerFactory. @@ -628,12 +627,11 @@ public InternalScanner preFlush(ObserverContext c, } else { TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName(); - Map tableAttributes = getTableAttributes().get(tableName); + Map tableAttributes = TABLE_ATTRIBUTES.get(tableName); if (tableAttributes == null) { LOGGER.warn("Table {} has no table attributes for empty CF and empty CQ. " - + "IndexRegionObserver has not received any mutations yet? Do not" - + " perform Compaction now.", - tableName); + + "UngroupedAggregateRegionObserver has not received any " + + "mutations yet? Do not perform Compaction now.", tableName); return scanner; } byte[] emptyCF = tableAttributes @@ -1106,6 +1104,7 @@ public void preBatchMutate(ObserverContext c, throws IOException { final Configuration conf = c.getEnvironment().getConfiguration(); try { + updateEmptyCfCqValuesToStaticCache(c, miniBatchOp); final HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(conf); if (haGroupStoreManager.isMutationBlocked()) { throw new IOException("Blocking Mutation as Some CRRs are in ACTIVE_TO_STANDBY " @@ -1115,4 +1114,36 @@ public void preBatchMutate(ObserverContext c, throw new IOException(e); } } + + private void updateEmptyCfCqValuesToStaticCache(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) { + if (!isEmptyColumnNameCached) { + byte[] emptyCF = miniBatchOp.getOperation(0) + .getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); + byte[] emptyCQ = miniBatchOp.getOperation(0) + .getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); + if (emptyCF != null && emptyCQ != null) { + updateEmptyColNamesToStaticCache( + c.getEnvironment().getRegion().getTableDescriptor().getTableName(), emptyCF, + emptyCQ); + isEmptyColumnNameCached = true; + } + } + } + + private static void updateEmptyColNamesToStaticCache(TableName table, byte[] emptyCF, + byte[] emptyCQ) { + TABLE_ATTRIBUTES.computeIfAbsent( + table, tableName -> { + Map tableAttributes = new HashMap<>(); + tableAttributes.put( + BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + emptyCF); + tableAttributes.put( + BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + emptyCQ); + return tableAttributes; + }); + } + } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index e776e67f961..31b255c36c7 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -55,7 +55,6 @@ import org.apache.htrace.TraceScope; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; -import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.exception.DataExceedsCapacityException; @@ -396,7 +395,6 @@ public int getMaxPendingRowCount() { private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100; private byte[] encodedRegionName; - private boolean initialMutation; @Override public Optional getRegionObserver() { @@ -407,7 +405,6 @@ public Optional getRegionObserver() { public void start(CoprocessorEnvironment e) throws IOException { try { final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - initialMutation = true; encodedRegionName = env.getRegion().getRegionInfo().getEncodedNameAsBytes(); String serverName = env.getServerName().getServerName(); if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) { @@ -1235,13 +1232,16 @@ private void preparePreIndexMutations(BatchMutateContext context, byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); HTableInterfaceReference hTableInterfaceReference = new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); - updateEmptyColNamesToStaticCache(TableName.valueOf(indexMaintainer.getIndexTableName()), - emptyCF, emptyCQ); List > updates = context.indexUpdates.get(hTableInterfaceReference); for (Pair update : updates) { Mutation m = update.getFirst(); if (m instanceof Put) { // This will be done before the data table row is updated (i.e., in the first write phase) + m.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + emptyCF); + m.setAttribute( + BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + emptyCQ); context.preIndexUpdates.put(hTableInterfaceReference, m); } else if (IndexUtil.isDeleteFamily(m)) { // DeleteColumn is always accompanied by a Put so no need to make the index @@ -1250,6 +1250,12 @@ private void preparePreIndexMutations(BatchMutateContext context, Put unverifiedPut = new Put(m.getRow()); unverifiedPut.addColumn( emptyCF, emptyCQ, batchTimestamp, QueryConstants.UNVERIFIED_BYTES); + unverifiedPut.setAttribute( + BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, + emptyCF); + unverifiedPut.setAttribute( + BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, + emptyCQ); // This will be done before the data table row is updated (i.e., in the first write phase) context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut); } @@ -1455,7 +1461,6 @@ public void preBatchMutateWithExceptions(ObserverContext c, - MiniBatchOperationInProgress miniBatchOp) { - if (initialMutation) { - initialMutation = false; - byte[] emptyCF = miniBatchOp.getOperation(0) - .getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); - byte[] emptyCQ = miniBatchOp.getOperation(0) - .getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); - if (emptyCF != null && emptyCQ != null) { - updateEmptyColNamesToStaticCache( - c.getEnvironment().getRegion().getTableDescriptor().getTableName(), emptyCF, - emptyCQ); - } - } - } - - private static void updateEmptyColNamesToStaticCache(TableName table, byte[] emptyCF, - byte[] emptyCQ) { - UngroupedAggregateRegionObserver.getTableAttributes().computeIfAbsent( - table, tableName -> { - Map tableAttributes = new HashMap<>(); - tableAttributes.put( - BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, - emptyCF); - tableAttributes.put( - BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME, - emptyCQ); - return tableAttributes; - }); - } - /** * In case of ON DUPLICATE KEY IGNORE, if the row already exists no mutations will be * generated so release the row lock. From 081abd1c37c0797d18660bb518f51690b2996e44 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Mon, 28 Apr 2025 08:05:10 -0700 Subject: [PATCH 3/4] concurrent map updates --- .../coprocessor/UngroupedAggregateRegionObserver.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 6f16069e315..91fa160f75c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -37,6 +37,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.annotation.concurrent.GuardedBy; @@ -183,7 +185,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private ReadOnlyProps indexWriteProps; private boolean isEmptyColumnNameCached; - private static final Map> TABLE_ATTRIBUTES = new HashMap<>(); + private static final ConcurrentMap> TABLE_ATTRIBUTES = + new ConcurrentHashMap<>(); @Override public Optional getRegionObserver() { From c3bb58085a091c671910b5d076a2ff02af97f9aa Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Mon, 28 Apr 2025 09:36:03 -0700 Subject: [PATCH 4/4] worse case catch --- .../UngroupedAggregateRegionObserver.java | 62 ++++++++++--------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 91fa160f75c..5af800f0844 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -623,36 +623,42 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) { @Override public InternalScanner preFlush(ObserverContext c, Store store, - InternalScanner scanner, FlushLifeCycleTracker tracker) - throws IOException { - if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { - return scanner; - } else { - TableName tableName = - c.getEnvironment().getRegion().getTableDescriptor().getTableName(); - Map tableAttributes = TABLE_ATTRIBUTES.get(tableName); - if (tableAttributes == null) { - LOGGER.warn("Table {} has no table attributes for empty CF and empty CQ. " - + "UngroupedAggregateRegionObserver has not received any " - + "mutations yet? Do not perform Compaction now.", tableName); - return scanner; - } - byte[] emptyCF = tableAttributes - .get(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); - byte[] emptyCQ = tableAttributes - .get(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); - if (emptyCF == null || emptyCQ == null) { - LOGGER.warn("Table {} has no empty CF and empty CQ attributes updated in cache. " - + "Do not perform Compaction now.", - tableName); + InternalScanner scanner, FlushLifeCycleTracker tracker) { + try { + if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { return scanner; + } else { + TableName tableName = + c.getEnvironment().getRegion().getTableDescriptor().getTableName(); + Map tableAttributes = TABLE_ATTRIBUTES.get(tableName); + if (tableAttributes == null) { + LOGGER.warn("Table {} has no table attributes for empty CF and empty CQ. " + + "UngroupedAggregateRegionObserver has not received any " + + "mutations yet? Do not perform Compaction now.", tableName); + return scanner; + } + byte[] emptyCF = tableAttributes + .get(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME); + byte[] emptyCQ = tableAttributes + .get(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME); + if (emptyCF == null || emptyCQ == null) { + LOGGER.warn( + "Table {} has no empty CF and empty CQ attributes updated in cache. " + + "Do not perform Compaction now.", + tableName); + return scanner; + } + return User.runAsLoginUser( + (PrivilegedExceptionAction) () -> new CompactionScanner( + c.getEnvironment(), store, scanner, + BaseScannerRegionObserverConstants.getMaxLookbackInMillis( + c.getEnvironment().getConfiguration()), + false, true, null, emptyCF, emptyCQ)); } - return User.runAsLoginUser( - (PrivilegedExceptionAction) () -> new CompactionScanner( - c.getEnvironment(), store, scanner, - BaseScannerRegionObserverConstants.getMaxLookbackInMillis( - c.getEnvironment().getConfiguration()), - false, true, null, emptyCF, emptyCQ)); + } catch (Exception e) { + // this is not normal + LOGGER.error("Error in preFlush. Do not perform Compaction for now.", e); + return scanner; } }