Skip to content

feat: metadata access support for table #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ set(ICEBERG_SOURCES
sort_field.cc
sort_order.cc
statistics_file.cc
table.cc
table_metadata.cc
transform.cc
transform_function.cc
Expand Down
110 changes: 110 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/table.h"

#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_metadata.h"

namespace iceberg {

const std::string& Table::uuid() const { return metadata_->table_uuid; }

Result<std::shared_ptr<Schema>> Table::schema() const { return metadata_->Schema(); }

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>&
Table::schemas() const {
if (!schemas_map_) {
schemas_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<Schema>>>();
for (const auto& schema : metadata_->schemas) {
if (schema->schema_id()) {
schemas_map_->emplace(schema->schema_id().value(), schema);
}
}
}
return schemas_map_;
}

Result<std::shared_ptr<PartitionSpec>> Table::spec() const {
return metadata_->PartitionSpec();
}

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
Table::specs() const {
if (!partition_spec_map_) {
partition_spec_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>();
for (const auto& spec : metadata_->partition_specs) {
partition_spec_map_->emplace(spec->spec_id(), spec);
}
}
return partition_spec_map_;
}

Result<std::shared_ptr<SortOrder>> Table::sort_order() const {
return metadata_->SortOrder();
}

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
Table::sort_orders() const {
if (!sort_orders_map_) {
sort_orders_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>();
for (const auto& order : metadata_->sort_orders) {
sort_orders_map_->emplace(order->order_id(), order);
}
}
return sort_orders_map_;
}

const std::unordered_map<std::string, std::string>& Table::properties() const {
return metadata_->properties;
}

const std::string& Table::location() const { return metadata_->location; }

Result<std::shared_ptr<Snapshot>> Table::current_snapshot() const {
return metadata_->Snapshot();
}

Result<std::shared_ptr<Snapshot>> Table::SnapshotById(int64_t snapshot_id) const {
auto iter = std::ranges::find_if(metadata_->snapshots,
[this, &snapshot_id](const auto& snapshot) {
return snapshot->snapshot_id == snapshot_id;
});
if (iter == metadata_->snapshots.end()) {
return NotFound("Snapshot with ID {} is not found", snapshot_id);
}
return *iter;
}

const std::vector<std::shared_ptr<Snapshot>>& Table::snapshots() const {
return metadata_->snapshots;
}

const std::vector<SnapshotLogEntry>& Table::history() const {
return metadata_->snapshot_log;
}

const std::shared_ptr<FileIO>& Table::io() const { return io_; }

} // namespace iceberg
108 changes: 60 additions & 48 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

#pragma once

#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_identifier.h"
#include "iceberg/type_fwd.h"

namespace iceberg {
Expand All @@ -35,77 +36,88 @@ class ICEBERG_EXPORT Table {
public:
virtual ~Table() = default;

/// \brief Return the full name for this table
virtual const std::string& name() const = 0;
/// \brief Construct a table.
/// \param[in] identifier The identifier of the table.
/// \param[in] metadata The metadata for the table.
/// \param[in] metadata_location The location of the table metadata file.
/// \param[in] io The FileIO to read and write table data and metadata files.
/// \param[in] catalog The catalog that this table belongs to. If null, the table will
/// be read-only.
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
std::shared_ptr<Catalog> catalog)
: identifier_(std::move(identifier)),
metadata_(std::move(metadata)),
metadata_location_(std::move(metadata_location)),
io_(std::move(io)),
catalog_(std::move(catalog)) {};

/// \brief Return the identifier of this table
const TableIdentifier& name() const { return identifier_; }

/// \brief Returns the UUID of the table
virtual const std::string& uuid() const = 0;
const std::string& uuid() const;

/// \brief Refresh the current table metadata
virtual Status Refresh() = 0;

/// \brief Return the schema for this table
virtual const std::shared_ptr<Schema>& schema() const = 0;
/// \brief Return the schema for this table, return NotFoundError if not found
Result<std::shared_ptr<Schema>> schema() const;

/// \brief Return a map of schema for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<Schema>>& schemas() const = 0;
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>& schemas()
const;

/// \brief Return the partition spec for this table
virtual const std::shared_ptr<PartitionSpec>& spec() const = 0;
/// \brief Return the partition spec for this table, return NotFoundError if not found
Result<std::shared_ptr<PartitionSpec>> spec() const;

/// \brief Return a map of partition specs for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs()
const = 0;
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
specs() const;

/// \brief Return the sort order for this table
virtual const std::shared_ptr<SortOrder>& sort_order() const = 0;
/// \brief Return the sort order for this table, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> sort_order() const;

/// \brief Return a map of sort order IDs to sort orders for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>& sort_orders()
const = 0;
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
sort_orders() const;

/// \brief Return a map of string properties for this table
virtual const std::unordered_map<std::string, std::string>& properties() const = 0;
const std::unordered_map<std::string, std::string>& properties() const;

/// \brief Return the table's base location
virtual const std::string& location() const = 0;
const std::string& location() const;

/// \brief Return the table's current snapshot
virtual const std::shared_ptr<Snapshot>& current_snapshot() const = 0;
/// \brief Return the table's current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> current_snapshot() const;

/// \brief Get the snapshot of this table with the given id, or null if there is no
/// matching snapshot
/// \brief Get the snapshot of this table with the given id
///
/// \param snapshot_id the ID of the snapshot to get
/// \return the Snapshot with the given id
virtual Result<std::shared_ptr<Snapshot>> snapshot(int64_t snapshot_id) const = 0;
/// \return the Snapshot with the given id, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> SnapshotById(int64_t snapshot_id) const;

/// \brief Get the snapshots of this table
virtual const std::vector<std::shared_ptr<Snapshot>>& snapshots() const = 0;
const std::vector<std::shared_ptr<Snapshot>>& snapshots() const;

/// \brief Get the snapshot history of this table
///
/// \return a vector of history entries
virtual const std::vector<std::shared_ptr<HistoryEntry>>& history() const = 0;

/// \brief Create a new table scan for this table
///
/// Once a table scan is created, it can be refined to project columns and filter data.
virtual std::unique_ptr<TableScan> NewScan() const = 0;

/// \brief Create a new append API to add files to this table and commit
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;

/// \brief Create a new transaction API to commit multiple table operations at once
virtual std::unique_ptr<Transaction> NewTransaction() = 0;

/// TODO(wgtmac): design of FileIO is not finalized yet. We intend to use an
/// IO-less design in the core library.
// /// \brief Returns a FileIO to read and write table data and metadata files
// virtual std::shared_ptr<FileIO> io() const = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep io()?


/// \brief Returns a LocationProvider to provide locations for new data files
virtual std::unique_ptr<LocationProvider> location_provider() const = 0;
const std::vector<SnapshotLogEntry>& history() const;

/// \brief Returns a FileIO to read and write table data and metadata files
const std::shared_ptr<FileIO>& io() const;

private:
const TableIdentifier identifier_;
const std::shared_ptr<TableMetadata> metadata_;
const std::string metadata_location_;
std::shared_ptr<FileIO> io_;
std::shared_ptr<Catalog> catalog_;

mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>
schemas_map_;
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>
partition_spec_map_;
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>
sort_orders_map_;
};

} // namespace iceberg
10 changes: 10 additions & 0 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
return *iter;
}

Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) {
return snapshot->snapshot_id == current_snapshot_id;
});
if (iter == snapshots.end()) {
return NotFound("Current snapshot with ID {} is not found", current_snapshot_id);
}
return *iter;
}

namespace {

template <typename T>
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
/// Table metadata for Iceberg tables.

#include <memory>
#include <mutex>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/snapshot.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/timepoint.h"

Expand Down Expand Up @@ -135,6 +137,8 @@ struct ICEBERG_EXPORT TableMetadata {
Result<std::shared_ptr<PartitionSpec>> PartitionSpec() const;
/// \brief Get the current sort order, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> SortOrder() const;
/// \brief Get the current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> Snapshot() const;

friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs);

Expand Down
5 changes: 4 additions & 1 deletion src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class TransformFunction;
struct PartitionStatisticsFile;
struct Snapshot;
struct SnapshotRef;

struct MetadataLogEntry;
struct SnapshotLogEntry;

struct StatisticsFile;
struct TableMetadata;

Expand All @@ -113,7 +117,6 @@ enum class TransformType;
/// TODO: Forward declarations below are not added yet.
/// ----------------------------------------------------------------------------

class HistoryEntry;
class StructLike;

class MetadataUpdate;
Expand Down
11 changes: 9 additions & 2 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ target_sources(catalog_test PRIVATE in_memory_catalog_test.cc)
target_link_libraries(catalog_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
add_test(NAME catalog_test COMMAND catalog_test)

add_executable(table_test)
target_include_directories(table_test PRIVATE "${CMAKE_BINARY_DIR}")
target_sources(table_test PRIVATE test_common.cc json_internal_test.cc table_test.cc
schema_json_test.cc)
target_link_libraries(table_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
add_test(NAME table_test COMMAND table_test)

add_executable(expression_test)
target_sources(expression_test PRIVATE expression_test.cc)
target_link_libraries(expression_test PRIVATE iceberg_static GTest::gtest_main
Expand All @@ -57,8 +64,8 @@ add_test(NAME expression_test COMMAND expression_test)

add_executable(json_serde_test)
target_include_directories(json_serde_test PRIVATE "${CMAKE_BINARY_DIR}")
target_sources(json_serde_test PRIVATE json_internal_test.cc metadata_serde_test.cc
schema_json_test.cc)
target_sources(json_serde_test PRIVATE test_common.cc json_internal_test.cc
metadata_serde_test.cc schema_json_test.cc)
target_link_libraries(json_serde_test PRIVATE iceberg_static GTest::gtest_main
GTest::gmock)
add_test(NAME json_serde_test COMMAND json_serde_test)
Expand Down
Loading
Loading