From 551f0de116370c82afae1546f638ecd2c74830e7 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 27 Feb 2026 11:17:06 +0000 Subject: [PATCH 1/2] gpu scan bench Signed-off-by: Onur Satici --- Cargo.lock | 17 ++ Cargo.toml | 1 + vortex-cuda/gpu-scan-bench/Cargo.toml | 29 ++++ vortex-cuda/gpu-scan-bench/src/main.rs | 208 +++++++++++++++++++++++++ 4 files changed, 255 insertions(+) create mode 100644 vortex-cuda/gpu-scan-bench/Cargo.toml create mode 100644 vortex-cuda/gpu-scan-bench/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 1aabee2626a..42c27da14dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4048,6 +4048,23 @@ dependencies = [ "yansi", ] +[[package]] +name = "gpu-scan-bench" +version = "0.1.0" +dependencies = [ + "clap", + "futures", + "object_store", + "tokio", + "tracing", + "tracing-perfetto", + "tracing-subscriber", + "url", + "vortex", + "vortex-cuda", + "vortex-cuda-macros", +] + [[package]] name = "gpu-scan-cli" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5bbc2ebab02..cc3c024e5c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "vortex-duckdb", "vortex-cuda", "vortex-cuda/cub", + "vortex-cuda/gpu-scan-bench", "vortex-cuda/gpu-scan-cli", "vortex-cuda/macros", "vortex-cuda/nvcomp", diff --git a/vortex-cuda/gpu-scan-bench/Cargo.toml b/vortex-cuda/gpu-scan-bench/Cargo.toml new file mode 100644 index 00000000000..ec4a736fe7a --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "gpu-scan-bench" +authors = { workspace = true } +description = "CUDA GPU scan benchmarks for S3/NVMe" +edition = { workspace = true } +homepage = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +publish = false +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +clap = { workspace = true, features = ["derive"] } +futures = { workspace = true, features = ["executor"] } +object_store = { workspace = true, features = ["aws", "fs"] } +tokio = { workspace = true, features = ["macros", "full"] } +tracing = { workspace = true, features = ["std", "attributes"] } +tracing-perfetto = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +url = { workspace = true } +vortex = { workspace = true } +vortex-cuda = { workspace = true, features = ["_test-harness"] } +vortex-cuda-macros = { workspace = true } diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs new file mode 100644 index 00000000000..a1418161e12 --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -0,0 +1,208 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(unused_imports)] + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use clap::Parser; +use futures::StreamExt; +use object_store::aws::AmazonS3Builder; +use object_store::path::Path as ObjectPath; +use tracing::Instrument; +use tracing_perfetto::PerfettoLayer; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::Layer; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use url::Url; +use vortex::VortexSessionDefault; +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::file::OpenOptionsSessionExt; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; +use vortex_cuda::CudaSession; +use vortex_cuda::PinnedByteBufferPool; +use vortex_cuda::PooledFileReadAt; +use vortex_cuda::PooledObjectStoreReadAt; +use vortex_cuda::TracingLaunchStrategy; +use vortex_cuda::VortexCudaStreamPool; +use vortex_cuda::executor::CudaArrayExt; +use vortex_cuda::layout::register_cuda_layout; +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; + +#[derive(Parser)] +#[command( + name = "gpu-scan-bench", + about = "Benchmark GPU scans of CUDA-compatible Vortex files from S3 or local storage" +)] +struct Cli { + /// S3 URI (s3://bucket/path) or local path to a CUDA-compatible .vortex file. + source: String, + + /// Number of scan iterations. + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Path to write Perfetto trace output. If omitted, no trace file is written. + #[arg(long)] + perfetto: Option, + + /// Output logs as JSON. + #[arg(long)] + json: bool, +} + +#[cuda_not_available] +fn main() {} + +#[cuda_available] +#[tokio::main] +async fn main() -> VortexResult<()> { + let cli = Cli::parse(); + + // Setup tracing + let perfetto_guard = if let Some(ref perfetto_path) = cli.perfetto { + let perfetto_file = File::create(perfetto_path)?; + Some(PerfettoLayer::new(perfetto_file).with_debug_annotations(true)) + } else { + None + }; + + if cli.json { + let log_layer = tracing_subscriber::fmt::layer() + .json() + .with_span_events(FmtSpan::NONE) + .with_ansi(false); + + let mut registry = tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())); + + if let Some(perfetto) = perfetto_guard { + registry.with(perfetto).init(); + } else { + registry.init(); + } + } else { + let log_layer = tracing_subscriber::fmt::layer() + .pretty() + .with_span_events(FmtSpan::NONE) + .with_ansi(false) + .event_format(tracing_subscriber::fmt::format().with_target(true)); + + let mut registry = tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())); + + if let Some(perfetto) = perfetto_guard { + registry.with(perfetto).init(); + } else { + registry.init(); + } + } + + let session = VortexSession::default(); + register_cuda_layout(&session); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&session)? + .with_launch_strategy(Arc::new(TracingLaunchStrategy)); + + let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone( + cuda_ctx.stream().context(), + ))); + let cuda_stream = + VortexCudaStreamPool::new(Arc::clone(cuda_ctx.stream().context()), 1).get_stream()?; + let handle = session.handle(); + + // Parse source and create reader + let reader: Arc = if cli.source.starts_with("s3://") { + let url = Url::parse(&cli.source)?; + let bucket = url + .host_str() + .ok_or_else(|| vortex::error::vortex_err!("S3 URL missing bucket name"))?; + let path = ObjectPath::from(url.path()); + let store: Arc = Arc::new( + AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .build()?, + ); + Arc::new(PooledObjectStoreReadAt::new( + store, + path, + handle, + Arc::clone(&pool), + cuda_stream, + )) + } else { + let path = PathBuf::from(&cli.source); + Arc::new(PooledFileReadAt::open( + &path, + handle, + Arc::clone(&pool), + cuda_stream, + )?) + }; + + // Run benchmark iterations + let mut iteration_times = Vec::with_capacity(cli.iterations); + + for iteration in 0..cli.iterations { + let start = Instant::now(); + + let gpu_file = session.open_options().open(Arc::clone(&reader)).await?; + + let mut batches = gpu_file.scan()?.into_array_stream()?; + + let mut chunk = 0; + while let Some(next) = batches.next().await.transpose()? { + let len = next.len(); + let span = tracing::info_span!( + "batch execution", + iteration = iteration, + chunk = chunk, + len = len, + ); + + async { + next.execute_cuda(&mut cuda_ctx).await?; + VortexResult::Ok(()) + } + .instrument(span) + .await?; + + chunk += 1; + } + + let elapsed = start.elapsed(); + iteration_times.push(elapsed); + tracing::info!( + "Iteration {}/{}: {:.3}s", + iteration + 1, + cli.iterations, + elapsed.as_secs_f64() + ); + } + + // Print summary + let total: std::time::Duration = iteration_times.iter().sum(); + let avg = total / iteration_times.len() as u32; + + // Get file size for throughput + let file_size = reader.size().await?; + let throughput_mbs = (file_size as f64 / (1024.0 * 1024.0)) / avg.as_secs_f64(); + + eprintln!(); + eprintln!("=== Benchmark Results ==="); + eprintln!("Source: {}", cli.source); + eprintln!("Iterations: {}", cli.iterations); + eprintln!("Avg time: {:.3}s", avg.as_secs_f64()); + eprintln!("File size: {:.2} MB", file_size as f64 / (1024.0 * 1024.0)); + eprintln!("Throughput: {throughput_mbs:.2} MB/s"); + + Ok(()) +} From eed86273ff40ce175d3c08fd75a902c2eaabb1d5 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Wed, 4 Mar 2026 14:27:10 +0000 Subject: [PATCH 2/2] comments Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/src/main.rs | 358 ++++++++++++------------- 1 file changed, 178 insertions(+), 180 deletions(-) diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs index a1418161e12..19213f83338 100644 --- a/vortex-cuda/gpu-scan-bench/src/main.rs +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -1,208 +1,206 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -#![allow(unused_imports)] - -use std::fs::File; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Instant; - -use clap::Parser; -use futures::StreamExt; -use object_store::aws::AmazonS3Builder; -use object_store::path::Path as ObjectPath; -use tracing::Instrument; -use tracing_perfetto::PerfettoLayer; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::Layer; -use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; -use url::Url; -use vortex::VortexSessionDefault; -use vortex::error::VortexResult; -use vortex::error::vortex_bail; -use vortex::file::OpenOptionsSessionExt; -use vortex::io::session::RuntimeSessionExt; -use vortex::session::VortexSession; -use vortex_cuda::CudaSession; -use vortex_cuda::PinnedByteBufferPool; -use vortex_cuda::PooledFileReadAt; -use vortex_cuda::PooledObjectStoreReadAt; -use vortex_cuda::TracingLaunchStrategy; -use vortex_cuda::VortexCudaStreamPool; -use vortex_cuda::executor::CudaArrayExt; -use vortex_cuda::layout::register_cuda_layout; use vortex_cuda_macros::cuda_available; use vortex_cuda_macros::cuda_not_available; -#[derive(Parser)] -#[command( - name = "gpu-scan-bench", - about = "Benchmark GPU scans of CUDA-compatible Vortex files from S3 or local storage" -)] -struct Cli { - /// S3 URI (s3://bucket/path) or local path to a CUDA-compatible .vortex file. - source: String, - - /// Number of scan iterations. - #[arg(long, default_value_t = 1)] - iterations: usize, - - /// Path to write Perfetto trace output. If omitted, no trace file is written. - #[arg(long)] - perfetto: Option, - - /// Output logs as JSON. - #[arg(long)] - json: bool, -} - #[cuda_not_available] fn main() {} #[cuda_available] -#[tokio::main] -async fn main() -> VortexResult<()> { - let cli = Cli::parse(); - - // Setup tracing - let perfetto_guard = if let Some(ref perfetto_path) = cli.perfetto { - let perfetto_file = File::create(perfetto_path)?; - Some(PerfettoLayer::new(perfetto_file).with_debug_annotations(true)) - } else { - None - }; - - if cli.json { - let log_layer = tracing_subscriber::fmt::layer() - .json() - .with_span_events(FmtSpan::NONE) - .with_ansi(false); +fn main() -> vortex::error::VortexResult<()> { + cuda_main::main() +} - let mut registry = tracing_subscriber::registry() - .with(log_layer.with_filter(EnvFilter::from_default_env())); +#[cuda_available] +mod cuda_main { + use std::fs::File; + use std::path::Path; + use std::path::PathBuf; + use std::sync::Arc; + use std::time::Instant; + + use clap::Parser; + use futures::StreamExt; + use object_store::aws::AmazonS3Builder; + use object_store::path::Path as ObjectPath; + use tracing::Instrument; + use tracing_perfetto::PerfettoLayer; + use tracing_subscriber::EnvFilter; + use tracing_subscriber::Layer; + use tracing_subscriber::fmt::format::FmtSpan; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + use url::Url; + use vortex::VortexSessionDefault; + use vortex::error::VortexResult; + use vortex::file::OpenOptionsSessionExt; + use vortex::io::session::RuntimeSessionExt; + use vortex::session::VortexSession; + use vortex_cuda::CudaSession; + use vortex_cuda::PinnedByteBufferPool; + use vortex_cuda::PooledFileReadAt; + use vortex_cuda::PooledObjectStoreReadAt; + use vortex_cuda::TracingLaunchStrategy; + use vortex_cuda::VortexCudaStreamPool; + use vortex_cuda::executor::CudaArrayExt; + use vortex_cuda::layout::register_cuda_layout; + + #[derive(Parser)] + #[command( + name = "gpu-scan-bench", + about = "Benchmark GPU scans of CUDA-compatible Vortex files from S3 or local storage" + )] + struct Cli { + /// S3 URI (s3://bucket/path) or local path to a CUDA-compatible .vortex file. + source: String, + + /// Number of scan iterations. + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Path to write Perfetto trace output. If omitted, no trace file is written. + #[arg(long)] + perfetto: Option, + + /// Output logs as JSON. + #[arg(long)] + json: bool, + } - if let Some(perfetto) = perfetto_guard { - registry.with(perfetto).init(); + fn init_tracing(json: bool, perfetto_path: Option<&Path>) -> VortexResult<()> { + let perfetto_layer = if let Some(perfetto_path) = perfetto_path { + let perfetto_file = File::create(perfetto_path)?; + Some(PerfettoLayer::new(perfetto_file).with_debug_annotations(true)) } else { - registry.init(); - } - } else { - let log_layer = tracing_subscriber::fmt::layer() - .pretty() - .with_span_events(FmtSpan::NONE) - .with_ansi(false) - .event_format(tracing_subscriber::fmt::format().with_target(true)); + None + }; - let mut registry = tracing_subscriber::registry() - .with(log_layer.with_filter(EnvFilter::from_default_env())); + let base_log_layer = tracing_subscriber::fmt::layer() + .with_span_events(FmtSpan::NONE) + .with_ansi(false); - if let Some(perfetto) = perfetto_guard { - registry.with(perfetto).init(); + if json { + let log_layer = base_log_layer.json(); + tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())) + .with(perfetto_layer) + .init(); } else { - registry.init(); + let log_layer = base_log_layer + .pretty() + .event_format(tracing_subscriber::fmt::format().with_target(true)); + tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())) + .with(perfetto_layer) + .init(); } + + Ok(()) } - let session = VortexSession::default(); - register_cuda_layout(&session); - - let mut cuda_ctx = CudaSession::create_execution_ctx(&session)? - .with_launch_strategy(Arc::new(TracingLaunchStrategy)); - - let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone( - cuda_ctx.stream().context(), - ))); - let cuda_stream = - VortexCudaStreamPool::new(Arc::clone(cuda_ctx.stream().context()), 1).get_stream()?; - let handle = session.handle(); - - // Parse source and create reader - let reader: Arc = if cli.source.starts_with("s3://") { - let url = Url::parse(&cli.source)?; - let bucket = url - .host_str() - .ok_or_else(|| vortex::error::vortex_err!("S3 URL missing bucket name"))?; - let path = ObjectPath::from(url.path()); - let store: Arc = Arc::new( - AmazonS3Builder::from_env() - .with_bucket_name(bucket) - .build()?, - ); - Arc::new(PooledObjectStoreReadAt::new( - store, - path, - handle, - Arc::clone(&pool), - cuda_stream, - )) - } else { - let path = PathBuf::from(&cli.source); - Arc::new(PooledFileReadAt::open( - &path, - handle, - Arc::clone(&pool), - cuda_stream, - )?) - }; - - // Run benchmark iterations - let mut iteration_times = Vec::with_capacity(cli.iterations); - - for iteration in 0..cli.iterations { - let start = Instant::now(); - - let gpu_file = session.open_options().open(Arc::clone(&reader)).await?; - - let mut batches = gpu_file.scan()?.into_array_stream()?; - - let mut chunk = 0; - while let Some(next) = batches.next().await.transpose()? { - let len = next.len(); - let span = tracing::info_span!( - "batch execution", - iteration = iteration, - chunk = chunk, - len = len, + #[tokio::main] + pub async fn main() -> VortexResult<()> { + let cli = Cli::parse(); + init_tracing(cli.json, cli.perfetto.as_deref())?; + + let session = VortexSession::default(); + register_cuda_layout(&session); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&session)? + .with_launch_strategy(Arc::new(TracingLaunchStrategy)); + + let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone( + cuda_ctx.stream().context(), + ))); + let cuda_stream = + VortexCudaStreamPool::new(Arc::clone(cuda_ctx.stream().context()), 1).stream()?; + let handle = session.handle(); + + // Parse source and create reader + let reader: Arc = if cli.source.starts_with("s3://") { + let url = Url::parse(&cli.source)?; + let bucket = url + .host_str() + .ok_or_else(|| vortex::error::vortex_err!("S3 URL missing bucket name"))?; + let path = ObjectPath::from(url.path()); + let store: Arc = Arc::new( + AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .build()?, ); - - async { - next.execute_cuda(&mut cuda_ctx).await?; - VortexResult::Ok(()) + Arc::new(PooledObjectStoreReadAt::new( + store, + path, + handle, + Arc::clone(&pool), + cuda_stream, + )) + } else { + let path = PathBuf::from(&cli.source); + Arc::new(PooledFileReadAt::open( + &path, + handle, + Arc::clone(&pool), + cuda_stream, + )?) + }; + + // Run benchmark iterations + let mut iteration_times = Vec::with_capacity(cli.iterations); + + for iteration in 0..cli.iterations { + let start = Instant::now(); + + let gpu_file = session.open_options().open(Arc::clone(&reader)).await?; + let mut batches = gpu_file.scan()?.into_array_stream()?; + + let mut chunk = 0; + while let Some(next) = batches.next().await.transpose()? { + let len = next.len(); + let span = tracing::info_span!( + "batch execution", + iteration = iteration, + chunk = chunk, + len = len, + ); + + async { + next.execute_cuda(&mut cuda_ctx).await?; + VortexResult::Ok(()) + } + .instrument(span) + .await?; + + chunk += 1; } - .instrument(span) - .await?; - chunk += 1; + let elapsed = start.elapsed(); + iteration_times.push(elapsed); + tracing::info!( + "Iteration {}/{}: {:.3}s", + iteration + 1, + cli.iterations, + elapsed.as_secs_f64() + ); } - let elapsed = start.elapsed(); - iteration_times.push(elapsed); - tracing::info!( - "Iteration {}/{}: {:.3}s", - iteration + 1, - cli.iterations, - elapsed.as_secs_f64() - ); - } - - // Print summary - let total: std::time::Duration = iteration_times.iter().sum(); - let avg = total / iteration_times.len() as u32; + // Print summary + let total: std::time::Duration = iteration_times.iter().sum(); + let avg = total / iteration_times.len() as u32; - // Get file size for throughput - let file_size = reader.size().await?; - let throughput_mbs = (file_size as f64 / (1024.0 * 1024.0)) / avg.as_secs_f64(); + // Get file size for throughput + let file_size = reader.size().await?; + let throughput_mbs = (file_size as f64 / (1024.0 * 1024.0)) / avg.as_secs_f64(); - eprintln!(); - eprintln!("=== Benchmark Results ==="); - eprintln!("Source: {}", cli.source); - eprintln!("Iterations: {}", cli.iterations); - eprintln!("Avg time: {:.3}s", avg.as_secs_f64()); - eprintln!("File size: {:.2} MB", file_size as f64 / (1024.0 * 1024.0)); - eprintln!("Throughput: {throughput_mbs:.2} MB/s"); + eprintln!(); + eprintln!("=== Benchmark Results ==="); + eprintln!("Source: {}", cli.source); + eprintln!("Iterations: {}", cli.iterations); + eprintln!("Avg time: {:.3}s", avg.as_secs_f64()); + eprintln!("File size: {:.2} MB", file_size as f64 / (1024.0 * 1024.0)); + eprintln!("Throughput: {throughput_mbs:.2} MB/s"); - Ok(()) + Ok(()) + } }