diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 359b79e6a..d82229ef1 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -92,6 +92,7 @@ set(ICEBERG_SOURCES update/snapshot_update.cc update/update_location.cc update/update_partition_spec.cc + update/update_partition_statistics.cc update/update_properties.cc update/update_schema.cc update/update_snapshot_reference.cc diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 056af671c..95883a935 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -192,6 +192,9 @@ constexpr std::string_view kActionRemoveProperties = "remove-properties"; constexpr std::string_view kActionSetLocation = "set-location"; constexpr std::string_view kActionSetStatistics = "set-statistics"; constexpr std::string_view kActionRemoveStatistics = "remove-statistics"; +constexpr std::string_view kActionSetPartitionStatistics = "set-partition-statistics"; +constexpr std::string_view kActionRemovePartitionStatistics = + "remove-partition-statistics"; // TableUpdate field constants constexpr std::string_view kUUID = "uuid"; @@ -1439,6 +1442,24 @@ nlohmann::json ToJson(const TableUpdate& update) { json[kSnapshotId] = u.snapshot_id(); break; } + case TableUpdate::Kind::kSetPartitionStatistics: { + const auto& u = + internal::checked_cast(update); + json[kAction] = kActionSetPartitionStatistics; + if (u.partition_statistics_file()) { + json[kPartitionStatistics] = ToJson(*u.partition_statistics_file()); + } else { + json[kPartitionStatistics] = nlohmann::json::value_t::null; + } + break; + } + case TableUpdate::Kind::kRemovePartitionStatistics: { + const auto& u = + internal::checked_cast(update); + json[kAction] = kActionRemovePartitionStatistics; + json[kSnapshotId] = u.snapshot_id(); + break; + } } return json; } @@ -1628,6 +1649,18 @@ Result> TableUpdateFromJson(const nlohmann::json& j ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue(json, kSnapshotId)); return std::make_unique(snapshot_id); } + if (action == kActionSetPartitionStatistics) { + ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_json, + GetJsonValue(json, kPartitionStatistics)); + ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_file, + PartitionStatisticsFileFromJson(partition_statistics_json)); + return std::make_unique( + std::move(partition_statistics_file)); + } + if (action == kActionRemovePartitionStatistics) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue(json, kSnapshotId)); + return std::make_unique(snapshot_id); + } return JsonParseError("Unknown table update action: {}", action); } diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 05cb6f8d3..651de7823 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -110,6 +110,7 @@ iceberg_sources = files( 'update/snapshot_update.cc', 'update/update_location.cc', 'update/update_partition_spec.cc', + 'update/update_partition_statistics.cc', 'update/update_properties.cc', 'update/update_schema.cc', 'update/update_snapshot_reference.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 73acafd74..b6c26ea00 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -33,6 +33,7 @@ #include "iceberg/transaction.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/update_partition_spec.h" +#include "iceberg/update/update_partition_statistics.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" #include "iceberg/update/update_statistics.h" @@ -214,6 +215,13 @@ Result> Table::NewUpdateStatistics() { return transaction->NewUpdateStatistics(); } +Result> Table::NewUpdatePartitionStatistics() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewUpdatePartitionStatistics(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 77e9016f4..1f3135dd7 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -156,6 +156,11 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewUpdateStatistics(); + /// \brief Create a new UpdatePartitionStatistics to update partition statistics and + /// commit the changes. + virtual Result> + NewUpdatePartitionStatistics(); + /// \brief Create a new UpdateLocation to update the table location and commit the /// changes. virtual Result> NewUpdateLocation(); diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 393b438af..7dab67041 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -623,6 +623,9 @@ class TableMetadataBuilder::Impl { Status RemovePartitionSpecs(const std::vector& spec_ids); Status SetStatistics(std::shared_ptr statistics_file); Status RemoveStatistics(int64_t snapshot_id); + Status SetPartitionStatistics( + std::shared_ptr partition_statistics_file); + Status RemovePartitionStatistics(int64_t snapshot_id); Result> Build(); @@ -1208,6 +1211,41 @@ Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) { return {}; } +Status TableMetadataBuilder::Impl::SetPartitionStatistics( + std::shared_ptr partition_statistics_file) { + ICEBERG_PRECHECK(partition_statistics_file != nullptr, + "Cannot set null partition statistics file"); + + // Find and replace existing partition statistics for the same snapshot_id, or add new + // one + auto it = std::ranges::find_if( + metadata_.partition_statistics, + [snapshot_id = partition_statistics_file->snapshot_id](const auto& stat) { + return stat && stat->snapshot_id == snapshot_id; + }); + + if (it != metadata_.partition_statistics.end()) { + *it = partition_statistics_file; + } else { + metadata_.partition_statistics.push_back(partition_statistics_file); + } + + changes_.push_back(std::make_unique( + std::move(partition_statistics_file))); + return {}; +} + +Status TableMetadataBuilder::Impl::RemovePartitionStatistics(int64_t snapshot_id) { + auto removed_count = + std::erase_if(metadata_.partition_statistics, [snapshot_id](const auto& stat) { + return stat && stat->snapshot_id == snapshot_id; + }); + if (removed_count != 0) { + changes_.push_back(std::make_unique(snapshot_id)); + } + return {}; +} + std::unordered_set TableMetadataBuilder::Impl::IntermediateSnapshotIdSet( int64_t current_snapshot_id) const { std::unordered_set added_snapshot_ids; @@ -1636,12 +1674,15 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics( const std::shared_ptr& partition_statistics_file) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR( + impl_->SetPartitionStatistics(partition_statistics_file)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics( int64_t snapshot_id) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionStatistics(snapshot_id)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetProperties( diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 5866b61d0..946b2a6ab 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -500,4 +500,60 @@ std::unique_ptr RemoveStatistics::Clone() const { return std::make_unique(snapshot_id_); } +// SetPartitionStatistics + +int64_t SetPartitionStatistics::snapshot_id() const { + return partition_statistics_file_->snapshot_id; +} + +void SetPartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const { + builder.SetPartitionStatistics(partition_statistics_file_); +} + +void SetPartitionStatistics::GenerateRequirements(TableUpdateContext& context) const { + // SetPartitionStatistics doesn't generate any requirements +} + +bool SetPartitionStatistics::Equals(const TableUpdate& other) const { + if (other.kind() != Kind::kSetPartitionStatistics) { + return false; + } + const auto& other_set = internal::checked_cast(other); + if (!partition_statistics_file_ != !other_set.partition_statistics_file_) { + return false; + } + if (partition_statistics_file_ && + !(*partition_statistics_file_ == *other_set.partition_statistics_file_)) { + return false; + } + return true; +} + +std::unique_ptr SetPartitionStatistics::Clone() const { + return std::make_unique(partition_statistics_file_); +} + +// RemovePartitionStatistics + +void RemovePartitionStatistics::ApplyTo(TableMetadataBuilder& builder) const { + builder.RemovePartitionStatistics(snapshot_id_); +} + +void RemovePartitionStatistics::GenerateRequirements(TableUpdateContext& context) const { + // RemovePartitionStatistics doesn't generate any requirements +} + +bool RemovePartitionStatistics::Equals(const TableUpdate& other) const { + if (other.kind() != Kind::kRemovePartitionStatistics) { + return false; + } + const auto& other_remove = + internal::checked_cast(other); + return snapshot_id_ == other_remove.snapshot_id_; +} + +std::unique_ptr RemovePartitionStatistics::Clone() const { + return std::make_unique(snapshot_id_); +} + } // namespace iceberg::table diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index 5bbc243ef..c75c3fa6a 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -61,6 +61,8 @@ class ICEBERG_EXPORT TableUpdate { kSetLocation, kSetStatistics, kRemoveStatistics, + kSetPartitionStatistics, + kRemovePartitionStatistics, }; virtual ~TableUpdate(); @@ -558,6 +560,54 @@ class ICEBERG_EXPORT RemoveStatistics : public TableUpdate { int64_t snapshot_id_; }; +/// \brief Represents setting partition statistics for a snapshot +class ICEBERG_EXPORT SetPartitionStatistics : public TableUpdate { + public: + explicit SetPartitionStatistics( + std::shared_ptr partition_statistics_file) + : partition_statistics_file_(std::move(partition_statistics_file)) {} + + int64_t snapshot_id() const; + + const std::shared_ptr& partition_statistics_file() const { + return partition_statistics_file_; + } + + void ApplyTo(TableMetadataBuilder& builder) const override; + + void GenerateRequirements(TableUpdateContext& context) const override; + + Kind kind() const override { return Kind::kSetPartitionStatistics; } + + bool Equals(const TableUpdate& other) const override; + + std::unique_ptr Clone() const override; + + private: + std::shared_ptr partition_statistics_file_; +}; + +/// \brief Represents removing partition statistics for a snapshot +class ICEBERG_EXPORT RemovePartitionStatistics : public TableUpdate { + public: + explicit RemovePartitionStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {} + + int64_t snapshot_id() const { return snapshot_id_; } + + void ApplyTo(TableMetadataBuilder& builder) const override; + + void GenerateRequirements(TableUpdateContext& context) const override; + + Kind kind() const override { return Kind::kRemovePartitionStatistics; } + + bool Equals(const TableUpdate& other) const override; + + std::unique_ptr Clone() const override; + + private: + int64_t snapshot_id_; +}; + } // namespace table } // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1bd2fd6ad..c46e05a95 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -178,6 +178,7 @@ if(ICEBERG_BUILD_BUNDLE) transaction_test.cc update_location_test.cc update_partition_spec_test.cc + update_partition_statistics_test.cc update_properties_test.cc update_schema_test.cc update_sort_order_test.cc diff --git a/src/iceberg/test/json_internal_test.cc b/src/iceberg/test/json_internal_test.cc index bb167ad0d..8fa24312b 100644 --- a/src/iceberg/test/json_internal_test.cc +++ b/src/iceberg/test/json_internal_test.cc @@ -613,6 +613,43 @@ TEST(JsonInternalTest, TableUpdateRemoveStatistics) { update); } +TEST(JsonInternalTest, TableUpdateSetPartitionStatistics) { + auto partition_stats_file = std::make_shared(); + partition_stats_file->snapshot_id = 123456789; + partition_stats_file->path = + "s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet"; + partition_stats_file->file_size_in_bytes = 2048; + + table::SetPartitionStatistics update(partition_stats_file); + nlohmann::json expected = R"({ + "action": "set-partition-statistics", + "partition-statistics": { + "snapshot-id": 123456789, + "statistics-path": "s3://bucket/warehouse/table/metadata/partition-stats-123456789.parquet", + "file-size-in-bytes": 2048 + } + })"_json; + + EXPECT_EQ(ToJson(update), expected); + auto parsed = TableUpdateFromJson(expected); + ASSERT_THAT(parsed, IsOk()); + EXPECT_EQ(*internal::checked_cast(parsed.value().get()), + update); +} + +TEST(JsonInternalTest, TableUpdateRemovePartitionStatistics) { + table::RemovePartitionStatistics update(123456789); + nlohmann::json expected = + R"({"action":"remove-partition-statistics","snapshot-id":123456789})"_json; + + EXPECT_EQ(ToJson(update), expected); + auto parsed = TableUpdateFromJson(expected); + ASSERT_THAT(parsed, IsOk()); + EXPECT_EQ( + *internal::checked_cast(parsed.value().get()), + update); +} + TEST(JsonInternalTest, TableUpdateUnknownAction) { nlohmann::json json = R"({"action":"unknown-action"})"_json; auto result = TableUpdateFromJson(json); diff --git a/src/iceberg/test/update_partition_statistics_test.cc b/src/iceberg/test/update_partition_statistics_test.cc new file mode 100644 index 000000000..5ed84cc0a --- /dev/null +++ b/src/iceberg/test/update_partition_statistics_test.cc @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_partition_statistics.h" + +#include +#include + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/statistics_file.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" + +namespace iceberg { + +class UpdatePartitionStatisticsTest : public UpdateTestBase { + protected: + // Helper function to create a partition statistics file + std::shared_ptr MakePartitionStatisticsFile( + int64_t snapshot_id, const std::string& path, int64_t file_size = 2048) { + auto stats_file = std::make_shared(); + stats_file->snapshot_id = snapshot_id; + stats_file->path = path; + stats_file->file_size_in_bytes = file_size; + return stats_file; + } + + // Helper to find partition statistics file by snapshot_id in the result vector + std::shared_ptr FindPartitionStatistics( + const std::vector>>& + to_set, + int64_t snapshot_id) { + auto it = std::ranges::find_if( + to_set, [snapshot_id](const auto& p) { return p.first == snapshot_id; }); + return it != to_set.end() ? it->second : nullptr; + } +}; + +TEST_F(UpdatePartitionStatisticsTest, EmptyUpdate) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics()); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_TRUE(result.to_remove.empty()); +} + +TEST_F(UpdatePartitionStatisticsTest, SetPartitionStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics()); + auto partition_stats_file = MakePartitionStatisticsFile( + 1, "/warehouse/test_table/metadata/partition-stats-1.parquet"); + update->SetPartitionStatistics(partition_stats_file); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_TRUE(result.to_remove.empty()); + + auto found = FindPartitionStatistics(result.to_set, 1); + ASSERT_NE(found, nullptr); + EXPECT_EQ(found->snapshot_id, 1); + EXPECT_EQ(found->path, "/warehouse/test_table/metadata/partition-stats-1.parquet"); + EXPECT_EQ(found->file_size_in_bytes, 2048); +} + +TEST_F(UpdatePartitionStatisticsTest, SetMultiplePartitionStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics()); + + auto partition_stats_file1 = MakePartitionStatisticsFile( + 1, "/warehouse/test_table/metadata/partition-stats-1.parquet"); + auto partition_stats_file2 = MakePartitionStatisticsFile( + 2, "/warehouse/test_table/metadata/partition-stats-2.parquet", 4096); + + update->SetPartitionStatistics(partition_stats_file1); + update->SetPartitionStatistics(partition_stats_file2); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 2); + EXPECT_TRUE(result.to_remove.empty()); + + auto found1 = FindPartitionStatistics(result.to_set, 1); + ASSERT_NE(found1, nullptr); + EXPECT_EQ(found1->snapshot_id, 1); + + auto found2 = FindPartitionStatistics(result.to_set, 2); + ASSERT_NE(found2, nullptr); + EXPECT_EQ(found2->snapshot_id, 2); + EXPECT_EQ(found2->file_size_in_bytes, 4096); +} + +TEST_F(UpdatePartitionStatisticsTest, ReplacePartitionStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics()); + + auto partition_stats_file1 = MakePartitionStatisticsFile( + 1, "/warehouse/test_table/metadata/partition-stats-1.parquet"); + auto partition_stats_file2 = MakePartitionStatisticsFile( + 1, "/warehouse/test_table/metadata/partition-stats-1-updated.parquet", 8192); + + update->SetPartitionStatistics(partition_stats_file1); + update->SetPartitionStatistics(partition_stats_file2); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_TRUE(result.to_remove.empty()); + + auto found = FindPartitionStatistics(result.to_set, 1); + ASSERT_NE(found, nullptr); + EXPECT_EQ(found->path, + "/warehouse/test_table/metadata/partition-stats-1-updated.parquet"); + EXPECT_EQ(found->file_size_in_bytes, 8192); +} + +TEST_F(UpdatePartitionStatisticsTest, RemovePartitionStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics()); + update->RemovePartitionStatistics(1); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_EQ(result.to_remove[0], 1); +} + +TEST_F(UpdatePartitionStatisticsTest, SetThenRemovePartitionStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics()); + + auto partition_stats_file = MakePartitionStatisticsFile( + 1, "/warehouse/test_table/metadata/partition-stats-1.parquet"); + update->SetPartitionStatistics(partition_stats_file); + update->RemovePartitionStatistics(1); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_EQ(result.to_remove[0], 1); +} + +TEST_F(UpdatePartitionStatisticsTest, SetNullPartitionStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics()); + + update->SetPartitionStatistics(nullptr); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Statistics file cannot be null")); +} + +TEST_F(UpdatePartitionStatisticsTest, SetAndRemoveMixed) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdatePartitionStatistics()); + + auto partition_stats_file1 = MakePartitionStatisticsFile( + 1, "/warehouse/test_table/metadata/partition-stats-1.parquet"); + auto partition_stats_file2 = MakePartitionStatisticsFile( + 2, "/warehouse/test_table/metadata/partition-stats-2.parquet"); + + update->SetPartitionStatistics(partition_stats_file1); + update->SetPartitionStatistics(partition_stats_file2); + update->RemovePartitionStatistics(3); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 2); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_EQ(result.to_remove[0], 3); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 04ccdfb98..b24aa0da3 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -39,6 +39,7 @@ #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_location.h" #include "iceberg/update/update_partition_spec.h" +#include "iceberg/update/update_partition_statistics.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" #include "iceberg/update/update_snapshot_reference.h" @@ -142,6 +143,10 @@ Status Transaction::Apply(PendingUpdate& update) { ICEBERG_RETURN_UNEXPECTED( ApplyUpdateStatistics(internal::checked_cast(update))); break; + case PendingUpdate::Kind::kUpdatePartitionStatistics: + ICEBERG_RETURN_UNEXPECTED(ApplyUpdatePartitionStatistics( + internal::checked_cast(update))); + break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -284,6 +289,17 @@ Status Transaction::ApplyUpdateStatistics(UpdateStatistics& update) { return {}; } +Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& update) { + ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); + for (auto&& [_, partition_stat_file] : result.to_set) { + metadata_builder_->SetPartitionStatistics(std::move(partition_stat_file)); + } + for (const auto& snapshot_id : result.to_remove) { + metadata_builder_->RemovePartitionStatistics(snapshot_id); + } + return {}; +} + Result> Transaction::Commit() { if (committed_) { return Invalid("Transaction already committed"); @@ -395,6 +411,15 @@ Result> Transaction::NewUpdateStatistics() { return update_statistics; } +Result> +Transaction::NewUpdatePartitionStatistics() { + ICEBERG_ASSIGN_OR_RAISE( + std::shared_ptr update_partition_statistics, + UpdatePartitionStatistics::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_partition_statistics)); + return update_partition_statistics; +} + Result> Transaction::NewUpdateSnapshotReference() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_ref, diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 3d5450e5c..e975be7ff 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -86,6 +86,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateStatistics(); + /// \brief Create a new UpdatePartitionStatistics to update partition statistics and + /// commit the changes. + Result> NewUpdatePartitionStatistics(); + /// \brief Create a new UpdateLocation to update the table location and commit the /// changes. Result> NewUpdateLocation(); @@ -115,6 +119,7 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/statistics_file.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> UpdatePartitionStatistics::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdatePartitionStatistics without a transaction"); + return std::shared_ptr( + new UpdatePartitionStatistics(std::move(transaction))); +} + +UpdatePartitionStatistics::UpdatePartitionStatistics( + std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +UpdatePartitionStatistics::~UpdatePartitionStatistics() = default; + +UpdatePartitionStatistics& UpdatePartitionStatistics::SetPartitionStatistics( + std::shared_ptr partition_statistics_file) { + ICEBERG_BUILDER_CHECK(partition_statistics_file != nullptr, + "Statistics file cannot be null"); + + partition_statistics_to_set_[partition_statistics_file->snapshot_id] = + std::move(partition_statistics_file); + return *this; +} + +UpdatePartitionStatistics& UpdatePartitionStatistics::RemovePartitionStatistics( + int64_t snapshot_id) { + partition_statistics_to_set_[snapshot_id] = nullptr; + return *this; +} + +Result UpdatePartitionStatistics::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + ApplyResult result; + for (const auto& [snapshot_id, partition_stats] : partition_statistics_to_set_) { + if (partition_stats) { + result.to_set.emplace_back(snapshot_id, partition_stats); + } else { + result.to_remove.push_back(snapshot_id); + } + } + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_partition_statistics.h b/src/iceberg/update/update_partition_statistics.h new file mode 100644 index 000000000..740fe214e --- /dev/null +++ b/src/iceberg/update/update_partition_statistics.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/update_partition_statistics.h +/// \brief Updates table partition statistics. + +namespace iceberg { + +/// \brief Updates table partition statistics. +class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdatePartitionStatistics() override; + + /// \brief Set partition statistics file for a snapshot. + /// + /// Associates a partition statistics file with a snapshot ID. If partition statistics + /// already exist for this snapshot, they will be replaced. + /// + /// \param partition_statistics_file The partition statistics file to set + /// \return Reference to this UpdatePartitionStatistics for chaining + UpdatePartitionStatistics& SetPartitionStatistics( + std::shared_ptr partition_statistics_file); + + /// \brief Remove partition statistics for a snapshot. + /// + /// Marks the partition statistics for the given snapshot ID for removal. + /// + /// \param snapshot_id The snapshot ID whose partition statistics to remove + /// \return Reference to this UpdatePartitionStatistics for chaining + UpdatePartitionStatistics& RemovePartitionStatistics(int64_t snapshot_id); + + Kind kind() const final { return Kind::kUpdatePartitionStatistics; } + + struct ApplyResult { + std::vector>> to_set; + std::vector to_remove; + }; + + Result Apply(); + + private: + explicit UpdatePartitionStatistics(std::shared_ptr transaction); + + std::unordered_map> + partition_statistics_to_set_; +}; + +} // namespace iceberg