From 7cf7dfba7f5993732205503fe00f1e9658db1b69 Mon Sep 17 00:00:00 2001 From: Attila Szegedi Date: Mon, 14 Jul 2025 16:45:34 +0200 Subject: [PATCH 1/4] libuv-size --- .../profiling/exporters/event_serializer.js | 23 +-------- packages/dd-trace/src/profiling/libuv-size.js | 49 ++++++++++++++++++ .../test/profiling/libuv-size.spec.js | 50 +++++++++++++++++++ 3 files changed, 101 insertions(+), 21 deletions(-) create mode 100644 packages/dd-trace/src/profiling/libuv-size.js create mode 100644 packages/dd-trace/test/profiling/libuv-size.spec.js diff --git a/packages/dd-trace/src/profiling/exporters/event_serializer.js b/packages/dd-trace/src/profiling/exporters/event_serializer.js index 58f028e7983..44c80033bf2 100644 --- a/packages/dd-trace/src/profiling/exporters/event_serializer.js +++ b/packages/dd-trace/src/profiling/exporters/event_serializer.js @@ -5,23 +5,7 @@ const os = require('os') const perf = require('perf_hooks').performance const version = require('../../../../../package.json').version -const { getEnvironmentVariable } = require('../../config-helper') - -const libuvThreadPoolSize = (() => { - const ss = getEnvironmentVariable('UV_THREADPOOL_SIZE') - if (ss === undefined) { - // Backend will apply the default size based on Node version. - return - } - // libuv uses atoi to parse the value, which is almost the same as parseInt, except that parseInt - // will return NaN on invalid input, while atoi will return 0. This is handled at return. - const s = Number.parseInt(ss) - // We don't interpret the value further here in the library. Backend will interpret the number - // based on Node version. In all currently known Node versions, 0 results in 1 worker thread, - // negative values (because they're assigned to an unsigned int) become very high positive values, - // and the value is finally capped at 1024. - return Number.isNaN(s) ? 0 : s -})() +const { availableParallelism, libuvThreadPoolSize } = require('../libuv-size') class EventSerializer { constructor ({ env, host, service, version, libraryInjected, activation } = {}) { @@ -77,10 +61,7 @@ class EventSerializer { version }, runtime: { - // os.availableParallelism only available in node 18.14.0/19.4.0 and above - available_processors: typeof os.availableParallelism === 'function' - ? os.availableParallelism() - : os.cpus().length, + available_processors: availableParallelism(), // Using `nodejs` for consistency with the existing `runtime` tag. // Note that the event `family` property uses `node`, as that's what's // proscribed by the Intake API, but that's an internal enum and is diff --git a/packages/dd-trace/src/profiling/libuv-size.js b/packages/dd-trace/src/profiling/libuv-size.js new file mode 100644 index 00000000000..d4b42ee2110 --- /dev/null +++ b/packages/dd-trace/src/profiling/libuv-size.js @@ -0,0 +1,49 @@ +'use strict' + +const { getEnvironmentVariable } = require('../config-helper') +const os = require('node:os') + +function getLibuvThreadPoolSize (envVar) { + if (envVar === undefined) { + return + } + // libuv uses atoi to parse the value, which is almost the same as parseInt, except that parseInt + // will return NaN on invalid input, while atoi will return 0. This is handled at return. + const s = Number.parseInt(envVar, 10) + // We don't interpret the value further here in the library. Backend will interpret the number + // based on Node version. + return Number.isNaN(s) ? 0 : s +} + +const libuvThreadPoolSize = getLibuvThreadPoolSize(getEnvironmentVariable('UV_THREADPOOL_SIZE')) + +function getEffectiveLibuvThreadCount (size) { + // In all currently known Node versions, 0 results in 1 worker thread, negative values (because + // they're assigned to an unsigned int) become very high positive values, and the value is finally + // capped at 1024. + if (size === undefined) { + return 4 + } else if (size < 0 || size > 1024) { + return 1024 + } else if (size === 0) { + return 1 + } + return size +} + +const effectiveLibuvThreadCount = getEffectiveLibuvThreadCount(libuvThreadPoolSize) + +function availableParallelism () { + // os.availableParallelism only available in node 18.14.0/19.4.0 and above + // eslint-disable-next-line n/no-unsupported-features/node-builtins + return typeof os.availableParallelism === 'function' ? os.availableParallelism() : os.cpus().length +} + +module.exports = { + availableParallelism, + effectiveLibuvThreadCount, + libuvThreadPoolSize, + // Only used for testing + getLibuvThreadPoolSize, + getEffectiveLibuvThreadCount +} diff --git a/packages/dd-trace/test/profiling/libuv-size.spec.js b/packages/dd-trace/test/profiling/libuv-size.spec.js new file mode 100644 index 00000000000..db42fef3e1e --- /dev/null +++ b/packages/dd-trace/test/profiling/libuv-size.spec.js @@ -0,0 +1,50 @@ +'use strict' + +require('../setup/tap') + +const { expect } = require('chai') +const libuvSize = require('../../src/profiling/libuv-size') + +describe('libuv-size', function () { + describe('getLibuvThreadPoolSize should return', function () { + it('undefined if no environment variable is set', function () { + expect(libuvSize.getLibuvThreadPoolSize()).to.equal(undefined) + }) + + it('0 for an empty environment variable', function () { + expect(libuvSize.getLibuvThreadPoolSize('')).to.equal(0) + }) + + it('0 for an invalid environment variable', function () { + expect(libuvSize.getLibuvThreadPoolSize('invalid')).to.equal(0) + }) + + it('a parsed numeric value', function () { + expect(libuvSize.getLibuvThreadPoolSize('100')).to.equal(100) + }) + }) + + describe('getEffectiveLibuvThreadPoolSize should return', function () { + it('the libuv thread pool size if set', function () { + expect(libuvSize.getEffectiveLibuvThreadCount(100)).to.equal(100) + }) + + it('the default value if not set', function () { + expect(libuvSize.getEffectiveLibuvThreadCount()).to.equal(4) + }) + + it('1 if set to 0', function () { + expect(libuvSize.getEffectiveLibuvThreadCount(0)).to.equal(1) + }) + + it('1024 if set to a negative value', function () { + expect(libuvSize.getEffectiveLibuvThreadCount(-1)).to.equal(1024) + expect(libuvSize.getEffectiveLibuvThreadCount(-100000)).to.equal(1024) + }) + + it('1024 if set to a very large value', function () { + expect(libuvSize.getEffectiveLibuvThreadCount(1025)).to.equal(1024) + expect(libuvSize.getEffectiveLibuvThreadCount(100000)).to.equal(1024) + }) + }) +}) From aebfedd4ea553fe79fd2c6c8fd6a1d250e80951a Mon Sep 17 00:00:00 2001 From: Attila Szegedi Date: Mon, 14 Jul 2025 16:50:21 +0200 Subject: [PATCH 2/4] Set default sampling interval in millis in config --- packages/dd-trace/src/profiling/config.js | 2 ++ packages/dd-trace/src/profiling/profilers/wall.js | 2 +- packages/dd-trace/test/profiling/profilers/wall.spec.js | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/dd-trace/src/profiling/config.js b/packages/dd-trace/src/profiling/config.js index 9cddf3769c7..b65940af3ba 100644 --- a/packages/dd-trace/src/profiling/config.js +++ b/packages/dd-trace/src/profiling/config.js @@ -171,6 +171,8 @@ class Config { samplingContextsAvailable)) checkOptionWithSamplingContextAllowed(this.cpuProfilingEnabled, 'CPU profiling') + this.samplingInterval = coalesce(options.samplingInterval, 1e3 / 99) // 99hz in millis + this.heapSamplingInterval = coalesce(options.heapSamplingInterval, Number(DD_PROFILING_HEAP_SAMPLING_INTERVAL)) const uploadCompression0 = coalesce(options.uploadCompression, DD_PROFILING_DEBUG_UPLOAD_COMPRESSION, 'on') diff --git a/packages/dd-trace/src/profiling/profilers/wall.js b/packages/dd-trace/src/profiling/profilers/wall.js index b619668566e..316a98a3298 100644 --- a/packages/dd-trace/src/profiling/profilers/wall.js +++ b/packages/dd-trace/src/profiling/profilers/wall.js @@ -71,7 +71,7 @@ function ensureChannelsActivated () { class NativeWallProfiler { constructor (options = {}) { this.type = 'wall' - this._samplingIntervalMicros = options.samplingInterval || 1e6 / 99 // 99hz + this._samplingIntervalMicros = (options.samplingInterval || 1e3 / 99) * 1000 // 99hz this._flushIntervalMillis = options.flushInterval || 60 * 1e3 // 60 seconds this._codeHotspotsEnabled = !!options.codeHotspotsEnabled this._endpointCollectionEnabled = !!options.endpointCollectionEnabled diff --git a/packages/dd-trace/test/profiling/profilers/wall.spec.js b/packages/dd-trace/test/profiling/profilers/wall.spec.js index dd8183c656f..822e25843f0 100644 --- a/packages/dd-trace/test/profiling/profilers/wall.spec.js +++ b/packages/dd-trace/test/profiling/profilers/wall.spec.js @@ -66,7 +66,7 @@ describe('profilers/native/wall', () => { }) it('should use the provided configuration options', () => { - const samplingInterval = 500 + const samplingInterval = 0.5 const profiler = new NativeWallProfiler({ samplingInterval }) profiler.start() From bf17096d95337e3714fd5b755b6bf62b91508c0e Mon Sep 17 00:00:00 2001 From: Attila Szegedi Date: Mon, 14 Jul 2025 16:50:49 +0200 Subject: [PATCH 3/4] Set an upper bound for the number of timeline events gathered within a single flush period --- .../src/profiling/profilers/events.js | 52 ++++++++++++++++--- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/packages/dd-trace/src/profiling/profilers/events.js b/packages/dd-trace/src/profiling/profilers/events.js index 440e12d493e..6f40ebc700e 100644 --- a/packages/dd-trace/src/profiling/profilers/events.js +++ b/packages/dd-trace/src/profiling/profilers/events.js @@ -3,6 +3,7 @@ const { performance, constants, PerformanceObserver } = require('perf_hooks') const { END_TIMESTAMP_LABEL, SPAN_ID_LABEL, LOCAL_ROOT_SPAN_ID_LABEL, encodeProfileAsync } = require('./shared') const { Function, Label, Line, Location, Profile, Sample, StringTable, ValueType } = require('pprof-format') +const { availableParallelism, effectiveLibuvThreadCount } = require('../libuv-size') // perf_hooks uses millis, with fractional part representing nanos. We emit nanos into the pprof file. const MS_TO_NS = 1_000_000 @@ -38,6 +39,20 @@ function labelFromStrStr (stringTable, keyStr, valStr) { return labelFromStr(stringTable, stringTable.dedup(keyStr), valStr) } +function getMaxSamples (options) { + const cpuSamplingInterval = (options.samplingInterval || 1e3 / 99) // 99hz + const flushInterval = options.flushInterval || 65 * 1e3 // 60 seconds + const maxCpuSamples = flushInterval / cpuSamplingInterval + + // The lesser of max parallelism and libuv thread pool size, plus one so we can detect + // oversubscription on libuv thread pool, plus another one for GC. + const factor = Math.max(1, Math.min(availableParallelism(), effectiveLibuvThreadCount)) + 2 + + // Let's not go overboard with too large limit and cap it at 100k. With current defaults, the + // value will be 65000/10.1*(4+2) = 38613. + return Math.min(100_000, Math.floor(maxCpuSamples * factor)) +} + class GCDecorator { constructor (stringTable) { this.stringTable = stringTable @@ -181,12 +196,13 @@ const decoratorTypes = { // Translates performance entries into pprof samples. class EventSerializer { - constructor () { + constructor (maxSamples) { this.stringTable = new StringTable() this.samples = [] this.locations = [] this.functions = [] this.decorators = {} + this.maxSamples = maxSamples // A synthetic single-frame location to serve as the location for timeline // samples. We need these as the profiling backend (mimicking official pprof @@ -204,6 +220,29 @@ class EventSerializer { } addEvent (item) { + if (this.samples.length < this.maxSamples) { + const sample = this.createSample(item) + if (sample !== undefined) { + this.samples.push(sample) + } + } else { + // Choose one sample to be dropped. Using rnd(max + 1) - 1 we allow the + // current sample too to be the dropped one, with the equal likelihood as + // any other sample. + const replacementIndex = Math.floor(Math.random() * (this.maxSamples + 1)) - 1 + if (replacementIndex !== -1) { + const sample = this.createSample(item) + if (sample !== undefined) { + // This will cause the samples to no longer be sorted in their array + // by their end time. This is fine as the backend has no ordering + // expectations. + this.samples[replacementIndex] = sample + } + } + } + } + + createSample (item) { const { entryType, startTime, duration, _ddSpanId, _ddRootSpanId } = item let decorator = this.decorators[entryType] if (!decorator) { @@ -236,7 +275,7 @@ class EventSerializer { label } decorator.decorateSample(sampleInput, item) - this.samples.push(new Sample(sampleInput)) + return new Sample(sampleInput) } createProfile (startDate, endDate) { @@ -338,7 +377,7 @@ class CompositeEventSource { } } -function createPossionProcessSamplingFilter (samplingIntervalMillis) { +function createPoissonProcessSamplingFilter (samplingIntervalMillis) { let nextSamplingInstant = performance.now() let currentSamplingInstant = 0 setNextSamplingInstant() @@ -377,12 +416,13 @@ function createPossionProcessSamplingFilter (samplingIntervalMillis) { class EventsProfiler { constructor (options = {}) { this.type = 'events' - this.eventSerializer = new EventSerializer() + this.maxSamples = getMaxSamples(options) + this.eventSerializer = new EventSerializer(this.maxSamples) const eventHandler = event => this.eventSerializer.addEvent(event) const eventFilter = options.timelineSamplingEnabled // options.samplingInterval comes in microseconds, we need millis - ? createPossionProcessSamplingFilter((options.samplingInterval ?? 1e6 / 99) / 1000) + ? createPoissonProcessSamplingFilter((options.samplingInterval ?? 1e6 / 99) / 1000) : _ => true const filteringEventHandler = event => { if (eventFilter(event)) { @@ -414,7 +454,7 @@ class EventsProfiler { this.stop() } const thatEventSerializer = this.eventSerializer - this.eventSerializer = new EventSerializer() + this.eventSerializer = new EventSerializer(this.maxSamples) return () => thatEventSerializer.createProfile(startDate, endDate) } From 23f4c494951fcd24e38f1b6a5e6ca7f890f5e349 Mon Sep 17 00:00:00 2001 From: Attila Szegedi Date: Mon, 14 Jul 2025 14:53:49 +0200 Subject: [PATCH 4/4] Testing --- integration-tests/profiler/eventlimits.js | 21 ++++++++ integration-tests/profiler/profiler.spec.js | 27 ++++++++++ .../src/profiling/profilers/events.js | 51 ++++++++++++++++--- 3 files changed, 93 insertions(+), 6 deletions(-) create mode 100644 integration-tests/profiler/eventlimits.js diff --git a/integration-tests/profiler/eventlimits.js b/integration-tests/profiler/eventlimits.js new file mode 100644 index 00000000000..a7373d114f3 --- /dev/null +++ b/integration-tests/profiler/eventlimits.js @@ -0,0 +1,21 @@ +'use strict' + +const performance = require('perf_hooks').performance + +const EVENT_COUNT = parseInt(process.argv[2]) +require('dd-trace').init().profilerStarted().then(() => { + const EventSource = require('dd-trace/packages/dd-trace/src/profiling/profilers/events.js') + const template = { + entryType: 'dns', + duration: 10, + name: 'lookup', + _ddSpanId: '1234567890abcdef', + _ddRootSpanId: 'abcdef1234567890', + detail: { + hostname: 'example.com' + } + } + for (let i = 0; i < EVENT_COUNT; i++) { + EventSource.emitTestEvent({ startTime: performance.now(), ...template }) + } +}) diff --git a/integration-tests/profiler/profiler.spec.js b/integration-tests/profiler/profiler.spec.js index c7a52eecdfd..af066cb6dab 100644 --- a/integration-tests/profiler/profiler.spec.js +++ b/integration-tests/profiler/profiler.spec.js @@ -454,6 +454,33 @@ describe('profiler', () => { assert.equal(endpoints.size, 3, encoded) }) + it('number of events is limited', async () => { + const threads = 1 + const uploadPeriodSec = 10 + const samplingPeriodMs = 1000 / 99 + // Recreates getMaxSamples from packages/dd-trace/src/profiling/profilers/events.js + const maxEvents = Math.floor((threads + 2) * uploadPeriodSec * 1000 / samplingPeriodMs) + + // Test will try to force maxEvents + 10 events, we'll assert that only maxEvents are captured. + const proc = fork(path.join(cwd, 'profiler/eventlimits.js'), [String(maxEvents + 10)], { + cwd, + env: { + DD_PROFILING_EXPORTERS: 'file', + DD_PROFILING_ENABLED: 1, + DD_INTERNAL_PROFILING_TIMELINE_SAMPLING_ENABLED: 0, // capture all events + DD_TRACE_AGENT_PORT: agent.port, + TESTING_PROFILING_EVENTS: 'true', // enable test event source + DD_PROFILING_UPLOAD_PERIOD: String(uploadPeriodSec), + UV_THREADPOOL_SIZE: String(threads) + } + }) + + await processExitPromise(proc, TIMEOUT) + + const { profile, encoded } = await getLatestProfile(cwd, /^events_.+\.pprof$/) + assert.equal(profile.sample.length, maxEvents, encoded) + }) + it('fs timeline events work', async () => { const fsEvents = await gatherFilesystemTimelineEvents(cwd, 'profiler/fstest.js', agent.port) assert.equal(fsEvents.length, 6) diff --git a/packages/dd-trace/src/profiling/profilers/events.js b/packages/dd-trace/src/profiling/profilers/events.js index 6f40ebc700e..d16b13a2d4b 100644 --- a/packages/dd-trace/src/profiling/profilers/events.js +++ b/packages/dd-trace/src/profiling/profilers/events.js @@ -4,6 +4,12 @@ const { performance, constants, PerformanceObserver } = require('perf_hooks') const { END_TIMESTAMP_LABEL, SPAN_ID_LABEL, LOCAL_ROOT_SPAN_ID_LABEL, encodeProfileAsync } = require('./shared') const { Function, Label, Line, Location, Profile, Sample, StringTable, ValueType } = require('pprof-format') const { availableParallelism, effectiveLibuvThreadCount } = require('../libuv-size') +const { getEnvironmentVariable } = require('../../config-helper') +const dc = require('dc-polyfill') + +const testEventChannel = ['true', '1'].includes(getEnvironmentVariable('TESTING_PROFILING_EVENTS')) + ? dc.channel('dd-trace:profiling:events-test') + : undefined // perf_hooks uses millis, with fractional part representing nanos. We emit nanos into the pprof file. const MS_TO_NS = 1_000_000 @@ -363,6 +369,27 @@ class DatadogInstrumentationEventSource { } } +class TestEventSource { + constructor (eventHandler) { + this.eventHandler = eventHandler + this.started = false + } + + start () { + if (!this.started) { + testEventChannel.subscribe(this.eventHandler) + this.started = true + } + } + + stop () { + if (this.started) { + testEventChannel.unsubscribe(this.eventHandler) + this.started = false + } + } +} + class CompositeEventSource { constructor (sources) { this.sources = sources @@ -430,15 +457,21 @@ class EventsProfiler { } } - this.eventSource = options.codeHotspotsEnabled + const eventSources = options.codeHotspotsEnabled // Use Datadog instrumentation to collect events with span IDs. Still use // Node API for GC events. - ? new CompositeEventSource([ - new DatadogInstrumentationEventSource(eventHandler, eventFilter), - new NodeApiEventSource(filteringEventHandler, ['gc']) - ]) + ? [ + new DatadogInstrumentationEventSource(eventHandler, eventFilter), + new NodeApiEventSource(filteringEventHandler, ['gc']), + ] // Use Node API instrumentation to collect events without span IDs - : new NodeApiEventSource(filteringEventHandler) + : [ + new NodeApiEventSource(filteringEventHandler) + ] + if (testEventChannel !== undefined) { + eventSources.push(new TestEventSource(filteringEventHandler)) + } + this.eventSource = new CompositeEventSource(eventSources) } start () { @@ -461,6 +494,12 @@ class EventsProfiler { encode (profile) { return encodeProfileAsync(profile()) } + + static emitTestEvent (event) { + if (testEventChannel !== undefined) { + testEventChannel.publish(event) + } + } } module.exports = EventsProfiler