Skip to content

Commit 27f86ca

Browse files
committed
Use table definition of metastore in Delta Lake
1 parent 653fce3 commit 27f86ca

File tree

3 files changed

+560
-37
lines changed

3 files changed

+560
-37
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 139 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.collect.ImmutableSet;
2323
import com.google.common.collect.ImmutableTable;
2424
import com.google.common.collect.Sets;
25+
import com.google.common.collect.Streams;
2526
import dev.failsafe.Failsafe;
2627
import dev.failsafe.RetryPolicy;
2728
import io.airlift.json.JsonCodec;
@@ -50,6 +51,7 @@
5051
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
5152
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler.TableUpdateInfo;
5253
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
54+
import io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore;
5355
import io.trino.plugin.deltalake.metastore.NotADeltaLakeTableException;
5456
import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle;
5557
import io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId;
@@ -80,6 +82,7 @@
8082
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
8183
import io.trino.plugin.hive.TrinoViewHiveMetastore;
8284
import io.trino.plugin.hive.security.AccessControlMetadata;
85+
import io.trino.spi.ErrorCode;
8386
import io.trino.spi.NodeManager;
8487
import io.trino.spi.TrinoException;
8588
import io.trino.spi.block.Block;
@@ -107,6 +110,7 @@
107110
import io.trino.spi.connector.Constraint;
108111
import io.trino.spi.connector.ConstraintApplicationResult;
109112
import io.trino.spi.connector.ProjectionApplicationResult;
113+
import io.trino.spi.connector.RelationCommentMetadata;
110114
import io.trino.spi.connector.RetryMode;
111115
import io.trino.spi.connector.RowChangeParadigm;
112116
import io.trino.spi.connector.SaveMode;
@@ -162,6 +166,7 @@
162166
import java.util.List;
163167
import java.util.Map;
164168
import java.util.Map.Entry;
169+
import java.util.Objects;
165170
import java.util.Optional;
166171
import java.util.OptionalInt;
167172
import java.util.OptionalLong;
@@ -170,6 +175,7 @@
170175
import java.util.concurrent.atomic.AtomicInteger;
171176
import java.util.concurrent.atomic.AtomicReference;
172177
import java.util.function.Function;
178+
import java.util.function.UnaryOperator;
173179
import java.util.stream.Collectors;
174180
import java.util.stream.Stream;
175181

@@ -191,6 +197,7 @@
191197
import static io.trino.hive.formats.HiveClassNames.LAZY_SIMPLE_SERDE_CLASS;
192198
import static io.trino.hive.formats.HiveClassNames.SEQUENCEFILE_INPUT_FORMAT_CLASS;
193199
import static io.trino.metastore.StorageFormat.create;
200+
import static io.trino.metastore.Table.TABLE_COMMENT;
194201
import static io.trino.plugin.base.filter.UtcConstraintExtractor.extractTupleDomain;
195202
import static io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation;
196203
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
@@ -219,6 +226,7 @@
219226
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
220227
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
221228
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
229+
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isStoreTableMetadataInMetastoreEnabled;
222230
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled;
223231
import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate;
224232
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY;
@@ -230,11 +238,14 @@
230238
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getCheckpointInterval;
231239
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getLocation;
232240
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.getPartitionedBy;
241+
import static io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler.containsSchemaString;
242+
import static io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler.getLastTransactionVersion;
233243
import static io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler.isSameTransactionVersion;
234244
import static io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler.tableMetadataParameters;
235245
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_PROPERTY;
236246
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE;
237247
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.convertToDeltaMetastoreTable;
248+
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.verifyDeltaLakeTable;
238249
import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE;
239250
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY;
240251
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.COLUMN_MAPPING_PHYSICAL_NAME_CONFIGURATION_KEY;
@@ -280,13 +291,17 @@
280291
import static io.trino.plugin.hive.util.HiveUtil.escapeTableName;
281292
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
282293
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
294+
import static io.trino.spi.ErrorType.EXTERNAL;
283295
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
284296
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
285297
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
286298
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
287299
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
300+
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
288301
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
289302
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
303+
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
304+
import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE;
290305
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
291306
import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
292307
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
@@ -324,7 +339,9 @@
324339
import static java.util.UUID.randomUUID;
325340
import static java.util.function.Function.identity;
326341
import static java.util.function.Predicate.not;
342+
import static java.util.stream.Collectors.collectingAndThen;
327343
import static java.util.stream.Collectors.partitioningBy;
344+
import static java.util.stream.Collectors.toUnmodifiableSet;
328345

329346
public class DeltaLakeMetadata
330347
implements ConnectorMetadata
@@ -835,6 +852,93 @@ public Optional<ConnectorTableLayout> getInsertLayout(ConnectorSession session,
835852
return Optional.of(new ConnectorTableLayout(partitionColumnNames));
836853
}
837854

855+
@Override
856+
public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
857+
{
858+
Map<SchemaTableName, ConnectorViewDefinition> viewDefinitions = getViews(session, schemaName);
859+
ImmutableList.Builder<RelationCommentMetadata> commentMetadataBuilder = ImmutableList.builderWithExpectedSize(viewDefinitions.size());
860+
ImmutableSet.Builder<SchemaTableName> viewNamesBuilder = ImmutableSet.builderWithExpectedSize(viewDefinitions.size());
861+
for (Entry<SchemaTableName, ConnectorViewDefinition> viewDefinitionEntry : viewDefinitions.entrySet()) {
862+
RelationCommentMetadata relationCommentMetadata = RelationCommentMetadata.forRelation(viewDefinitionEntry.getKey(), viewDefinitionEntry.getValue().getComment());
863+
commentMetadataBuilder.add(relationCommentMetadata);
864+
viewNamesBuilder.add(relationCommentMetadata.name());
865+
}
866+
List<RelationCommentMetadata> views = commentMetadataBuilder.build();
867+
Set<SchemaTableName> viewNames = viewNamesBuilder.build();
868+
869+
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
870+
871+
Stream<RelationCommentMetadata> tables = listTables(session, schemaName).stream()
872+
.filter(tableName -> !viewNames.contains(tableName))
873+
.collect(collectingAndThen(toUnmodifiableSet(), relationFilter)).stream()
874+
.map(tableName -> getRelationCommentMetadata(session, fileSystem, tableName))
875+
.filter(Objects::nonNull);
876+
877+
Set<SchemaTableName> availableViews = relationFilter.apply(viewNames);
878+
return Streams.concat(views.stream().filter(commentMetadata -> availableViews.contains(commentMetadata.name())), tables)
879+
.iterator();
880+
}
881+
882+
private RelationCommentMetadata getRelationCommentMetadata(ConnectorSession session, TrinoFileSystem fileSystem, SchemaTableName tableName)
883+
{
884+
if (redirectTable(session, tableName).isPresent()) {
885+
return RelationCommentMetadata.forRedirectedTable(tableName);
886+
}
887+
888+
try {
889+
Optional<Table> metastoreTable = metastore.getRawMetastoreTable(tableName.getSchemaName(), tableName.getTableName());
890+
if (metastoreTable.isEmpty()) {
891+
// this may happen when table is being deleted concurrently
892+
return null;
893+
}
894+
895+
Table table = metastoreTable.get();
896+
verifyDeltaLakeTable(table);
897+
898+
String tableLocation = HiveMetastoreBackedDeltaLakeMetastore.getTableLocation(table);
899+
if (canUseTableParametersFromMetastore(session, fileSystem, table, tableLocation)) {
900+
// Don't check TABLE_COMMENT existence because it's not stored in case of null comment
901+
return RelationCommentMetadata.forRelation(tableName, Optional.ofNullable(table.getParameters().get(TABLE_COMMENT)));
902+
}
903+
904+
TableSnapshot snapshot = getSnapshot(session, tableName, tableLocation, Optional.empty());
905+
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot);
906+
return RelationCommentMetadata.forRelation(tableName, Optional.ofNullable(metadata.getDescription()));
907+
}
908+
catch (RuntimeException e) {
909+
boolean suppressed = false;
910+
if (e instanceof TrinoException trinoException) {
911+
ErrorCode errorCode = trinoException.getErrorCode();
912+
suppressed = errorCode.equals(UNSUPPORTED_TABLE_TYPE.toErrorCode()) ||
913+
// e.g. table deleted concurrently
914+
errorCode.equals(TABLE_NOT_FOUND.toErrorCode()) ||
915+
errorCode.equals(NOT_FOUND.toErrorCode()) ||
916+
// e.g. Delta table being deleted concurrently resulting in failure to load metadata from filesystem
917+
errorCode.getType() == EXTERNAL;
918+
}
919+
if (suppressed) {
920+
LOG.debug("Failed to get metadata for table: %s", tableName);
921+
}
922+
else {
923+
// getTableHandle or getTableMetadata failed call may fail if table disappeared during listing or is unsupported
924+
LOG.warn("Failed to get metadata for table: %s", tableName);
925+
}
926+
// Since the getTableHandle did not return null (i.e. succeeded or failed), we assume the table would be returned by listTables
927+
return RelationCommentMetadata.forRelation(tableName, Optional.empty());
928+
}
929+
}
930+
931+
private static boolean canUseTableParametersFromMetastore(ConnectorSession session, TrinoFileSystem fileSystem, Table table, String tableLocation)
932+
{
933+
if (!isStoreTableMetadataInMetastoreEnabled(session)) {
934+
return false;
935+
}
936+
937+
return getLastTransactionVersion(table)
938+
.map(version -> isLatestVersion(fileSystem, tableLocation, version))
939+
.orElse(false);
940+
}
941+
838942
@Override
839943
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
840944
{
@@ -848,39 +952,66 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
848952
.map(_ -> singletonList(prefix.toSchemaTableName()))
849953
.orElseGet(() -> listTables(session, prefix.getSchema()));
850954

955+
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
956+
851957
return tables.stream()
852-
.flatMap(table -> {
958+
.flatMap(tableName -> {
853959
try {
854-
if (redirectTable(session, table).isPresent()) {
960+
if (redirectTable(session, tableName).isPresent()) {
855961
// put "redirect marker" for current table
856-
return Stream.of(TableColumnsMetadata.forRedirectedTable(table));
962+
return Stream.of(TableColumnsMetadata.forRedirectedTable(tableName));
857963
}
858964

859-
Optional<DeltaMetastoreTable> metastoreTable = metastore.getTable(table.getSchemaName(), table.getTableName());
965+
Optional<Table> metastoreTable = metastore.getRawMetastoreTable(tableName.getSchemaName(), tableName.getTableName());
860966
if (metastoreTable.isEmpty()) {
861967
// this may happen when table is being deleted concurrently,
862968
return Stream.of();
863969
}
864-
String tableLocation = metastoreTable.get().location();
865-
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, table, tableLocation, Optional.empty());
970+
971+
Table table = metastoreTable.get();
972+
verifyDeltaLakeTable(table);
973+
974+
String tableLocation = HiveMetastoreBackedDeltaLakeMetastore.getTableLocation(table);
975+
if (containsSchemaString(table) && canUseTableParametersFromMetastore(session, fileSystem, table, tableLocation)) {
976+
List<ColumnMetadata> columnsMetadata = metadataScheduler.getColumnsMetadata(table);
977+
return Stream.of(TableColumnsMetadata.forTable(tableName, columnsMetadata));
978+
}
979+
// Don't store cache in streamTableColumns method for avoiding too many update calls
980+
981+
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, tableName, tableLocation, Optional.empty());
866982
MetadataEntry metadata = transactionLogAccess.getMetadataEntry(session, snapshot);
867983
ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot);
868984
List<ColumnMetadata> columnMetadata = getTableColumnMetadata(metadata, protocol);
869-
return Stream.of(TableColumnsMetadata.forTable(table, columnMetadata));
985+
return Stream.of(TableColumnsMetadata.forTable(tableName, columnMetadata));
870986
}
871987
catch (NotADeltaLakeTableException | IOException e) {
872988
return Stream.empty();
873989
}
874990
catch (RuntimeException e) {
875991
// this may happen when table is being deleted concurrently, it still exists in metastore but TL is no longer present
876992
// there can be several different exceptions thrown this is why all RTE are caught and ignored here
877-
LOG.debug(e, "Ignored exception when trying to list columns from %s", table);
993+
LOG.debug(e, "Ignored exception when trying to list columns from %s", tableName);
878994
return Stream.empty();
879995
}
880996
})
881997
.iterator();
882998
}
883999

1000+
private static boolean isLatestVersion(TrinoFileSystem fileSystem, String tableLocation, long version)
1001+
{
1002+
String transactionLogDir = getTransactionLogDir(tableLocation);
1003+
Location transactionLogJsonEntryPath = getTransactionLogJsonEntryPath(transactionLogDir, version);
1004+
Location nextTransactionLogJsonEntryPath = getTransactionLogJsonEntryPath(transactionLogDir, version + 1);
1005+
try {
1006+
return !fileSystem.newInputFile(nextTransactionLogJsonEntryPath).exists() &&
1007+
fileSystem.newInputFile(transactionLogJsonEntryPath).exists();
1008+
}
1009+
catch (IOException e) {
1010+
LOG.debug(e, "Failed to check table location: %s", tableLocation);
1011+
return false;
1012+
}
1013+
}
1014+
8841015
private List<DeltaLakeColumnHandle> getColumns(MetadataEntry deltaMetadata, ProtocolEntry protocolEntry)
8851016
{
8861017
ImmutableList.Builder<DeltaLakeColumnHandle> columns = ImmutableList.builder();

0 commit comments

Comments
 (0)