|
20 | 20 | use std::collections::HashMap;
|
21 | 21 |
|
22 | 22 | use async_trait::async_trait;
|
23 |
| -use futures::lock::Mutex; |
| 23 | +use futures::lock::{Mutex, MutexGuard}; |
24 | 24 | use iceberg::io::FileIO;
|
25 | 25 | use iceberg::spec::{TableMetadata, TableMetadataBuilder};
|
26 | 26 | use iceberg::table::Table;
|
@@ -53,6 +53,23 @@ impl MemoryCatalog {
|
53 | 53 | warehouse_location,
|
54 | 54 | }
|
55 | 55 | }
|
| 56 | + |
| 57 | + /// Loads a table from the locked namespace state. |
| 58 | + async fn load_table_from_locked_namespace_state( |
| 59 | + &self, |
| 60 | + table_ident: &TableIdent, |
| 61 | + root_namespace_state: &MutexGuard<'_, NamespaceState>, |
| 62 | + ) -> Result<Table> { |
| 63 | + let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; |
| 64 | + let metadata = self.read_metadata(&metadata_location).await?; |
| 65 | + |
| 66 | + Table::builder() |
| 67 | + .identifier(table_ident.clone()) |
| 68 | + .metadata(metadata) |
| 69 | + .metadata_location(metadata_location.to_string()) |
| 70 | + .file_io(self.file_io.clone()) |
| 71 | + .build() |
| 72 | + } |
56 | 73 | }
|
57 | 74 |
|
58 | 75 | #[async_trait]
|
@@ -223,17 +240,8 @@ impl Catalog for MemoryCatalog {
|
223 | 240 | async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
|
224 | 241 | let root_namespace_state = self.root_namespace_state.lock().await;
|
225 | 242 |
|
226 |
| - let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; |
227 |
| - let input_file = self.file_io.new_input(metadata_location)?; |
228 |
| - let metadata_content = input_file.read().await?; |
229 |
| - let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?; |
230 |
| - |
231 |
| - Table::builder() |
232 |
| - .file_io(self.file_io.clone()) |
233 |
| - .metadata_location(metadata_location.clone()) |
234 |
| - .metadata(metadata) |
235 |
| - .identifier(table_ident.clone()) |
236 |
| - .build() |
| 243 | + self.load_table_from_locked_namespace_state(table_ident, &root_namespace_state) |
| 244 | + .await |
237 | 245 | }
|
238 | 246 |
|
239 | 247 | /// Drop a table from the catalog.
|
|
0 commit comments