Skip to content

Set an upper bound for the number of timeline events gathered #6089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions integration-tests/profiler/eventlimits.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict'

const performance = require('perf_hooks').performance

const EVENT_COUNT = parseInt(process.argv[2])
require('dd-trace').init().profilerStarted().then(() => {
const EventSource = require('dd-trace/packages/dd-trace/src/profiling/profilers/events.js')
const template = {
entryType: 'dns',
duration: 10,
name: 'lookup',
_ddSpanId: '1234567890abcdef',
_ddRootSpanId: 'abcdef1234567890',
detail: {
hostname: 'example.com'
}
}
for (let i = 0; i < EVENT_COUNT; i++) {
EventSource.emitTestEvent({ startTime: performance.now(), ...template })
}
})
27 changes: 27 additions & 0 deletions integration-tests/profiler/profiler.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,33 @@ describe('profiler', () => {
assert.equal(endpoints.size, 3, encoded)
})

it('number of events is limited', async () => {
const threads = 1
const uploadPeriodSec = 10
const samplingPeriodMs = 1000 / 99
// Recreates getMaxSamples from packages/dd-trace/src/profiling/profilers/events.js
const maxEvents = Math.floor((threads + 2) * uploadPeriodSec * 1000 / samplingPeriodMs)

// Test will try to force maxEvents + 10 events, we'll assert that only maxEvents are captured.
const proc = fork(path.join(cwd, 'profiler/eventlimits.js'), [String(maxEvents + 10)], {
cwd,
env: {
DD_PROFILING_EXPORTERS: 'file',
DD_PROFILING_ENABLED: 1,
DD_INTERNAL_PROFILING_TIMELINE_SAMPLING_ENABLED: 0, // capture all events
DD_TRACE_AGENT_PORT: agent.port,
TESTING_PROFILING_EVENTS: 'true', // enable test event source
DD_PROFILING_UPLOAD_PERIOD: String(uploadPeriodSec),
UV_THREADPOOL_SIZE: String(threads)
}
})

await processExitPromise(proc, TIMEOUT)

const { profile, encoded } = await getLatestProfile(cwd, /^events_.+\.pprof$/)
assert.equal(profile.sample.length, maxEvents, encoded)
})

it('fs timeline events work', async () => {
const fsEvents = await gatherFilesystemTimelineEvents(cwd, 'profiler/fstest.js', agent.port)
assert.equal(fsEvents.length, 6)
Expand Down
2 changes: 2 additions & 0 deletions packages/dd-trace/src/profiling/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class Config {
samplingContextsAvailable))
checkOptionWithSamplingContextAllowed(this.cpuProfilingEnabled, 'CPU profiling')

this.samplingInterval = coalesce(options.samplingInterval, 1e3 / 99) // 99hz in millis

this.heapSamplingInterval = coalesce(options.heapSamplingInterval,
Number(DD_PROFILING_HEAP_SAMPLING_INTERVAL))
const uploadCompression0 = coalesce(options.uploadCompression, DD_PROFILING_DEBUG_UPLOAD_COMPRESSION, 'on')
Expand Down
23 changes: 2 additions & 21 deletions packages/dd-trace/src/profiling/exporters/event_serializer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,7 @@
const os = require('os')
const perf = require('perf_hooks').performance
const version = require('../../../../../package.json').version
const { getEnvironmentVariable } = require('../../config-helper')

const libuvThreadPoolSize = (() => {
const ss = getEnvironmentVariable('UV_THREADPOOL_SIZE')
if (ss === undefined) {
// Backend will apply the default size based on Node version.
return
}
// libuv uses atoi to parse the value, which is almost the same as parseInt, except that parseInt
// will return NaN on invalid input, while atoi will return 0. This is handled at return.
const s = Number.parseInt(ss)
// We don't interpret the value further here in the library. Backend will interpret the number
// based on Node version. In all currently known Node versions, 0 results in 1 worker thread,
// negative values (because they're assigned to an unsigned int) become very high positive values,
// and the value is finally capped at 1024.
return Number.isNaN(s) ? 0 : s
})()
const { availableParallelism, libuvThreadPoolSize } = require('../libuv-size')

class EventSerializer {
constructor ({ env, host, service, version, libraryInjected, activation } = {}) {
Expand Down Expand Up @@ -77,10 +61,7 @@ class EventSerializer {
version
},
runtime: {
// os.availableParallelism only available in node 18.14.0/19.4.0 and above
available_processors: typeof os.availableParallelism === 'function'
? os.availableParallelism()
: os.cpus().length,
available_processors: availableParallelism(),
// Using `nodejs` for consistency with the existing `runtime` tag.
// Note that the event `family` property uses `node`, as that's what's
// proscribed by the Intake API, but that's an internal enum and is
Expand Down
49 changes: 49 additions & 0 deletions packages/dd-trace/src/profiling/libuv-size.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict'

const { getEnvironmentVariable } = require('../config-helper')
const os = require('node:os')

function getLibuvThreadPoolSize (envVar) {
if (envVar === undefined) {
return
}
// libuv uses atoi to parse the value, which is almost the same as parseInt, except that parseInt
// will return NaN on invalid input, while atoi will return 0. This is handled at return.
const s = Number.parseInt(envVar, 10)
// We don't interpret the value further here in the library. Backend will interpret the number
// based on Node version.
return Number.isNaN(s) ? 0 : s
}

const libuvThreadPoolSize = getLibuvThreadPoolSize(getEnvironmentVariable('UV_THREADPOOL_SIZE'))

function getEffectiveLibuvThreadCount (size) {
// In all currently known Node versions, 0 results in 1 worker thread, negative values (because
// they're assigned to an unsigned int) become very high positive values, and the value is finally
// capped at 1024.
if (size === undefined) {
return 4
} else if (size < 0 || size > 1024) {
return 1024
} else if (size === 0) {
return 1
}
return size
}

const effectiveLibuvThreadCount = getEffectiveLibuvThreadCount(libuvThreadPoolSize)

function availableParallelism () {
// os.availableParallelism only available in node 18.14.0/19.4.0 and above
// eslint-disable-next-line n/no-unsupported-features/node-builtins
return typeof os.availableParallelism === 'function' ? os.availableParallelism() : os.cpus().length
}

module.exports = {
availableParallelism,
effectiveLibuvThreadCount,
libuvThreadPoolSize,
// Only used for testing
getLibuvThreadPoolSize,
getEffectiveLibuvThreadCount
}
103 changes: 91 additions & 12 deletions packages/dd-trace/src/profiling/profilers/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
const { performance, constants, PerformanceObserver } = require('perf_hooks')
const { END_TIMESTAMP_LABEL, SPAN_ID_LABEL, LOCAL_ROOT_SPAN_ID_LABEL, encodeProfileAsync } = require('./shared')
const { Function, Label, Line, Location, Profile, Sample, StringTable, ValueType } = require('pprof-format')
const { availableParallelism, effectiveLibuvThreadCount } = require('../libuv-size')
const { getEnvironmentVariable } = require('../../config-helper')
const dc = require('dc-polyfill')

const testEventChannel = ['true', '1'].includes(getEnvironmentVariable('TESTING_PROFILING_EVENTS'))
? dc.channel('dd-trace:profiling:events-test')
: undefined

// perf_hooks uses millis, with fractional part representing nanos. We emit nanos into the pprof file.
const MS_TO_NS = 1_000_000
Expand Down Expand Up @@ -38,6 +45,20 @@ function labelFromStrStr (stringTable, keyStr, valStr) {
return labelFromStr(stringTable, stringTable.dedup(keyStr), valStr)
}

function getMaxSamples (options) {
const cpuSamplingInterval = (options.samplingInterval || 1e3 / 99) // 99hz
const flushInterval = options.flushInterval || 65 * 1e3 // 60 seconds
const maxCpuSamples = flushInterval / cpuSamplingInterval

// The lesser of max parallelism and libuv thread pool size, plus one so we can detect
// oversubscription on libuv thread pool, plus another one for GC.
const factor = Math.max(1, Math.min(availableParallelism(), effectiveLibuvThreadCount)) + 2

// Let's not go overboard with too large limit and cap it at 100k. With current defaults, the
// value will be 65000/10.1*(4+2) = 38613.
return Math.min(100_000, Math.floor(maxCpuSamples * factor))
}

class GCDecorator {
constructor (stringTable) {
this.stringTable = stringTable
Expand Down Expand Up @@ -181,12 +202,13 @@ const decoratorTypes = {

// Translates performance entries into pprof samples.
class EventSerializer {
constructor () {
constructor (maxSamples) {
this.stringTable = new StringTable()
this.samples = []
this.locations = []
this.functions = []
this.decorators = {}
this.maxSamples = maxSamples

// A synthetic single-frame location to serve as the location for timeline
// samples. We need these as the profiling backend (mimicking official pprof
Expand All @@ -204,6 +226,29 @@ class EventSerializer {
}

addEvent (item) {
if (this.samples.length < this.maxSamples) {
const sample = this.createSample(item)
if (sample !== undefined) {
this.samples.push(sample)
}
} else {
// Choose one sample to be dropped. Using rnd(max + 1) - 1 we allow the
// current sample too to be the dropped one, with the equal likelihood as
// any other sample.
const replacementIndex = Math.floor(Math.random() * (this.maxSamples + 1)) - 1
if (replacementIndex !== -1) {
const sample = this.createSample(item)
if (sample !== undefined) {
// This will cause the samples to no longer be sorted in their array
// by their end time. This is fine as the backend has no ordering
// expectations.
this.samples[replacementIndex] = sample
}
}
}
}

createSample (item) {
const { entryType, startTime, duration, _ddSpanId, _ddRootSpanId } = item
let decorator = this.decorators[entryType]
if (!decorator) {
Expand Down Expand Up @@ -236,7 +281,7 @@ class EventSerializer {
label
}
decorator.decorateSample(sampleInput, item)
this.samples.push(new Sample(sampleInput))
return new Sample(sampleInput)
}

createProfile (startDate, endDate) {
Expand Down Expand Up @@ -324,6 +369,27 @@ class DatadogInstrumentationEventSource {
}
}

class TestEventSource {
constructor (eventHandler) {
this.eventHandler = eventHandler
this.started = false
}

start () {
if (!this.started) {
testEventChannel.subscribe(this.eventHandler)
this.started = true
}
}

stop () {
if (this.started) {
testEventChannel.unsubscribe(this.eventHandler)
this.started = false
}
}
}

class CompositeEventSource {
constructor (sources) {
this.sources = sources
Expand All @@ -338,7 +404,7 @@ class CompositeEventSource {
}
}

function createPossionProcessSamplingFilter (samplingIntervalMillis) {
function createPoissonProcessSamplingFilter (samplingIntervalMillis) {
let nextSamplingInstant = performance.now()
let currentSamplingInstant = 0
setNextSamplingInstant()
Expand Down Expand Up @@ -377,28 +443,35 @@ function createPossionProcessSamplingFilter (samplingIntervalMillis) {
class EventsProfiler {
constructor (options = {}) {
this.type = 'events'
this.eventSerializer = new EventSerializer()
this.maxSamples = getMaxSamples(options)
this.eventSerializer = new EventSerializer(this.maxSamples)

const eventHandler = event => this.eventSerializer.addEvent(event)
const eventFilter = options.timelineSamplingEnabled
// options.samplingInterval comes in microseconds, we need millis
? createPossionProcessSamplingFilter((options.samplingInterval ?? 1e6 / 99) / 1000)
? createPoissonProcessSamplingFilter((options.samplingInterval ?? 1e6 / 99) / 1000)
: _ => true
const filteringEventHandler = event => {
if (eventFilter(event)) {
eventHandler(event)
}
}

this.eventSource = options.codeHotspotsEnabled
const eventSources = options.codeHotspotsEnabled
// Use Datadog instrumentation to collect events with span IDs. Still use
// Node API for GC events.
? new CompositeEventSource([
new DatadogInstrumentationEventSource(eventHandler, eventFilter),
new NodeApiEventSource(filteringEventHandler, ['gc'])
])
? [
new DatadogInstrumentationEventSource(eventHandler, eventFilter),
new NodeApiEventSource(filteringEventHandler, ['gc']),
]
// Use Node API instrumentation to collect events without span IDs
: new NodeApiEventSource(filteringEventHandler)
: [
new NodeApiEventSource(filteringEventHandler)
]
if (testEventChannel !== undefined) {
eventSources.push(new TestEventSource(filteringEventHandler))
}
this.eventSource = new CompositeEventSource(eventSources)
}

start () {
Expand All @@ -414,13 +487,19 @@ class EventsProfiler {
this.stop()
}
const thatEventSerializer = this.eventSerializer
this.eventSerializer = new EventSerializer()
this.eventSerializer = new EventSerializer(this.maxSamples)
return () => thatEventSerializer.createProfile(startDate, endDate)
}

encode (profile) {
return encodeProfileAsync(profile())
}

static emitTestEvent (event) {
if (testEventChannel !== undefined) {
testEventChannel.publish(event)
}
}
}

module.exports = EventsProfiler
2 changes: 1 addition & 1 deletion packages/dd-trace/src/profiling/profilers/wall.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function ensureChannelsActivated () {
class NativeWallProfiler {
constructor (options = {}) {
this.type = 'wall'
this._samplingIntervalMicros = options.samplingInterval || 1e6 / 99 // 99hz
this._samplingIntervalMicros = (options.samplingInterval || 1e3 / 99) * 1000 // 99hz
this._flushIntervalMillis = options.flushInterval || 60 * 1e3 // 60 seconds
this._codeHotspotsEnabled = !!options.codeHotspotsEnabled
this._endpointCollectionEnabled = !!options.endpointCollectionEnabled
Expand Down
Loading
Loading