Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ impl TurbopackFormat {
}
}
}
TraceRow::MemorySample { ts, memory } => {
let ts = Timestamp::from_micros(ts);
store.add_memory_sample(ts, memory);
}
TraceRow::AllocationCounters {
ts: _,
thread_id,
Expand Down
5 changes: 5 additions & 0 deletions turbopack/crates/turbopack-trace-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum ServerToClientMessage {
persistent_allocations: u64,
args: Vec<(String, String)>,
path: Vec<String>,
memory_samples: Vec<u64>,
},
}

Expand Down Expand Up @@ -286,6 +287,8 @@ fn handle_connection(
current = parent;
}
path.reverse();
let memory_samples =
store.memory_samples_for_range(span.start(), span.end());
ServerToClientMessage::QueryResult {
id,
is_graph,
Expand All @@ -299,6 +302,7 @@ fn handle_connection(
persistent_allocations,
args,
path,
memory_samples,
}
} else {
ServerToClientMessage::QueryResult {
Expand All @@ -314,6 +318,7 @@ fn handle_connection(
persistent_allocations: 0,
args: Vec::new(),
path: Vec::new(),
memory_samples: Vec::new(),
}
}
};
Expand Down
50 changes: 50 additions & 0 deletions turbopack/crates/turbopack-trace-server/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ pub type SpanId = NonZeroUsize;
/// at the cut-off depth (Flattening).
const CUT_OFF_DEPTH: u32 = 80;

/// A single memory usage sample: (timestamp, memory_bytes).
/// Sorted by timestamp.
type MemorySample = (Timestamp, u64);

/// Maximum number of memory samples returned in a query result.
const MAX_MEMORY_SAMPLES: usize = 200;

pub struct Store {
pub(crate) spans: Vec<Span>,
pub(crate) self_time_tree: Option<SelfTimeTree<SpanIndex>>,
max_self_time_lookup_time: AtomicU64,
/// Global sorted list of memory samples (timestamp, memory_bytes).
memory_samples: Vec<MemorySample>,
}

fn new_root_span() -> Span {
Expand Down Expand Up @@ -63,6 +72,7 @@ impl Store {
.is_none()
.then(SelfTimeTree::new),
max_self_time_lookup_time: AtomicU64::new(0),
memory_samples: Vec::new(),
}
}

Expand All @@ -73,6 +83,7 @@ impl Store {
*tree = SelfTimeTree::new();
}
*self.max_self_time_lookup_time.get_mut() = 0;
self.memory_samples.clear();
}

pub fn has_time_info(&self) -> bool {
Expand Down Expand Up @@ -323,6 +334,45 @@ impl Store {
span.self_deallocation_count += count;
}

pub fn add_memory_sample(&mut self, ts: Timestamp, memory: u64) {
// Samples arrive nearly sorted (roughly chronological from the trace
// writer), so an insertion-sort step is efficient: push to the end
// then swap backward until the timestamp ordering is restored.
self.memory_samples.push((ts, memory));
let mut i = self.memory_samples.len() - 1;
while i > 0 && self.memory_samples[i - 1].0 > ts {
self.memory_samples.swap(i, i - 1);
i -= 1;
}
}

/// Returns up to `MAX_MEMORY_SAMPLES` memory samples in the range
/// `[start, end]`. When more samples exist, groups of N consecutive
/// samples are merged by taking the maximum memory value in each group.
pub fn memory_samples_for_range(&self, start: Timestamp, end: Timestamp) -> Vec<u64> {
// Binary search for the first sample >= start
let lo = self.memory_samples.partition_point(|(ts, _)| *ts < start);
// Binary search for the first sample > end
let hi = self.memory_samples.partition_point(|(ts, _)| *ts <= end);

let slice = &self.memory_samples[lo..hi];
let count = slice.len();
if count == 0 {
return Vec::new();
}

if count <= MAX_MEMORY_SAMPLES {
return slice.iter().map(|(_, mem)| *mem).collect();
}

// Merge groups of N samples, taking the max memory in each group.
let n = count.div_ceil(MAX_MEMORY_SAMPLES);
slice
.chunks(n)
.map(|chunk| chunk.iter().map(|(_, mem)| *mem).max().unwrap())
.collect()
}

pub fn complete_span(&mut self, span_index: SpanIndex) {
let span = &mut self.spans[span_index.get()];
span.is_complete = true;
Expand Down
54 changes: 53 additions & 1 deletion turbopack/crates/turbopack-trace-utils/src/raw_trace.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use std::{
borrow::Cow, fmt::Write, marker::PhantomData, sync::atomic::AtomicU64, thread, time::Instant,
borrow::Cow,
cell::Cell,
fmt::Write,
marker::PhantomData,
sync::atomic::{AtomicU64, Ordering},
thread,
time::Instant,
};

use tracing::{
Expand All @@ -16,6 +22,15 @@ use crate::{
tracing::{TraceRow, TraceValue},
};

/// 10ms in microseconds
const MEMORY_SAMPLE_INTERVAL_US: u64 = 10_000;

static GLOBAL_LAST_MEMORY_SAMPLE: AtomicU64 = AtomicU64::new(0);

thread_local! {
static THREAD_LOCAL_LAST_MEMORY_SAMPLE: Cell<u64> = const { Cell::new(0) };
}

pub struct RawTraceLayerOptions {}

struct RawTraceLayerExtension {
Expand Down Expand Up @@ -60,6 +75,42 @@ impl<S: Subscriber + for<'a> LookupSpan<'a>> RawTraceLayer<S> {
TurboMalloc::reset_allocation_counters(start);
}

fn maybe_report_memory_sample(&self, ts: u64) {
// Fast thread-local check
let skip = THREAD_LOCAL_LAST_MEMORY_SAMPLE
.with(|tl| ts.saturating_sub(tl.get()) < MEMORY_SAMPLE_INTERVAL_US);
if skip {
return;
}

// Check global atomic
let global_last = GLOBAL_LAST_MEMORY_SAMPLE.load(Ordering::Relaxed);
if ts.saturating_sub(global_last) < MEMORY_SAMPLE_INTERVAL_US {
// Another thread sampled recently; update thread-local cache
THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(global_last));
return;
}

// Try to atomically claim the sample
match GLOBAL_LAST_MEMORY_SAMPLE.compare_exchange(
global_last,
ts,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
// We won the race — write the sample
THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(ts));
let memory = TurboMalloc::memory_usage() as u64;
self.write(TraceRow::MemorySample { ts, memory });
}
Err(actual) => {
// Lost the race; update thread-local with the winner's timestamp
THREAD_LOCAL_LAST_MEMORY_SAMPLE.with(|tl| tl.set(actual));
}
}
}

fn report_allocations(&self, ts: u64, thread_id: u64) {
let allocation_counters = turbo_tasks_malloc::TurboMalloc::allocation_counters();
self.write(TraceRow::AllocationCounters {
Expand Down Expand Up @@ -115,6 +166,7 @@ impl<S: Subscriber + for<'a> LookupSpan<'a>> Layer<S> for RawTraceLayer<S> {
fn on_enter(&self, id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
let ts = self.start.elapsed().as_micros() as u64;
let thread_id = thread::current().id().as_u64().into();
self.maybe_report_memory_sample(ts);
self.report_allocations(ts, thread_id);
self.write(TraceRow::Enter {
ts,
Expand Down
7 changes: 7 additions & 0 deletions turbopack/crates/turbopack-trace-utils/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ pub enum TraceRow<'a> {
/// Deallocation count
deallocation_count: u64,
},
/// A snapshot of the process memory usage at a point in time.
MemorySample {
/// Timestamp
ts: u64,
/// Memory usage in bytes (from TurboMalloc::memory_usage())
memory: u64,
},
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
Loading