From 0b3158b5cef6caf113ee3eea9639c4904f950081 Mon Sep 17 00:00:00 2001 From: glitchy Date: Thu, 26 Feb 2026 14:38:44 -0500 Subject: [PATCH] add OverwriteAction with CoW delete support --- crates/iceberg/src/spec/manifest/writer.rs | 8 + crates/iceberg/src/transaction/append.rs | 5 +- crates/iceberg/src/transaction/mod.rs | 7 + crates/iceberg/src/transaction/overwrite.rs | 530 ++++++++++++++++++++ crates/iceberg/src/transaction/snapshot.rs | 15 + 5 files changed, 564 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/transaction/overwrite.rs diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 0669651603..5316150322 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -341,6 +341,14 @@ impl ManifestWriter { Ok(()) } + /// Add a deleted manifest entry, preserving the original sequence numbers. + pub(crate) fn add_deleted_entry(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_data_file(&entry.data_file)?; + entry.status = ManifestStatus::Deleted; + self.add_entry_inner(entry)?; + Ok(()) + } + /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID, /// which were assigned at commit, must be preserved when adding an existing entry. pub fn add_existing_file( diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..cbe9b906be 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -90,6 +90,7 @@ impl TransactionAction for FastAppendAction { self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + vec![], ); // validate added files @@ -138,7 +139,9 @@ impl SnapshotProduceOperation for FastAppendOperation { Ok(manifest_list .entries() .iter() - .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .filter(|entry| { + entry.has_added_files() || entry.has_existing_files() || entry.has_deleted_files() + }) .cloned() .collect()) } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 074c7fefe4..a4aab8deef 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod overwrite; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::overwrite::OverwriteAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -141,6 +143,11 @@ impl Transaction { FastAppendAction::new() } + /// Creates an overwrite action. + pub fn overwrite(&self) -> OverwriteAction { + OverwriteAction::new() + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() diff --git a/crates/iceberg/src/transaction/overwrite.rs b/crates/iceberg/src/transaction/overwrite.rs new file mode 100644 index 0000000000..76b2e299f7 --- /dev/null +++ b/crates/iceberg/src/transaction/overwrite.rs @@ -0,0 +1,530 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{ + DataFile, FormatVersion, ManifestContentType, ManifestEntry, ManifestFile, + ManifestWriterBuilder, Operation, +}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; + +/// OverwriteAction is a transaction action for overwriting data files in the table. +/// +/// Creates a snapshot with `Operation::Overwrite` semantics — adds new data files and +/// optionally removes existing data files by rewriting affected manifests with those +/// entries marked as `ManifestStatus::Deleted`. +pub struct OverwriteAction { + check_duplicate: bool, + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, + deleted_data_files: Vec, +} + +impl OverwriteAction { + pub(crate) fn new() -> Self { + Self { + check_duplicate: true, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + deleted_data_files: vec![], + } + } + + /// Set whether to check duplicate files. + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + + /// Add data files to the snapshot. + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self + } + + /// Specify data files to be removed from the table in this overwrite. + pub fn delete_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.deleted_data_files.extend(data_files); + self + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } +} + +#[async_trait] +impl TransactionAction for OverwriteAction { + async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + self.deleted_data_files.clone(), + ); + + snapshot_producer.validate_added_data_files()?; + + if self.check_duplicate { + snapshot_producer.validate_duplicate_files().await?; + } + + let deleted_file_paths: HashSet = self + .deleted_data_files + .iter() + .map(|f| f.file_path.clone()) + .collect(); + + let snapshot_id = snapshot_producer.snapshot_id(); + snapshot_producer + .commit( + OverwriteOperation { + deleted_file_paths, + snapshot_id, + }, + DefaultManifestProcess, + ) + .await + } +} + +struct OverwriteOperation { + deleted_file_paths: HashSet, + snapshot_id: i64, +} + +impl SnapshotProduceOperation for OverwriteOperation { + fn operation(&self) -> Operation { + Operation::Overwrite + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) + .await?; + + if self.deleted_file_paths.is_empty() { + return Ok(manifest_list + .entries() + .iter() + .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .cloned() + .collect()); + } + + let mut result = Vec::new(); + + for manifest_file in manifest_list.entries() { + if !manifest_file.has_added_files() && !manifest_file.has_existing_files() { + continue; + } + + let manifest = manifest_file + .load_manifest(snapshot_produce.table.file_io()) + .await?; + + let has_deletes = manifest.entries().iter().any(|entry| { + entry.is_alive() && self.deleted_file_paths.contains(entry.file_path()) + }); + + if has_deletes { + let rewritten = self + .rewrite_manifest(snapshot_produce, manifest_file, &manifest) + .await?; + result.push(rewritten); + } else { + result.push(manifest_file.clone()); + } + } + + Ok(result) + } +} + +impl OverwriteOperation { + /// Rewrite a manifest, marking entries whose file paths are in `deleted_file_paths` + /// as `ManifestStatus::Deleted`. + async fn rewrite_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + manifest_file: &ManifestFile, + manifest: &crate::spec::Manifest, + ) -> Result { + let table = snapshot_produce.table; + + let new_manifest_path = format!( + "{}/metadata/{}-m-overwrite.avro", + table.metadata().location(), + Uuid::now_v7(), + ); + let output_file = table.file_io().new_output(&new_manifest_path)?; + let builder = ManifestWriterBuilder::new( + output_file, + Some(self.snapshot_id), + manifest_file.key_metadata.clone(), + table.metadata().current_schema().clone(), + table.metadata().default_partition_spec().as_ref().clone(), + ); + + let mut writer = match table.metadata().format_version() { + FormatVersion::V1 => builder.build_v1(), + FormatVersion::V2 => match manifest_file.content { + ManifestContentType::Data => builder.build_v2_data(), + ManifestContentType::Deletes => builder.build_v2_deletes(), + }, + FormatVersion::V3 => match manifest_file.content { + ManifestContentType::Data => builder.build_v3_data(), + ManifestContentType::Deletes => builder.build_v3_deletes(), + }, + }; + + for entry in manifest.entries() { + if entry.is_alive() && self.deleted_file_paths.contains(entry.file_path()) { + let mut deleted: ManifestEntry = (**entry).clone(); + deleted.snapshot_id = Some(self.snapshot_id); + writer.add_deleted_entry(deleted)?; + } else { + let cloned: ManifestEntry = (**entry).clone(); + writer.add_existing_entry(cloned)?; + } + } + + writer.write_manifest_file().await + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, ManifestStatus, + Operation, Struct, + }; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; + use crate::{TableRequirement, TableUpdate}; + + fn test_data_file(path: &str, partition_spec_id: i32) -> crate::spec::DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(partition_spec_id) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_empty_data_overwrite_action() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let action = tx.overwrite().add_data_files(vec![]); + assert!(Arc::new(action).commit(&table).await.is_err()); + } + + #[tokio::test] + async fn test_overwrite_snapshot_properties() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let mut snapshot_properties = HashMap::new(); + snapshot_properties.insert("key".to_string(), "val".to_string()); + + let data_file = test_data_file( + "test/1.parquet", + table.metadata().default_partition_spec_id(), + ); + + let action = tx + .overwrite() + .set_snapshot_properties(snapshot_properties) + .add_data_files(vec![data_file]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!() + }; + assert_eq!( + new_snapshot + .summary() + .additional_properties + .get("key") + .unwrap(), + "val" + ); + } + + #[tokio::test] + async fn test_overwrite_incompatible_partition_value() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::string("test"))])) + .build() + .unwrap(); + + let action = tx.overwrite().add_data_files(vec![data_file]); + assert!(Arc::new(action).commit(&table).await.is_err()); + } + + #[tokio::test] + async fn test_overwrite_basic() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = test_data_file( + "test/3.parquet", + table.metadata().default_partition_spec_id(), + ); + + let action = tx.overwrite().add_data_files(vec![data_file.clone()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + assert!( + matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + ); + + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id + } + ], + requirements + ); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!() + }; + assert_eq!(new_snapshot.summary().operation, Operation::Overwrite); + + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + assert_eq!( + manifest_list.entries()[0].sequence_number, + new_snapshot.sequence_number() + ); + + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(1, manifest.entries().len()); + assert_eq!( + new_snapshot.sequence_number(), + manifest.entries()[0] + .sequence_number() + .expect("Inherit sequence number by load manifest") + ); + assert_eq!( + new_snapshot.snapshot_id(), + manifest.entries()[0].snapshot_id().unwrap() + ); + assert_eq!(data_file, *manifest.entries()[0].data_file()); + } + + #[tokio::test] + async fn test_overwrite_with_deleted_files() { + use crate::memory::tests::new_memory_catalog; + use crate::transaction::ApplyTransactionAction; + use crate::transaction::tests::make_v3_minimal_table_in_catalog; + + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + let spec_id = table.metadata().default_partition_spec_id(); + + let original_file = test_data_file("test/original.parquet", spec_id); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![original_file.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + + let replacement_file = test_data_file("test/replacement.parquet", spec_id); + let tx = Transaction::new(&table); + let action = tx + .overwrite() + .add_data_files(vec![replacement_file.clone()]) + .delete_data_files(vec![original_file.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot = table.metadata().current_snapshot().unwrap(); + assert_eq!(snapshot.summary().operation, Operation::Overwrite); + + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + assert_eq!(2, manifest_list.entries().len()); + + let mut all_entries = vec![]; + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + for entry in manifest.entries() { + all_entries.push((entry.status(), entry.file_path().to_string())); + } + } + + assert!( + all_entries + .iter() + .any(|(status, path)| *status == ManifestStatus::Deleted + && path == "test/original.parquet"), + "Original file should be marked as Deleted, entries: {all_entries:?}", + ); + + assert!( + all_entries + .iter() + .any(|(status, path)| *status == ManifestStatus::Added + && path == "test/replacement.parquet"), + "Replacement file should be marked as Added, entries: {all_entries:?}", + ); + + // Verify snapshot summary reports the deleted file. + assert_eq!( + snapshot + .summary() + .additional_properties + .get("deleted-data-files") + .map(|s| s.as_str()), + Some("1") + ); + assert_eq!( + snapshot + .summary() + .additional_properties + .get("deleted-records") + .map(|s| s.as_str()), + Some("1") + ); + + // Step 3: Fast append after overwrite — delete-only manifest must survive. + let appended_file = test_data_file("test/appended.parquet", spec_id); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![appended_file.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + // 3 manifests: rewritten (deleted entry), overwrite added, fast_append added. + assert_eq!(3, manifest_list.entries().len()); + + let mut all_entries = vec![]; + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + for entry in manifest.entries() { + all_entries.push((entry.status(), entry.file_path().to_string())); + } + } + + // The deleted entry must still be present after fast_append. + assert!( + all_entries + .iter() + .any(|(status, path)| *status == ManifestStatus::Deleted + && path == "test/original.parquet"), + "Deleted entry should survive fast_append, entries: {all_entries:?}", + ); + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..c9d4da47ee 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -114,6 +114,7 @@ pub(crate) struct SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + deleted_data_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -121,12 +122,17 @@ pub(crate) struct SnapshotProducer<'a> { } impl<'a> SnapshotProducer<'a> { + pub(crate) fn snapshot_id(&self) -> i64 { + self.snapshot_id + } + pub(crate) fn new( table: &'a Table, commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + deleted_data_files: Vec, ) -> Self { Self { table, @@ -135,6 +141,7 @@ impl<'a> SnapshotProducer<'a> { key_metadata, snapshot_properties, added_data_files, + deleted_data_files, manifest_counter: (0..), } } @@ -383,6 +390,14 @@ impl<'a> SnapshotProducer<'a> { ); } + for data_file in &self.deleted_data_files { + summary_collector.remove_file( + data_file, + table_metadata.current_schema().clone(), + table_metadata.default_partition_spec().clone(), + ); + } + let previous_snapshot = table_metadata .snapshot_by_id(self.snapshot_id) .and_then(|snapshot| snapshot.parent_snapshot_id())