Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 201 additions & 185 deletions native/Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ edition = "2021"
rust-version = "1.88"

[workspace.dependencies]
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow = { version = "58.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "57.3.0", default-features = false, features = ["experimental"] }
datafusion = { version = "52.2.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "52.2.0" }
datafusion-physical-expr-adapter = { version = "52.2.0" }
datafusion-spark = { version = "52.2.0" }
parquet = { version = "58.0.0", default-features = false, features = ["experimental"] }
datafusion = { git = "https://github.com/apache/datafusion", branch = "branch-53", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-physical-expr-adapter = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
datafusion-spark = { git = "https://github.com/apache/datafusion", branch = "branch-53", features = ["core"] }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-proto = { path = "proto" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand All @@ -51,11 +51,11 @@ num = "0.4"
rand = "0.10"
regex = "1.12.3"
thiserror = "2"
object_store = { version = "0.12.3", features = ["gcp", "azure", "aws", "http"] }
object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] }
url = "2.2"
aws-config = "1.8.14"
aws-credential-types = "1.2.13"
iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "b24ab63" }
iceberg = { git = "https://github.com/mbutrovich/iceberg-rust", branch = "df53-upgrade" }

[profile.release]
debug = true
Expand Down
4 changes: 2 additions & 2 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "9"
hex = "0.4.3"
datafusion-functions-nested = { version = "52.2.0" }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", branch = "branch-53" }

[features]
backtrace = ["datafusion/backtrace"]
default = ["hdfs-opendal"]
default = []
hdfs = ["datafusion-comet-objectstore-hdfs"]
hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"]
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
Expand Down
8 changes: 4 additions & 4 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct ExpandExec {
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl ExpandExec {
Expand All @@ -52,12 +52,12 @@ impl ExpandExec {
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> Self {
let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
);
));

Self {
projections,
Expand Down Expand Up @@ -129,7 +129,7 @@ impl ExecutionPlan for ExpandExec {
Ok(Box::pin(expand_stream))
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
36 changes: 24 additions & 12 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
};
use futures::{Stream, StreamExt, TryStreamExt};
use iceberg::io::FileIO;
use iceberg::io::{FileIO, FileIOBuilder, OpenDalStorageFactory, StorageFactory};

use crate::execution::operators::ExecutionError;
use crate::parquet::parquet_support::SparkParquetOptions;
Expand All @@ -57,7 +57,7 @@ pub struct IcebergScanExec {
/// Output schema after projection
output_schema: SchemaRef,
/// Cached execution plan properties
plan_properties: PlanProperties,
plan_properties: Arc<PlanProperties>,
/// Catalog-specific configuration for FileIO
catalog_properties: HashMap<String, String>,
/// Pre-planned file scan tasks
Expand Down Expand Up @@ -92,13 +92,13 @@ impl IcebergScanExec {
})
}

fn compute_properties(schema: SchemaRef, num_partitions: usize) -> PlanProperties {
PlanProperties::new(
fn compute_properties(schema: SchemaRef, num_partitions: usize) -> Arc<PlanProperties> {
Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(num_partitions),
EmissionType::Incremental,
Boundedness::Bounded,
)
))
}
}

Expand All @@ -115,7 +115,7 @@ impl ExecutionPlan for IcebergScanExec {
Arc::clone(&self.output_schema)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}

Expand Down Expand Up @@ -191,20 +191,32 @@ impl IcebergScanExec {
Ok(Box::pin(wrapped_stream))
}

fn storage_factory_for(path: &str) -> Result<Arc<dyn StorageFactory>, DataFusionError> {
let scheme = path.split("://").next().unwrap_or("file");
match scheme {
"file" | "" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
"s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 {
configured_scheme: scheme.to_string(),
customized_credential_load: None,
})),
_ => Err(DataFusionError::Execution(format!(
"Unsupported storage scheme: {scheme}"
))),
}
}

fn load_file_io(
catalog_properties: &HashMap<String, String>,
metadata_location: &str,
) -> Result<FileIO, DataFusionError> {
let mut file_io_builder = FileIO::from_path(metadata_location)
.map_err(|e| DataFusionError::Execution(format!("Failed to create FileIO: {}", e)))?;
let factory = Self::storage_factory_for(metadata_location)?;
let mut file_io_builder = FileIOBuilder::new(factory);

for (key, value) in catalog_properties {
file_io_builder = file_io_builder.with_prop(key, value);
}

file_io_builder
.build()
.map_err(|e| DataFusionError::Execution(format!("Failed to build FileIO: {}", e)))
Ok(file_io_builder.build())
}
}

Expand Down Expand Up @@ -269,7 +281,7 @@ where
_ => {
let adapter = self
.adapter_factory
.create(Arc::clone(&self.schema), Arc::clone(&file_schema));
.create(Arc::clone(&self.schema), Arc::clone(&file_schema))?;
let exprs =
build_projection_expressions(&self.schema, &adapter).map_err(|e| {
DataFusionError::Execution(format!(
Expand Down
39 changes: 23 additions & 16 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ use std::{
fmt,
fmt::{Debug, Formatter},
fs::File,
io::Cursor,
sync::Arc,
};

#[cfg(feature = "hdfs-opendal")]
use opendal::Operator;
#[cfg(feature = "hdfs-opendal")]
use std::io::Cursor;

use crate::execution::shuffle::CompressionCodec;
use crate::parquet::parquet_support::{
create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
};
use crate::parquet::parquet_support::is_hdfs_scheme;
#[cfg(feature = "hdfs-opendal")]
use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
Expand All @@ -45,7 +47,7 @@ use datafusion::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream,
},
};
use futures::TryStreamExt;
Expand All @@ -64,6 +66,7 @@ enum ParquetWriter {
/// Contains the arrow writer, HDFS operator, and destination path
/// an Arrow writer writes to in-memory buffer the data converted to Parquet format
/// The opendal::Writer is created lazily on first write
#[cfg(feature = "hdfs-opendal")]
Remote(
ArrowWriter<Cursor<Vec<u8>>>,
Option<opendal::Writer>,
Expand All @@ -80,6 +83,7 @@ impl ParquetWriter {
) -> std::result::Result<(), parquet::errors::ParquetError> {
match self {
ParquetWriter::LocalFile(writer) => writer.write(batch),
#[cfg(feature = "hdfs-opendal")]
ParquetWriter::Remote(
arrow_parquet_buffer_writer,
hdfs_writer_opt,
Expand Down Expand Up @@ -134,6 +138,7 @@ impl ParquetWriter {
writer.close()?;
Ok(())
}
#[cfg(feature = "hdfs-opendal")]
ParquetWriter::Remote(
arrow_parquet_buffer_writer,
mut hdfs_writer_opt,
Expand Down Expand Up @@ -208,7 +213,7 @@ pub struct ParquetWriterExec {
/// Metrics
metrics: ExecutionPlanMetricsSet,
/// Cache for plan properties
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl ParquetWriterExec {
Expand All @@ -228,12 +233,12 @@ impl ParquetWriterExec {
// Preserve the input's partitioning so each partition writes its own file
let input_partitioning = input.output_partitioning().clone();

let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&input.schema())),
input_partitioning,
EmissionType::Final,
Boundedness::Bounded,
);
));

Ok(ParquetWriterExec {
input,
Expand Down Expand Up @@ -275,7 +280,7 @@ impl ParquetWriterExec {
output_file_path: &str,
schema: SchemaRef,
props: WriterProperties,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
_runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
object_store_options: &HashMap<String, String>,
) -> Result<ParquetWriter> {
// Parse URL and match on storage scheme directly
Expand All @@ -284,11 +289,11 @@ impl ParquetWriterExec {
})?;

if is_hdfs_scheme(&url, object_store_options) {
// HDFS storage
#[cfg(feature = "hdfs-opendal")]
{
// Use prepare_object_store_with_configs to create and register the object store
let (_object_store_url, object_store_path) = prepare_object_store_with_configs(
runtime_env,
_runtime_env,
output_file_path.to_string(),
object_store_options,
)
Expand Down Expand Up @@ -324,6 +329,12 @@ impl ParquetWriterExec {
object_store_path.to_string(),
))
}
#[cfg(not(feature = "hdfs-opendal"))]
{
Err(DataFusionError::Execution(
"HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(),
))
}
} else if output_file_path.starts_with("file://")
|| output_file_path.starts_with("file:")
|| !output_file_path.contains("://")
Expand Down Expand Up @@ -405,11 +416,7 @@ impl ExecutionPlan for ParquetWriterExec {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
8 changes: 4 additions & 4 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct ScanExec {
/// It is also used in unit test to mock the input data from JVM.
pub batch: Arc<Mutex<Option<InputBatch>>>,
/// Cache of expensive-to-compute plan properties
cache: PlanProperties,
cache: Arc<PlanProperties>,
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
Expand All @@ -95,14 +95,14 @@ impl ScanExec {
// Build schema directly from data types since get_next now always unpacks dictionaries
let schema = schema_from_data_types(&data_types);

let cache = PlanProperties::new(
let cache = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
// The partitioning is not important because we are not using DataFusion's
// query planner or optimizer
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
);
));

Ok(Self {
exec_context_id,
Expand Down Expand Up @@ -417,7 +417,7 @@ impl ExecutionPlan for ScanExec {
)))
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
Loading
Loading