Skip to content

Enable CompactionScanner for flushes with Mutation attributes #2131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,9 @@ private void annotateMutationWithMetadata(PTable table, Mutation mutation) {
Bytes.toBytes(table.getExternalSchemaId()) : null;
byte[] lastDDLTimestamp =
table.getLastDDLTimestamp() != null ? Bytes.toBytes(table.getLastDDLTimestamp()) : null;
WALAnnotationUtil.annotateMutation(mutation, tenantId, schemaName, tableName, tableType, lastDDLTimestamp);
WALAnnotationUtil.annotateMutation(mutation, tenantId, schemaName, tableName, tableType,
lastDDLTimestamp, SchemaUtil.getEmptyColumnFamily(table),
SchemaUtil.getEmptyColumnQualifier(table));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,16 @@ public static void annotateScanWithMetadataAttributes(PTable table, Scan scan) {
table.getTableName().getBytes());
scan.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(),
table.getType().getValue().getBytes());
if (scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME) ==
null) {
scan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME,
SchemaUtil.getEmptyColumnFamily(table));
}
if (scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME) ==
null) {
scan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME,
SchemaUtil.getEmptyColumnQualifier(table));
}
if (table.getLastDDLTimestamp() != null) {
scan.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(),
Bytes.toBytes(table.getLastDDLTimestamp()));
Expand All @@ -1648,13 +1658,17 @@ public static void annotateScanWithMetadataAttributes(PTable table, Scan scan) {
* @param logicalTableName logical table name.
* @param tableType table type.
* @param timestamp last ddl timestamp.
* @param emptyCF empty column family name.
* @param emptyCQ empty column qualifier name.
* @param mutation mutation object to attach attributes.
*/
public static void annotateMutationWithMetadataAttributes(byte[] tenantId,
byte[] schemaName,
byte[] logicalTableName,
byte[] tableType,
byte[] timestamp,
byte[] emptyCF,
byte[] emptyCQ,
Mutation mutation) {
if (tenantId != null) {
mutation.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(),
Expand All @@ -1666,6 +1680,14 @@ public static void annotateMutationWithMetadataAttributes(byte[] tenantId,
logicalTableName);
mutation.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(),
tableType);
if (emptyCF != null) {
mutation.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME,
emptyCF);
}
if (emptyCQ != null) {
mutation.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME,
emptyCQ);
}
if (timestamp != null) {
mutation.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(),
timestamp);
Expand All @@ -1690,6 +1712,19 @@ public static void annotateScanWithMetadataAttributes(Scan oldScan, Scan newScan
oldScan.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString());
byte[] timestamp =
oldScan.getAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString());
byte[] emptyCF =
oldScan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME);
byte[] emptyCQ =
oldScan.getAttribute(
BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME);
if (emptyCF != null) {
newScan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME,
emptyCF);
}
if (emptyCQ != null) {
newScan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME,
emptyCQ);
}
if (tenantId != null) {
newScan.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), tenantId);
}
Expand Down Expand Up @@ -1723,6 +1758,10 @@ public static void annotateMutationWithMetadataAttributes(PTable table, Mutation
mutation.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(),
table.getTenantId().getBytes());
}
mutation.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME,
SchemaUtil.getEmptyColumnFamily(table));
mutation.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME,
SchemaUtil.getEmptyColumnQualifier(table));
mutation.setAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString(),
table.getSchemaName().getBytes());
mutation.setAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.execute.MutationState;

/**
Expand All @@ -42,7 +43,8 @@ public static void annotateMutation(Mutation m, byte[] externalSchemaId) {
}

public static void annotateMutation(Mutation m, byte[] tenantId, byte[] schemaName,
byte[] logicalTableName, byte[] tableType, byte[] ddlTimestamp) {
byte[] logicalTableName, byte[] tableType,
byte[] ddlTimestamp, byte[] emptyCF, byte[] emptyCQ) {
if (!m.getDurability().equals(Durability.SKIP_WAL)) {
if (tenantId != null) {
m.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), tenantId);
Expand All @@ -52,6 +54,10 @@ public static void annotateMutation(Mutation m, byte[] tenantId, byte[] schemaNa
logicalTableName);
m.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(), tableType);
m.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(), ddlTimestamp);
m.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME,
emptyCF);
m.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME,
emptyCQ);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,26 +167,28 @@ public class CompactionScanner implements InternalScanner {
private static boolean forceMinorCompaction = false;

public CompactionScanner(RegionCoprocessorEnvironment env,
Store store,
InternalScanner storeScanner,
long maxLookbackAgeInMillis,
boolean major,
boolean keepDeleted,
PTable table) throws IOException {
Store store,
InternalScanner storeScanner,
long maxLookbackAgeInMillis,
boolean major,
boolean keepDeleted,
PTable table,
byte[] emptyCF,
byte[] emptyCQ) throws IOException {
this.storeScanner = storeScanner;
this.region = env.getRegion();
this.store = store;
this.env = env;
// Empty column family and qualifier are always needed to compute which all empty cells to retain
// even during minor compactions. If required empty cells are not retained during
// minor compactions then we can run into the risk of partial row expiry on next major compaction.
this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table);
this.emptyCF = table != null ? SchemaUtil.getEmptyColumnFamily(table) : emptyCF;
this.emptyCQ = table != null ? SchemaUtil.getEmptyColumnQualifier(table) : emptyCQ;
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
tableName = region.getRegionInfo().getTable().getNameAsString();
String dataTableName = table.getName().toString();
String dataTableName = table != null ? table.getName().toString() : "_NA_";
Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR + columnFamilyName);
this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackAgeInMillis : Math.max(maxLookbackAgeInMillis, overriddenMaxLookback);
Expand All @@ -201,7 +203,7 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : cfd.getKeepDeletedCells();
familyCount = region.getTableDescriptor().getColumnFamilies().length;
localIndex = columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
emptyCFStore = familyCount == 1 || columnFamilyName.equals(Bytes.toString(emptyCF))
emptyCFStore = familyCount == 1 || columnFamilyName.equals(Bytes.toString(this.emptyCF))
|| localIndex;

isCDCIndex = table != null ? CDCUtil.isCDCIndex(table) : false;
Expand Down Expand Up @@ -232,7 +234,7 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
dataTableName, tableName, region.getRegionInfo().getEncodedName(),
Bytes.toStringBinary(region.getRegionInfo().getStartKey()),
Bytes.toStringBinary(region.getRegionInfo().getEndKey()),
Bytes.toString(this.emptyCF), Bytes.toString(emptyCQ),
Bytes.toString(this.emptyCF), Bytes.toString(this.emptyCQ),
this.minVersion, this.maxVersion, this.keepDeletedCells.name(),
this.familyCount, this.localIndex, this.emptyCFStore,
compactionTime, maxLookbackWindowStart, maxLookbackInMillis, this.major));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
protected byte[] logicalTableName;
protected byte[] tableType;
protected byte[] lastDdlTimestamp;
protected byte[] emptyCF;
protected byte[] emptyCQ;
private final CompiledTTLExpression ttlExpression;

// This relies on Hadoop Configuration to handle warning about deprecated configs and
Expand Down Expand Up @@ -213,6 +215,8 @@ public GlobalIndexRegionScanner(final RegionScanner innerScanner,
}
tenantId = scan.getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString());
schemaName = scan.getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
emptyCF = scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME);
emptyCQ = scan.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME);
logicalTableName = scan.getAttribute(
MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
tableType = scan.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private void verifyIndex() throws IOException {

private void setMutationAttributes(Mutation m, byte[] uuidValue) {
ScanUtil.annotateMutationWithMetadataAttributes(tenantId, schemaName,
logicalTableName, tableType, lastDdlTimestamp, m);
logicalTableName, tableType, lastDdlTimestamp, emptyCF, emptyCQ, m);
m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD,
indexMetaData);
m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@
import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import javax.annotation.concurrent.GuardedBy;

Expand All @@ -60,6 +64,7 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
Expand Down Expand Up @@ -178,6 +183,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private Configuration compactionConfig;
private Configuration indexWriteConfig;
private ReadOnlyProps indexWriteProps;
private boolean isEmptyColumnNameCached;

private static final ConcurrentMap<TableName, Map<String, byte[]>> TABLE_ATTRIBUTES =
new ConcurrentHashMap<>();

@Override
public Optional<RegionObserver> getRegionObserver() {
Expand All @@ -186,6 +195,7 @@ public Optional<RegionObserver> getRegionObserver() {

@Override
public void start(CoprocessorEnvironment e) throws IOException {
this.isEmptyColumnNameCached = false;
/*
* We need to create a copy of region's configuration since we don't want any side effect of
* setting the RpcControllerFactory.
Expand Down Expand Up @@ -611,6 +621,47 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) {
region.getTableDescriptor().getTableName().getName()) == 0);
}

@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker) {
try {
if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
return scanner;
} else {
TableName tableName =
c.getEnvironment().getRegion().getTableDescriptor().getTableName();
Map<String, byte[]> tableAttributes = TABLE_ATTRIBUTES.get(tableName);
if (tableAttributes == null) {
LOGGER.warn("Table {} has no table attributes for empty CF and empty CQ. "
+ "UngroupedAggregateRegionObserver has not received any "
+ "mutations yet? Do not perform Compaction now.", tableName);
return scanner;
}
byte[] emptyCF = tableAttributes
.get(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME);
byte[] emptyCQ = tableAttributes
.get(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME);
if (emptyCF == null || emptyCQ == null) {
LOGGER.warn(
"Table {} has no empty CF and empty CQ attributes updated in cache. "
+ "Do not perform Compaction now.",
tableName);
return scanner;
}
return User.runAsLoginUser(
(PrivilegedExceptionAction<InternalScanner>) () -> new CompactionScanner(
c.getEnvironment(), store, scanner,
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(
c.getEnvironment().getConfiguration()),
false, true, null, emptyCF, emptyCQ));
}
} catch (Exception e) {
// this is not normal
LOGGER.error("Error in preFlush. Do not perform Compaction for now.", e);
return scanner;
}
}

@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
Expand Down Expand Up @@ -682,7 +733,7 @@ && isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(
c.getEnvironment().getConfiguration()),
request.isMajor() || request.isAllFiles(),
keepDeleted, table
keepDeleted, table, null, null
);
}
else if (isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
Expand Down Expand Up @@ -1062,6 +1113,7 @@ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
throws IOException {
final Configuration conf = c.getEnvironment().getConfiguration();
try {
updateEmptyCfCqValuesToStaticCache(c, miniBatchOp);
final HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(conf);
if (haGroupStoreManager.isMutationBlocked()) {
throw new IOException("Blocking Mutation as Some CRRs are in ACTIVE_TO_STANDBY "
Expand All @@ -1071,4 +1123,36 @@ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
throw new IOException(e);
}
}

private void updateEmptyCfCqValuesToStaticCache(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) {
if (!isEmptyColumnNameCached) {
byte[] emptyCF = miniBatchOp.getOperation(0)
.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME);
byte[] emptyCQ = miniBatchOp.getOperation(0)
.getAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME);
if (emptyCF != null && emptyCQ != null) {
updateEmptyColNamesToStaticCache(
c.getEnvironment().getRegion().getTableDescriptor().getTableName(), emptyCF,
emptyCQ);
isEmptyColumnNameCached = true;
}
}
}

private static void updateEmptyColNamesToStaticCache(TableName table, byte[] emptyCF,
byte[] emptyCQ) {
TABLE_ATTRIBUTES.computeIfAbsent(
table, tableName -> {
Map<String, byte[]> tableAttributes = new HashMap<>();
tableAttributes.put(
BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME,
emptyCF);
tableAttributes.put(
BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME,
emptyCQ);
return tableAttributes;
});
}

}
Loading