From 231db1135059c61b5211c28e51af280dacce5820 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 26 Feb 2026 11:56:08 +0100 Subject: [PATCH 1/6] Arc stats --- .../examples/relation_planner/table_sample.rs | 6 +- .../core/tests/custom_sources_cases/mod.rs | 8 +- .../tests/custom_sources_cases/statistics.rs | 14 +-- .../physical_optimizer/join_selection.rs | 6 +- .../partition_statistics.rs | 88 +++++++++---------- .../tests/physical_optimizer/test_utils.rs | 4 +- datafusion/core/tests/sql/path_partition.rs | 4 +- datafusion/datasource/src/file_scan_config.rs | 12 +-- datafusion/datasource/src/memory.rs | 14 +-- datafusion/datasource/src/source.rs | 4 +- .../src/output_requirements.rs | 2 +- .../physical-plan/src/aggregates/mod.rs | 12 +-- datafusion/physical-plan/src/buffer.rs | 2 +- .../physical-plan/src/coalesce_batches.rs | 7 +- .../physical-plan/src/coalesce_partitions.rs | 7 +- datafusion/physical-plan/src/coop.rs | 2 +- datafusion/physical-plan/src/display.rs | 6 +- datafusion/physical-plan/src/empty.rs | 4 +- .../physical-plan/src/execution_plan.rs | 8 +- datafusion/physical-plan/src/filter.rs | 14 +-- .../physical-plan/src/joins/cross_join.rs | 8 +- .../physical-plan/src/joins/hash_join/exec.rs | 2 +- .../src/joins/nested_loop_join.rs | 10 +-- .../src/joins/sort_merge_join/exec.rs | 12 +-- datafusion/physical-plan/src/limit.rs | 14 ++- .../physical-plan/src/placeholder_row.rs | 6 +- datafusion/physical-plan/src/projection.rs | 11 +-- .../physical-plan/src/repartition/mod.rs | 8 +- .../physical-plan/src/sorts/partial_sort.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 18 ++-- .../src/sorts/sort_preserving_merge.rs | 2 +- datafusion/physical-plan/src/test.rs | 6 +- datafusion/physical-plan/src/test/exec.rs | 22 ++--- datafusion/physical-plan/src/union.rs | 34 ++++--- .../src/windows/bounded_window_agg_exec.rs | 6 +- .../src/windows/window_agg_exec.rs | 8 +- datafusion/physical-plan/src/work_table.rs | 4 +- 37 files changed, 201 insertions(+), 196 deletions(-) diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index aa24f6cfbfa06..04e5efd9706a6 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -727,8 +727,8 @@ impl ExecutionPlan for SampleExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let mut stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let ratio = self.upper_bound - self.lower_bound; // Scale statistics by sampling ratio (inexact due to randomness) @@ -741,7 +741,7 @@ impl ExecutionPlan for SampleExec { .map(|n| (n as f64 * ratio) as usize) .to_inexact(); - Ok(stats) + Ok(Arc::new(stats)) } fn apply_expressions( diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 5f20fb1c6054e..6919d9794b29e 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -184,12 +184,12 @@ impl ExecutionPlan for CustomExecutionPlan { Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 })) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema())); + return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap(); - Ok(Statistics { + Ok(Arc::new(Statistics { num_rows: Precision::Exact(batch.num_rows()), total_byte_size: Precision::Absent, column_statistics: self @@ -208,7 +208,7 @@ impl ExecutionPlan for CustomExecutionPlan { ..Default::default() }) .collect(), - }) + })) } fn apply_expressions( diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index eb81c9f24dfd6..561c6b3b246ff 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -182,11 +182,11 @@ impl ExecutionPlan for StatisticsValidation { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - Ok(Statistics::new_unknown(&self.schema)) + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } else { - Ok(self.stats.clone()) + Ok(Arc::new(self.stats.clone())) } } @@ -255,7 +255,7 @@ async fn sql_basic() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); // the statistics should be those of the source - assert_eq!(stats, physical_plan.partition_statistics(None)?); + assert_eq!(stats, *physical_plan.partition_statistics(None)?); Ok(()) } @@ -295,7 +295,7 @@ async fn sql_limit() -> Result<()> { .collect(), total_byte_size: Precision::Absent }, - physical_plan.partition_statistics(None)? + *physical_plan.partition_statistics(None)? ); let df = ctx @@ -304,7 +304,7 @@ async fn sql_limit() -> Result<()> { .unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is larger than the original number of lines, statistics remain unchanged - assert_eq!(stats, physical_plan.partition_statistics(None)?); + assert_eq!(stats, *physical_plan.partition_statistics(None)?); Ok(()) } @@ -324,7 +324,7 @@ async fn sql_window() -> Result<()> { let result = physical_plan.partition_statistics(None)?; assert_eq!(stats.num_rows, result.num_rows); - let col_stats = result.column_statistics; + let col_stats = &result.column_statistics; assert_eq!(2, col_stats.len()); assert_eq!(stats.column_statistics[1], col_stats[0]); diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 17156022dac44..1c94a7bd1e91c 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -1191,12 +1191,12 @@ impl ExecutionPlan for StatisticsExec { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { - Ok(if partition.is_some() { + fn partition_statistics(&self, partition: Option) -> Result> { + Ok(Arc::new(if partition.is_some() { Statistics::new_unknown(&self.schema) } else { self.stats.clone() - }) + })) } fn apply_expressions( diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index a03792fae826a..185c22e5a6f65 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -259,8 +259,8 @@ mod test { ); // Check the statistics of each partition assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -292,8 +292,8 @@ mod test { create_partition_statistics(2, 8, 1, 2, None); // Check the statistics of each partition assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -326,7 +326,7 @@ mod test { Some((DATE_2025_03_01, DATE_2025_03_04)), ); assert_eq!(statistics.len(), 1); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); // Check the statistics_by_partition with real results let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)]; validate_statistics_with_data(sort_exec.clone(), expected_stats, 0).await?; @@ -357,8 +357,8 @@ mod test { .map(|idx| sort_exec.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -406,7 +406,7 @@ mod test { }, ], }; - assert_eq!(full_statistics, expected_full_statistic); + assert_eq!(*full_statistics, expected_full_statistic); let statistics = (0..filter.output_partitioning().partition_count()) .map(|idx| filter.partition_statistics(Some(idx))) @@ -435,8 +435,8 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_partition_statistic); - assert_eq!(statistics[1], expected_partition_statistic); + assert_eq!(*statistics[0], expected_partition_statistic); + assert_eq!(*statistics[1], expected_partition_statistic); Ok(()) } @@ -467,13 +467,13 @@ mod test { Some((DATE_2025_03_03, DATE_2025_03_04)), ); // Verify first partition (from first scan) - assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[0], expected_statistic_partition_1); // Verify second partition (from first scan) - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Verify third partition (from second scan - same as first partition) - assert_eq!(statistics[2], expected_statistic_partition_1); + assert_eq!(*statistics[2], expected_statistic_partition_1); // Verify fourth partition (from second scan - same as second partition) - assert_eq!(statistics[3], expected_statistic_partition_2); + assert_eq!(*statistics[3], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -522,8 +522,8 @@ mod test { ColumnStatistics::new_unknown(), ], }; - assert_eq!(stats[0], expected_stats); - assert_eq!(stats[1], expected_stats); + assert_eq!(*stats[0], expected_stats); + assert_eq!(*stats[1], expected_stats); // Verify the execution results let partitions = execute_stream_partitioned( @@ -629,8 +629,8 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -674,7 +674,7 @@ mod test { ); expected_full_statistics.num_rows = Precision::Inexact(4); expected_full_statistics.total_byte_size = Precision::Absent; - assert_eq!(full_statistics, expected_full_statistics); + assert_eq!(*full_statistics, expected_full_statistics); // Test partition_statistics(Some(idx)) - returns partition-specific statistics // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] @@ -703,8 +703,8 @@ mod test { .map(|idx| nested_loop_join.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Check the statistics_by_partition with real results let expected_stats = vec![ @@ -733,7 +733,7 @@ mod test { .map(|idx| coalesce_partitions.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 1); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); // Check the statistics_by_partition with real results let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)]; @@ -750,20 +750,20 @@ mod test { .map(|idx| local_limit.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 2); - let mut expected_0 = statistics[0].clone(); + let mut expected_0 = Statistics::clone(&statistics[0]); expected_0.column_statistics = expected_0 .column_statistics .into_iter() .map(|c| c.to_inexact()) .collect(); - let mut expected_1 = statistics[1].clone(); + let mut expected_1 = Statistics::clone(&statistics[1]); expected_1.column_statistics = expected_1 .column_statistics .into_iter() .map(|c| c.to_inexact()) .collect(); - assert_eq!(statistics[0], expected_0); - assert_eq!(statistics[1], expected_1); + assert_eq!(*statistics[0], expected_0); + assert_eq!(*statistics[1], expected_1); Ok(()) } @@ -785,7 +785,7 @@ mod test { 4, Some((DATE_2025_03_01, DATE_2025_03_02)), ); - assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(*statistics[0], expected_statistic_partition); Ok(()) } @@ -853,7 +853,7 @@ mod test { ], }; - assert_eq!(&p0_statistics, &expected_p0_statistics); + assert_eq!(*p0_statistics, expected_p0_statistics); let expected_p1_statistics = Statistics { num_rows: Precision::Inexact(2), @@ -873,7 +873,7 @@ mod test { }; let p1_statistics = aggregate_exec_partial.partition_statistics(Some(1))?; - assert_eq!(&p1_statistics, &expected_p1_statistics); + assert_eq!(*p1_statistics, expected_p1_statistics); validate_statistics_with_data( aggregate_exec_partial.clone(), @@ -895,10 +895,10 @@ mod test { )?); let p0_statistics = agg_final.partition_statistics(Some(0))?; - assert_eq!(&p0_statistics, &expected_p0_statistics); + assert_eq!(*p0_statistics, expected_p0_statistics); let p1_statistics = agg_final.partition_statistics(Some(1))?; - assert_eq!(&p1_statistics, &expected_p1_statistics); + assert_eq!(*p1_statistics, expected_p1_statistics); validate_statistics_with_data( agg_final.clone(), @@ -939,8 +939,8 @@ mod test { ], }; - assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(0))?); - assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(1))?); + assert_eq!(empty_stat, *agg_partial.partition_statistics(Some(0))?); + assert_eq!(empty_stat, *agg_partial.partition_statistics(Some(1))?); validate_statistics_with_data( agg_partial.clone(), vec![ExpectedStatistics::Empty, ExpectedStatistics::Empty], @@ -966,8 +966,8 @@ mod test { agg_partial.schema(), )?); - assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(0))?); - assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(1))?); + assert_eq!(empty_stat, *agg_final.partition_statistics(Some(0))?); + assert_eq!(empty_stat, *agg_final.partition_statistics(Some(1))?); validate_statistics_with_data( agg_final, @@ -1003,7 +1003,7 @@ mod test { column_statistics: vec![ColumnStatistics::new_unknown()], }; - assert_eq!(&expect_stat, &agg_final.partition_statistics(Some(0))?); + assert_eq!(expect_stat, *agg_final.partition_statistics(Some(0))?); // Verify that the aggregate final result has exactly one partition with one row let mut partitions = execute_stream_partitioned( @@ -1037,13 +1037,13 @@ mod test { &schema, None, ); - assert_eq!(actual, expected); + assert_eq!(*actual, expected); all_batches.push(batches); } let actual = plan.partition_statistics(None)?; let expected = compute_record_batch_statistics(&all_batches, &schema, None); - assert_eq!(actual, expected); + assert_eq!(*actual, expected); Ok(()) } @@ -1074,7 +1074,7 @@ mod test { // All partitions should have the same statistics for stat in statistics.iter() { - assert_eq!(stat, &expected_stats); + assert_eq!(**stat, expected_stats); } // Verify that the result has exactly 3 partitions @@ -1139,7 +1139,7 @@ mod test { )?); let result = repartition.partition_statistics(Some(0))?; - assert_eq!(result, Statistics::new_unknown(&scan_schema)); + assert_eq!(*result, Statistics::new_unknown(&scan_schema)); // Verify that the result has exactly 0 partitions let partitions = execute_stream_partitioned( @@ -1178,8 +1178,8 @@ mod test { ColumnStatistics::new_unknown(), ], }; - assert_eq!(stats[0], expected_stats); - assert_eq!(stats[1], expected_stats); + assert_eq!(*stats[0], expected_stats); + assert_eq!(*stats[1], expected_stats); // Verify the repartition execution results let partitions = @@ -1286,8 +1286,8 @@ mod test { ], }; - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); + assert_eq!(*statistics[0], expected_statistic_partition_1); + assert_eq!(*statistics[1], expected_statistic_partition_2); // Verify the statistics match actual execution results let expected_stats = vec![ diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 7b000f69a67c2..8d9e7b68b8c96 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -958,8 +958,8 @@ impl ExecutionPlan for TestScan { internal_err!("TestScan is for testing optimizer only, not for execution") } - fn partition_statistics(&self, _partition: Option) -> Result { - Ok(Statistics::new_unknown(&self.schema)) + fn partition_statistics(&self, _partition: Option) -> Result> { + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } // This is the key method - implement sort pushdown diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index a9061849795c7..b78bfd9ff5c15 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -461,7 +461,7 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 4); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics; + let stat_cols = physical_plan.partition_statistics(None)?.column_statistics.clone(); assert_eq!(stat_cols.len(), 4); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(3)); @@ -485,7 +485,7 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 2); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics; + let stat_cols = physical_plan.partition_statistics(None)?.column_statistics.clone(); assert_eq!(stat_cols.len(), 2); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(1)); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 524e091381c4c..38c1bd61a342f 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -781,7 +781,7 @@ impl DataSource for FileScanConfig { SchedulingType::Cooperative } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { // Get statistics for a specific partition // Note: FileGroup statistics include partition columns (computed from partition_values) @@ -791,22 +791,22 @@ impl DataSource for FileScanConfig { // Project the statistics based on the projection let output_schema = self.projected_schema()?; return if let Some(projection) = self.file_source.projection() { - projection.project_statistics(stat.clone(), &output_schema) + Ok(Arc::new(projection.project_statistics(stat.clone(), &output_schema)?)) } else { - Ok(stat.clone()) + Ok(Arc::new(stat.clone())) }; } // If no statistics available for this partition, return unknown - Ok(Statistics::new_unknown(self.projected_schema()?.as_ref())) + Ok(Arc::new(Statistics::new_unknown(self.projected_schema()?.as_ref()))) } else { // Return aggregate statistics across all partitions let statistics = self.statistics(); let projection = self.file_source.projection(); let output_schema = self.projected_schema()?; if let Some(projection) = &projection { - projection.project_statistics(statistics.clone(), &output_schema) + Ok(Arc::new(projection.project_statistics(statistics.clone(), &output_schema)?)) } else { - Ok(statistics) + Ok(Arc::new(statistics)) } } } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index e6add2a42c510..aca943ed096b9 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -197,26 +197,26 @@ impl DataSource for MemorySourceConfig { SchedulingType::Cooperative } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { // Compute statistics for a specific partition if let Some(batches) = self.partitions.get(partition) { - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( from_ref(batches), &self.schema, self.projection.clone(), - )) + ))) } else { // Invalid partition index - Ok(Statistics::new_unknown(&self.projected_schema)) + Ok(Arc::new(Statistics::new_unknown(&self.projected_schema))) } } else { // Compute statistics across all partitions - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &self.partitions, &self.schema, self.projection.clone(), - )) + ))) } } @@ -968,7 +968,7 @@ mod tests { let values = MemorySourceConfig::try_new_as_values(schema, data)?; assert_eq!( - values.partition_statistics(None)?, + *values.partition_statistics(None)?, Statistics { num_rows: Precision::Exact(rows), total_byte_size: Precision::Exact(8), // not important diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index deea108139530..bffd94af210a7 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -157,7 +157,7 @@ pub trait DataSource: Send + Sync + Debug { /// Returns statistics for a specific partition, or aggregate statistics /// across all partitions if `partition` is `None`. - fn partition_statistics(&self, partition: Option) -> Result; + fn partition_statistics(&self, partition: Option) -> Result>; /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; @@ -359,7 +359,7 @@ impl ExecutionPlan for DataSourceExec { Some(self.data_source.metrics().clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.data_source.partition_statistics(partition) } diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index b2212c94bc07e..8b71fc9fbf74a 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -246,7 +246,7 @@ impl ExecutionPlan for OutputRequirementExec { unreachable!(); } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index b9a1e49ab5220..9ea2efaed8fa5 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1447,9 +1447,9 @@ impl ExecutionPlan for AggregateExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let child_statistics = self.input().partition_statistics(partition)?; - self.statistics_inner(&child_statistics) + Ok(Arc::new(self.statistics_inner(&child_statistics)?)) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -2538,16 +2538,16 @@ mod tests { Ok(Box::pin(stream)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(self.schema().as_ref())); + return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } let (_, batches) = some_data(); - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &[batches], &self.schema(), None, - )) + ))) } } diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 8477f39a9b687..3e85fb32d2f2c 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -245,7 +245,7 @@ impl ExecutionPlan for BufferExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index d26d1f9beaa18..3e8bfc7f81724 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -227,10 +227,9 @@ impl ExecutionPlan for CoalesceBatchesExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 3dffdd6a99ab5..5ea3589f22b3e 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -244,10 +244,9 @@ impl ExecutionPlan for CoalescePartitionsExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { - self.input - .partition_statistics(None)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, _partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(None)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index c7a54329944ec..efe6506edd7bd 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -311,7 +311,7 @@ impl ExecutionPlan for CooperativeExec { Ok(make_cooperative(child_stream)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 76dd6f2054b5a..806bd560afa2b 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1187,14 +1187,14 @@ mod tests { todo!() } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(self.schema().as_ref())); + return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } match self { Self::Panic => panic!("expected panic"), Self::Error => Err(internal_datafusion_err!("expected error")), - Self::Ok => Ok(Statistics::new_unknown(self.schema().as_ref())), + Self::Ok => Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))), } } } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index db3f5161a9dbf..078bc4b8d064b 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -164,7 +164,7 @@ impl ExecutionPlan for EmptyExec { )?)) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { assert_or_internal_err!( partition < self.partitions, @@ -191,7 +191,7 @@ impl ExecutionPlan for EmptyExec { }); } - Ok(stats) + Ok(Arc::new(stats)) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 0a9348b68a0d6..c1ca74cb6f3a0 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -553,7 +553,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// If statistics are not available, should return [`Statistics::new_unknown`] /// (the default), not an error. /// If `partition` is `None`, it returns statistics for the entire plan. - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(idx) = partition { // Validate partition index let partition_count = self.properties().partitioning.partition_count(); @@ -564,7 +564,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { partition_count ); } - Ok(Statistics::new_unknown(&self.schema())) + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } /// Returns `true` if a limit can be safely pushed down through this @@ -1640,7 +1640,7 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics(&self, _partition: Option) -> Result> { unimplemented!() } } @@ -1710,7 +1710,7 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics(&self, _partition: Option) -> Result> { unimplemented!() } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 581e833f8c292..09d0f079858f9 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -389,7 +389,7 @@ impl FilterExec { let schema = input.schema(); let stats = Self::statistics_helper( &schema, - input.partition_statistics(None)?, + Arc::unwrap_or_clone(input.partition_statistics(None)?), predicate, default_selectivity, )?; @@ -563,15 +563,15 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. - fn partition_statistics(&self, partition: Option) -> Result { - let input_stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let stats = Self::statistics_helper( &self.input.schema(), input_stats, self.predicate(), self.default_selectivity, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(Arc::new(stats.project(self.projection.as_ref()))) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -1348,7 +1348,7 @@ mod tests { ]; let _ = exp_col_stats .into_iter() - .zip(statistics.column_statistics) + .zip(statistics.column_statistics.clone()) .map(|(expected, actual)| { if let Some(val) = actual.min_value.get_value() { if val.data_type().is_floating() { @@ -1419,7 +1419,7 @@ mod tests { )), )); // Since filter predicate passes all entries, statistics after filter shouldn't change. - let expected = input.partition_statistics(None)?.column_statistics; + let expected = input.partition_statistics(None)?.column_statistics.clone(); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); let statistics = filter.partition_statistics(None)?; @@ -1602,7 +1602,7 @@ mod tests { }], }; - assert_eq!(filter_statistics, expected_filter_statistics); + assert_eq!(*filter_statistics, expected_filter_statistics); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index bdad207503585..89fed9edfaf8d 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -384,12 +384,12 @@ impl ExecutionPlan for CrossJoinExec { } } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // Get the all partitions statistics of the left - let left_stats = self.left.partition_statistics(None)?; - let right_stats = self.right.partition_statistics(partition)?; + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); + let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); - Ok(stats_cartesian_product(left_stats, right_stats)) + Ok(Arc::new(stats_cartesian_product(left_stats, right_stats))) } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c07ae2d65173a..c67c1dfdb0065 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1492,7 +1492,7 @@ impl ExecutionPlan for HashJoinExec { // Project statistics if there is a projection let stats = stats.project(self.projection.as_ref()); // Apply fetch limit to statistics - stats.with_fetch(self.fetch, 0, 1) + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } /// Tries to push `projection` down through `hash_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 1ba6ee23ca8a2..f84cb54dac948 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -646,7 +646,7 @@ impl ExecutionPlan for NestedLoopJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // NestedLoopJoinExec is designed for joins without equijoin keys in the // ON clause (e.g., `t1 JOIN t2 ON (t1.v1 + t2.v1) % 2 = 0`). Any join // predicates are stored in `self.filter`, but `estimate_join_statistics` @@ -660,11 +660,11 @@ impl ExecutionPlan for NestedLoopJoinExec { // so we always request overall stats with `None`. Right side can have // multiple partitions, so we forward the partition parameter to get // partition-specific statistics when requested. - let left_stats = self.left.partition_statistics(None)?; - let right_stats = match partition { + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); + let right_stats = Arc::unwrap_or_clone(match partition { Some(partition) => self.right.partition_statistics(Some(partition))?, None => self.right.partition_statistics(None)?, - }; + }); let stats = estimate_join_statistics( left_stats, @@ -674,7 +674,7 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(Arc::new(stats.project(self.projection.as_ref()))) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 7a01888c2812e..39b3962971e34 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -552,7 +552,7 @@ impl ExecutionPlan for SortMergeJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { // SortMergeJoinExec uses symmetric hash partitioning where both left and right // inputs are hash-partitioned on the join keys. This means partition `i` of the // left input is joined with partition `i` of the right input. @@ -564,13 +564,15 @@ impl ExecutionPlan for SortMergeJoinExec { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` - estimate_join_statistics( - self.left.partition_statistics(partition)?, - self.right.partition_statistics(partition)?, + let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(partition)?); + let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); + Ok(Arc::new(estimate_join_statistics( + left_stats, + right_stats, &self.on, &self.join_type, &self.schema, - ) + )?)) } /// Tries to swap the projection with its input [`SortMergeJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 63a722ef26a75..d135434898d8f 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -239,10 +239,9 @@ impl ExecutionPlan for GlobalLimitExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, self.skip, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(self.fetch, self.skip, 1)?)) } fn fetch(&self) -> Option { @@ -421,10 +420,9 @@ impl ExecutionPlan for LocalLimitExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - self.input - .partition_statistics(partition)? - .with_fetch(Some(self.fetch), 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(stats.with_fetch(Some(self.fetch), 0, 1)?)) } fn fetch(&self) -> Option { diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 68fe62f168006..eaa895c821837 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -178,7 +178,7 @@ impl ExecutionPlan for PlaceholderRowExec { Ok(Box::pin(cooperative(ms))) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let batches = self .data() .expect("Create single row placeholder RecordBatch should not fail"); @@ -189,11 +189,11 @@ impl ExecutionPlan for PlaceholderRowExec { None => vec![batches; self.partitions], }; - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &batches, &self.schema, None, - )) + ))) } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index a3d940df13da7..add99a9071ec5 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -365,12 +365,13 @@ impl ExecutionPlan for ProjectionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stats = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let output_schema = self.schema(); - self.projector - .projection() - .project_statistics(input_stats, &output_schema) + Ok(Arc::new(self.projector.projection().project_statistics( + input_stats, + &output_schema, + )?)) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 081f10d482e1e..21f5bd37299a0 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1102,11 +1102,11 @@ impl ExecutionPlan for RepartitionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition) = partition { let partition_count = self.partitioning().partition_count(); if partition_count == 0 { - return Ok(Statistics::new_unknown(&self.schema())); + return Ok(Arc::new(Statistics::new_unknown(&self.schema()))); } assert_or_internal_err!( @@ -1116,7 +1116,7 @@ impl ExecutionPlan for RepartitionExec { partition_count ); - let mut stats = self.input.partition_statistics(None)?; + let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(None)?); // Distribute statistics across partitions stats.num_rows = stats @@ -1137,7 +1137,7 @@ impl ExecutionPlan for RepartitionExec { .map(|_| ColumnStatistics::new_unknown()) .collect(); - Ok(stats) + Ok(Arc::new(stats)) } else { self.input.partition_statistics(None) } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 6df3babb8eef2..127998601fba8 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -350,7 +350,7 @@ impl ExecutionPlan for PartialSortExec { Some(self.metrics_set.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { self.input.partition_statistics(partition) } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index fa05ad99328bd..d02ef48e761bd 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1335,16 +1335,14 @@ impl ExecutionPlan for SortExec { Some(self.metrics_set.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - if !self.preserve_partitioning() { - return self - .input - .partition_statistics(None)? - .with_fetch(self.fetch, 0, 1); - } - self.input - .partition_statistics(partition)? - .with_fetch(self.fetch, 0, 1) + fn partition_statistics(&self, partition: Option) -> Result> { + let p = if !self.preserve_partitioning() { + None + } else { + partition + }; + let stats = Arc::unwrap_or_clone(self.input.partition_statistics(p)?); + Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 7cbc21f8cbfc5..b1ee5b4d5e8da 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -398,7 +398,7 @@ impl ExecutionPlan for SortPreservingMergeExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics(&self, _partition: Option) -> Result> { self.input.partition_statistics(None) } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 499c10413adeb..0630b8f174563 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -186,11 +186,11 @@ impl ExecutionPlan for TestMemoryExec { unimplemented!() } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - Ok(Statistics::new_unknown(&self.schema)) + Ok(Arc::new(Statistics::new_unknown(&self.schema))) } else { - self.statistics_inner() + Ok(Arc::new(self.statistics_inner()?)) } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 7b31855ce88a1..5458fa7ab8264 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -262,9 +262,9 @@ impl ExecutionPlan for MockExec { } // Panics if one of the batches is an error - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema)); + return Ok(Arc::new(Statistics::new_unknown(&self.schema))); } let data: Result> = self .data @@ -277,11 +277,11 @@ impl ExecutionPlan for MockExec { let data = data?; - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &[data], &self.schema, None, - )) + ))) } } @@ -498,15 +498,15 @@ impl ExecutionPlan for BarrierExec { Ok(builder.build()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema)); + return Ok(Arc::new(Statistics::new_unknown(&self.schema))); } - Ok(common::compute_record_batch_statistics( + Ok(Arc::new(common::compute_record_batch_statistics( &self.data, &self.schema, None, - )) + ))) } } @@ -700,12 +700,12 @@ impl ExecutionPlan for StatisticsExec { unimplemented!("This plan only serves for testing statistics") } - fn partition_statistics(&self, partition: Option) -> Result { - Ok(if partition.is_some() { + fn partition_statistics(&self, partition: Option) -> Result> { + Ok(Arc::new(if partition.is_some() { Statistics::new_unknown(&self.schema) } else { self.stats.clone() - }) + })) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 168048295d0f1..76a68b1b93744 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -331,7 +331,7 @@ impl ExecutionPlan for UnionExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { if let Some(partition_idx) = partition { // For a specific partition, find which input it belongs to let mut remaining_idx = partition_idx; @@ -344,19 +344,25 @@ impl ExecutionPlan for UnionExec { remaining_idx -= input_partition_count; } // If we get here, the partition index is out of bounds - Ok(Statistics::new_unknown(&self.schema())) + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } else { // Collect statistics from all inputs let stats = self .inputs .iter() - .map(|input_exec| input_exec.partition_statistics(None)) + .map(|input_exec| { + input_exec + .partition_statistics(None) + .map(Arc::unwrap_or_clone) + }) .collect::>>()?; - Ok(stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) + Ok(Arc::new( + stats + .into_iter() + .reduce(stats_union) + .unwrap_or_else(|| Statistics::new_unknown(&self.schema())), + )) } } @@ -670,17 +676,19 @@ impl ExecutionPlan for InterleaveExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let stats = self .inputs .iter() - .map(|stat| stat.partition_statistics(partition)) + .map(|stat| stat.partition_statistics(partition).map(Arc::unwrap_or_clone)) .collect::>>()?; - Ok(stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) + Ok(Arc::new( + stats + .into_iter() + .reduce(stats_union) + .unwrap_or_else(|| Statistics::new_unknown(&self.schema())), + )) } fn benefits_from_input_partitioning(&self) -> Vec { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index f589b4d748a53..b6ee5d91d8cc5 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -395,9 +395,9 @@ impl ExecutionPlan for BoundedWindowAggExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stat = self.input.partition_statistics(partition)?; - self.statistics_helper(input_stat) + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stat = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + Ok(Arc::new(self.statistics_helper(input_stat)?)) } fn cardinality_effect(&self) -> CardinalityEffect { diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index f71f1cbfe6221..f641ec84fd5f9 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -298,8 +298,8 @@ impl ExecutionPlan for WindowAggExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { - let input_stat = self.input.partition_statistics(partition)?; + fn partition_statistics(&self, partition: Option) -> Result> { + let input_stat = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let win_cols = self.window_expr.len(); let input_cols = self.input.schema().fields().len(); // TODO stats: some windowing function will maintain invariants such as min, max... @@ -309,11 +309,11 @@ impl ExecutionPlan for WindowAggExec { for _ in 0..win_cols { column_statistics.push(ColumnStatistics::new_unknown()) } - Ok(Statistics { + Ok(Arc::new(Statistics { num_rows: input_stat.num_rows, column_statistics, total_byte_size: Precision::Absent, - }) + })) } fn cardinality_effect(&self) -> CardinalityEffect { diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 4e8cfeea39196..c2ef6bf071c43 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -239,8 +239,8 @@ impl ExecutionPlan for WorkTableExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, _partition: Option) -> Result { - Ok(Statistics::new_unknown(&self.schema())) + fn partition_statistics(&self, _partition: Option) -> Result> { + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } /// Injects run-time state into this `WorkTableExec`. From ef1a206a38d7c8fc4187da226017cafa55fc1832 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Sun, 1 Mar 2026 20:57:27 +0100 Subject: [PATCH 2/6] fi conflicts --- datafusion/core/tests/sql/path_partition.rs | 10 ++++++++-- datafusion/datasource/src/file_scan_config.rs | 12 +++++++++--- datafusion/physical-plan/src/aggregates/mod.rs | 5 ++++- datafusion/physical-plan/src/display.rs | 5 ++++- datafusion/physical-plan/src/execution_plan.rs | 10 ++++++++-- datafusion/physical-plan/src/filter.rs | 3 ++- datafusion/physical-plan/src/joins/cross_join.rs | 3 ++- .../physical-plan/src/joins/sort_merge_join/exec.rs | 3 ++- datafusion/physical-plan/src/projection.rs | 12 +++++++----- datafusion/physical-plan/src/union.rs | 5 ++++- .../src/windows/bounded_window_agg_exec.rs | 3 ++- .../physical-plan/src/windows/window_agg_exec.rs | 3 ++- 12 files changed, 54 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index b78bfd9ff5c15..c23f53b8db6b3 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -461,7 +461,10 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 4); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics.clone(); + let stat_cols = physical_plan + .partition_statistics(None)? + .column_statistics + .clone(); assert_eq!(stat_cols.len(), 4); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(3)); @@ -485,7 +488,10 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 2); - let stat_cols = physical_plan.partition_statistics(None)?.column_statistics.clone(); + let stat_cols = physical_plan + .partition_statistics(None)? + .column_statistics + .clone(); assert_eq!(stat_cols.len(), 2); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(1)); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 38c1bd61a342f..82a986a688ed7 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -791,20 +791,26 @@ impl DataSource for FileScanConfig { // Project the statistics based on the projection let output_schema = self.projected_schema()?; return if let Some(projection) = self.file_source.projection() { - Ok(Arc::new(projection.project_statistics(stat.clone(), &output_schema)?)) + Ok(Arc::new( + projection.project_statistics(stat.clone(), &output_schema)?, + )) } else { Ok(Arc::new(stat.clone())) }; } // If no statistics available for this partition, return unknown - Ok(Arc::new(Statistics::new_unknown(self.projected_schema()?.as_ref()))) + Ok(Arc::new(Statistics::new_unknown( + self.projected_schema()?.as_ref(), + ))) } else { // Return aggregate statistics across all partitions let statistics = self.statistics(); let projection = self.file_source.projection(); let output_schema = self.projected_schema()?; if let Some(projection) = &projection { - Ok(Arc::new(projection.project_statistics(statistics.clone(), &output_schema)?)) + Ok(Arc::new( + projection.project_statistics(statistics.clone(), &output_schema)?, + )) } else { Ok(Arc::new(statistics)) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 9ea2efaed8fa5..e573a7f85ddf9 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2538,7 +2538,10 @@ mod tests { Ok(Box::pin(stream)) } - fn partition_statistics(&self, partition: Option) -> Result> { + fn partition_statistics( + &self, + partition: Option, + ) -> Result> { if partition.is_some() { return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 806bd560afa2b..aaf83345d99b8 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1187,7 +1187,10 @@ mod tests { todo!() } - fn partition_statistics(&self, partition: Option) -> Result> { + fn partition_statistics( + &self, + partition: Option, + ) -> Result> { if partition.is_some() { return Ok(Arc::new(Statistics::new_unknown(self.schema().as_ref()))); } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index c1ca74cb6f3a0..5dde4144aa86a 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -1640,7 +1640,10 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result> { + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { unimplemented!() } } @@ -1710,7 +1713,10 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result> { + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { unimplemented!() } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 09d0f079858f9..6f8cb80993bd3 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -564,7 +564,8 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn partition_statistics(&self, partition: Option) -> Result> { - let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + let input_stats = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let stats = Self::statistics_helper( &self.input.schema(), input_stats, diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 89fed9edfaf8d..a895f69dc5138 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -387,7 +387,8 @@ impl ExecutionPlan for CrossJoinExec { fn partition_statistics(&self, partition: Option) -> Result> { // Get the all partitions statistics of the left let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(None)?); - let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); + let right_stats = + Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); Ok(Arc::new(stats_cartesian_product(left_stats, right_stats))) } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 39b3962971e34..ac077792f592c 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -565,7 +565,8 @@ impl ExecutionPlan for SortMergeJoinExec { // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` let left_stats = Arc::unwrap_or_clone(self.left.partition_statistics(partition)?); - let right_stats = Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); + let right_stats = + Arc::unwrap_or_clone(self.right.partition_statistics(partition)?); Ok(Arc::new(estimate_join_statistics( left_stats, right_stats, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index add99a9071ec5..a4cce0436b10e 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -366,12 +366,14 @@ impl ExecutionPlan for ProjectionExec { } fn partition_statistics(&self, partition: Option) -> Result> { - let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + let input_stats = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let output_schema = self.schema(); - Ok(Arc::new(self.projector.projection().project_statistics( - input_stats, - &output_schema, - )?)) + Ok(Arc::new( + self.projector + .projection() + .project_statistics(input_stats, &output_schema)?, + )) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 76a68b1b93744..384a715820885 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -680,7 +680,10 @@ impl ExecutionPlan for InterleaveExec { let stats = self .inputs .iter() - .map(|stat| stat.partition_statistics(partition).map(Arc::unwrap_or_clone)) + .map(|stat| { + stat.partition_statistics(partition) + .map(Arc::unwrap_or_clone) + }) .collect::>>()?; Ok(Arc::new( diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index b6ee5d91d8cc5..33570c2a21cdc 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -396,7 +396,8 @@ impl ExecutionPlan for BoundedWindowAggExec { } fn partition_statistics(&self, partition: Option) -> Result> { - let input_stat = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + let input_stat = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); Ok(Arc::new(self.statistics_helper(input_stat)?)) } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index f641ec84fd5f9..c9958c875c6b6 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -299,7 +299,8 @@ impl ExecutionPlan for WindowAggExec { } fn partition_statistics(&self, partition: Option) -> Result> { - let input_stat = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); + let input_stat = + Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); let win_cols = self.window_expr.len(); let input_cols = self.input.schema().fields().len(); // TODO stats: some windowing function will maintain invariants such as min, max... From 2c61cf690c357b4ee0dd8316b1a88ed465d3928e Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 9 Mar 2026 16:25:18 +0800 Subject: [PATCH 3/6] add upgrade doc --- .../library-user-guide/upgrading/54.0.0.md | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 85cb8e9dd01fe..616f79db30b01 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -90,4 +90,34 @@ fn apply_expressions( } Ok(tnr) } +### `ExecutionPlan::partition_statistics` now returns `Arc` + +`ExecutionPlan::partition_statistics` now returns `Result>` instead of `Result`. This avoids cloning `Statistics` when it is shared across multiple consumers. + +**Before:** + +```rust,ignore +fn partition_statistics(&self, partition: Option) -> Result { + Ok(Statistics::new_unknown(&self.schema())) +} +``` + +**After:** + +```rust,ignore +fn partition_statistics(&self, partition: Option) -> Result> { + Ok(Arc::new(Statistics::new_unknown(&self.schema()))) +} +``` + +If you need an owned `Statistics` value (e.g. to mutate it), use `Arc::unwrap_or_clone`: + +```rust,ignore +// If you previously consumed the Statistics directly: +let stats = plan.partition_statistics(None)?; +stats.column_statistics[0].min_value = ...; + +// Now unwrap the Arc first: +let mut stats = Arc::unwrap_or_clone(plan.partition_statistics(None)?); +stats.column_statistics[0].min_value = ...; ``` From c14975f416bcf01a360f341808d14d9ab8468a6c Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 9 Mar 2026 16:27:14 +0800 Subject: [PATCH 4/6] rebase --- docs/source/library-user-guide/upgrading/54.0.0.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 616f79db30b01..a2de4a7b1076a 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -77,7 +77,7 @@ fn apply_expressions( **Node whose only expressions are in `output_ordering()` (e.g. a synthetic test node with no owned expression fields):** -```rust,ignore +````rust,ignore fn apply_expressions( &self, f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, @@ -100,7 +100,7 @@ fn apply_expressions( fn partition_statistics(&self, partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } -``` +```` **After:** From 44c3089fa0f2ecd27f1b002325f843aa85bb18c8 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 9 Mar 2026 17:22:01 +0800 Subject: [PATCH 5/6] update --- .../physical_optimizer/partition_statistics.rs | 6 +++--- datafusion/physical-plan/src/execution_plan.rs | 5 ++++- .../physical-plan/src/joins/hash_join/exec.rs | 16 +++++++++------- .../library-user-guide/upgrading/54.0.0.md | 5 +++-- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 185c22e5a6f65..e4b1f1b261510 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -1452,7 +1452,7 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_p0_statistics); + assert_eq!(*statistics[0], expected_p0_statistics); // Test Partitioned mode let partitioned_join = Arc::new(HashJoinExec::try_new( @@ -1526,7 +1526,7 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_p0_statistics); + assert_eq!(*statistics[0], expected_p0_statistics); // Test Auto mode - should fall back to getting all partition statistics let auto_join = Arc::new(HashJoinExec::try_new( @@ -1600,7 +1600,7 @@ mod test { }, ], }; - assert_eq!(statistics[0], expected_p0_statistics); + assert_eq!(*statistics[0], expected_p0_statistics); Ok(()) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 5dde4144aa86a..d1e0978cfe1c0 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -1781,7 +1781,10 @@ mod tests { unimplemented!() } - fn partition_statistics(&self, _partition: Option) -> Result { + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { unimplemented!() } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c67c1dfdb0065..a44ddf41b0b46 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1441,7 +1441,7 @@ impl ExecutionPlan for HashJoinExec { Some(self.metrics.clone_inner()) } - fn partition_statistics(&self, partition: Option) -> Result { + fn partition_statistics(&self, partition: Option) -> Result> { let stats = match (partition, self.mode) { // For CollectLeft mode, the left side is collected into a single partition, // so all left partitions are available to each output partition. @@ -1451,8 +1451,8 @@ impl ExecutionPlan for HashJoinExec { let right_stats = self.right.partition_statistics(Some(partition))?; estimate_join_statistics( - left_stats, - right_stats, + (*left_stats).clone(), + (*right_stats).clone(), &self.on, &self.join_type, &self.join_schema, @@ -1466,8 +1466,8 @@ impl ExecutionPlan for HashJoinExec { let right_stats = self.right.partition_statistics(Some(partition))?; estimate_join_statistics( - left_stats, - right_stats, + (*left_stats).clone(), + (*right_stats).clone(), &self.on, &self.join_type, &self.join_schema, @@ -1480,9 +1480,11 @@ impl ExecutionPlan for HashJoinExec { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` + let left_stats = self.left.partition_statistics(None)?; + let right_stats = self.right.partition_statistics(None)?; estimate_join_statistics( - self.left.partition_statistics(None)?, - self.right.partition_statistics(None)?, + (*left_stats).clone(), + (*right_stats).clone(), &self.on, &self.join_type, &self.join_schema, diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index a2de4a7b1076a..ad3ea1e4286dd 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -77,7 +77,7 @@ fn apply_expressions( **Node whose only expressions are in `output_ordering()` (e.g. a synthetic test node with no owned expression fields):** -````rust,ignore +```rust,ignore fn apply_expressions( &self, f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, @@ -90,6 +90,7 @@ fn apply_expressions( } Ok(tnr) } + ### `ExecutionPlan::partition_statistics` now returns `Arc` `ExecutionPlan::partition_statistics` now returns `Result>` instead of `Result`. This avoids cloning `Statistics` when it is shared across multiple consumers. @@ -100,7 +101,7 @@ fn apply_expressions( fn partition_statistics(&self, partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } -```` +``` **After:** From 75d40fa531fd890c8a252996a720abcb906b8ac1 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 9 Mar 2026 17:51:15 +0800 Subject: [PATCH 6/6] fix prettier --- docs/source/library-user-guide/upgrading/54.0.0.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index ad3ea1e4286dd..77b4fb6f71a35 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -77,7 +77,7 @@ fn apply_expressions( **Node whose only expressions are in `output_ordering()` (e.g. a synthetic test node with no owned expression fields):** -```rust,ignore +````rust,ignore fn apply_expressions( &self, f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, @@ -101,7 +101,7 @@ fn apply_expressions( fn partition_statistics(&self, partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } -``` +```` **After:**