Skip to content

Commit 25a7695

Browse files
authored
fix(worker): Fix prometheus and otel regressions (#1345)
1 parent 6b35787 commit 25a7695

File tree

8 files changed

+213
-82
lines changed

8 files changed

+213
-82
lines changed

packages/core-bridge/src/conversions.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ impl ArrayHandleConversionsExt for Handle<'_, JsArray> {
4444
}
4545
}
4646

47-
pub(crate) type TelemOptsRes = (
48-
TelemetryOptions,
49-
Option<Box<dyn FnOnce() -> Arc<dyn CoreMeter> + Send>>,
50-
);
47+
type BoxedMeterMaker = Box<dyn FnOnce() -> Result<Arc<dyn CoreMeter>, String> + Send + Sync>;
48+
49+
pub(crate) type TelemOptsRes = (TelemetryOptions, Option<BoxedMeterMaker>);
5150

5251
pub trait ObjectHandleConversionsExt {
5352
fn set_default(&self, cx: &mut FunctionContext, key: &str, value: &str) -> NeonResult<()>;
@@ -220,20 +219,24 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
220219
.unwrap_err()
221220
})?;
222221

223-
meter_maker = Some(Box::new(move || {
224-
let prom_info = start_prometheus_metric_exporter(options)
225-
.expect("Failed creating prometheus exporter");
226-
prom_info.meter as Arc<dyn CoreMeter>
227-
})
228-
as Box<dyn FnOnce() -> Arc<dyn CoreMeter> + Send>);
222+
meter_maker =
223+
Some(
224+
Box::new(move || match start_prometheus_metric_exporter(options) {
225+
Ok(prom_info) => Ok(prom_info.meter as Arc<dyn CoreMeter>),
226+
Err(e) => Err(format!("Failed to start prometheus exporter: {}", e)),
227+
}) as BoxedMeterMaker,
228+
);
229229
} else if let Some(ref otel) = js_optional_getter!(cx, metrics, "otel", JsObject) {
230230
let mut options = OtelCollectorOptionsBuilder::default();
231231

232232
let url = js_value_getter!(cx, otel, "url", JsString);
233233
match Url::parse(&url) {
234234
Ok(url) => options.url(url),
235-
Err(_) => {
236-
return cx.throw_type_error("Invalid telemetryOptions.metrics.otel.url");
235+
Err(e) => {
236+
return cx.throw_type_error(format!(
237+
"Invalid telemetryOptions.metrics.otel.url: {}",
238+
e
239+
))?;
237240
}
238241
};
239242

@@ -269,11 +272,10 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
269272
.unwrap_err()
270273
})?;
271274

272-
meter_maker = Some(Box::new(move || {
273-
let otlp_exporter =
274-
build_otlp_metric_exporter(options).expect("Failed to build otlp exporter");
275-
Arc::new(otlp_exporter) as Arc<dyn CoreMeter>
276-
}));
275+
meter_maker = Some(Box::new(move || match build_otlp_metric_exporter(options) {
276+
Ok(otlp_exporter) => Ok(Arc::new(otlp_exporter) as Arc<dyn CoreMeter>),
277+
Err(e) => Err(format!("Failed to start otlp exporter: {}", e)),
278+
}) as BoxedMeterMaker);
277279
} else {
278280
cx.throw_type_error(
279281
"Invalid telemetryOptions.metrics, missing `prometheus` or `otel` option",

packages/core-bridge/src/runtime.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use temporal_sdk_core::{
1919
replay::{HistoryForReplay, ReplayWorkerInput},
2020
ClientOptions, RetryClient, WorkerConfig,
2121
};
22+
use tokio::sync::oneshot;
2223
use tokio::sync::{
2324
mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
2425
Mutex,
@@ -119,6 +120,7 @@ pub fn start_bridge_loop(
119120
telemetry_options: TelemOptsRes,
120121
channel: Arc<Channel>,
121122
receiver: &mut UnboundedReceiver<RuntimeRequest>,
123+
result_sender: oneshot::Sender<Result<(), String>>,
122124
) {
123125
let mut tokio_builder = tokio::runtime::Builder::new_multi_thread();
124126
tokio_builder.enable_all().thread_name("core");
@@ -129,10 +131,24 @@ pub fn start_bridge_loop(
129131

130132
core_runtime.tokio_handle().block_on(async {
131133
if let Some(meter_maker) = meter_maker {
132-
core_runtime
133-
.telemetry_mut()
134-
.attach_late_init_metrics(meter_maker());
134+
match meter_maker() {
135+
Ok(meter) => {
136+
core_runtime.telemetry_mut().attach_late_init_metrics(meter);
137+
}
138+
Err(err) => {
139+
result_sender
140+
.send(Err(format!("Failed to create meter: {}", err)))
141+
.unwrap_or_else(|_| {
142+
panic!("Failed to report runtime start error: {}", err)
143+
});
144+
return;
145+
}
146+
}
135147
}
148+
result_sender
149+
.send(Ok(()))
150+
.expect("Failed to report runtime start success");
151+
136152
loop {
137153
let request_option = receiver.recv().await;
138154
let request = match request_option {
@@ -386,7 +402,19 @@ pub fn runtime_new(mut cx: FunctionContext) -> JsResult<BoxedRuntime> {
386402
let channel = Arc::new(cx.channel());
387403
let (sender, mut receiver) = unbounded_channel::<RuntimeRequest>();
388404

389-
std::thread::spawn(move || start_bridge_loop(telemetry_options, channel, &mut receiver));
405+
// FIXME: This is a temporary fix to get sync notifications of errors while initializing the runtime.
406+
// The proper fix would be to avoid spawning a new thread here, so that start_bridge_loop
407+
// can simply yeild back a Result. But early attempts to do just that caused panics
408+
// on runtime shutdown, so let's use this hack until we can dig deeper.
409+
let (result_sender, result_receiver) = oneshot::channel::<Result<(), String>>();
410+
411+
std::thread::spawn(move || {
412+
start_bridge_loop(telemetry_options, channel, &mut receiver, result_sender)
413+
});
414+
415+
if let Ok(Err(e)) = result_receiver.blocking_recv() {
416+
Err(cx.throw_error::<_, String>(e).unwrap_err())?;
417+
}
390418

391419
Ok(cx.boxed(Arc::new(RuntimeHandle { sender })))
392420
}

packages/test/src/helpers.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import * as net from 'net';
12
import path from 'path';
23
import StackUtils from 'stack-utils';
34
import ava, { TestFn } from 'ava';
@@ -93,6 +94,7 @@ export const bundlerOptions = {
9394
'@grpc/grpc-js',
9495
'async-retry',
9596
'uuid',
97+
'net',
9698
],
9799
};
98100

@@ -183,3 +185,18 @@ export async function registerDefaultCustomSearchAttributes(connection: Connecti
183185
const timeTaken = Date.now() - startTime;
184186
console.log(`... Registered (took ${timeTaken / 1000} sec)!`);
185187
}
188+
189+
export async function getRandomPort(fn = (_port: number) => Promise.resolve()): Promise<number> {
190+
return new Promise<number>((resolve, reject) => {
191+
const srv = net.createServer();
192+
srv.listen({ port: 0, host: '127.0.0.1' }, () => {
193+
const addr = srv.address();
194+
if (typeof addr === 'string' || addr === null) {
195+
throw new Error('Unexpected server address type');
196+
}
197+
fn(addr.port)
198+
.catch((e) => reject(e))
199+
.finally(() => srv.close((_) => resolve(addr.port)));
200+
});
201+
});
202+
}

packages/test/src/test-otel.ts

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
/**
33
* Manual tests to inspect tracing output
44
*/
5+
import * as http2 from 'http2';
56
import { SpanStatusCode } from '@opentelemetry/api';
67
import { ExportResultCode } from '@opentelemetry/core';
78
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
@@ -17,11 +18,98 @@ import {
1718
} from '@temporalio/interceptors-opentelemetry/lib/worker';
1819
import { OpenTelemetrySinks, SpanName, SPAN_DELIMITER } from '@temporalio/interceptors-opentelemetry/lib/workflow';
1920
import { DefaultLogger, InjectedSinks, Runtime } from '@temporalio/worker';
21+
import { TestWorkflowEnvironment } from '@temporalio/testing';
2022
import * as activities from './activities';
2123
import { ConnectionInjectorInterceptor } from './activities/interceptors';
2224
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
2325
import * as workflows from './workflows';
2426

27+
async function withHttp2Server(
28+
fn: (port: number) => Promise<void>,
29+
requestListener?: (request: http2.Http2ServerRequest, response: http2.Http2ServerResponse) => void
30+
): Promise<void> {
31+
return new Promise<void>((resolve, reject) => {
32+
const srv = http2.createServer();
33+
srv.listen({ port: 0, host: '127.0.0.1' }, () => {
34+
const addr = srv.address();
35+
if (typeof addr === 'string' || addr === null) {
36+
throw new Error('Unexpected server address type');
37+
}
38+
srv.on('request', async (req, res) => {
39+
if (requestListener) await requestListener(req, res);
40+
res.end();
41+
});
42+
fn(addr.port)
43+
.catch((e) => reject(e))
44+
.finally(() => srv.close((_) => resolve()));
45+
});
46+
});
47+
}
48+
49+
test.serial('Runtime.install() throws meaningful error when passed invalid metrics.otel.url', async (t) => {
50+
t.throws(() => Runtime.install({ telemetryOptions: { metrics: { otel: { url: ':invalid' } } } }), {
51+
instanceOf: TypeError,
52+
message: /Invalid telemetryOptions.metrics.otel.url/,
53+
});
54+
});
55+
56+
test.serial('Runtime.install() accepts metrics.otel.url without headers', async (t) => {
57+
try {
58+
Runtime.install({ telemetryOptions: { metrics: { otel: { url: 'http://127.0.0.1:1234' } } } });
59+
t.pass();
60+
} finally {
61+
// Cleanup the runtime so that it doesn't interfere with other tests
62+
await Runtime._instance?.shutdown();
63+
}
64+
});
65+
66+
test.serial('Exporting OTEL metrics from Core works', async (t) => {
67+
let resolveCapturedRequest = (_req: http2.Http2ServerRequest) => undefined as void;
68+
const capturedRequest = new Promise<http2.Http2ServerRequest>((r) => (resolveCapturedRequest = r));
69+
await withHttp2Server(async (port: number) => {
70+
Runtime.install({
71+
telemetryOptions: {
72+
metrics: {
73+
otel: {
74+
url: `http://127.0.0.1:${port}`,
75+
headers: {
76+
'x-test-header': 'test-value',
77+
},
78+
metricsExportInterval: 10,
79+
},
80+
},
81+
},
82+
});
83+
84+
const localEnv = await TestWorkflowEnvironment.createLocal();
85+
try {
86+
const worker = await Worker.create({
87+
connection: localEnv.nativeConnection,
88+
workflowsPath: require.resolve('./workflows'),
89+
taskQueue: 'test-otel',
90+
});
91+
const client = new WorkflowClient({
92+
connection: localEnv.connection,
93+
});
94+
await worker.runUntil(async () => {
95+
await client.execute(workflows.successString, {
96+
taskQueue: 'test-otel',
97+
workflowId: uuid4(),
98+
});
99+
const req = await Promise.race([
100+
capturedRequest,
101+
await new Promise<undefined>((resolve) => setTimeout(() => resolve(undefined), 2000)),
102+
]);
103+
t.truthy(req);
104+
t.is(req?.url, '/opentelemetry.proto.collector.metrics.v1.MetricsService/Export');
105+
t.is(req?.headers['x-test-header'], 'test-value');
106+
});
107+
} finally {
108+
await localEnv.teardown();
109+
}
110+
}, resolveCapturedRequest);
111+
});
112+
25113
if (RUN_INTEGRATION_TESTS) {
26114
test.serial('Otel interceptor spans are connected and complete', async (t) => {
27115
const spans = Array<opentelemetry.tracing.ReadableSpan>();
@@ -178,7 +266,7 @@ if (RUN_INTEGRATION_TESTS) {
178266
});
179267
await worker.runUntil(client.execute(workflows.smorgasbord, { taskQueue: 'test-otel', workflowId: uuid4() }));
180268
// Allow some time to ensure spans are flushed out to collector
181-
await new Promise((resolve) => setTimeout(resolve, 5000));
269+
await new Promise<void>((resolve) => setTimeout(resolve, 5000));
182270
t.pass();
183271
});
184272

packages/test/src/test-prometheus.ts

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,57 @@
1-
import * as net from 'net';
21
import test from 'ava';
32
import { v4 as uuid4 } from 'uuid';
43
import fetch from 'node-fetch';
54
import { WorkflowClient } from '@temporalio/client';
6-
import { NativeConnection, Runtime } from '@temporalio/worker';
7-
import * as activities from './activities';
8-
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
5+
import { Runtime } from '@temporalio/worker';
6+
import { TestWorkflowEnvironment } from '@temporalio/testing';
7+
import { Worker, getRandomPort } from './helpers';
98
import * as workflows from './workflows';
109

11-
async function getRandomPort(): Promise<number> {
12-
return new Promise<number>((res) => {
13-
const srv = net.createServer();
14-
srv.listen(0, () => {
15-
const addr = srv.address();
16-
if (typeof addr === 'string' || addr === null) {
17-
throw new Error('Unexpected server address type');
18-
}
19-
srv.close((_) => res(addr.port));
20-
});
10+
test.serial('Runtime.install() throws meaningful error when passed invalid metrics.prometheus.bindAddress', (t) => {
11+
t.throws(() => Runtime.install({ telemetryOptions: { metrics: { prometheus: { bindAddress: ':invalid' } } } }), {
12+
instanceOf: TypeError,
13+
message: 'Invalid telemetryOptions.metrics.prometheus.bindAddress',
2114
});
22-
}
15+
});
16+
17+
test.serial(
18+
'Runtime.install() throws meaningful error when metrics.prometheus.bindAddress port is already taken',
19+
async (t) => {
20+
await getRandomPort(async (port: number) => {
21+
t.throws(
22+
() => Runtime.install({ telemetryOptions: { metrics: { prometheus: { bindAddress: `127.0.0.1:${port}` } } } }),
23+
{
24+
instanceOf: Error,
25+
message: /(Address already in use|socket address)/,
26+
}
27+
);
28+
});
29+
}
30+
);
2331

24-
if (RUN_INTEGRATION_TESTS) {
25-
test.serial('Prometheus metrics work', async (t) => {
26-
const port = await getRandomPort();
27-
Runtime.install({
28-
telemetryOptions: {
29-
metrics: {
30-
prometheus: {
31-
bindAddress: `127.0.0.1:${port}`,
32-
},
32+
test.serial('Exporting Prometheus metrics from Core works', async (t) => {
33+
const port = await getRandomPort();
34+
Runtime.install({
35+
telemetryOptions: {
36+
metrics: {
37+
prometheus: {
38+
bindAddress: `127.0.0.1:${port}`,
3339
},
3440
},
35-
});
36-
const connection = await NativeConnection.connect({
37-
address: '127.0.0.1:7233',
38-
});
41+
},
42+
});
43+
const localEnv = await TestWorkflowEnvironment.createLocal();
44+
try {
3945
const worker = await Worker.create({
40-
connection,
46+
connection: localEnv.nativeConnection,
4147
workflowsPath: require.resolve('./workflows'),
42-
activities,
4348
taskQueue: 'test-prometheus',
4449
});
45-
46-
const client = new WorkflowClient();
50+
const client = new WorkflowClient({
51+
connection: localEnv.connection,
52+
});
4753
await worker.runUntil(async () => {
48-
await client.execute(workflows.cancelFakeProgress, {
54+
await client.execute(workflows.successString, {
4955
taskQueue: 'test-prometheus',
5056
workflowId: uuid4(),
5157
});
@@ -54,7 +60,7 @@ if (RUN_INTEGRATION_TESTS) {
5460
const text = await resp.text();
5561
t.assert(text.includes('task_slots'));
5662
});
57-
58-
t.pass();
59-
});
60-
}
63+
} finally {
64+
await localEnv.teardown();
65+
}
66+
});

0 commit comments

Comments
 (0)