Skip to content

Commit 653fce3

Browse files
committed
Add support for storing metadata to metastore in Delta Lake
1 parent 18b4486 commit 653fce3

32 files changed

+1493
-30
lines changed

docs/src/main/sphinx/connector/delta-lake.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,13 @@ values. Typical usage does not require you to configure them.
161161
- Maximum number of metastore data objects per transaction in the Hive
162162
metastore cache.
163163
- `1000`
164+
* - `delta.metastore.store-table-metadata`
165+
- Store table comments and colum definitions in the metastore. The write
166+
permission is required to update the metastore.
167+
- `false`
168+
* - `delta.metastore.store-table-metadata-threads`
169+
- Number of threads used for storing table metadata in metastore.
170+
- `5`
164171
* - `delta.delete-schema-locations-fallback`
165172
- Whether schema locations are deleted when Trino can't determine whether they
166173
contain external files.

plugin/trino-delta-lake/pom.xml

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@
130130
<artifactId>trino-plugin-toolkit</artifactId>
131131
</dependency>
132132

133+
<dependency>
134+
<groupId>io.trino.hive</groupId>
135+
<artifactId>hive-thrift</artifactId>
136+
</dependency>
137+
133138
<dependency>
134139
<groupId>jakarta.annotation</groupId>
135140
<artifactId>jakarta.annotation-api</artifactId>
@@ -181,6 +186,21 @@
181186
<artifactId>jmxutils</artifactId>
182187
</dependency>
183188

189+
<dependency>
190+
<groupId>software.amazon.awssdk</groupId>
191+
<artifactId>aws-core</artifactId>
192+
</dependency>
193+
194+
<dependency>
195+
<groupId>software.amazon.awssdk</groupId>
196+
<artifactId>glue</artifactId>
197+
</dependency>
198+
199+
<dependency>
200+
<groupId>software.amazon.awssdk</groupId>
201+
<artifactId>utils</artifactId>
202+
</dependency>
203+
184204
<dependency>
185205
<groupId>com.fasterxml.jackson.core</groupId>
186206
<artifactId>jackson-annotations</artifactId>
@@ -275,12 +295,6 @@
275295
<scope>runtime</scope>
276296
</dependency>
277297

278-
<dependency>
279-
<groupId>software.amazon.awssdk</groupId>
280-
<artifactId>glue</artifactId>
281-
<scope>runtime</scope>
282-
</dependency>
283-
284298
<dependency>
285299
<groupId>com.github.docker-java</groupId>
286300
<artifactId>docker-java-api</artifactId>

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class DeltaLakeConfig
7474
private boolean collectExtendedStatisticsOnWrite = true;
7575
private HiveCompressionCodec compressionCodec = HiveCompressionCodec.SNAPPY;
7676
private long perTransactionMetastoreCacheMaximumSize = 1000;
77+
private boolean storeTableMetadataEnabled;
78+
private int storeTableMetadataThreads = 5;
7779
private boolean deleteSchemaLocationsFallback;
7880
private String parquetTimeZone = TimeZone.getDefault().getID();
7981
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);
@@ -377,6 +379,33 @@ public DeltaLakeConfig setPerTransactionMetastoreCacheMaximumSize(long perTransa
377379
return this;
378380
}
379381

382+
public boolean isStoreTableMetadataEnabled()
383+
{
384+
return storeTableMetadataEnabled;
385+
}
386+
387+
@Config("delta.metastore.store-table-metadata")
388+
@ConfigDescription("Store table metadata in metastore")
389+
public DeltaLakeConfig setStoreTableMetadataEnabled(boolean storeTableMetadataEnabled)
390+
{
391+
this.storeTableMetadataEnabled = storeTableMetadataEnabled;
392+
return this;
393+
}
394+
395+
@Min(0) // Allow 0 to use the same thread for testing purpose
396+
public int getStoreTableMetadataThreads()
397+
{
398+
return storeTableMetadataThreads;
399+
}
400+
401+
@Config("delta.metastore.store-table-metadata-threads")
402+
@ConfigDescription("Number of threads used for storing table metadata in metastore")
403+
public DeltaLakeConfig setStoreTableMetadataThreads(int storeTableMetadataThreads)
404+
{
405+
this.storeTableMetadataThreads = storeTableMetadataThreads;
406+
return this;
407+
}
408+
380409
public boolean isDeleteSchemaLocationsFallback()
381410
{
382411
return this.deleteSchemaLocationsFallback;

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 97 additions & 13 deletions
Large diffs are not rendered by default.

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.google.inject.Inject;
1717
import io.airlift.json.JsonCodec;
1818
import io.trino.filesystem.TrinoFileSystemFactory;
19+
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
1920
import io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore;
2021
import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess;
2122
import io.trino.plugin.deltalake.statistics.FileBasedTableStatisticsProvider;
@@ -56,6 +57,7 @@ public class DeltaLakeMetadataFactory
5657
private final long perTransactionMetastoreCacheMaximumSize;
5758
private final boolean deleteSchemaLocationsFallback;
5859
private final boolean useUniqueTableLocation;
60+
private final DeltaLakeTableMetadataScheduler metadataScheduler;
5961

6062
private final boolean allowManagedTableRename;
6163
private final String trinoVersion;
@@ -76,7 +78,8 @@ public DeltaLakeMetadataFactory(
7678
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
7779
CachingExtendedStatisticsAccess statisticsAccess,
7880
@AllowDeltaLakeManagedTableRename boolean allowManagedTableRename,
79-
NodeVersion nodeVersion)
81+
NodeVersion nodeVersion,
82+
DeltaLakeTableMetadataScheduler metadataScheduler)
8083
{
8184
this.hiveMetastoreFactory = requireNonNull(hiveMetastoreFactory, "hiveMetastore is null");
8285
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
@@ -98,6 +101,7 @@ public DeltaLakeMetadataFactory(
98101
this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation();
99102
this.allowManagedTableRename = allowManagedTableRename;
100103
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
104+
this.metadataScheduler = requireNonNull(metadataScheduler, "metadataScheduler is null");
101105
}
102106

103107
public DeltaLakeMetadata create(ConnectorIdentity identity)
@@ -135,6 +139,7 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
135139
deleteSchemaLocationsFallback,
136140
deltaLakeRedirectionsProvider,
137141
statisticsAccess,
142+
metadataScheduler,
138143
useUniqueTableLocation,
139144
allowManagedTableRename);
140145
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.trino.plugin.deltalake.cache.DeltaLakeCacheKeyProvider;
2828
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunctionProvider;
2929
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider;
30+
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
3031
import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure;
3132
import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure;
3233
import io.trino.plugin.deltalake.procedure.OptimizeTableProcedure;
@@ -117,6 +118,9 @@ public void setup(Binder binder)
117118
binder.bind(TransactionLogAccess.class).in(Scopes.SINGLETON);
118119
newExporter(binder).export(TransactionLogAccess.class)
119120
.as(generator -> generator.generatedNameOf(TransactionLogAccess.class, catalogName.get().toString()));
121+
binder.bind(DeltaLakeTableMetadataScheduler.class).in(Scopes.SINGLETON);
122+
newExporter(binder).export(DeltaLakeTableMetadataScheduler.class)
123+
.as(generator -> generator.generatedNameOf(DeltaLakeTableMetadataScheduler.class, catalogName.get().toString()));
120124

121125
binder.bind(TransactionLogWriterFactory.class).in(Scopes.SINGLETON);
122126
binder.bind(TransactionLogSynchronizerManager.class).in(Scopes.SINGLETON);

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public final class DeltaLakeSessionProperties
7575
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
7676
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
7777
private static final String CHECKPOINT_FILTERING_ENABLED = "checkpoint_filtering_enabled";
78+
private static final String STORE_TABLE_METADATA = "store_table_metadata";
7879

7980
private final List<PropertyMetadata<?>> sessionProperties;
8081

@@ -230,7 +231,12 @@ public DeltaLakeSessionProperties(
230231
CHECKPOINT_FILTERING_ENABLED,
231232
"Use filter in checkpoint reader",
232233
deltaLakeConfig.isCheckpointFilteringEnabled(),
233-
false));
234+
false),
235+
booleanProperty(
236+
STORE_TABLE_METADATA,
237+
"Store table metadata in metastore",
238+
deltaLakeConfig.isStoreTableMetadataEnabled(),
239+
true));
234240
}
235241

236242
@Override
@@ -348,4 +354,9 @@ public static boolean isCheckpointFilteringEnabled(ConnectorSession session)
348354
{
349355
return session.getProperty(CHECKPOINT_FILTERING_ENABLED, Boolean.class);
350356
}
357+
358+
public static boolean isStoreTableMetadataInMetastoreEnabled(ConnectorSession session)
359+
{
360+
return session.getProperty(STORE_TABLE_METADATA, Boolean.class);
361+
}
351362
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTransactionManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ public void commit(ConnectorTransactionHandle transaction)
5353
{
5454
MemoizedMetadata deltaLakeMetadata = transactions.remove(transaction);
5555
checkArgument(deltaLakeMetadata != null, "no such transaction: %s", transaction);
56+
deltaLakeMetadata.optionalGet().ifPresent(metadata -> {
57+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
58+
metadata.commit();
59+
}
60+
});
5661
}
5762

5863
public void rollback(ConnectorTransactionHandle transaction)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import com.google.inject.BindingAnnotation;
17+
18+
import java.lang.annotation.Retention;
19+
import java.lang.annotation.Target;
20+
21+
import static java.lang.annotation.ElementType.FIELD;
22+
import static java.lang.annotation.ElementType.METHOD;
23+
import static java.lang.annotation.ElementType.PARAMETER;
24+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
25+
26+
@Retention(RUNTIME)
27+
@Target({FIELD, PARAMETER, METHOD})
28+
@BindingAnnotation
29+
public @interface MaxTableParameterLength {}

0 commit comments

Comments
 (0)