diff --git a/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs b/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs index 53632497c875f6..4a025a2b0f38e0 100644 --- a/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs +++ b/turbopack/crates/turbopack-trace-server/src/reader/turbopack.rs @@ -288,6 +288,10 @@ impl TurbopackFormat { } } } + TraceRow::MemorySample { ts, memory } => { + let ts = Timestamp::from_micros(ts); + store.add_memory_sample(ts, memory); + } TraceRow::AllocationCounters { ts: _, thread_id, diff --git a/turbopack/crates/turbopack-trace-server/src/server.rs b/turbopack/crates/turbopack-trace-server/src/server.rs index 2526db4b6cd421..f4c43d24ef0ebc 100644 --- a/turbopack/crates/turbopack-trace-server/src/server.rs +++ b/turbopack/crates/turbopack-trace-server/src/server.rs @@ -43,6 +43,7 @@ pub enum ServerToClientMessage { persistent_allocations: u64, args: Vec<(String, String)>, path: Vec, + memory_samples: Vec, }, } @@ -286,6 +287,8 @@ fn handle_connection( current = parent; } path.reverse(); + let memory_samples = + store.memory_samples_for_range(span.start(), span.end()); ServerToClientMessage::QueryResult { id, is_graph, @@ -299,6 +302,7 @@ fn handle_connection( persistent_allocations, args, path, + memory_samples, } } else { ServerToClientMessage::QueryResult { @@ -314,6 +318,7 @@ fn handle_connection( persistent_allocations: 0, args: Vec::new(), path: Vec::new(), + memory_samples: Vec::new(), } } }; diff --git a/turbopack/crates/turbopack-trace-server/src/store.rs b/turbopack/crates/turbopack-trace-server/src/store.rs index 9397524dde8fe8..102f04f7d59e96 100644 --- a/turbopack/crates/turbopack-trace-server/src/store.rs +++ b/turbopack/crates/turbopack-trace-server/src/store.rs @@ -22,10 +22,19 @@ pub type SpanId = NonZeroUsize; /// at the cut-off depth (Flattening). const CUT_OFF_DEPTH: u32 = 80; +/// A single memory usage sample: (timestamp, memory_bytes). +/// Sorted by timestamp. +type MemorySample = (Timestamp, u64); + +/// Maximum number of memory samples returned in a query result. +const MAX_MEMORY_SAMPLES: usize = 200; + pub struct Store { pub(crate) spans: Vec, pub(crate) self_time_tree: Option>, max_self_time_lookup_time: AtomicU64, + /// Global sorted list of memory samples (timestamp, memory_bytes). + memory_samples: Vec, } fn new_root_span() -> Span { @@ -63,6 +72,7 @@ impl Store { .is_none() .then(SelfTimeTree::new), max_self_time_lookup_time: AtomicU64::new(0), + memory_samples: Vec::new(), } } @@ -73,6 +83,7 @@ impl Store { *tree = SelfTimeTree::new(); } *self.max_self_time_lookup_time.get_mut() = 0; + self.memory_samples.clear(); } pub fn has_time_info(&self) -> bool { @@ -323,6 +334,45 @@ impl Store { span.self_deallocation_count += count; } + pub fn add_memory_sample(&mut self, ts: Timestamp, memory: u64) { + // Samples arrive nearly sorted (roughly chronological from the trace + // writer), so an insertion-sort step is efficient: push to the end + // then swap backward until the timestamp ordering is restored. + self.memory_samples.push((ts, memory)); + let mut i = self.memory_samples.len() - 1; + while i > 0 && self.memory_samples[i - 1].0 > ts { + self.memory_samples.swap(i, i - 1); + i -= 1; + } + } + + /// Returns up to `MAX_MEMORY_SAMPLES` memory samples in the range + /// `[start, end]`. When more samples exist, groups of N consecutive + /// samples are merged by taking the maximum memory value in each group. + pub fn memory_samples_for_range(&self, start: Timestamp, end: Timestamp) -> Vec { + // Binary search for the first sample >= start + let lo = self.memory_samples.partition_point(|(ts, _)| *ts < start); + // Binary search for the first sample > end + let hi = self.memory_samples.partition_point(|(ts, _)| *ts <= end); + + let slice = &self.memory_samples[lo..hi]; + let count = slice.len(); + if count == 0 { + return Vec::new(); + } + + if count <= MAX_MEMORY_SAMPLES { + return slice.iter().map(|(_, mem)| *mem).collect(); + } + + // Merge groups of N samples, taking the max memory in each group. + let n = count.div_ceil(MAX_MEMORY_SAMPLES); + slice + .chunks(n) + .map(|chunk| chunk.iter().map(|(_, mem)| *mem).max().unwrap()) + .collect() + } + pub fn complete_span(&mut self, span_index: SpanIndex) { let span = &mut self.spans[span_index.get()]; span.is_complete = true; diff --git a/turbopack/crates/turbopack-trace-utils/src/raw_trace.rs b/turbopack/crates/turbopack-trace-utils/src/raw_trace.rs index d5ae223c7548f5..923d9677cbd83d 100644 --- a/turbopack/crates/turbopack-trace-utils/src/raw_trace.rs +++ b/turbopack/crates/turbopack-trace-utils/src/raw_trace.rs @@ -1,5 +1,11 @@ use std::{ - borrow::Cow, fmt::Write, marker::PhantomData, sync::atomic::AtomicU64, thread, time::Instant, + borrow::Cow, + cell::Cell, + fmt::Write, + marker::PhantomData, + sync::atomic::{AtomicU64, Ordering}, + thread, + time::Instant, }; use tracing::{ @@ -16,6 +22,15 @@ use crate::{ tracing::{TraceRow, TraceValue}, }; +/// 10ms in microseconds +const MEMORY_SAMPLE_INTERVAL_US: u64 = 10_000; + +static GLOBAL_LAST_MEMORY_SAMPLE: AtomicU64 = AtomicU64::new(0); + +thread_local! { + static THREAD_LOCAL_LAST_MEMORY_SAMPLE: Cell = const { Cell::new(0) }; +} + pub struct RawTraceLayerOptions {} struct RawTraceLayerExtension { @@ -60,6 +75,42 @@ impl LookupSpan<'a>> RawTraceLayer { TurboMalloc::reset_allocation_counters(start); } + fn maybe_report_memory_sample(&self, ts: u64) { + // Fast thread-local check + let skip = THREAD_LOCAL_LAST_MEMORY_SAMPLE + .with(|tl| ts.saturating_sub(tl.get()) < MEMORY_SAMPLE_INTERVAL_US); + if skip { + return; + } + + // Check global atomic + let global_last = GLOBAL_LAST_MEMORY_SAMPLE.load(Ordering::Relaxed); + if ts.saturating_sub(global_last) < MEMORY_SAMPLE_INTERVAL_US { + // Another thread sampled recently; update thread-local cache + THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(global_last)); + return; + } + + // Try to atomically claim the sample + match GLOBAL_LAST_MEMORY_SAMPLE.compare_exchange( + global_last, + ts, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => { + // We won the race — write the sample + THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(ts)); + let memory = TurboMalloc::memory_usage() as u64; + self.write(TraceRow::MemorySample { ts, memory }); + } + Err(actual) => { + // Lost the race; update thread-local with the winner's timestamp + THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(actual)); + } + } + } + fn report_allocations(&self, ts: u64, thread_id: u64) { let allocation_counters = turbo_tasks_malloc::TurboMalloc::allocation_counters(); self.write(TraceRow::AllocationCounters { @@ -115,6 +166,7 @@ impl LookupSpan<'a>> Layer for RawTraceLayer { fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) { let ts = self.start.elapsed().as_micros() as u64; let thread_id = thread::current().id().as_u64().into(); + self.maybe_report_memory_sample(ts); self.report_allocations(ts, thread_id); self.write(TraceRow::Enter { ts, diff --git a/turbopack/crates/turbopack-trace-utils/src/tracing.rs b/turbopack/crates/turbopack-trace-utils/src/tracing.rs index 3c9fe3130f062b..175b85db0a5ded 100644 --- a/turbopack/crates/turbopack-trace-utils/src/tracing.rs +++ b/turbopack/crates/turbopack-trace-utils/src/tracing.rs @@ -100,6 +100,13 @@ pub enum TraceRow<'a> { /// Deallocation count deallocation_count: u64, }, + /// A snapshot of the process memory usage at a point in time. + MemorySample { + /// Timestamp + ts: u64, + /// Memory usage in bytes (from TurboMalloc::memory_usage()) + memory: u64, + }, } #[derive(Debug, Serialize, Deserialize)]