|
| 1 | +//! Basic support for emitting a `*.data` file which contains samples of pulley |
| 2 | +//! bytecode. |
| 3 | +//! |
| 4 | +//! Pulley is Wasmtime's interpreter and native profilers are not good at |
| 5 | +//! profiling bytecode interpreters because they show hot bytecode instructions |
| 6 | +//! but we're instead often interested in the shape of the bytecode itself |
| 7 | +//! around the hot instruction, for example to identify new macro-instructions |
| 8 | +//! to add to Pulley. This module serves as a means of collecting data from |
| 9 | +//! Pulley being executed in-process and serializing it to a file. |
| 10 | +//! |
| 11 | +//! The file collected here is populated by a sampling thread in-process. This |
| 12 | +//! sampling thread only collects the current program counter of any interpeters |
| 13 | +//! in the process. This does not collect stack traces at all. That means that |
| 14 | +//! this profiler is only suitable for looking at "self time" and is not |
| 15 | +//! suitable for getting a broader picture of what's going on (e.g. why |
| 16 | +//! something was called in the first place). |
| 17 | +//! |
| 18 | +//! The general design of this profiler is: |
| 19 | +//! |
| 20 | +//! * Support for this all requires a `pulley-profile` feature at compile-time |
| 21 | +//! as it's generally a perf hit to the interpreter loop. |
| 22 | +//! * Each Pulley interpreter updates an `AtomicUsize` before all instructions |
| 23 | +//! with the current PC that it's executing. |
| 24 | +//! * This module spawns a "sampling thread" which will, at some frequency, |
| 25 | +//! collect all the PCs of all interpreters in the process. |
| 26 | +//! * Once enough samples have been collected they're flushed out to a data file |
| 27 | +//! on a second thread, the "recording thread". |
| 28 | +//! |
| 29 | +//! The hope is that the sampling thread stays as steady as possible in its |
| 30 | +//! sampling rate while not hitting OOM conditions in the process or anything |
| 31 | +//! like that. The `*.data` file that's emitted is intended to be processed by |
| 32 | +//! example code in the `pulley-interpreter` crate or `pulley/examples/*.rs` in |
| 33 | +//! the Wasmtime repository. |
| 34 | +
|
| 35 | +use crate::prelude::*; |
| 36 | +use crate::profiling_agent::ProfilingAgent; |
| 37 | +use crate::vm::Interpreter; |
| 38 | +use pulley_interpreter::profile::{ExecutingPc, Recorder, Samples}; |
| 39 | +use std::mem; |
| 40 | +use std::sync::mpsc; |
| 41 | +use std::sync::{Arc, Condvar, Mutex}; |
| 42 | +use std::thread::{self, JoinHandle}; |
| 43 | +use std::time::{Duration, Instant}; |
| 44 | + |
| 45 | +/// Implementation of `ProfilingAgent` from the Wasmtime crate. |
| 46 | +struct PulleyAgent { |
| 47 | + state: Arc<State>, |
| 48 | + |
| 49 | + /// Handle to the thread performing periodic sampling. This is joined on |
| 50 | + /// `Drop` of this structure so it's not a daemon thread permanently. |
| 51 | + sampling_thread: Option<JoinHandle<()>>, |
| 52 | + |
| 53 | + /// Same as the sampling thread above, but for recording data to the |
| 54 | + /// filesystem. |
| 55 | + recording_thread: Option<JoinHandle<()>>, |
| 56 | +} |
| 57 | + |
| 58 | +struct State { |
| 59 | + /// Protected state about the recorder, or the file being created. This is |
| 60 | + /// accessed both from the "recording thread" as well as `Engine` threads to |
| 61 | + /// register new pulley bytecode. |
| 62 | + recorder: Mutex<Recorder>, |
| 63 | + |
| 64 | + /// Protected state about sampling interpreters. This is accessed both from |
| 65 | + /// the "sampling thread" primarily but is additionally accessed from |
| 66 | + /// `Engine` threads to register new interpreters coming online. |
| 67 | + sampling: Mutex<SamplingState>, |
| 68 | + |
| 69 | + /// Condition variable which is signaled when sampling should cease and |
| 70 | + /// exit. This is coupled with `Drop for PulleyAgent`. |
| 71 | + sampling_done: Condvar, |
| 72 | + |
| 73 | + /// The frequency at which samples are collected. Defaults to 1000 but can |
| 74 | + /// be configured with the `PULLEY_SAMPLING_FREQ` environment variable. |
| 75 | + sampling_freq: u32, |
| 76 | + |
| 77 | + /// Number of samples to buffer before flushing them to a file. Defaults to |
| 78 | + /// 20000 but can be configured with the `PULLEY_SAMPLING_FLUSH_AMT` |
| 79 | + /// environment variable. |
| 80 | + sampling_flush_amt: u32, |
| 81 | +} |
| 82 | + |
| 83 | +/// State protected by a mutex in `State` above related to sampling. |
| 84 | +#[derive(Default)] |
| 85 | +struct SamplingState { |
| 86 | + /// All interpreters known to be executing. This is a list of |
| 87 | + /// pointers-to-the-current-PC which is updated whenever the interpreter |
| 88 | + /// executes an instruction. |
| 89 | + interpreters: Vec<ExecutingPc>, |
| 90 | + |
| 91 | + /// Current list of samples that have been collected. |
| 92 | + samples: Samples, |
| 93 | +} |
| 94 | + |
| 95 | +pub fn new() -> Result<Box<dyn ProfilingAgent>> { |
| 96 | + let pid = std::process::id(); |
| 97 | + let filename = format!("./pulley-{pid}.data"); |
| 98 | + let mut agent = PulleyAgent { |
| 99 | + state: Arc::new(State { |
| 100 | + recorder: Mutex::new(Recorder::new(&filename)?), |
| 101 | + sampling: Default::default(), |
| 102 | + sampling_done: Condvar::new(), |
| 103 | + sampling_freq: std::env::var("PULLEY_SAMPLING_FREQ") |
| 104 | + .ok() |
| 105 | + .and_then(|s| s.parse::<u32>().ok()) |
| 106 | + .unwrap_or(1_000), |
| 107 | + sampling_flush_amt: std::env::var("PULLEY_SAMPLING_FLUSH_AMT") |
| 108 | + .ok() |
| 109 | + .and_then(|s| s.parse::<u32>().ok()) |
| 110 | + .unwrap_or(20_000), |
| 111 | + }), |
| 112 | + sampling_thread: None, |
| 113 | + recording_thread: None, |
| 114 | + }; |
| 115 | + |
| 116 | + let (tx, rx) = mpsc::channel(); |
| 117 | + let state = agent.state.clone(); |
| 118 | + agent.sampling_thread = Some(thread::spawn(move || sampling_thread(&state, tx))); |
| 119 | + let state = agent.state.clone(); |
| 120 | + agent.recording_thread = Some(thread::spawn(move || recording_thread(&state, rx))); |
| 121 | + |
| 122 | + Ok(Box::new(agent)) |
| 123 | +} |
| 124 | + |
| 125 | +impl ProfilingAgent for PulleyAgent { |
| 126 | + /// New functions are registered with `Recorder` to record the exact |
| 127 | + /// bytecode so disassembly is available during profile analysis. |
| 128 | + /// |
| 129 | + /// Note that this also provides the native address that code is loaded at |
| 130 | + /// so samples know what code it's within. |
| 131 | + fn register_function(&self, name: &str, code: &[u8]) { |
| 132 | + self.state |
| 133 | + .recorder |
| 134 | + .lock() |
| 135 | + .unwrap() |
| 136 | + .add_function(name, code) |
| 137 | + .expect("failed to register pulley function"); |
| 138 | + } |
| 139 | + |
| 140 | + /// Registers a new interpreter coming online. Interpreters, with |
| 141 | + /// `pulley-profile` enabled, store a shadow program counter updated on each |
| 142 | + /// instruction which we can read from a different thread. |
| 143 | + fn register_interpreter(&self, interpreter: &Interpreter) { |
| 144 | + let pc = interpreter.pulley().executing_pc(); |
| 145 | + self.state |
| 146 | + .sampling |
| 147 | + .lock() |
| 148 | + .unwrap() |
| 149 | + .interpreters |
| 150 | + .push(pc.clone()); |
| 151 | + } |
| 152 | +} |
| 153 | + |
| 154 | +/// Execution of the thread responsible for sampling interpreters. |
| 155 | +/// |
| 156 | +/// This thread has a few tasks: |
| 157 | +/// |
| 158 | +/// * Needs to sample, at `state.sampling_freq`, the state of all known |
| 159 | +/// interpreters. Ideally this sampling is as steady as possible. |
| 160 | +/// * Needs to clean up interpeters which have been destroyed as there's |
| 161 | +/// otherwise no hook for doing so. |
| 162 | +/// * Needs to send batches of samples to the recording thread to get written to |
| 163 | +/// the filesystem. |
| 164 | +fn sampling_thread(state: &State, to_record: mpsc::Sender<Samples>) { |
| 165 | + // Calculate the `Duration` between each sample which will be in |
| 166 | + // nanoseconds. This duration is then used to create an `Instant` in time |
| 167 | + // where we'll be collecting the next sample. |
| 168 | + let between_ticks = Duration::new(0, 1_000_000_000 / state.sampling_freq); |
| 169 | + let start = Instant::now(); |
| 170 | + let mut next_sample = start + between_ticks; |
| 171 | + |
| 172 | + // Helper closure to send off a batch of samples to the recording thread. |
| 173 | + // Note that recording is done off-thread to ensure that the filesystem I/O |
| 174 | + // interferes as little as possible with the sampling rate here. |
| 175 | + let record = |sampling: &mut SamplingState| { |
| 176 | + if sampling.samples.num_samples() == 0 { |
| 177 | + return; |
| 178 | + } |
| 179 | + let samples = mem::take(&mut sampling.samples); |
| 180 | + to_record.send(samples).unwrap(); |
| 181 | + }; |
| 182 | + |
| 183 | + let mut sampling = state.sampling.lock().unwrap(); |
| 184 | + |
| 185 | + loop { |
| 186 | + // Calculate the duration, from this current moment in time, to when the |
| 187 | + // next sample is supposed to be taken. If the next sampling time is in |
| 188 | + // the past then this won't sleep but will still check the condvar. |
| 189 | + let dur = next_sample |
| 190 | + .checked_duration_since(Instant::now()) |
| 191 | + .unwrap_or(Duration::new(0, 0)); |
| 192 | + |
| 193 | + // Wait on `state.sampling_done`, but with the timeout we've calculated. |
| 194 | + // If this times out that means that the next sample can proceed. |
| 195 | + // Otherwise if this did not time out then it means that sampling should |
| 196 | + // cease as the profiler is being destroyed. |
| 197 | + let (guard, result) = state.sampling_done.wait_timeout(sampling, dur).unwrap(); |
| 198 | + sampling = guard; |
| 199 | + if !result.timed_out() { |
| 200 | + break; |
| 201 | + } |
| 202 | + |
| 203 | + // Now that we've decided to take a sample increment the next sample |
| 204 | + // time by our interval. Once we're done sampling below we'll then sleep |
| 205 | + // again up to this time. |
| 206 | + next_sample += between_ticks; |
| 207 | + |
| 208 | + // Sample the state of all interpreters known. This first starts by |
| 209 | + // discarding any interpreters that are offline. Samples without a PC |
| 210 | + // are additionally discarded as it means the interpreter is inactive. |
| 211 | + // |
| 212 | + // Once enough samples have been collected they're flushed to the |
| 213 | + // recording thread. |
| 214 | + let SamplingState { |
| 215 | + interpreters, |
| 216 | + samples, |
| 217 | + } = &mut *sampling; |
| 218 | + interpreters.retain(|a| !a.is_done()); |
| 219 | + for interpreter in interpreters.iter() { |
| 220 | + if let Some(pc) = interpreter.get() { |
| 221 | + samples.append(pc); |
| 222 | + } |
| 223 | + } |
| 224 | + if samples.num_samples() > state.sampling_flush_amt { |
| 225 | + record(&mut sampling); |
| 226 | + } |
| 227 | + } |
| 228 | + |
| 229 | + // Send any final samples to the recording thread after the loop has exited. |
| 230 | + record(&mut sampling); |
| 231 | +} |
| 232 | + |
| 233 | +/// Helper thread responsible for writing samples to the filesystem. |
| 234 | +/// |
| 235 | +/// This receives samples over `to_record` and then performs the filesystem I/O |
| 236 | +/// necessary to write them out. This thread completes once `to_record` is |
| 237 | +/// closed, or when the sampling thread completes. At that time all data in the |
| 238 | +/// recorder is flushed out as well. |
| 239 | +fn recording_thread(state: &State, to_record: mpsc::Receiver<Samples>) { |
| 240 | + for mut samples in to_record { |
| 241 | + state |
| 242 | + .recorder |
| 243 | + .lock() |
| 244 | + .unwrap() |
| 245 | + .add_samples(&mut samples) |
| 246 | + .expect("failed to write samples"); |
| 247 | + } |
| 248 | + |
| 249 | + state.recorder.lock().unwrap().flush().unwrap(); |
| 250 | +} |
| 251 | + |
| 252 | +impl Drop for PulleyAgent { |
| 253 | + fn drop(&mut self) { |
| 254 | + // First notify the sampling thread that it's time to shut down and |
| 255 | + // wait for it to exit. |
| 256 | + self.state.sampling_done.notify_one(); |
| 257 | + self.sampling_thread.take().unwrap().join().unwrap(); |
| 258 | + |
| 259 | + // Wait on the recording thread as well which should terminate once |
| 260 | + // `sampling_thread` has terminated as well. |
| 261 | + self.recording_thread.take().unwrap().join().unwrap(); |
| 262 | + } |
| 263 | +} |
0 commit comments