Skip to content

Commit 35c6005

Browse files
authored
feat(worker): Retain meta on log entries forwarded from core (#1225)
1 parent 2b585b6 commit 35c6005

File tree

7 files changed

+142
-6
lines changed

7 files changed

+142
-6
lines changed

packages/core-bridge/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core-bridge/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ opentelemetry = "0.18"
2424
parking_lot = "0.12"
2525
prost = "0.11"
2626
prost-types = "0.11"
27+
serde_json = "1.0"
2728
tokio = "1.13"
2829
once_cell = "1.7.2"
2930
temporal-sdk-core = { version = "*", path = "./sdk-core/core", features = ["ephemeral-server"] }

packages/core-bridge/src/helpers.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,70 @@ where
187187
cx.throw_type_error::<_, Vec<u8>>(format!("Invalid or missing {}", full_attr_path))
188188
}
189189
}
190+
191+
// Recursively convert a Serde value to a JS value
192+
pub fn serde_value_to_js_value<'a>(cx: &mut impl Context<'a>, val: serde_json::Value) -> JsResult<'a, JsValue> {
193+
match val {
194+
serde_json::Value::String(s) => Ok(cx.string(s).upcast()),
195+
serde_json::Value::Number(n) => Ok(cx.number(n.as_f64().unwrap()).upcast()),
196+
serde_json::Value::Bool(b) => Ok(cx.boolean(b).upcast()),
197+
serde_json::Value::Null => Ok(cx.null().upcast()),
198+
serde_json::Value::Array(vec ) => {
199+
let arr: Handle<'a, JsArray> = JsArray::new(cx, vec.len() as u32);
200+
for (i, v) in vec.into_iter().enumerate() {
201+
let v = serde_value_to_js_value(cx, v)?;
202+
arr.set(cx, i as u32, v)?;
203+
}
204+
Ok(arr.upcast())
205+
}
206+
serde_json::Value::Object(map) => {
207+
hashmap_to_js_value(cx, map).map(|v| v.upcast())
208+
}
209+
}
210+
}
211+
212+
pub fn hashmap_to_js_value<'a>(cx: &mut impl Context<'a>, map: impl IntoIterator<Item = (String, serde_json::Value)>) -> JsResult<'a, JsObject> {
213+
let obj: Handle<'a, JsObject> = cx.empty_object();
214+
for (k, v) in map {
215+
let k = cx.string(snake_to_camel(k));
216+
let v = serde_value_to_js_value(cx, v)?;
217+
obj.set(cx, k, v)?;
218+
}
219+
Ok(obj)
220+
}
221+
222+
fn snake_to_camel(input: String) -> String {
223+
match input.find('_') {
224+
None => input,
225+
Some(first) => {
226+
let mut result = String::with_capacity(input.len());
227+
if first > 0 {
228+
result.push_str(&input[..first]);
229+
}
230+
let mut capitalize = true;
231+
for c in input[first+1..].chars() {
232+
if c == '_' {
233+
capitalize = true;
234+
} else if capitalize {
235+
result.push(c.to_ascii_uppercase());
236+
capitalize = false;
237+
} else {
238+
result.push(c.to_ascii_lowercase());
239+
}
240+
}
241+
result
242+
}
243+
}
244+
}
245+
246+
#[cfg(test)]
247+
mod tests {
248+
use super::*;
249+
250+
#[test]
251+
fn snake_to_camel_works() {
252+
assert_eq!(snake_to_camel("this_is_a_test".into()), "thisIsATest");
253+
assert_eq!(snake_to_camel("this___IS_a_TEST".into()), "thisIsATest");
254+
assert_eq!(snake_to_camel("éàç_this_is_a_test".into()), "éàçThisIsATest");
255+
}
256+
}

packages/core-bridge/src/runtime.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,24 @@ pub fn start_bridge_loop(
195195
send_result(channel.clone(), callback, |cx| {
196196
let logarr = cx.empty_array();
197197
for (i, cl) in logs.into_iter().enumerate() {
198-
// Not much to do here except for panic when there's an
199-
// error here.
198+
// Not much to do here except for panic when there's an error here.
200199
let logobj = cx.empty_object();
200+
201201
let level = cx.string(cl.level.to_string());
202202
logobj.set(cx, "level", level).unwrap();
203+
203204
let ts = system_time_to_js(cx, cl.timestamp).unwrap();
204205
logobj.set(cx, "timestamp", ts).unwrap();
206+
205207
let msg = cx.string(cl.message);
206208
logobj.set(cx, "message", msg).unwrap();
209+
210+
let fieldsobj = hashmap_to_js_value(cx, cl.fields);
211+
logobj.set(cx, "fields", fieldsobj.unwrap()).unwrap();
212+
213+
let target = cx.string(cl.target);
214+
logobj.set(cx, "target", target).unwrap();
215+
207216
logarr.set(cx, i as u32, logobj).unwrap();
208217
}
209218
Ok(logarr)

packages/core-bridge/ts/index.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,10 @@ export interface WorkerOptions {
344344
maxActivitiesPerSecond?: number;
345345
}
346346

347+
export type LogEntryMetadata = {
348+
[key: string]: string | number | boolean | LogEntryMetadata;
349+
};
350+
347351
export interface LogEntry {
348352
/** Log message */
349353
message: string;
@@ -353,8 +357,15 @@ export interface LogEntry {
353357
* Should be switched to bigint once it is supported in neon.
354358
*/
355359
timestamp: [number, number];
360+
356361
/** Log level */
357362
level: LogLevel;
363+
364+
/** Name of the Core subsystem that emitted that log entry */
365+
target: string;
366+
367+
/*** Metadata fields */
368+
fields: LogEntryMetadata;
358369
}
359370

360371
/**

packages/test/src/test-runtime.ts

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
*/
55
import test from 'ava';
66
import { v4 as uuid4 } from 'uuid';
7-
import { Runtime, DefaultLogger } from '@temporalio/worker';
8-
import { WorkflowClient } from '@temporalio/client';
7+
import asyncRetry from 'async-retry';
8+
import { Runtime, DefaultLogger, LogEntry, makeTelemetryFilterString } from '@temporalio/worker';
9+
import { Client, WorkflowClient } from '@temporalio/client';
910
import { defaultOptions } from './mock-native-worker';
1011
import * as workflows from './workflows';
1112
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
@@ -73,6 +74,50 @@ if (RUN_INTEGRATION_TESTS) {
7374
}
7475
});
7576

77+
test.serial('Runtime.instance() Core forwarded logs contains metadata', async (t) => {
78+
const logEntries: LogEntry[] = [];
79+
const logger = new DefaultLogger('DEBUG', (entry) => logEntries.push(entry));
80+
Runtime.install({
81+
logger,
82+
telemetryOptions: { logging: { forward: {}, filter: makeTelemetryFilterString({ core: 'DEBUG' }) } },
83+
});
84+
try {
85+
{
86+
const runtime = Runtime.instance();
87+
t.is(runtime.options.logger, logger);
88+
}
89+
await new Client().workflow.start('not-existant', { taskQueue: 'q1', workflowId: uuid4() });
90+
const worker = await Worker.create({
91+
...defaultOptions,
92+
taskQueue: 'q1',
93+
});
94+
await worker.runUntil(() =>
95+
asyncRetry(
96+
() => {
97+
if (!logEntries.some((x) => x.message === 'Failing workflow task'))
98+
throw new Error('Waiting for failing workflow task');
99+
},
100+
{ minTimeout: 100, factor: 1, maxTimeout: 5000 }
101+
)
102+
);
103+
104+
const initWorkerEntry = logEntries.filter((x) => x.message === 'Initializing worker')?.[0];
105+
t.true(initWorkerEntry !== undefined);
106+
t.is(initWorkerEntry.meta?.['taskQueue'], 'q1');
107+
108+
const failingWftEntry = logEntries.filter((x) => x.message === 'Failing workflow task')?.[0];
109+
t.true(failingWftEntry !== undefined);
110+
t.is(failingWftEntry.meta?.['taskQueue'], 'q1');
111+
t.is(typeof failingWftEntry.meta?.['completion'], 'string');
112+
t.is(typeof failingWftEntry.meta?.['failure'], 'string');
113+
t.is(typeof failingWftEntry.meta?.['runId'], 'string');
114+
t.is(typeof failingWftEntry.meta?.['workflowId'], 'string');
115+
t.is(typeof failingWftEntry.meta?.['subsystem'], 'string');
116+
} finally {
117+
await Runtime.instance().shutdown();
118+
}
119+
});
120+
76121
test.serial('Runtime.instance() throws meaningful error when passed invalid tracing.otel.url', (t) => {
77122
t.throws(() => Runtime.install({ telemetryOptions: { tracing: { otel: { url: ':invalid' } } } }), {
78123
instanceOf: TypeError,

packages/worker/src/runtime.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
OtelCollectorExporter,
1717
} from '@temporalio/core-bridge';
1818
import { filterNullAndUndefined, normalizeTlsConfig } from '@temporalio/common/lib/internal-non-workflow';
19-
import { IllegalStateError } from '@temporalio/common';
19+
import { IllegalStateError, LogMetadata } from '@temporalio/common';
2020
import { temporal } from '@temporalio/proto';
2121
import { History } from '@temporalio/common/lib/proto-utils';
2222
import { msToNumber } from '@temporalio/common/lib/time';
@@ -273,8 +273,10 @@ export class Runtime {
273273
const doPoll = async () => {
274274
const logs = await poll(this.native);
275275
for (const log of logs) {
276-
const meta: Record<string | symbol, unknown> = {
276+
const meta: LogMetadata = {
277277
[LogTimestamp]: timeOfDayToBigint(log.timestamp),
278+
subsystem: log.target,
279+
...log.fields,
278280
};
279281
logger.log(log.level, log.message, meta);
280282
}

0 commit comments

Comments
 (0)