Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use futures::{StreamExt, TryStreamExt};

use crate::arrow::ArrowReader;
use crate::arrow::reader::ParquetReadOptions;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
Expand Down Expand Up @@ -65,8 +66,8 @@ impl BasicDeleteFileLoader {
self.file_io.clone(),
false,
None,
None,
file_size_in_bytes,
ParquetReadOptions::default(),
)
.await?
.build()?
Expand Down
202 changes: 187 additions & 15 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,36 @@ use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

/// Options for tuning Parquet file I/O.
#[derive(Clone, Copy, Debug)]
pub(crate) struct ParquetReadOptions {
pub metadata_size_hint: Option<usize>,
/// Gap threshold for merging nearby byte ranges into a single request.
pub range_coalesce_bytes: u64,
/// Maximum number of merged byte ranges to fetch concurrently.
pub range_fetch_concurrency: usize,
}

impl Default for ParquetReadOptions {
/// Defaults match object_store's OBJECT_STORE_COALESCE_DEFAULT and
/// OBJECT_STORE_COALESCE_PARALLEL.
fn default() -> Self {
Self {
metadata_size_hint: None,
range_coalesce_bytes: 1024 * 1024,
range_fetch_concurrency: 10,
}
}
}

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_size_hint: Option<usize>,
parquet_read_options: ParquetReadOptions,
}

impl ArrowReaderBuilder {
Expand All @@ -81,7 +103,7 @@ impl ArrowReaderBuilder {
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
metadata_size_hint: None,
parquet_read_options: ParquetReadOptions::default(),
}
}

Expand Down Expand Up @@ -115,7 +137,24 @@ impl ArrowReaderBuilder {
/// This hint can help reduce the number of fetch requests. For more details see the
/// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
self.metadata_size_hint = Some(metadata_size_hint);
self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint);
self
}

/// Sets the gap threshold for merging nearby byte ranges into a single request.
/// Ranges with gaps smaller than this value will be coalesced.
///
/// Defaults to 1 MiB, matching object_store's OBJECT_STORE_COALESCE_DEFAULT.
pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes;
self
}

/// Sets the maximum number of merged byte ranges to fetch concurrently.
///
/// Defaults to 10, matching object_store's OBJECT_STORE_COALESCE_PARALLEL.
pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency;
self
}

Expand All @@ -131,7 +170,7 @@ impl ArrowReaderBuilder {
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
metadata_size_hint: self.metadata_size_hint,
parquet_read_options: self.parquet_read_options,
}
}
}
Expand All @@ -148,7 +187,7 @@ pub struct ArrowReader {

row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_size_hint: Option<usize>,
parquet_read_options: ParquetReadOptions,
}

impl ArrowReader {
Expand All @@ -160,7 +199,7 @@ impl ArrowReader {
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;
let metadata_size_hint = self.metadata_size_hint;
let parquet_read_options = self.parquet_read_options;

// Fast-path for single concurrency to avoid overhead of try_flatten_unordered
let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
Expand All @@ -176,7 +215,7 @@ impl ArrowReader {
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
metadata_size_hint,
parquet_read_options,
)
})
.map_err(|err| {
Expand All @@ -198,7 +237,7 @@ impl ArrowReader {
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
metadata_size_hint,
parquet_read_options,
)
})
.map_err(|err| {
Expand All @@ -213,15 +252,14 @@ impl ArrowReader {
Ok(stream)
}

#[allow(clippy::too_many_arguments)]
async fn process_file_scan_task(
task: FileScanTask,
batch_size: Option<usize>,
file_io: FileIO,
delete_file_loader: CachingDeleteFileLoader,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_size_hint: Option<usize>,
parquet_read_options: ParquetReadOptions,
) -> Result<ArrowRecordBatchStream> {
let should_load_page_index =
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
Expand All @@ -236,8 +274,8 @@ impl ArrowReader {
file_io.clone(),
should_load_page_index,
None,
metadata_size_hint,
task.file_size_in_bytes,
parquet_read_options,
)
.await?;

Expand Down Expand Up @@ -290,8 +328,8 @@ impl ArrowReader {
file_io.clone(),
should_load_page_index,
Some(options),
metadata_size_hint,
task.file_size_in_bytes,
parquet_read_options,
)
.await?
} else {
Expand Down Expand Up @@ -495,8 +533,8 @@ impl ArrowReader {
file_io: FileIO,
should_load_page_index: bool,
arrow_reader_options: Option<ArrowReaderOptions>,
metadata_size_hint: Option<usize>,
file_size_in_bytes: u64,
parquet_read_options: ParquetReadOptions,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
Expand All @@ -510,9 +548,11 @@ impl ArrowReader {
)
.with_preload_column_index(true)
.with_preload_offset_index(true)
.with_preload_page_index(should_load_page_index);
.with_preload_page_index(should_load_page_index)
.with_range_coalesce_bytes(parquet_read_options.range_coalesce_bytes)
.with_range_fetch_concurrency(parquet_read_options.range_fetch_concurrency);

if let Some(hint) = metadata_size_hint {
if let Some(hint) = parquet_read_options.metadata_size_hint {
parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint);
}

Expand Down Expand Up @@ -1710,18 +1750,23 @@ pub struct ArrowFileReader {
preload_offset_index: bool,
preload_page_index: bool,
metadata_size_hint: Option<usize>,
range_coalesce_bytes: u64,
range_fetch_concurrency: usize,
r: Box<dyn FileRead>,
}

impl ArrowFileReader {
/// Create a new ArrowFileReader
pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
let defaults = ParquetReadOptions::default();
Self {
meta,
preload_column_index: false,
preload_offset_index: false,
preload_page_index: false,
metadata_size_hint: None,
range_coalesce_bytes: defaults.range_coalesce_bytes,
range_fetch_concurrency: defaults.range_fetch_concurrency,
r,
}
}
Expand Down Expand Up @@ -1752,6 +1797,18 @@ impl ArrowFileReader {
self.metadata_size_hint = Some(hint);
self
}

/// Sets the gap threshold for merging nearby byte ranges into a single request.
pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
self.range_coalesce_bytes = range_coalesce_bytes;
self
}

/// Sets the maximum number of merged byte ranges to fetch concurrently.
pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
self.range_fetch_concurrency = range_fetch_concurrency;
self
}
}

impl AsyncFileReader for ArrowFileReader {
Expand All @@ -1763,6 +1820,49 @@ impl AsyncFileReader for ArrowFileReader {
)
}

/// Override the default `get_byte_ranges` which calls `get_bytes` sequentially.
/// The parquet reader calls this to fetch column chunks for a row group, so
/// without this override each column chunk is a serial round-trip to object storage.
/// Adapted from object_store's `coalesce_ranges` in `util.rs`.
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
let coalesce_bytes = self.range_coalesce_bytes;
let concurrency = self.range_fetch_concurrency;

async move {
// Merge nearby ranges to reduce the number of object store requests.
let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
let r = &self.r;

// Fetch merged ranges concurrently.
let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
.map(|range| async move {
r.read(range)
.await
.map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
})
.buffered(concurrency)
.try_collect()
.await?;

// Slice the fetched data back into the originally requested ranges.
Ok(ranges
.iter()
.map(|range| {
let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
let fetch_range = &fetch_ranges[idx];
let fetch_bytes = &fetched[idx];
let start = (range.start - fetch_range.start) as usize;
let end = (range.end - fetch_range.start) as usize;
fetch_bytes.slice(start..end.min(fetch_bytes.len()))
})
.collect())
}
.boxed()
}

// TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
// we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
fn get_metadata(
Expand All @@ -1785,6 +1885,42 @@ impl AsyncFileReader for ArrowFileReader {
}
}

/// Merge overlapping or nearby byte ranges, combining ranges with gaps <= `coalesce` bytes.
/// Adapted from object_store's `merge_ranges` in `util.rs`.
fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
if ranges.is_empty() {
return vec![];
}

let mut ranges = ranges.to_vec();
ranges.sort_unstable_by_key(|r| r.start);

let mut merged = Vec::with_capacity(ranges.len());
let mut start_idx = 0;
let mut end_idx = 1;

while start_idx != ranges.len() {
let mut range_end = ranges[start_idx].end;

while end_idx != ranges.len()
&& ranges[end_idx]
.start
.checked_sub(range_end)
.map(|delta| delta <= coalesce)
.unwrap_or(true)
{
range_end = range_end.max(ranges[end_idx].end);
end_idx += 1;
}

merged.push(ranges[start_idx].start..range_end);
start_idx = end_idx;
end_idx += 1;
}

merged
}

/// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type
/// that Iceberg uses for literals - but they are effectively the same logical type,
/// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8.
Expand All @@ -1810,6 +1946,7 @@ fn try_cast_literal(
mod tests {
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::sync::Arc;

use arrow_array::cast::AsArray;
Expand Down Expand Up @@ -4317,4 +4454,39 @@ message schema {
assert_eq!(name_col.value(2), "Charlie");
assert_eq!(name_col.value(3), "Dave");
}

#[test]
fn test_merge_ranges_empty() {
assert_eq!(super::merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
}

#[test]
fn test_merge_ranges_no_coalesce() {
// Ranges far apart should not be merged
let ranges = vec![0..100, 1_000_000..1_000_100];
let merged = super::merge_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
}

#[test]
fn test_merge_ranges_coalesce() {
// Ranges within the gap threshold should be merged
let ranges = vec![0..100, 200..300, 500..600];
let merged = super::merge_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..600]);
}

#[test]
fn test_merge_ranges_overlapping() {
let ranges = vec![0..200, 100..300];
let merged = super::merge_ranges(&ranges, 0);
assert_eq!(merged, vec![0..300]);
}

#[test]
fn test_merge_ranges_unsorted() {
let ranges = vec![500..600, 0..100, 200..300];
let merged = super::merge_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..600]);
}
}
Loading