diff --git a/Cargo.lock b/Cargo.lock index 1aabee2626a..42c27da14dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4048,6 +4048,23 @@ dependencies = [ "yansi", ] +[[package]] +name = "gpu-scan-bench" +version = "0.1.0" +dependencies = [ + "clap", + "futures", + "object_store", + "tokio", + "tracing", + "tracing-perfetto", + "tracing-subscriber", + "url", + "vortex", + "vortex-cuda", + "vortex-cuda-macros", +] + [[package]] name = "gpu-scan-cli" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5bbc2ebab02..cc3c024e5c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "vortex-duckdb", "vortex-cuda", "vortex-cuda/cub", + "vortex-cuda/gpu-scan-bench", "vortex-cuda/gpu-scan-cli", "vortex-cuda/macros", "vortex-cuda/nvcomp", diff --git a/vortex-cuda/gpu-scan-bench/Cargo.toml b/vortex-cuda/gpu-scan-bench/Cargo.toml new file mode 100644 index 00000000000..ec4a736fe7a --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "gpu-scan-bench" +authors = { workspace = true } +description = "CUDA GPU scan benchmarks for S3/NVMe" +edition = { workspace = true } +homepage = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +publish = false +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +clap = { workspace = true, features = ["derive"] } +futures = { workspace = true, features = ["executor"] } +object_store = { workspace = true, features = ["aws", "fs"] } +tokio = { workspace = true, features = ["macros", "full"] } +tracing = { workspace = true, features = ["std", "attributes"] } +tracing-perfetto = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +url = { workspace = true } +vortex = { workspace = true } +vortex-cuda = { workspace = true, features = ["_test-harness"] } +vortex-cuda-macros = { workspace = true } diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs new file mode 100644 index 00000000000..19213f83338 --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -0,0 +1,206 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; + +#[cuda_not_available] +fn main() {} + +#[cuda_available] +fn main() -> vortex::error::VortexResult<()> { + cuda_main::main() +} + +#[cuda_available] +mod cuda_main { + use std::fs::File; + use std::path::Path; + use std::path::PathBuf; + use std::sync::Arc; + use std::time::Instant; + + use clap::Parser; + use futures::StreamExt; + use object_store::aws::AmazonS3Builder; + use object_store::path::Path as ObjectPath; + use tracing::Instrument; + use tracing_perfetto::PerfettoLayer; + use tracing_subscriber::EnvFilter; + use tracing_subscriber::Layer; + use tracing_subscriber::fmt::format::FmtSpan; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + use url::Url; + use vortex::VortexSessionDefault; + use vortex::error::VortexResult; + use vortex::file::OpenOptionsSessionExt; + use vortex::io::session::RuntimeSessionExt; + use vortex::session::VortexSession; + use vortex_cuda::CudaSession; + use vortex_cuda::PinnedByteBufferPool; + use vortex_cuda::PooledFileReadAt; + use vortex_cuda::PooledObjectStoreReadAt; + use vortex_cuda::TracingLaunchStrategy; + use vortex_cuda::VortexCudaStreamPool; + use vortex_cuda::executor::CudaArrayExt; + use vortex_cuda::layout::register_cuda_layout; + + #[derive(Parser)] + #[command( + name = "gpu-scan-bench", + about = "Benchmark GPU scans of CUDA-compatible Vortex files from S3 or local storage" + )] + struct Cli { + /// S3 URI (s3://bucket/path) or local path to a CUDA-compatible .vortex file. + source: String, + + /// Number of scan iterations. + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Path to write Perfetto trace output. If omitted, no trace file is written. + #[arg(long)] + perfetto: Option, + + /// Output logs as JSON. + #[arg(long)] + json: bool, + } + + fn init_tracing(json: bool, perfetto_path: Option<&Path>) -> VortexResult<()> { + let perfetto_layer = if let Some(perfetto_path) = perfetto_path { + let perfetto_file = File::create(perfetto_path)?; + Some(PerfettoLayer::new(perfetto_file).with_debug_annotations(true)) + } else { + None + }; + + let base_log_layer = tracing_subscriber::fmt::layer() + .with_span_events(FmtSpan::NONE) + .with_ansi(false); + + if json { + let log_layer = base_log_layer.json(); + tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())) + .with(perfetto_layer) + .init(); + } else { + let log_layer = base_log_layer + .pretty() + .event_format(tracing_subscriber::fmt::format().with_target(true)); + tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())) + .with(perfetto_layer) + .init(); + } + + Ok(()) + } + + #[tokio::main] + pub async fn main() -> VortexResult<()> { + let cli = Cli::parse(); + init_tracing(cli.json, cli.perfetto.as_deref())?; + + let session = VortexSession::default(); + register_cuda_layout(&session); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&session)? + .with_launch_strategy(Arc::new(TracingLaunchStrategy)); + + let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone( + cuda_ctx.stream().context(), + ))); + let cuda_stream = + VortexCudaStreamPool::new(Arc::clone(cuda_ctx.stream().context()), 1).stream()?; + let handle = session.handle(); + + // Parse source and create reader + let reader: Arc = if cli.source.starts_with("s3://") { + let url = Url::parse(&cli.source)?; + let bucket = url + .host_str() + .ok_or_else(|| vortex::error::vortex_err!("S3 URL missing bucket name"))?; + let path = ObjectPath::from(url.path()); + let store: Arc = Arc::new( + AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .build()?, + ); + Arc::new(PooledObjectStoreReadAt::new( + store, + path, + handle, + Arc::clone(&pool), + cuda_stream, + )) + } else { + let path = PathBuf::from(&cli.source); + Arc::new(PooledFileReadAt::open( + &path, + handle, + Arc::clone(&pool), + cuda_stream, + )?) + }; + + // Run benchmark iterations + let mut iteration_times = Vec::with_capacity(cli.iterations); + + for iteration in 0..cli.iterations { + let start = Instant::now(); + + let gpu_file = session.open_options().open(Arc::clone(&reader)).await?; + let mut batches = gpu_file.scan()?.into_array_stream()?; + + let mut chunk = 0; + while let Some(next) = batches.next().await.transpose()? { + let len = next.len(); + let span = tracing::info_span!( + "batch execution", + iteration = iteration, + chunk = chunk, + len = len, + ); + + async { + next.execute_cuda(&mut cuda_ctx).await?; + VortexResult::Ok(()) + } + .instrument(span) + .await?; + + chunk += 1; + } + + let elapsed = start.elapsed(); + iteration_times.push(elapsed); + tracing::info!( + "Iteration {}/{}: {:.3}s", + iteration + 1, + cli.iterations, + elapsed.as_secs_f64() + ); + } + + // Print summary + let total: std::time::Duration = iteration_times.iter().sum(); + let avg = total / iteration_times.len() as u32; + + // Get file size for throughput + let file_size = reader.size().await?; + let throughput_mbs = (file_size as f64 / (1024.0 * 1024.0)) / avg.as_secs_f64(); + + eprintln!(); + eprintln!("=== Benchmark Results ==="); + eprintln!("Source: {}", cli.source); + eprintln!("Iterations: {}", cli.iterations); + eprintln!("Avg time: {:.3}s", avg.as_secs_f64()); + eprintln!("File size: {:.2} MB", file_size as f64 / (1024.0 * 1024.0)); + eprintln!("Throughput: {throughput_mbs:.2} MB/s"); + + Ok(()) + } +}