diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index e4ba71e45c661..4441fe90357cc 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -61,6 +61,10 @@ name = "with_hashes" harness = false name = "scalar_to_array" +[[bench]] +harness = false +name = "stats_merge" + [dependencies] ahash = { workspace = true } apache-avro = { workspace = true, features = [ diff --git a/datafusion/common/benches/stats_merge.rs b/datafusion/common/benches/stats_merge.rs new file mode 100644 index 0000000000000..73229b6379360 --- /dev/null +++ b/datafusion/common/benches/stats_merge.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark for `Statistics::try_merge_iter`. + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + +/// Build a vector of `n` with `num_cols` columns +fn make_stats(n: usize, num_cols: usize) -> Vec { + (0..n) + .map(|i| { + let mut stats = Statistics::default() + .with_num_rows(Precision::Exact(100 + i)) + .with_total_byte_size(Precision::Exact(8000 + i * 80)); + for c in 0..num_cols { + let base = (i * num_cols + c) as i64; + stats = stats.add_column_statistics( + ColumnStatistics::new_unknown() + .with_null_count(Precision::Exact(i)) + .with_min_value(Precision::Exact(ScalarValue::Int64(Some(base)))) + .with_max_value(Precision::Exact(ScalarValue::Int64(Some( + base + 1000, + )))) + .with_sum_value(Precision::Exact(ScalarValue::Int64(Some( + base * 100, + )))), + ); + } + stats + }) + .collect() +} + +fn bench_stats_merge(c: &mut Criterion) { + let mut group = c.benchmark_group("stats_merge"); + + for &num_partitions in &[10, 100, 500] { + for &num_cols in &[1, 5, 20] { + let items = make_stats(num_partitions, num_cols); + let schema = Arc::new(Schema::new( + (0..num_cols) + .map(|i| Field::new(format!("col{i}"), DataType::Int64, true)) + .collect::>(), + )); + + let param = format!("{num_partitions}parts_{num_cols}cols"); + + group.bench_with_input( + BenchmarkId::new("try_merge_iter", ¶m), + &(&items, &schema), + |b, (items, schema)| { + b.iter(|| { + std::hint::black_box( + Statistics::try_merge_iter(*items, schema).unwrap(), + ); + }); + }, + ); + } + } + + group.finish(); +} + +criterion_group!(benches, bench_stats_merge); +criterion_main!(benches); diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 759ebfe67a812..a52ae8d9d5980 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -22,6 +22,7 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; use crate::error::_plan_err; +use crate::utils::aggregate::precision_add; use arrow::datatypes::{DataType, Schema}; /// Represents a value with a degree of certainty. `Precision` is used to @@ -205,15 +206,22 @@ impl Precision { /// Calculates the sum of two (possibly inexact) [`ScalarValue`] values, /// conservatively propagating exactness information. If one of the input /// values is [`Precision::Absent`], the result is `Absent` too. + /// + /// Uses [`ScalarValue::add_checked`] so that integer overflow returns + /// an error (mapped to `Absent`) instead of silently wrapping. + /// + /// For performance-sensitive paths prefer [`precision_add`] which + /// avoids the Arrow array round-trip. pub fn add(&self, other: &Precision) -> Precision { match (self, other) { - (Precision::Exact(a), Precision::Exact(b)) => { - a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent) - } + (Precision::Exact(a), Precision::Exact(b)) => a + .add_checked(b) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), (Precision::Inexact(a), Precision::Exact(b)) | (Precision::Exact(a), Precision::Inexact(b)) | (Precision::Inexact(a), Precision::Inexact(b)) => a - .add(b) + .add_checked(b) .map(Precision::Inexact) .unwrap_or(Precision::Absent), (_, _) => Precision::Absent, @@ -576,23 +584,6 @@ impl Statistics { /// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent. /// /// Returns an error if the statistics do not match the specified schemas. - pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result - where - I: IntoIterator, - { - let mut items = items.into_iter(); - - let Some(init) = items.next() else { - return Ok(Statistics::new_unknown(schema)); - }; - items.try_fold(init.clone(), |acc: Statistics, item_stats: &Statistics| { - acc.try_merge(item_stats) - }) - } - - /// Merge this Statistics value with another Statistics value. - /// - /// Returns an error if the statistics do not match (different schemas). /// /// # Example /// ``` @@ -600,68 +591,100 @@ impl Statistics { /// # use arrow::datatypes::{Field, Schema, DataType}; /// # use datafusion_common::stats::Precision; /// let stats1 = Statistics::default() - /// .with_num_rows(Precision::Exact(1)) - /// .with_total_byte_size(Precision::Exact(2)) + /// .with_num_rows(Precision::Exact(10)) /// .add_column_statistics( /// ColumnStatistics::new_unknown() - /// .with_null_count(Precision::Exact(3)) - /// .with_min_value(Precision::Exact(ScalarValue::from(4))) - /// .with_max_value(Precision::Exact(ScalarValue::from(5))), + /// .with_min_value(Precision::Exact(ScalarValue::from(1))) + /// .with_max_value(Precision::Exact(ScalarValue::from(100))) + /// .with_sum_value(Precision::Exact(ScalarValue::from(500))), /// ); /// /// let stats2 = Statistics::default() - /// .with_num_rows(Precision::Exact(10)) - /// .with_total_byte_size(Precision::Inexact(20)) + /// .with_num_rows(Precision::Exact(20)) /// .add_column_statistics( /// ColumnStatistics::new_unknown() - /// // absent null count - /// .with_min_value(Precision::Exact(ScalarValue::from(40))) - /// .with_max_value(Precision::Exact(ScalarValue::from(50))), + /// .with_min_value(Precision::Exact(ScalarValue::from(5))) + /// .with_max_value(Precision::Exact(ScalarValue::from(200))) + /// .with_sum_value(Precision::Exact(ScalarValue::from(1000))), /// ); /// - /// let merged_stats = stats1.try_merge(&stats2).unwrap(); - /// let expected_stats = Statistics::default() - /// .with_num_rows(Precision::Exact(11)) - /// .with_total_byte_size(Precision::Inexact(22)) // inexact in stats2 --> inexact - /// .add_column_statistics( - /// ColumnStatistics::new_unknown() - /// .with_null_count(Precision::Absent) // missing from stats2 --> absent - /// .with_min_value(Precision::Exact(ScalarValue::from(4))) - /// .with_max_value(Precision::Exact(ScalarValue::from(50))), - /// ); + /// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + /// let merged = Statistics::try_merge_iter( + /// &[stats1, stats2], + /// &schema, + /// ).unwrap(); /// - /// assert_eq!(merged_stats, expected_stats) + /// assert_eq!(merged.num_rows, Precision::Exact(30)); + /// assert_eq!(merged.column_statistics[0].min_value, + /// Precision::Exact(ScalarValue::from(1))); + /// assert_eq!(merged.column_statistics[0].max_value, + /// Precision::Exact(ScalarValue::from(200))); + /// assert_eq!(merged.column_statistics[0].sum_value, + /// Precision::Exact(ScalarValue::from(1500))); /// ``` - pub fn try_merge(self, other: &Statistics) -> Result { - let Self { - mut num_rows, - mut total_byte_size, - mut column_statistics, - } = self; - - // Accumulate statistics for subsequent items - num_rows = num_rows.add(&other.num_rows); - total_byte_size = total_byte_size.add(&other.total_byte_size); - - if column_statistics.len() != other.column_statistics.len() { - return _plan_err!( - "Cannot merge statistics with different number of columns: {} vs {}", - column_statistics.len(), - other.column_statistics.len() - ); + pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result + where + I: IntoIterator, + { + let items: Vec<&Statistics> = items.into_iter().collect(); + + if items.is_empty() { + return Ok(Statistics::new_unknown(schema)); + } + if items.len() == 1 { + return Ok(items[0].clone()); } - for (item_col_stats, col_stats) in other + let num_cols = items[0].column_statistics.len(); + // Validate all items have the same number of columns + for (i, stat) in items.iter().enumerate().skip(1) { + if stat.column_statistics.len() != num_cols { + return _plan_err!( + "Cannot merge statistics with different number of columns: {} vs {} (item {})", + num_cols, + stat.column_statistics.len(), + i + ); + } + } + + // Aggregate usize fields (cheap arithmetic) + let mut num_rows = Precision::Exact(0usize); + let mut total_byte_size = Precision::Exact(0usize); + for stat in &items { + num_rows = num_rows.add(&stat.num_rows); + total_byte_size = total_byte_size.add(&stat.total_byte_size); + } + + let first = items[0]; + let mut column_statistics: Vec = first .column_statistics .iter() - .zip(column_statistics.iter_mut()) - { - col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count); - col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); - col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); - col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value); - col_stats.distinct_count = Precision::Absent; - col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size); + .map(|cs| ColumnStatistics { + null_count: cs.null_count, + max_value: cs.max_value.clone(), + min_value: cs.min_value.clone(), + sum_value: cs.sum_value.clone(), + distinct_count: Precision::Absent, + byte_size: cs.byte_size, + }) + .collect(); + + // Accumulate all statistics in a single pass. + // Uses precision_add for sum (avoids the expensive + // ScalarValue::add round-trip through Arrow arrays), and + // Precision::min/max which use cheap PartialOrd comparison. + for stat in items.iter().skip(1) { + for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { + let item_cs = &stat.column_statistics[col_idx]; + + col_stats.null_count = col_stats.null_count.add(&item_cs.null_count); + col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size); + col_stats.sum_value = + precision_add(&col_stats.sum_value, &item_cs.sum_value); + col_stats.min_value = col_stats.min_value.min(&item_cs.min_value); + col_stats.max_value = col_stats.max_value.max(&item_cs.max_value); + } } Ok(Statistics { @@ -1141,7 +1164,7 @@ mod tests { } #[test] - fn test_try_merge_basic() { + fn test_try_merge() { // Create a schema with two columns let schema = Arc::new(Schema::new(vec![ Field::new("col1", DataType::Int32, false), @@ -1338,52 +1361,6 @@ mod tests { ); } - #[test] - fn test_try_merge_distinct_count_absent() { - // Create statistics with known distinct counts - let stats1 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .with_total_byte_size(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_null_count(Precision::Exact(0)) - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(1)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10)))) - .with_distinct_count(Precision::Exact(5)), - ); - - let stats2 = Statistics::default() - .with_num_rows(Precision::Exact(15)) - .with_total_byte_size(Precision::Exact(150)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_null_count(Precision::Exact(0)) - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(20)))) - .with_distinct_count(Precision::Exact(7)), - ); - - // Merge statistics - let merged_stats = stats1.try_merge(&stats2).unwrap(); - - // Verify the results - assert_eq!(merged_stats.num_rows, Precision::Exact(25)); - assert_eq!(merged_stats.total_byte_size, Precision::Exact(250)); - - let col_stats = &merged_stats.column_statistics[0]; - assert_eq!(col_stats.null_count, Precision::Exact(0)); - assert_eq!( - col_stats.min_value, - Precision::Exact(ScalarValue::Int32(Some(1))) - ); - assert_eq!( - col_stats.max_value, - Precision::Exact(ScalarValue::Int32(Some(20))) - ); - // Distinct count should be Absent after merge - assert_eq!(col_stats.distinct_count, Precision::Absent); - } - #[test] fn test_with_fetch_basic_preservation() { // Test that column statistics and byte size are preserved (as inexact) when applying fetch @@ -1650,44 +1627,6 @@ mod tests { assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789)); } - #[test] - fn test_byte_size_try_merge() { - // Test that byte_size is summed correctly in try_merge - let col_stats1 = ColumnStatistics { - null_count: Precision::Exact(10), - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: Precision::Exact(1000), - }; - let col_stats2 = ColumnStatistics { - null_count: Precision::Exact(20), - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - byte_size: Precision::Exact(2000), - }; - - let stats1 = Statistics { - num_rows: Precision::Exact(50), - total_byte_size: Precision::Exact(1000), - column_statistics: vec![col_stats1], - }; - let stats2 = Statistics { - num_rows: Precision::Exact(100), - total_byte_size: Precision::Exact(2000), - column_statistics: vec![col_stats2], - }; - - let merged = stats1.try_merge(&stats2).unwrap(); - assert_eq!( - merged.column_statistics[0].byte_size, - Precision::Exact(3000) // 1000 + 2000 - ); - } - #[test] fn test_byte_size_to_inexact() { let col_stats = ColumnStatistics { @@ -1785,4 +1724,442 @@ mod tests { // total_byte_size should fall back to scaling: 8000 * 0.1 = 800 assert_eq!(result.total_byte_size, Precision::Inexact(800)); } + + #[test] + fn test_try_merge_iter_basic() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int32, false), + ])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(40), + }, + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(200))), + min_value: Precision::Exact(ScalarValue::Int32(Some(10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(40), + }, + ], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(600))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(60), + }, + ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::Int32(Some(180))), + min_value: Precision::Exact(ScalarValue::Int32(Some(5))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(60), + }, + ], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Exact(25)); + assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); + + let col1_stats = &summary_stats.column_statistics[0]; + assert_eq!(col1_stats.null_count, Precision::Exact(3)); + assert_eq!( + col1_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col1_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(-10))) + ); + assert_eq!( + col1_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(1100))) + ); + + let col2_stats = &summary_stats.column_statistics[1]; + assert_eq!(col2_stats.null_count, Precision::Exact(5)); + assert_eq!( + col2_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(200))) + ); + assert_eq!( + col2_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(5))) + ); + assert_eq!( + col2_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(2200))) + ); + } + + #[test] + fn test_try_merge_iter_mixed_precision() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(40), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Inexact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(2), + max_value: Precision::Inexact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Inexact(60), + }], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); + assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Inexact(3)); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Int32(Some(-10))) + ); + // sum_value becomes Absent because stats2 has Absent sum + assert_eq!(col_stats.sum_value, Precision::Absent); + } + + #[test] + fn test_try_merge_iter_empty() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let items: Vec<&Statistics> = vec![]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Absent); + assert_eq!(summary_stats.total_byte_size, Precision::Absent); + assert_eq!(summary_stats.column_statistics.len(), 1); + assert_eq!( + summary_stats.column_statistics[0].null_count, + Precision::Absent + ); + } + + #[test] + fn test_try_merge_iter_single_item() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let stats = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Exact(10), + byte_size: Precision::Exact(40), + }], + }; + + let items = vec![&stats]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats, stats); + } + + #[test] + fn test_try_merge_iter_mismatched_columns() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let stats1 = Statistics::default(); + let stats2 = + Statistics::default().add_column_statistics(ColumnStatistics::new_unknown()); + + let items = vec![&stats1, &stats2]; + let e = Statistics::try_merge_iter(items, &schema).unwrap_err(); + assert_contains!( + e.to_string(), + "Cannot merge statistics with different number of columns: 0 vs 1" + ); + } + + #[test] + fn test_try_merge_iter_three_items() { + // Verify that merging three items works correctly + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int64, + false, + )])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int64(Some(100))), + min_value: Precision::Exact(ScalarValue::Int64(Some(10))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(500))), + distinct_count: Precision::Exact(8), + byte_size: Precision::Exact(80), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(20), + total_byte_size: Precision::Exact(200), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int64(Some(200))), + min_value: Precision::Exact(ScalarValue::Int64(Some(5))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(1000))), + distinct_count: Precision::Exact(15), + byte_size: Precision::Exact(160), + }], + }; + + let stats3 = Statistics { + num_rows: Precision::Exact(30), + total_byte_size: Precision::Exact(300), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::Int64(Some(150))), + min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(2000))), + distinct_count: Precision::Exact(25), + byte_size: Precision::Exact(240), + }], + }; + + let items = vec![&stats1, &stats2, &stats3]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Exact(60)); + assert_eq!(summary_stats.total_byte_size, Precision::Exact(600)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Exact(6)); + assert_eq!( + col_stats.max_value, + Precision::Exact(ScalarValue::Int64(Some(200))) + ); + assert_eq!( + col_stats.min_value, + Precision::Exact(ScalarValue::Int64(Some(1))) + ); + assert_eq!( + col_stats.sum_value, + Precision::Exact(ScalarValue::Int64(Some(3500))) + ); + assert_eq!(col_stats.byte_size, Precision::Exact(480)); + // distinct_count is always Absent after merge (can't accurately merge NDV) + assert_eq!(col_stats.distinct_count, Precision::Absent); + } + + #[test] + fn test_try_merge_iter_float_types() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Float64, + false, + )])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(80), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Float64(Some(99.9))), + min_value: Precision::Exact(ScalarValue::Float64(Some(1.1))), + sum_value: Precision::Exact(ScalarValue::Float64(Some(500.5))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(80), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(80), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Float64(Some(200.0))), + min_value: Precision::Exact(ScalarValue::Float64(Some(0.5))), + sum_value: Precision::Exact(ScalarValue::Float64(Some(1000.0))), + distinct_count: Precision::Absent, + byte_size: Precision::Exact(80), + }], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!( + col_stats.max_value, + Precision::Exact(ScalarValue::Float64(Some(200.0))) + ); + assert_eq!( + col_stats.min_value, + Precision::Exact(ScalarValue::Float64(Some(0.5))) + ); + assert_eq!( + col_stats.sum_value, + Precision::Exact(ScalarValue::Float64(Some(1500.5))) + ); + } + + #[test] + fn test_try_merge_iter_string_types() { + let schema = + Arc::new(Schema::new(vec![Field::new("col1", DataType::Utf8, false)])); + + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Utf8(Some("dog".to_string()))), + min_value: Precision::Exact(ScalarValue::Utf8(Some("ant".to_string()))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(100), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Utf8(Some("zebra".to_string()))), + min_value: Precision::Exact(ScalarValue::Utf8(Some("bat".to_string()))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(100), + }], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!( + col_stats.max_value, + Precision::Exact(ScalarValue::Utf8(Some("zebra".to_string()))) + ); + assert_eq!( + col_stats.min_value, + Precision::Exact(ScalarValue::Utf8(Some("ant".to_string()))) + ); + assert_eq!(col_stats.sum_value, Precision::Absent); + } + + #[test] + fn test_try_merge_iter_all_inexact() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + let stats1 = Statistics { + num_rows: Precision::Inexact(10), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(1), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Inexact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + byte_size: Precision::Inexact(40), + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Inexact(20), + total_byte_size: Precision::Inexact(200), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(2), + max_value: Precision::Inexact(ScalarValue::Int32(Some(200))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(-5))), + sum_value: Precision::Inexact(ScalarValue::Int32(Some(1000))), + distinct_count: Precision::Absent, + byte_size: Precision::Inexact(60), + }], + }; + + let items = vec![&stats1, &stats2]; + let summary_stats = Statistics::try_merge_iter(items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Inexact(30)); + assert_eq!(summary_stats.total_byte_size, Precision::Inexact(300)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Inexact(3)); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Int32(Some(200))) + ); + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Int32(Some(-5))) + ); + assert_eq!( + col_stats.sum_value, + Precision::Inexact(ScalarValue::Int32(Some(1500))) + ); + } } diff --git a/datafusion/common/src/utils/aggregate.rs b/datafusion/common/src/utils/aggregate.rs new file mode 100644 index 0000000000000..43bc0676b2d3c --- /dev/null +++ b/datafusion/common/src/utils/aggregate.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Scalar-level aggregation utilities for statistics merging. +//! +//! Provides a cheap pairwise [`ScalarValue`] addition that directly +//! extracts inner primitive values, avoiding the expensive +//! `ScalarValue::add` path (which round-trips through Arrow arrays). +use arrow::datatypes::i256; + +use crate::stats::Precision; +use crate::{Result, ScalarValue}; + +/// Saturating addition for [`i256`] (which lacks a built-in +/// `saturating_add`). Returns `i256::MAX` on positive overflow and +/// `i256::MIN` on negative overflow. +#[inline] +fn i256_saturating_add(a: i256, b: i256) -> i256 { + match a.checked_add(b) { + Some(sum) => sum, + None => { + // If b is non-negative the overflow is positive, otherwise + // negative. + if b >= i256::ZERO { + i256::MAX + } else { + i256::MIN + } + } + } +} + +/// Add two [`ScalarValue`]s by directly extracting and adding their +/// inner primitive values. +/// +/// This avoids `ScalarValue::add` which converts both operands to +/// single-element Arrow arrays, runs the `add_wrapping` kernel, and +/// converts the result back — 3 heap allocations per call. +/// +/// For non-primitive types, falls back to `ScalarValue::add`. +pub(crate) fn scalar_add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result { + macro_rules! add_int { + ($lhs:expr, $rhs:expr, $VARIANT:ident) => { + match ($lhs, $rhs) { + (ScalarValue::$VARIANT(Some(a)), ScalarValue::$VARIANT(Some(b))) => { + Ok(ScalarValue::$VARIANT(Some(a.saturating_add(*b)))) + } + (ScalarValue::$VARIANT(None), other) + | (other, ScalarValue::$VARIANT(None)) => Ok(other.clone()), + _ => unreachable!(), + } + }; + } + + macro_rules! add_decimal { + ($lhs:expr, $rhs:expr, $VARIANT:ident) => { + match ($lhs, $rhs) { + ( + ScalarValue::$VARIANT(Some(a), p, s), + ScalarValue::$VARIANT(Some(b), _, _), + ) => Ok(ScalarValue::$VARIANT(Some(a.saturating_add(*b)), *p, *s)), + (ScalarValue::$VARIANT(None, _, _), other) + | (other, ScalarValue::$VARIANT(None, _, _)) => Ok(other.clone()), + _ => unreachable!(), + } + }; + } + + macro_rules! add_float { + ($lhs:expr, $rhs:expr, $VARIANT:ident) => { + match ($lhs, $rhs) { + (ScalarValue::$VARIANT(Some(a)), ScalarValue::$VARIANT(Some(b))) => { + Ok(ScalarValue::$VARIANT(Some(*a + *b))) + } + (ScalarValue::$VARIANT(None), other) + | (other, ScalarValue::$VARIANT(None)) => Ok(other.clone()), + _ => unreachable!(), + } + }; + } + + match lhs { + ScalarValue::Int8(_) => add_int!(lhs, rhs, Int8), + ScalarValue::Int16(_) => add_int!(lhs, rhs, Int16), + ScalarValue::Int32(_) => add_int!(lhs, rhs, Int32), + ScalarValue::Int64(_) => add_int!(lhs, rhs, Int64), + ScalarValue::UInt8(_) => add_int!(lhs, rhs, UInt8), + ScalarValue::UInt16(_) => add_int!(lhs, rhs, UInt16), + ScalarValue::UInt32(_) => add_int!(lhs, rhs, UInt32), + ScalarValue::UInt64(_) => add_int!(lhs, rhs, UInt64), + ScalarValue::Float16(_) => add_float!(lhs, rhs, Float16), + ScalarValue::Float32(_) => add_float!(lhs, rhs, Float32), + ScalarValue::Float64(_) => add_float!(lhs, rhs, Float64), + ScalarValue::Decimal32(_, _, _) => add_decimal!(lhs, rhs, Decimal32), + ScalarValue::Decimal64(_, _, _) => add_decimal!(lhs, rhs, Decimal64), + ScalarValue::Decimal128(_, _, _) => add_decimal!(lhs, rhs, Decimal128), + ScalarValue::Decimal256(_, _, _) => match (lhs, rhs) { + ( + ScalarValue::Decimal256(Some(a), p, s), + ScalarValue::Decimal256(Some(b), _, _), + ) => Ok(ScalarValue::Decimal256( + Some(i256_saturating_add(*a, *b)), + *p, + *s, + )), + (ScalarValue::Decimal256(None, _, _), other) + | (other, ScalarValue::Decimal256(None, _, _)) => Ok(other.clone()), + _ => unreachable!(), + }, + // Fallback: use the existing ScalarValue::add + _ => lhs.add(rhs), + } +} + +/// [`Precision`]-aware sum of two [`ScalarValue`] precisions using +/// cheap direct addition via [`scalar_add`]. +/// +/// Mirrors the semantics of `Precision::add` but avoids +/// the expensive `ScalarValue::add` round-trip through Arrow arrays. +pub(crate) fn precision_add( + lhs: &Precision, + rhs: &Precision, +) -> Precision { + match (lhs, rhs) { + (Precision::Exact(a), Precision::Exact(b)) => scalar_add(a, b) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + (Precision::Inexact(a), Precision::Exact(b)) + | (Precision::Exact(a), Precision::Inexact(b)) + | (Precision::Inexact(a), Precision::Inexact(b)) => scalar_add(a, b) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), + (_, _) => Precision::Absent, + } +} diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 7f2d78d57970e..075a189c371dc 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -17,6 +17,7 @@ //! This module provides the bisect function, which implements binary search. +pub(crate) mod aggregate; pub mod expr; pub mod memory; pub mod proxy;