diff --git a/core-api/src/builtin_queries.rs b/core-api/src/builtin_queries.rs new file mode 100644 index 000000000..0476baace --- /dev/null +++ b/core-api/src/builtin_queries.rs @@ -0,0 +1,145 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use temporal_sdk_core_protos::{ + coresdk::{ + workflow_activation::{workflow_activation_job, QueryWorkflow, WorkflowActivationJob}, + workflow_commands::{query_result, QueryResult, QuerySuccess}, + }, + ENHANCED_STACK_QUERY, +}; + +pub static FAKE_ENHANCED_STACK_QUERY_ID: &str = "__fake_enhanced_stack"; +// must start with the above +pub static FAKE_ENHANCED_STACK_QUERY_ID_FINAL_LEGACY: &str = "__fake_enhanced_stack_final"; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct SDKInfo { + pub name: String, + pub version: String, +} + +/// Represents a slice of a file starting at line_offset +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct FileSlice { + /// slice of a file with `\n` (newline) line terminator. + pub content: String, + /// Only used possible to trim the file without breaking syntax highlighting. + pub line_offset: u64, +} + +/// A pointer to a location in a file +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct FileLocation { + /// Path to source file (absolute or relative). + /// When using a relative path, make sure all paths are relative to the same root. + pub file_path: Option, + /// If possible, SDK should send this, required for displaying the code location. + pub line: Option, + /// If possible, SDK should send this. + pub column: Option, + /// Function name this line belongs to (if applicable). + /// Used for falling back to stack trace view. + pub function_name: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum InternalCommandType { + ScheduleActivity, + StartTimer, + StartChildWorkflow, +} + +/// An internal (Lang<->Core) command identifier +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InternalCommand { + pub r#type: InternalCommandType, + pub seq: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct InternalStackTrace { + pub locations: Vec, + pub commands: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct InternalEnhancedStackTrace { + pub sdk: SDKInfo, + /// Mapping of file path to file contents. SDK may choose to send no, some or all sources. + /// Sources might be trimmed, and some time only the file(s) of the top element of the trace + /// will be sent. + pub sources: HashMap>, + pub stacks: Vec, +} + +impl TryFrom<&QueryResult> for InternalEnhancedStackTrace { + type Error = (); + + fn try_from(qr: &QueryResult) -> Result { + if let Some(query_result::Variant::Succeeded(QuerySuccess { + response: Some(ref payload), + })) = qr.variant + { + if payload.is_json_payload() { + if let Ok(internal_trace) = + serde_json::from_slice::(payload.data.as_slice()) + { + return Ok(internal_trace); + } + } + } + Err(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct StackTrace { + pub locations: Vec, + pub correlating_event_ids: Vec, +} + +// Used as the result for the enhanced stack trace query +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct EnhancedStackTrace { + pub sdk: SDKInfo, + /// Mapping of file path to file contents. SDK may choose to send no, some or all sources. + /// Sources might be trimmed, and some time only the file(s) of the top element of the trace + /// will be sent. + pub sources: HashMap>, + pub stacks: Vec, +} + +// Used as the result for the time travel stack trace query +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct TimeTravelStackTrace { + pub sdk: SDKInfo, + /// Mapping of file path to file contents. SDK may choose to send no, some or all sources. + /// Sources might be trimmed, and some time only the file(s) of the top element of the trace + /// will be sent. + pub sources: HashMap>, + /// Maps WFT started event ids to active stack traces upon completion of that WFT + pub stacks: HashMap>, +} + +/// Generate an activation job with an enhanced stack trace query. +/// * `final_legacy` - Set to true if this is the final query before aggregation will be completed, +/// and we should respond to the *legacy query* with the aggregation result when ready. +pub fn enhanced_stack_query_job(final_legacy: bool) -> WorkflowActivationJob { + let id = if final_legacy { + FAKE_ENHANCED_STACK_QUERY_ID_FINAL_LEGACY + } else { + FAKE_ENHANCED_STACK_QUERY_ID + }; + workflow_activation_job::Variant::QueryWorkflow(QueryWorkflow { + query_id: id.to_string(), + query_type: ENHANCED_STACK_QUERY.to_string(), + arguments: vec![], + headers: Default::default(), + }) + .into() +} diff --git a/core-api/src/lib.rs b/core-api/src/lib.rs index 47533e9c8..17497c6ad 100644 --- a/core-api/src/lib.rs +++ b/core-api/src/lib.rs @@ -1,3 +1,4 @@ +pub mod builtin_queries; pub mod errors; pub mod telemetry; pub mod worker; diff --git a/core/src/abstractions.rs b/core/src/abstractions.rs index 3dffc363f..0f314f69c 100644 --- a/core/src/abstractions.rs +++ b/core/src/abstractions.rs @@ -268,6 +268,22 @@ macro_rules! dbg_panic { } pub(crate) use dbg_panic; +/// Get rid of me once https://doc.rust-lang.org/std/vec/struct.Vec.html#method.drain_filter is +/// stable +pub(crate) fn lame_drain_filter(vec: &mut Vec, pred: impl Fn(&T) -> bool) -> Vec { + let mut drained = vec![]; + let mut i = 0; + while i < vec.len() { + if pred(&vec[i]) { + let val = vec.remove(i); + drained.push(val); + } else { + i += 1; + } + } + drained +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/test_help/mod.rs b/core/src/test_help/mod.rs index 127671708..4863b102b 100644 --- a/core/src/test_help/mod.rs +++ b/core/src/test_help/mod.rs @@ -794,6 +794,7 @@ pub(crate) async fn poll_and_reply_clears_outstanding_evicts<'a>( // Eviction plus some work, we still want to issue the reply WorkflowActivationCompletion { run_id: res.run_id.clone(), + alternate_cache_key: "".to_string(), status: Some(reply.clone()), } }; diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 48051cfa1..8f49ac3a6 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -549,11 +549,12 @@ impl Worker { /// Request a workflow eviction pub(crate) fn request_wf_eviction( &self, - run_id: &str, + cache_key: &str, message: impl Into, reason: EvictionReason, ) { - self.workflows.request_eviction(run_id, message, reason); + self.workflows + .request_eviction(cache_key.parse().unwrap(), message, reason); } /// Sets a function to be called at the end of each activation completion @@ -583,8 +584,9 @@ impl Worker { ) } - fn notify_local_result(&self, run_id: &str, res: LocalResolution) { - self.workflows.notify_of_local_result(run_id, res); + fn notify_local_result(&self, cache_key: &str, res: LocalResolution) { + self.workflows + .notify_of_local_result(cache_key.parse().unwrap(), res); } } diff --git a/core/src/worker/workflow/history_update.rs b/core/src/worker/workflow/history_update.rs index 9d38493cd..bcf6a6a97 100644 --- a/core/src/worker/workflow/history_update.rs +++ b/core/src/worker/workflow/history_update.rs @@ -2,7 +2,7 @@ use crate::{ protosext::ValidPollWFTQResponse, worker::{ client::WorkerClient, - workflow::{CacheMissFetchReq, PermittedWFT, PreparedWFT}, + workflow::{run_cache::RunCacheKey, CacheMissFetchReq, PermittedWFT, PreparedWFT}, }, }; use futures::{future::BoxFuture, FutureExt, Stream}; @@ -17,9 +17,12 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use temporal_sdk_core_protos::temporal::api::{ - enums::v1::EventType, - history::v1::{history_event, History, HistoryEvent, WorkflowTaskCompletedEventAttributes}, +use temporal_sdk_core_protos::{ + temporal::api::{ + enums::v1::EventType, + history::v1::{history_event, History, HistoryEvent, WorkflowTaskCompletedEventAttributes}, + }, + TIME_TRAVEL_QUERY, }; use tracing::Instrument; @@ -73,7 +76,7 @@ pub enum NextWFT { } #[derive(derive_more::DebugCustom)] -#[debug(fmt = "HistoryPaginator(run_id: {run_id})")] +#[debug(fmt = "HistoryPaginator(cache_key: {cache_key})")] #[cfg_attr( feature = "save_wf_inputs", derive(serde::Serialize, serde::Deserialize), @@ -81,7 +84,7 @@ pub enum NextWFT { )] pub struct HistoryPaginator { pub(crate) wf_id: String, - pub(crate) run_id: String, + pub(crate) cache_key: RunCacheKey, pub(crate) previous_wft_started_id: i64, pub(crate) wft_started_event_id: i64, @@ -122,9 +125,27 @@ impl HistoryPaginator { /// Use a new poll response to create a new [WFTPaginator], returning it and the /// [PreparedWFT] extracted from it that can be fed into workflow state. pub(super) async fn from_poll( - wft: ValidPollWFTQResponse, + mut wft: ValidPollWFTQResponse, client: Arc, ) -> Result<(Self, PreparedWFT), tonic::Status> { + let mut alternate_cache = 0; + // Intercept and special time travel query + let mut all_queries = wft + .query_requests + .iter() + .map(|q| q.query_type.as_str()) + .chain(wft.legacy_query.iter().map(|q| q.query_type.as_str())); + if all_queries.any(|qt| qt == TIME_TRAVEL_QUERY) { + // If we see one of these, we need to fetch *all* history, attach the alternate cache + // key, and then break things into made up WFTs and issue enhanced stack trace queries + // for each one + info!("Time travel query from poll"); + + // This will force fetching all history. + wft.history.events = vec![]; + alternate_cache = 1; + } + let empty_hist = wft.history.events.is_empty(); let npt = if empty_hist { NextPageToken::FetchFromStart @@ -162,6 +183,7 @@ impl HistoryPaginator { legacy_query: wft.legacy_query, query_requests: wft.query_requests, update, + alternate_cache, }; Ok((paginator, prepared)) } @@ -172,7 +194,7 @@ impl HistoryPaginator { ) -> Result { let mut paginator = Self { wf_id: req.original_wft.work.execution.workflow_id.clone(), - run_id: req.original_wft.work.execution.run_id.clone(), + cache_key: req.original_wft.work.cache_key(), previous_wft_started_id: req.original_wft.work.update.previous_wft_started_id, wft_started_event_id: req.original_wft.work.update.wft_started_id, client, @@ -206,7 +228,9 @@ impl HistoryPaginator { client, event_queue, wf_id, - run_id, + // TODO: At least, with what's happening for this hack, the key never needs to start + // with an alternate, but this is a bit ugly + cache_key: run_id.parse().unwrap(), next_page_token, final_events, previous_wft_started_id, @@ -221,7 +245,10 @@ impl HistoryPaginator { client: Arc::new(mock_manual_workflow_client()), event_queue: Default::default(), wf_id: "".to_string(), - run_id: "".to_string(), + cache_key: RunCacheKey { + run_id: "".to_string(), + alternate_key: 0, + }, next_page_token: NextPageToken::FetchFromStart, final_events: vec![], previous_wft_started_id: -2, @@ -292,10 +319,14 @@ impl HistoryPaginator { NextPageToken::FetchFromStart => vec![], NextPageToken::Next(v) => v, }; - debug!(run_id=%self.run_id, "Fetching new history page"); + debug!(run_id=%self.cache_key, "Fetching new history page"); let fetch_res = self .client - .get_workflow_execution_history(self.wf_id.clone(), Some(self.run_id.clone()), npt) + .get_workflow_execution_history( + self.wf_id.clone(), + Some(self.cache_key.run_id.clone()), + npt, + ) .instrument(span!(tracing::Level::TRACE, "fetch_history_in_paginator")) .await?; diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index d9f671f28..f166225d0 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -27,6 +27,7 @@ use crate::{ upsert_search_attributes_state_machine::upsert_search_attrs_internal, HistEventData, }, + run_cache::RunCacheKey, CommandID, DrivenWorkflow, HistoryUpdate, InternalFlagsRef, LocalResolution, OutgoingJob, RunBasics, WFCommand, WFMachinesError, WorkflowFetcher, WorkflowStartedInfo, @@ -95,6 +96,8 @@ pub(crate) struct WorkflowMachines { pub workflow_type: String, /// Identifies the current run pub run_id: String, + /// Cache key used by this run TODO: Duplicates run_id + pub cache_key: RunCacheKey, /// The time the workflow execution began, as told by the WEStarted event workflow_start_time: Option, /// The time the workflow execution finished, as determined by when the machines handled @@ -120,7 +123,8 @@ pub(crate) struct WorkflowMachines { /// Maps command ids as created by workflow authors to their associated machines. id_to_machine: HashMap, - + // TODO: consdier mantaining this guy + // command_id_to_machine_id: HashMap, /// Queued commands which have been produced by machines and await processing / being sent to /// the server. commands: VecDeque, @@ -228,7 +232,8 @@ impl WorkflowMachines { namespace: basics.namespace, workflow_id: basics.workflow_id, workflow_type: basics.workflow_type, - run_id: basics.run_id, + run_id: basics.cache_key.run_id.clone(), + cache_key: basics.cache_key, drive_me: driven_wf, replaying, metrics: basics.metrics, @@ -273,6 +278,10 @@ impl WorkflowMachines { Ok(()) } + pub(crate) fn get_current_wft_started_id(&self) -> i64 { + self.current_started_event_id + } + /// Let this workflow know that something we've been waiting locally on has resolved, like a /// local activity or side effect /// @@ -366,6 +375,7 @@ impl WorkflowMachines { .iter() .copied() .collect(), + alternate_cache_key: self.cache_key.to_string(), } } @@ -1289,6 +1299,16 @@ impl WorkflowMachines { } attrs } + + pub fn command_id_to_event_id(&self, id: &CommandID) -> Option { + self.id_to_machine.get(id).and_then(|machine_id| { + self.machines_by_event_id + .iter() + .find(|(_, m_id)| &machine_id == m_id) + .map(|(event_id, _)| event_id) + .copied() + }) + } } fn str_to_randomness_seed(run_id: &str) -> u64 { diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 5335f6f28..8ef8ad8db 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -5,11 +5,12 @@ mod managed_wf_test; pub(crate) use managed_wf_test::ManagedWFFunc; use crate::{ - abstractions::dbg_panic, + abstractions::{dbg_panic, lame_drain_filter}, protosext::WorkflowActivationExt, worker::{ workflow::{ - history_update::HistoryPaginator, machines::WorkflowMachines, ActivationAction, + history_update::HistoryPaginator, internal_to_enhanced_stack_trace, + machines::WorkflowMachines, run_cache::RunCacheKey, ActivationAction, ActivationCompleteOutcome, ActivationCompleteResult, ActivationOrAuto, EvictionRequestResult, FailedActivationWFTReport, HeartbeatTimeoutMsg, HistoryUpdate, LocalActivityRequestSink, LocalResolution, NextPageReq, OutgoingServerCommands, @@ -22,6 +23,7 @@ use crate::{ MetricsContext, }; use futures_util::future::AbortHandle; +use itertools::{Either, Itertools}; use std::{ collections::HashSet, ops::Add, @@ -29,14 +31,18 @@ use std::{ sync::mpsc::Sender, time::{Duration, Instant}, }; +use temporal_sdk_core_api::builtin_queries::{ + enhanced_stack_query_job, TimeTravelStackTrace, FAKE_ENHANCED_STACK_QUERY_ID, + FAKE_ENHANCED_STACK_QUERY_ID_FINAL_LEGACY, +}; use temporal_sdk_core_protos::{ coresdk::{ workflow_activation::{ create_evict_activation, query_to_job, remove_from_cache::EvictionReason, workflow_activation_job, RemoveFromCache, WorkflowActivation, }, - workflow_commands::QueryResult, - workflow_completion, + workflow_commands::{QueryResult, QuerySuccess}, + workflow_completion, AsJsonPayloadExt, }, temporal::api::{enums::v1::WorkflowTaskFailedCause, failure::v1::Failure}, TaskToken, @@ -83,6 +89,7 @@ pub(super) struct ManagedRun { buffered_resp: Option, /// Is set if an eviction has been requested for this run trying_to_evict: Option, + started_id_upon_activation: i64, /// We track if we have recorded useful debugging values onto a certain span yet, to overcome /// duplicating field values. Remove this once https://github.com/tokio-rs/tracing/issues/2334 @@ -109,6 +116,7 @@ impl ManagedRun { activation: None, buffered_resp: None, trying_to_evict: None, + started_id_upon_activation: 0, recorded_span_ids: Default::default(), metrics, paginator: None, @@ -176,6 +184,13 @@ impl ManagedRun { wf_id: work.execution.workflow_id.clone(), }; + let time_travel_aggregation = if work.alternate_cache == 1 { + info!("Special incoming WFT for time travel replay"); + Some(TimeTravelStackTrace::default()) + } else { + None + }; + let legacy_query_from_poll = work .legacy_query .map(|q| query_to_job(LEGACY_QUERY_ID.to_string(), q)); @@ -204,6 +219,7 @@ impl ManagedRun { pending_queries, start_time, permit: pwft.permit, + time_travel_aggregation, }); // The update field is only populated in the event we hit the cache @@ -234,7 +250,7 @@ impl ManagedRun { } lawait.hb_timeout_handle.abort(); lawait.hb_timeout_handle = sink_heartbeat_timeout_start( - self.wfm.machines.run_id.clone(), + self.wfm.machines.cache_key.clone(), self.local_activity_request_sink.as_ref(), start_time, lawait.wft_timeout, @@ -250,7 +266,7 @@ impl ManagedRun { } } else { return Ok(Some(ActivationOrAuto::Autocomplete { - run_id: self.wfm.machines.run_id.clone(), + cache_key: self.wfm.machines.cache_key.clone(), })); } } @@ -264,8 +280,8 @@ impl ManagedRun { &mut self, report_status: WFTReportStatus, ) -> Option { - debug!("Marking WFT completed"); let retme = self.wft.take(); + info!("Marking wft complete"); // Only record latency metrics if we genuinely reported to server if matches!(report_status, WFTReportStatus::Reported) { @@ -386,12 +402,27 @@ impl ManagedRun { // If the only command from the activation is a legacy query response, that means we need // to respond differently than a typical activation. if matches!(&commands.as_slice(), - &[WFCommand::QueryResponse(qr)] if qr.query_id == LEGACY_QUERY_ID) + &[WFCommand::QueryResponse(qr)] + if qr.query_id == LEGACY_QUERY_ID || + qr.query_id == FAKE_ENHANCED_STACK_QUERY_ID_FINAL_LEGACY) { - let qr = match commands.remove(0) { + let mut qr = match commands.remove(0) { WFCommand::QueryResponse(qr) => qr, _ => unreachable!("We just verified this is the only command"), }; + self.fixup_enhanced_stack_query(&mut qr); + + qr = if qr.query_id == FAKE_ENHANCED_STACK_QUERY_ID_FINAL_LEGACY { + // Merge with the time travel aggregation, then reply with it + self.aggregate_time_travel_query(&mut vec![qr]); + QueryResult { + query_id: LEGACY_QUERY_ID.to_string(), + variant: Some(self.take_time_travel_as_query_success().into()), + } + } else { + qr + }; + self.reply_to_complete( ActivationCompleteOutcome::ReportWFTSuccess(ServerCommandsWithWorkflowInfo { task_token, @@ -404,18 +435,11 @@ impl ManagedRun { Ok(None) } else { // First strip out query responses from other commands that actually affect machines - // Would be prettier with `drain_filter` - let mut i = 0; - let mut query_responses = vec![]; - while i < commands.len() { - if matches!(commands[i], WFCommand::QueryResponse(_)) { - if let WFCommand::QueryResponse(qr) = commands.remove(i) { - query_responses.push(qr); - } - } else { - i += 1; - } - } + let (query_responses, commands): (Vec<_>, Vec<_>) = + commands.into_iter().partition_map(|c| match c { + WFCommand::QueryResponse(qr) => Either::Left(qr), + o => Either::Right(o), + }); if activation_was_only_eviction && !commands.is_empty() { dbg_panic!("Reply to an eviction only containing an eviction included commands"); @@ -463,6 +487,22 @@ impl ManagedRun { } } + /// Will update a query result to map command ids to event ids in an enhanced stack trace. + fn fixup_enhanced_stack_query(&self, qr: &mut QueryResult) { + if let Ok(as_internal) = (&*qr).try_into() { + if let Some(payload) = qr.payload_mut() { + let trace = internal_to_enhanced_stack_trace(as_internal, |c| { + self.wfm.machines.command_id_to_event_id(c) + }); + if let Ok(data) = serde_json::to_vec(&trace) { + payload.data = data; + } else { + warn!("Could not encode enhanced stack trace to JSON"); + } + } + } + } + /// Called after the higher-up machinery has fetched more pages of event history needed to apply /// the next workflow task. The history update and paginator used to perform the fetch are /// passed in, with the update being used to apply the task, and the paginator stored to be @@ -521,7 +561,7 @@ impl ManagedRun { let message = format!("Workflow activation completion failed: {:?}", &failure); // Blow up any cached data associated with the workflow let evict_req_outcome = self.request_eviction(RequestEvictMsg { - run_id: self.run_id().to_string(), + cache_key: self.cache_key().clone(), message, reason, auto_reply_fail_tt: None, @@ -622,6 +662,10 @@ impl ManagedRun { } })(); + info!( + "Completion done, wft_started: {}", + self.wfm.machines.get_current_wft_started_id() + ); match outcome { Ok(None) => Ok(Some(self.prepare_complete_resp( completion.resp_chan, @@ -636,7 +680,7 @@ impl ManagedRun { wft_timeout, completion_dat: Some((data, completion.resp_chan)), hb_timeout_handle: sink_heartbeat_timeout_start( - self.run_id().to_string(), + self.cache_key().clone(), self.local_activity_request_sink.as_ref(), start_t, wft_timeout, @@ -676,7 +720,7 @@ impl ManagedRun { pub(super) fn heartbeat_timeout(&mut self) -> RunUpdateAct { let maybe_act = if self._heartbeat_timeout() { Some(ActivationOrAuto::Autocomplete { - run_id: self.wfm.machines.run_id.clone(), + cache_key: self.wfm.machines.cache_key.clone(), }) } else { None @@ -780,7 +824,7 @@ impl ManagedRun { } if !self.activation_has_eviction() && self.trying_to_evict.is_none() { - debug!(run_id=%info.run_id, reason=%info.message, "Eviction requested"); + debug!(run_id=%info.cache_key, reason=%info.message, "Eviction requested"); self.trying_to_evict = Some(info); EvictionRequestResult::EvictionRequested(attempts, self.check_more_activations()) } else { @@ -893,8 +937,8 @@ impl ManagedRun { None } } - Some(r) => { - self.insert_outstanding_activation(&r); + Some(mut r) => { + self.insert_outstanding_activation(&mut r); Some(r) } None => None, @@ -919,7 +963,7 @@ impl ManagedRun { } else { warn!(error=?fail.source, "Error while updating workflow"); Some(ActivationOrAuto::AutoFail { - run_id: self.run_id().to_owned(), + cache_key: self.cache_key().clone(), machines_err: fail.source, }) }; @@ -928,10 +972,34 @@ impl ManagedRun { } } - fn insert_outstanding_activation(&mut self, act: &ActivationOrAuto) { - let act_type = match &act { + fn insert_outstanding_activation(&mut self, act: &mut ActivationOrAuto) { + self.started_id_upon_activation = self.wfm.machines.get_current_wft_started_id(); + let act_type = match act { ActivationOrAuto::LangActivation(act) | ActivationOrAuto::ReadyForQueries(act) => { - if act.is_legacy_query() { + let is_leg_q = act.is_legacy_query(); + + // If the time travel query is pending, insert an enhanced stack trace query for + // the activation + // TODO: Optimization: Somehow do only at WFT boundaries - we only end up keeping + // the final stack before we complete the WFT, so capturing them for activations + // before the last is technically a waste. + if let Some(ref wft) = self.wft { + if wft.has_pending_time_travel_query() { + // Insert enhanced stack trace query (and remove tt query, if present) + act.jobs.retain(|j| !j.is_time_travel_query()); + act.jobs.push(enhanced_stack_query_job(false)); + } else { + // Check if the time travel query is *in* the activation, IE: it is no + // longer pending. This means we should conclude aggregation after this + // final activation. + if let Some(ttq) = act.jobs.iter().position(|j| j.is_time_travel_query()) { + act.jobs.remove(ttq); + act.jobs.push(enhanced_stack_query_job(true)); + } + } + } + + if is_leg_q { OutstandingActivation::LegacyQuery } else { OutstandingActivation::Normal { @@ -958,7 +1026,7 @@ impl ManagedRun { fn prepare_complete_resp( &mut self, resp_chan: Option>, - data: CompletionDataForWFT, + mut data: CompletionDataForWFT, due_to_heartbeat_timeout: bool, ) -> FulfillableActivationComplete { let mut outgoing_cmds = self.wfm.get_server_commands(); @@ -975,6 +1043,11 @@ impl ManagedRun { } } + self.aggregate_time_travel_query(&mut data.query_responses); + + for qr in data.query_responses.iter_mut() { + self.fixup_enhanced_stack_query(qr); + } let query_responses = data.query_responses; let has_query_responses = !query_responses.is_empty(); let is_query_playback = data.has_pending_query && !has_query_responses; @@ -1017,6 +1090,52 @@ impl ManagedRun { } } + fn aggregate_time_travel_query(&mut self, query_resps: &mut Vec) { + if let Some(tta) = self + .wft + .as_mut() + .and_then(|wft| wft.time_travel_aggregation.as_mut()) + { + let wft_id = self.started_id_upon_activation; + // If time travel aggregation is ongoing, strip any inserted fake enhanced stack trace + // queries and merge them into the time travel aggregation. + let enhanced_queries = lame_drain_filter(query_resps, |qr| { + qr.query_id.starts_with(FAKE_ENHANCED_STACK_QUERY_ID) + }); + for qr in enhanced_queries { + if let Ok(internal) = (&qr).try_into() { + let mapped = internal_to_enhanced_stack_trace(internal, |c| { + self.wfm.machines.command_id_to_event_id(c) + }); + tta.sdk = mapped.sdk; + tta.sources.extend(mapped.sources.into_iter()); + info!("Aggregating stacks for wft id {}", wft_id); + if !mapped.stacks.is_empty() { + tta.stacks.insert(wft_id as u32, mapped.stacks); + } + } else { + error!("Couldn't decode fake stack trace query"); + } + } + } + } + + fn take_time_travel_as_query_success(&mut self) -> QuerySuccess { + let payload = if let Some(tta) = self + .wft + .as_mut() + .and_then(|wft| wft.time_travel_aggregation.take()) + { + tta.as_json_payload().expect("Serializes fine") + } else { + error!("Expected to take time travel aggregation, but it wasn't there."); + Default::default() + }; + QuerySuccess { + response: Some(payload), + } + } + /// Pump some local activity requests into the sink, applying any immediate results to the /// workflow machines. fn sink_la_requests( @@ -1087,6 +1206,10 @@ impl ManagedRun { fn run_id(&self) -> &str { &self.wfm.machines.run_id } + + fn cache_key(&self) -> &RunCacheKey { + &self.wfm.machines.cache_key + } } /// Drains pending queries from the workflow task and appends them to the activation's jobs @@ -1111,7 +1234,7 @@ fn put_queries_in_act(act: &mut WorkflowActivation, wft: &mut OutstandingTask) { act.jobs.extend(query_jobs); } fn sink_heartbeat_timeout_start( - run_id: String, + cache_key: RunCacheKey, sink: &dyn LocalActivityRequestSink, wft_start_time: Instant, wft_timeout: Duration, @@ -1121,7 +1244,7 @@ fn sink_heartbeat_timeout_start( let (abort_handle, abort_reg) = AbortHandle::new_pair(); sink.sink_reqs(vec![LocalActRequest::StartHeartbeatTimeout { send_on_elapse: HeartbeatTimeoutMsg { - run_id, + cache_key, span: Span::current(), }, deadline, diff --git a/core/src/worker/workflow/managed_run/managed_wf_test.rs b/core/src/worker/workflow/managed_run/managed_wf_test.rs index 2386f89b3..cb4a32241 100644 --- a/core/src/worker/workflow/managed_run/managed_wf_test.rs +++ b/core/src/worker/workflow/managed_run/managed_wf_test.rs @@ -92,7 +92,7 @@ impl ManagedWFFunc { namespace: "test_namespace".to_string(), workflow_id: "wfid".to_string(), workflow_type: "wftype".to_string(), - run_id: "runid".to_string(), + cache_key: "runid".parse().unwrap(), history: hist, metrics: MetricsContext::no_op(), capabilities: &DEFAULT_TEST_CAPABILITIES, diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 4172c1c4f..be7359c68 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -38,6 +38,7 @@ use crate::{ workflow::{ history_update::HistoryPaginator, managed_run::RunUpdateAct, + run_cache::{CacheKeyGetter, RunCacheKey}, wft_extraction::{HistoryFetchReq, WFTExtractor}, wft_poller::validate_wft, workflow_stream::{LocalInput, LocalInputs, WFStream}, @@ -65,7 +66,13 @@ use std::{ thread, time::{Duration, Instant}, }; -use temporal_sdk_core_api::errors::{CompleteWfError, PollWfError}; +use temporal_sdk_core_api::{ + builtin_queries::{ + EnhancedStackTrace, InternalCommandType, InternalEnhancedStackTrace, StackTrace, + TimeTravelStackTrace, + }, + errors::{CompleteWfError, PollWfError}, +}; use temporal_sdk_core_protos::{ coresdk::{ workflow_activation::{ @@ -87,7 +94,7 @@ use temporal_sdk_core_protos::{ taskqueue::v1::StickyExecutionAttributes, workflowservice::v1::{get_system_info_response, PollActivityTaskQueueResponse}, }, - TaskToken, + TaskToken, TIME_TRAVEL_QUERY, }; use tokio::{ sync::{ @@ -150,7 +157,7 @@ pub(crate) struct RunBasics<'a> { pub namespace: String, pub workflow_id: String, pub workflow_type: String, - pub run_id: String, + pub cache_key: RunCacheKey, pub history: HistoryUpdate, pub metrics: MetricsContext, pub capabilities: &'a get_system_info_response::Capabilities, @@ -286,10 +293,11 @@ impl Workflows { debug!(activation=%act, "Sending activation to lang"); break Ok(act); } - ActivationOrAuto::Autocomplete { run_id } => { + ActivationOrAuto::Autocomplete { cache_key } => { self.activation_completed( WorkflowActivationCompletion { - run_id, + alternate_cache_key: cache_key.to_string(), + run_id: cache_key.run_id, status: Some( workflow_completion::Success::from_variants(vec![]).into(), ), @@ -301,12 +309,13 @@ impl Workflows { .await?; } ActivationOrAuto::AutoFail { - run_id, + cache_key, machines_err, } => { self.activation_completed( WorkflowActivationCompletion { - run_id, + alternate_cache_key: cache_key.to_string(), + run_id: cache_key.run_id, status: Some(auto_fail_to_complete_status(machines_err)), }, Option::>::None, @@ -328,6 +337,7 @@ impl Workflows { ) -> Result { let is_empty_completion = completion.is_empty(); let completion = validate_completion(completion)?; + let cache_key = completion.cache_key().clone(); let run_id = completion.run_id().to_string(); let (tx, rx) = oneshot::channel(); let was_sent = self.send_local(WFActCompleteMsg { @@ -388,7 +398,7 @@ impl Workflows { } completion.sticky_attributes = sticky_attrs; - self.handle_wft_reporting_errs(&run_id, || async { + self.handle_wft_reporting_errs(&cache_key, || async { let maybe_wft = self.client.complete_workflow_task(completion).await?; if let Some(wft) = maybe_wft.workflow_task { wft_from_complete = Some(validate_wft(wft)?); @@ -413,7 +423,7 @@ impl Workflows { ActivationCompleteOutcome::ReportWFTFail(outcome) => match outcome { FailedActivationWFTReport::Report(tt, cause, failure) => { warn!(run_id=%run_id, failure=?failure, "Failing workflow task"); - self.handle_wft_reporting_errs(&run_id, || async { + self.handle_wft_reporting_errs(&cache_key, || async { self.client .fail_workflow_task(tt, cause, failure.failure.map(Into::into)) .await @@ -437,7 +447,7 @@ impl Workflows { Ok((paginator, pwft)) => Some((pwft, paginator)), Err(e) => { self.request_eviction( - &run_id, + cache_key.clone(), format!("Failed to paginate workflow task from completion: {e:?}"), EvictionReason::Fatal, ); @@ -457,7 +467,7 @@ impl Workflows { } self.post_activation(PostActivationMsg { - run_id, + cache_key, wft_report_status, wft_from_complete: maybe_pwft, }); @@ -466,13 +476,9 @@ impl Workflows { } /// Tell workflow that a local activity has finished with the provided result - pub(super) fn notify_of_local_result( - &self, - run_id: impl Into, - resolved: LocalResolution, - ) { + pub(super) fn notify_of_local_result(&self, cache_key: RunCacheKey, resolved: LocalResolution) { self.send_local(LocalResolutionMsg { - run_id: run_id.into(), + cache_key, res: resolved, }); } @@ -480,12 +486,12 @@ impl Workflows { /// Request eviction of a workflow pub(super) fn request_eviction( &self, - run_id: impl Into, + cache_key: RunCacheKey, message: impl Into, reason: EvictionReason, ) { self.send_local(RequestEvictMsg { - run_id: run_id.into(), + cache_key, message: message.into(), reason, auto_reply_fail_tt: None, @@ -538,8 +544,11 @@ impl Workflows { /// Handle server errors from either completing or failing a workflow task. Un-handleable errors /// trigger a workflow eviction and are logged. - async fn handle_wft_reporting_errs(&self, run_id: &str, completer: impl FnOnce() -> Fut) - where + async fn handle_wft_reporting_errs( + &self, + cache_key: &RunCacheKey, + completer: impl FnOnce() -> Fut, + ) where Fut: Future>, { let mut should_evict = None; @@ -548,11 +557,12 @@ impl Workflows { // Silence unhandled command errors since the lang SDK cannot do anything // about them besides poll again, which it will do anyway. tonic::Code::InvalidArgument if err.message() == "UnhandledCommand" => { - debug!(error = %err, run_id, "Unhandled command response when completing"); + debug!(error = %err, cache_key=%cache_key, + "Unhandled command response when completing"); should_evict = Some(EvictionReason::UnhandledCommand); } tonic::Code::NotFound => { - warn!(error = %err, run_id, "Task not found when completing"); + warn!(error = %err, cache_key=%cache_key, "Task not found when completing"); should_evict = Some(EvictionReason::TaskNotFound); } _ => { @@ -562,7 +572,7 @@ impl Workflows { } } if let Some(reason) = should_evict { - self.request_eviction(run_id, "Error reporting WFT to server", reason); + self.request_eviction(cache_key.clone(), "Error reporting WFT to server", reason); } } @@ -722,13 +732,13 @@ enum ActivationOrAuto { /// This type should only be filled with an empty activation which is ready to have queries /// inserted into the joblist ReadyForQueries(WorkflowActivation), - #[display(fmt = "Autocomplete(run_id={run_id})")] + #[display(fmt = "Autocomplete(run_id={0})", "cache_key.run_id")] Autocomplete { - run_id: String, + cache_key: RunCacheKey, }, - #[display(fmt = "AutoFail(run_id={run_id})")] + #[display(fmt = "AutoFail(run_id={0})", "cache_key.run_id")] AutoFail { - run_id: String, + cache_key: RunCacheKey, machines_err: WFMachinesError, }, } @@ -736,9 +746,9 @@ impl ActivationOrAuto { pub fn run_id(&self) -> &str { match self { ActivationOrAuto::LangActivation(act) => &act.run_id, - ActivationOrAuto::Autocomplete { run_id, .. } => run_id, + ActivationOrAuto::Autocomplete { cache_key, .. } => cache_key.run_id.as_str(), ActivationOrAuto::ReadyForQueries(act) => &act.run_id, - ActivationOrAuto::AutoFail { run_id, .. } => run_id, + ActivationOrAuto::AutoFail { cache_key, .. } => cache_key.run_id.as_str(), } } } @@ -776,6 +786,7 @@ struct PreparedWFT { legacy_query: Option, query_requests: Vec, update: HistoryUpdate, + alternate_cache: u32, } impl PreparedWFT { /// Returns true if the contained history update is incremental (IE: expects to hit a cached @@ -785,6 +796,13 @@ impl PreparedWFT { let poll_resp_is_incremental = start_event_id.map(|eid| eid > 1).unwrap_or_default(); poll_resp_is_incremental || start_event_id.is_none() } + + pub fn cache_key(&self) -> RunCacheKey { + RunCacheKey { + run_id: self.execution.run_id.clone(), + alternate_key: self.alternate_cache, + } + } } #[derive(Debug)] @@ -797,6 +815,7 @@ pub(crate) struct OutstandingTask { /// The WFT permit owned by this task, ensures we don't exceed max concurrent WFT, and makes /// sure the permit is automatically freed when we delete the task. pub permit: UsedMeteredSemPermit, + pub time_travel_aggregation: Option, } impl OutstandingTask { @@ -805,6 +824,11 @@ impl OutstandingTask { .iter() .any(|q| q.query_id == LEGACY_QUERY_ID) } + pub(crate) fn has_pending_time_travel_query(&self) -> bool { + self.pending_queries + .iter() + .any(|q| q.query_type == TIME_TRAVEL_QUERY) + } } #[derive(Copy, Clone, Debug)] @@ -920,7 +944,7 @@ struct WFActCompleteMsg { derive(serde::Serialize, serde::Deserialize) )] struct LocalResolutionMsg { - run_id: String, + cache_key: RunCacheKey, res: LocalResolution, } #[derive(Debug)] @@ -929,7 +953,7 @@ struct LocalResolutionMsg { derive(serde::Serialize, serde::Deserialize) )] struct PostActivationMsg { - run_id: String, + cache_key: RunCacheKey, wft_report_status: WFTReportStatus, wft_from_complete: Option<(PreparedWFT, HistoryPaginator)>, } @@ -939,7 +963,7 @@ struct PostActivationMsg { derive(serde::Serialize, serde::Deserialize) )] struct RequestEvictMsg { - run_id: String, + cache_key: RunCacheKey, message: String, reason: EvictionReason, /// If set, we requested eviction because something went wrong processing a brand new poll task, @@ -949,7 +973,7 @@ struct RequestEvictMsg { } #[derive(Debug)] pub(crate) struct HeartbeatTimeoutMsg { - pub(crate) run_id: String, + pub(crate) cache_key: RunCacheKey, pub(crate) span: Span, } #[derive(Debug)] @@ -997,6 +1021,7 @@ enum WFTReportStatus { fn validate_completion( completion: WorkflowActivationCompletion, ) -> Result { + let cache_key = completion.cache_key(); match completion.status { Some(workflow_activation_completion::Status::Successful(success)) => { // Convert to wf commands @@ -1038,16 +1063,13 @@ fn validate_completion( } Ok(ValidatedCompletion::Success { - run_id: completion.run_id, + cache_key, commands, used_flags: success.used_internal_flags, }) } Some(workflow_activation_completion::Status::Failed(failure)) => { - Ok(ValidatedCompletion::Fail { - run_id: completion.run_id, - failure, - }) + Ok(ValidatedCompletion::Fail { cache_key, failure }) } None => Err(CompleteWfError::MalformedWorkflowCompletion { reason: "Workflow completion had empty status field".to_owned(), @@ -1064,21 +1086,25 @@ fn validate_completion( #[allow(clippy::large_enum_variant)] enum ValidatedCompletion { Success { - run_id: String, + cache_key: RunCacheKey, commands: Vec, used_flags: Vec, }, Fail { - run_id: String, + cache_key: RunCacheKey, failure: Failure, }, } impl ValidatedCompletion { pub fn run_id(&self) -> &str { + self.cache_key().run_id.as_str() + } + + pub fn cache_key(&self) -> &RunCacheKey { match self { - ValidatedCompletion::Success { run_id, .. } => run_id, - ValidatedCompletion::Fail { run_id, .. } => run_id, + ValidatedCompletion::Success { cache_key, .. } => cache_key, + ValidatedCompletion::Fail { cache_key, .. } => cache_key, } } } @@ -1207,7 +1233,7 @@ impl WFCommand { } #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -enum CommandID { +pub(crate) enum CommandID { Timer(u32), Activity(u32), LocalActivity(u32), @@ -1351,6 +1377,37 @@ fn sort_act_jobs(wfa: &mut WorkflowActivation) { }) } +fn internal_to_enhanced_stack_trace( + internal_trace: InternalEnhancedStackTrace, + mapper: impl Fn(&CommandID) -> Option, +) -> EnhancedStackTrace { + let mut stacks = Vec::with_capacity(internal_trace.stacks.len()); + for stack in internal_trace.stacks.iter() { + let mut correlating_event_ids = vec![]; + for command in stack.commands.iter() { + let command_id = match command.r#type { + InternalCommandType::ScheduleActivity => CommandID::Activity(command.seq), + InternalCommandType::StartTimer => CommandID::Timer(command.seq), + InternalCommandType::StartChildWorkflow => { + CommandID::ChildWorkflowStart(command.seq) + } + }; + if let Some(event_id) = mapper(&command_id) { + correlating_event_ids.push(event_id); + } + } + stacks.push(StackTrace { + locations: stack.locations.clone(), + correlating_event_ids, + }); + } + EnhancedStackTrace { + sdk: internal_trace.sdk, + stacks, + sources: internal_trace.sources, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/worker/workflow/run_cache.rs b/core/src/worker/workflow/run_cache.rs index af28e4f4e..3fe927225 100644 --- a/core/src/worker/workflow/run_cache.rs +++ b/core/src/worker/workflow/run_cache.rs @@ -7,15 +7,25 @@ use crate::{ MetricsContext, }; use lru::LruCache; -use std::{mem, num::NonZeroUsize, rc::Rc}; -use temporal_sdk_core_protos::temporal::api::workflowservice::v1::get_system_info_response; +use std::{ + convert::Infallible, + fmt::{Display, Formatter}, + mem, + num::NonZeroUsize, + rc::Rc, + str::FromStr, +}; +use temporal_sdk_core_protos::{ + coresdk::workflow_completion::WorkflowActivationCompletion, + temporal::api::workflowservice::v1::get_system_info_response, +}; pub(super) struct RunCache { max: usize, namespace: String, server_capabilities: get_system_info_response::Capabilities, /// Run id -> Data - runs: LruCache, + runs: LruCache, local_activity_request_sink: Rc, metrics: MetricsContext, @@ -50,9 +60,12 @@ impl RunCache { pub fn instantiate_or_update(&mut self, mut pwft: PermittedWFT) -> RunUpdateAct { let cur_num_cached_runs = self.runs.len(); - let run_id = &pwft.work.execution.run_id; + let cache_key = RunCacheKey { + run_id: pwft.work.execution.run_id.clone(), + alternate_key: pwft.work.alternate_cache, + }; - if let Some(run_handle) = self.runs.get_mut(run_id) { + if let Some(run_handle) = self.runs.get_mut(&cache_key) { let rur = run_handle.incoming_wft(pwft); self.metrics.cache_size(cur_num_cached_runs as u64); return rur; @@ -71,46 +84,45 @@ impl RunCache { namespace: self.namespace.clone(), workflow_id: pwft.work.execution.workflow_id.clone(), workflow_type: pwft.work.workflow_type.clone(), - run_id: pwft.work.execution.run_id.clone(), + cache_key: pwft.work.cache_key(), history: history_update, metrics, capabilities: &self.server_capabilities, }, self.local_activity_request_sink.clone(), ); - let run_id = run_id.to_string(); let rur = mrh.incoming_wft(pwft); - if self.runs.push(run_id, mrh).is_some() { + if self.runs.push(cache_key, mrh).is_some() { panic!("Overflowed run cache! Cache owner is expected to avoid this!"); } self.metrics.cache_size(cur_num_cached_runs as u64 + 1); rur } - pub fn remove(&mut self, k: &str) -> Option { + pub fn remove(&mut self, k: &RunCacheKey) -> Option { let r = self.runs.pop(k); self.metrics.cache_size(self.len() as u64); r } - pub fn get_mut(&mut self, k: &str) -> Option<&mut ManagedRun> { + pub fn get_mut(&mut self, k: &RunCacheKey) -> Option<&mut ManagedRun> { self.runs.get_mut(k) } - pub fn get(&mut self, k: &str) -> Option<&ManagedRun> { + pub fn get(&mut self, k: &RunCacheKey) -> Option<&ManagedRun> { self.runs.get(k) } /// Returns the current least-recently-used run. Returns `None` when cache empty. - pub fn current_lru_run(&self) -> Option<&str> { - self.runs.peek_lru().map(|(run_id, _)| run_id.as_str()) + pub fn current_lru_run(&self) -> Option<&RunCacheKey> { + self.runs.peek_lru().map(|(run_id, _)| run_id) } /// Returns an iterator yielding cached runs in LRU order - pub fn runs_lru_order(&self) -> impl Iterator { - self.runs.iter().rev().map(|(k, v)| (k.as_str(), v)) + pub fn runs_lru_order(&self) -> impl Iterator { + self.runs.iter().rev().map(|(k, v)| (k, v)) } - pub fn peek(&self, k: &str) -> Option<&ManagedRun> { + pub fn peek(&self, k: &RunCacheKey) -> Option<&ManagedRun> { self.runs.peek(k) } - pub fn has_run(&self, k: &str) -> bool { + pub fn has_run(&self, k: &RunCacheKey) -> bool { self.runs.contains(k) } pub fn handles(&self) -> impl Iterator { @@ -126,3 +138,56 @@ impl RunCache { self.max } } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr( + feature = "save_wf_inputs", + derive(serde::Serialize, serde::Deserialize) +)] +pub(crate) struct RunCacheKey { + pub run_id: String, + pub alternate_key: u32, +} + +impl Display for RunCacheKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if self.alternate_key == 0 { + return f.write_str(&self.run_id); + } + f.write_fmt(format_args!("!k{}-{}", self.alternate_key, self.run_id)) + } +} +impl FromStr for RunCacheKey { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(if s.starts_with("!k") { + let dash_ix = s.find('-').unwrap(); + Self { + run_id: s[dash_ix + 1..].to_string(), + alternate_key: s[2..dash_ix].parse().unwrap(), + } + } else { + Self { + run_id: s.to_string(), + alternate_key: 0, + } + }) + } +} + +pub(crate) trait CacheKeyGetter { + fn cache_key(&self) -> RunCacheKey; +} +impl CacheKeyGetter for WorkflowActivationCompletion { + fn cache_key(&self) -> RunCacheKey { + if self.alternate_cache_key.is_empty() { + RunCacheKey { + run_id: self.run_id.clone(), + alternate_key: 0, + } + } else { + self.alternate_cache_key.parse().unwrap() + } + } +} diff --git a/core/src/worker/workflow/wft_extraction.rs b/core/src/worker/workflow/wft_extraction.rs index 01c6d2bdb..f5bf04913 100644 --- a/core/src/worker/workflow/wft_extraction.rs +++ b/core/src/worker/workflow/wft_extraction.rs @@ -4,8 +4,8 @@ use crate::{ worker::{ client::WorkerClient, workflow::{ - history_update::HistoryPaginator, CacheMissFetchReq, HistoryUpdate, NextPageReq, - PermittedWFT, + history_update::HistoryPaginator, run_cache::RunCacheKey, CacheMissFetchReq, + HistoryUpdate, NextPageReq, PermittedWFT, }, }, }; @@ -29,7 +29,7 @@ pub(super) enum WFTExtractorOutput { rc: Arc, }, FailedFetch { - run_id: String, + cache_key: RunCacheKey, err: tonic::Status, auto_reply_fail_tt: Option, }, @@ -72,7 +72,7 @@ impl WFTExtractor { paginator: pag, }), Err(err) => WFTExtractorOutput::FailedFetch { - run_id, + cache_key: run_id.parse().unwrap(), err, auto_reply_fail_tt: Some(tt), }, @@ -100,11 +100,11 @@ impl WFTExtractor { // It's OK to simply drop the refcounters in the event of fetch // failure. We'll just proceed with shutdown. HistoryFetchReq::Full(req, rc) => { - let run_id = req.original_wft.work.execution.run_id.clone(); + let cache_key = req.original_wft.work.cache_key(); match HistoryPaginator::from_fetchreq(req, client).await { Ok(r) => WFTExtractorOutput::FetchResult(r, rc), Err(err) => WFTExtractorOutput::FailedFetch { - run_id, + cache_key, err, auto_reply_fail_tt: None, }, @@ -119,7 +119,7 @@ impl WFTExtractor { rc, }, Err(err) => WFTExtractorOutput::FailedFetch { - run_id: req.paginator.run_id, + cache_key: req.paginator.cache_key, err, auto_reply_fail_tt: None, }, diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index 44bdd86e7..deccb0b98 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -126,8 +126,8 @@ impl WFStream { } WFStreamInput::Local(local_input) => { let _span_g = local_input.span.enter(); - if let Some(rid) = local_input.input.run_id() { - if let Some(rh) = state.runs.get_mut(rid) { + if let Some(key) = local_input.input.cache_key() { + if let Some(rh) = state.runs.get_mut(key) { rh.record_span_fields(&local_input.span); } } @@ -149,7 +149,7 @@ impl WFStream { } LocalInputs::LocalResolution(res) => state.local_resolution(res), LocalInputs::HeartbeatTimeout(hbt) => { - state.process_heartbeat_timeout(hbt) + state.process_heartbeat_timeout(&hbt) } LocalInputs::RequestEviction(evict) => { state.request_eviction(evict).into_run_update_resp() @@ -164,12 +164,12 @@ impl WFStream { } } WFStreamInput::FailedFetch { - run_id, + cache_key: run_id, err, auto_reply_fail_tt, } => state .request_eviction(RequestEvictMsg { - run_id, + cache_key: run_id, message: format!("Fetching history failed: {err:?}"), reason: EvictionReason::PaginationOrHistoryFetch, auto_reply_fail_tt, @@ -241,7 +241,7 @@ impl WFStream { ) -> Result { // If the run already exists, possibly buffer the work and return early if we can't handle // it yet. - let pwft = if let Some(rh) = self.runs.get_mut(&pwft.work.execution.run_id) { + let pwft = if let Some(rh) = self.runs.get_mut(&pwft.work.cache_key()) { if let Some(w) = rh.buffer_wft_if_outstanding_work(pwft) { w } else { @@ -251,10 +251,10 @@ impl WFStream { pwft }; - let run_id = pwft.work.execution.run_id.clone(); + let cache_key = pwft.work.cache_key(); // If our cache is full and this WFT is for an unseen run we must first evict a run before // we can deal with this task. So, buffer the task in that case. - if !self.runs.has_run(&run_id) && self.runs.is_full() { + if !self.runs.has_run(&cache_key) && self.runs.is_full() { self.buffer_resp_on_full_cache(pwft); return Ok(None); } @@ -262,8 +262,8 @@ impl WFStream { // This check can't really be lifted up higher since we could EX: See it's in the cache, // not fetch more history, send the task, see cache is full, buffer it, then evict that // run, and now we still have a cache miss. - if !self.runs.has_run(&run_id) && pwft.work.is_incremental() { - debug!(run_id=?run_id, "Workflow task has partial history, but workflow is not in \ + if !self.runs.has_run(&cache_key) && pwft.work.is_incremental() { + debug!(run_id=?cache_key, "Workflow task has partial history, but workflow is not in \ cache. Will fetch history"); self.metrics.sticky_cache_miss(); return Err(HistoryFetchReq::Full( @@ -277,7 +277,7 @@ impl WFStream { } fn process_completion(&mut self, complete: NewOrFetchedComplete) -> Vec { - let rh = if let Some(rh) = self.runs.get_mut(complete.run_id()) { + let rh = if let Some(rh) = self.runs.get_mut(complete.cache_key()) { rh } else { dbg_panic!("Run missing during completion {:?}", complete); @@ -321,15 +321,15 @@ impl WFStream { } fn process_post_activation(&mut self, report: PostActivationMsg) -> RunUpdateAct { - let run_id = &report.run_id; + let cache_key = &report.cache_key; let wft_from_complete = report.wft_from_complete; if let Some((wft, _)) = &wft_from_complete { - if &wft.execution.run_id != run_id { + if &wft.cache_key() != cache_key { dbg_panic!( "Server returned a WFT on completion for a different run ({}) than the \ one being completed ({}). This is a server bug.", wft.execution.run_id, - run_id + cache_key.run_id ); } } @@ -337,19 +337,19 @@ impl WFStream { let mut res = None; // If we reported to server, we always want to mark it complete. - let maybe_t = self.complete_wft(run_id, report.wft_report_status); + let maybe_t = self.complete_wft(cache_key, report.wft_report_status); // Delete the activation let activation = self .runs - .get_mut(run_id) + .get_mut(cache_key) .and_then(|rh| rh.delete_activation()); // Evict the run if the activation contained an eviction let mut applied_buffered_poll_for_this_run = false; if activation.map(|a| a.has_eviction()).unwrap_or_default() { - debug!(run_id=%run_id, "Evicting run"); + debug!(run_id=%cache_key, "Evicting run"); - if let Some(mut rh) = self.runs.remove(run_id) { + if let Some(mut rh) = self.runs.remove(cache_key) { if let Some(buff) = rh.take_buffered_wft() { // Don't try to apply a buffered poll for this run if we just got a new WFT // from completing, because by definition that buffered poll is now an @@ -382,7 +382,7 @@ impl WFStream { } if res.is_none() { - if let Some(rh) = self.runs.get_mut(run_id) { + if let Some(rh) = self.runs.get_mut(cache_key) { // Attempt to produce the next activation if needed res = rh.check_more_activations(); } @@ -391,7 +391,7 @@ impl WFStream { } fn local_resolution(&mut self, msg: LocalResolutionMsg) -> RunUpdateAct { - let run_id = msg.run_id; + let run_id = msg.cache_key; if let Some(rh) = self.runs.get_mut(&run_id) { rh.local_resolution(msg.res) } else { @@ -403,8 +403,8 @@ impl WFStream { } } - fn process_heartbeat_timeout(&mut self, run_id: String) -> RunUpdateAct { - if let Some(rh) = self.runs.get_mut(&run_id) { + fn process_heartbeat_timeout(&mut self, cache_key: &RunCacheKey) -> RunUpdateAct { + if let Some(rh) = self.runs.get_mut(cache_key) { rh.heartbeat_timeout() } else { None @@ -415,19 +415,18 @@ impl WFStream { /// activation to evict the workflow from the lang side. Workflow will not *actually* be evicted /// until lang replies to that activation fn request_eviction(&mut self, info: RequestEvictMsg) -> EvictionRequestResult { - if let Some(rh) = self.runs.get_mut(&info.run_id) { + if let Some(rh) = self.runs.get_mut(&info.cache_key) { rh.request_eviction(info) } else { - debug!(run_id=%info.run_id, "Eviction requested for unknown run"); + debug!(run_id=%info.cache_key, "Eviction requested for unknown run"); EvictionRequestResult::NotFound } } fn request_eviction_of_lru_run(&mut self) -> EvictionRequestResult { if let Some(lru_run_id) = self.runs.current_lru_run() { - let run_id = lru_run_id.to_string(); self.request_eviction(RequestEvictMsg { - run_id, + cache_key: lru_run_id.clone(), message: "Workflow cache full".to_string(), reason: EvictionReason::CacheFull, auto_reply_fail_tt: None, @@ -440,7 +439,7 @@ impl WFStream { fn complete_wft( &mut self, - run_id: &str, + cache_key: &RunCacheKey, wft_report_status: WFTReportStatus, ) -> Option { // If the WFT completion wasn't sent to the server, but we did see the final event, we still @@ -450,14 +449,14 @@ impl WFStream { // but they are very useful for testing complete replay. let saw_final = self .runs - .get(run_id) + .get(cache_key) .map(|r| r.have_seen_terminal_event()) .unwrap_or_default(); if !saw_final && matches!(wft_report_status, WFTReportStatus::NotReported) { return None; } - if let Some(rh) = self.runs.get_mut(run_id) { + if let Some(rh) = self.runs.get_mut(cache_key) { // Can't mark the WFT complete if there are pending queries, as doing so would destroy // them. if rh @@ -509,14 +508,14 @@ impl WFStream { } if !handle.has_buffered_wft() { num_evicts_needed -= 1; - evict_these.push(rid.to_string()); + evict_these.push(rid.clone()); } } let mut acts = vec![]; - for run_id in evict_these { + for cache_key in evict_these { acts.extend( self.request_eviction(RequestEvictMsg { - run_id, + cache_key, message: "Workflow cache full".to_string(), reason: EvictionReason::CacheFull, auto_reply_fail_tt: None, @@ -550,13 +549,13 @@ impl WFStream { // Useful when debugging #[allow(dead_code)] - fn info_dump(&self, run_id: &str) { - if let Some(r) = self.runs.peek(run_id) { - info!(run_id, wft=?r.wft(), activation=?r.activation(), + fn info_dump(&self, cache_key: &RunCacheKey) { + if let Some(r) = self.runs.peek(cache_key) { + info!(cache_key=%cache_key, wft=?r.wft(), activation=?r.activation(), buffered_wft=r.has_buffered_wft(), trying_to_evict=r.is_trying_to_evict(), more_work=r.more_pending_work()); } else { - info!(run_id, "Run not found"); + info!(cache_key=%cache_key, "Run not found"); } } } @@ -582,7 +581,7 @@ enum WFStreamInput { tonic::Status, ), FailedFetch { - run_id: String, + cache_key: RunCacheKey, #[cfg_attr( feature = "save_wf_inputs", serde(with = "tonic_status_serde::SerdeStatus") @@ -607,7 +606,7 @@ pub(super) struct LocalInput { impl From for LocalInput { fn from(hb: HeartbeatTimeoutMsg) -> Self { Self { - input: LocalInputs::HeartbeatTimeout(hb.run_id), + input: LocalInputs::HeartbeatTimeout(hb.cache_key), span: hb.span, } } @@ -628,18 +627,18 @@ pub(super) enum LocalInputs { LocalResolution(LocalResolutionMsg), PostActivation(PostActivationMsg), RequestEviction(RequestEvictMsg), - HeartbeatTimeout(String), + HeartbeatTimeout(RunCacheKey), #[cfg_attr(feature = "save_wf_inputs", serde(skip))] GetStateInfo(GetStateInfoMsg), } impl LocalInputs { - fn run_id(&self) -> Option<&str> { + fn cache_key(&self) -> Option<&RunCacheKey> { Some(match self { - LocalInputs::Completion(c) => c.completion.run_id(), - LocalInputs::FetchedPageCompletion { paginator, .. } => &paginator.run_id, - LocalInputs::LocalResolution(lr) => &lr.run_id, - LocalInputs::PostActivation(pa) => &pa.run_id, - LocalInputs::RequestEviction(re) => &re.run_id, + LocalInputs::Completion(c) => c.completion.cache_key(), + LocalInputs::FetchedPageCompletion { paginator, .. } => &paginator.cache_key, + LocalInputs::LocalResolution(lr) => &lr.cache_key, + LocalInputs::PostActivation(pa) => &pa.cache_key, + LocalInputs::RequestEviction(re) => &re.cache_key, LocalInputs::HeartbeatTimeout(hb) => hb, LocalInputs::GetStateInfo(_) => return None, }) @@ -658,7 +657,7 @@ enum ExternalPollerInputs { span: Span, }, FailedFetch { - run_id: String, + cache_key: RunCacheKey, err: tonic::Status, auto_reply_fail_tt: Option, }, @@ -671,11 +670,11 @@ impl From for WFStreamInput { ExternalPollerInputs::PollerError(e) => WFStreamInput::PollerError(e), ExternalPollerInputs::FetchedUpdate(wft) => WFStreamInput::NewWft(wft), ExternalPollerInputs::FailedFetch { - run_id, + cache_key, err, auto_reply_fail_tt, } => WFStreamInput::FailedFetch { - run_id, + cache_key, err, auto_reply_fail_tt, }, @@ -708,11 +707,11 @@ impl From> for ExternalPollerInputs { span, }, Ok(WFTExtractorOutput::FailedFetch { - run_id, + cache_key: run_id, err, auto_reply_fail_tt, }) => ExternalPollerInputs::FailedFetch { - run_id, + cache_key: run_id, err, auto_reply_fail_tt, }, @@ -727,10 +726,10 @@ enum NewOrFetchedComplete { Fetched(HistoryUpdate, HistoryPaginator), } impl NewOrFetchedComplete { - fn run_id(&self) -> &str { + fn cache_key(&self) -> &RunCacheKey { match self { - NewOrFetchedComplete::New(c) => c.completion.run_id(), - NewOrFetchedComplete::Fetched(_, p) => &p.run_id, + NewOrFetchedComplete::New(c) => c.completion.cache_key(), + NewOrFetchedComplete::Fetched(_, p) => &p.cache_key, } } } diff --git a/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto b/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto index 6e25e846f..b526e857a 100644 --- a/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto +++ b/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto @@ -35,6 +35,10 @@ message WorkflowActivation { // internal flags may be used. This is not a delta - all previously used flags always // appear since this representation is cheap. repeated uint32 available_internal_flags = 6; + // If set, this activation should be cached under they key, so as not to collide with any + // existing cached instance of this run. Useful for capturing stack trace state over time + // without blowing up any already-cached instance of the run. + string alternate_cache_key = 7; } message WorkflowActivationJob { diff --git a/protos/local/temporal/sdk/core/workflow_completion/workflow_completion.proto b/protos/local/temporal/sdk/core/workflow_completion/workflow_completion.proto index 226bb828f..fdc1ca13d 100644 --- a/protos/local/temporal/sdk/core/workflow_completion/workflow_completion.proto +++ b/protos/local/temporal/sdk/core/workflow_completion/workflow_completion.proto @@ -12,6 +12,8 @@ import "temporal/sdk/core/workflow_commands/workflow_commands.proto"; message WorkflowActivationCompletion { // The run id from the workflow activation you are completing string run_id = 1; + // If this was set on the incoming activation, you must use the same key on this completion. + string alternate_cache_key = 4; oneof status { Success successful = 2; Failure failed = 3; diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index f71e9493a..21b36e52b 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -23,6 +23,8 @@ pub use task_token::TaskToken; pub static ENCODING_PAYLOAD_KEY: &str = "encoding"; pub static JSON_ENCODING_VAL: &str = "json/plain"; pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data"; +pub static TIME_TRAVEL_QUERY: &str = "__time_travel_stack_trace"; +pub static ENHANCED_STACK_QUERY: &str = "__enhanced_stack_trace"; #[allow(clippy::large_enum_variant, clippy::derive_partial_eq_without_eq)] // I'd prefer not to do this, but there are some generated things that just don't need it. @@ -428,6 +430,7 @@ pub mod coresdk { }, query::v1::WorkflowQuery, }, + TIME_TRAVEL_QUERY, }; use prost_wkt_types::Timestamp; use std::{ @@ -454,6 +457,7 @@ pub mod coresdk { }), )], available_internal_flags: vec![], + alternate_cache_key: "".to_string(), } } @@ -515,6 +519,16 @@ pub mod coresdk { } } + impl WorkflowActivationJob { + pub fn is_time_travel_query(&self) -> bool { + matches!( + &self.variant, + Some(workflow_activation_job::Variant::QueryWorkflow(q)) + if q.query_type == TIME_TRAVEL_QUERY + ) + } + } + impl Display for EvictionReason { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{self:?}") @@ -706,7 +720,10 @@ pub mod coresdk { pub mod workflow_commands { tonic::include_proto!("coresdk.workflow_commands"); - use crate::temporal::api::{common::v1::Payloads, enums::v1::QueryResultType}; + use crate::temporal::api::{ + common::v1::{Payload, Payloads}, + enums::v1::QueryResultType, + }; use std::fmt::{Display, Formatter}; impl Display for WorkflowCommand { @@ -880,6 +897,15 @@ pub mod coresdk { ), } } + + pub fn payload_mut(&mut self) -> Option<&mut Payload> { + match self.variant.as_mut() { + Some(query_result::Variant::Succeeded(QuerySuccess { response: Some(p) })) => { + Some(p) + } + _ => None, + } + } } } @@ -922,6 +948,7 @@ pub mod coresdk { let success = workflow_completion::Success::from_variants(vec![]); Self { run_id: run_id.into(), + alternate_cache_key: "".to_string(), status: Some(workflow_activation_completion::Status::Successful(success)), } } @@ -931,6 +958,7 @@ pub mod coresdk { let success = workflow_completion::Success::from_variants(cmds); Self { run_id: run_id.into(), + alternate_cache_key: "".to_string(), status: Some(workflow_activation_completion::Status::Successful(success)), } } @@ -940,6 +968,7 @@ pub mod coresdk { let success = workflow_completion::Success::from_variants(vec![cmd]); Self { run_id: run_id.into(), + alternate_cache_key: "".to_string(), status: Some(workflow_activation_completion::Status::Successful(success)), } } @@ -947,6 +976,7 @@ pub mod coresdk { pub fn fail(run_id: impl Into, failure: Failure) -> Self { Self { run_id: run_id.into(), + alternate_cache_key: "".to_string(), status: Some(workflow_activation_completion::Status::Failed( workflow_completion::Failure { failure: Some(failure), @@ -1063,6 +1093,7 @@ pub mod coresdk { let success = self.into_iter().map(Into::into).collect::>().into(); WorkflowActivationCompletion { run_id, + alternate_cache_key: "".to_string(), status: Some(workflow_activation_completion::Status::Successful(success)), } } diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index 1dd7d0f26..438881778 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -12,6 +12,7 @@ mod replay; mod resets; mod signals; mod stickyness; +mod time_travel_stack; mod timers; mod upsert_search_attrs; diff --git a/tests/integ_tests/workflow_tests/time_travel_stack.rs b/tests/integ_tests/workflow_tests/time_travel_stack.rs new file mode 100644 index 000000000..281d8e314 --- /dev/null +++ b/tests/integ_tests/workflow_tests/time_travel_stack.rs @@ -0,0 +1,41 @@ +use std::time::Duration; +use temporal_client::WorkflowOptions; +use temporal_sdk::{ActContext, ActivityOptions, WfContext}; +use temporal_sdk_core_protos::coresdk::AsJsonPayloadExt; +use temporal_sdk_core_test_utils::CoreWfStarter; + +#[tokio::test] +async fn time_travel_stacks_example() { + let wf_name = "time_travel_stacks"; + let mut starter = CoreWfStarter::new(wf_name); + let mut worker = starter.worker().await; + worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + for _ in 1..=5 { + ctx.timer(Duration::from_millis(1)).await; + ctx.activity(ActivityOptions { + activity_type: "echo".to_string(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(1)), + ..Default::default() + }) + .await; + } + Ok(().into()) + }); + worker.register_activity("echo", |_: ActContext, str: String| async { Ok(str) }); + + worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); + // starter + // .fetch_history_and_replay(wf_name, run_id, worker.inner_mut()) + // .await + // .unwrap(); +}