|
| 1 | +use futures::future::BoxFuture; |
| 2 | +use std::{error::Error, time::Duration}; |
| 3 | +use temporal_sdk_core_protos::temporal::api::common::v1::Payload; |
| 4 | + |
| 5 | +/// Workflow authors must implement this trait to create Temporal Rust workflows |
| 6 | +pub trait Workflow: Sized { |
| 7 | + /// Type of the input argument to the workflow |
| 8 | + type Input: TemporalDeserializable; |
| 9 | + /// Type of the output of the workflow |
| 10 | + type Output: TemporalSerializable; |
| 11 | + /// The workflow's name |
| 12 | + const NAME: &'static str; |
| 13 | + |
| 14 | + /// Called when an instance of a Workflow is first initialized. |
| 15 | + /// |
| 16 | + /// `input` contains the input argument to the workflow as defined by the client who requested |
| 17 | + /// the Workflow Execution. |
| 18 | + fn new(input: Self::Input, ctx: SafeWfContext) -> Self; |
| 19 | + |
| 20 | + /// Defines the actual workflow logic. The function must return a future, and this future is |
| 21 | + /// cached and polled as updates to the workflow history are received. |
| 22 | + /// |
| 23 | + /// `ctx` should be used to perform various Temporal commands like starting timers and |
| 24 | + /// activities. |
| 25 | + fn run(&mut self, ctx: WfContext) -> BoxFuture<Self::Output>; |
| 26 | + |
| 27 | + /// All signals this workflow can handle. Typically you won't implement this directly, it will |
| 28 | + /// automatically contain all signals defined with the `#[signal]` attribute. |
| 29 | + fn signals() -> &'static [&'static SignalDefinition<Self>] { |
| 30 | + // TODO |
| 31 | + &[] |
| 32 | + } |
| 33 | + /// All queries this workflow can handle. Typically you won't implement this directly, it will |
| 34 | + /// automatically contain all queries defined with the `#[query]` attribute. |
| 35 | + fn queries() -> &'static [&'static QueryDefinition<Self>] { |
| 36 | + // TODO |
| 37 | + &[] |
| 38 | + } |
| 39 | +} |
| 40 | + |
| 41 | +/// A workflow context which contains only information, but does not allow any commands to |
| 42 | +/// be created. |
| 43 | +pub struct SafeWfContext { |
| 44 | + // TODO |
| 45 | +} |
| 46 | + |
| 47 | +/// TODO: Placeholder, move from SDK |
| 48 | +pub struct WfContext {} |
| 49 | +impl WfContext { |
| 50 | + pub async fn timer(&self, _: Duration) { |
| 51 | + todo!() |
| 52 | + } |
| 53 | +} |
| 54 | + |
| 55 | +pub struct SignalDefinition<WF: Workflow> { |
| 56 | + // TODO: Could be a matching predicate |
| 57 | + name: String, |
| 58 | + // The handler input type must be erased here, since otherwise we couldn't store/return the |
| 59 | + // heterogeneous collection of definition types in the workflow itself. The signal macro |
| 60 | + // will wrap the user's function with code that performs deserialization, as well as error |
| 61 | + // boxing. |
| 62 | + handler: Box<dyn FnMut(&mut WF, Payload) -> Result<(), Box<dyn Error>>>, |
| 63 | +} |
| 64 | +pub struct QueryDefinition<WF: Workflow> { |
| 65 | + // TODO: Could be a matching predicate |
| 66 | + name: String, |
| 67 | + // The query macro will wrap the user's function with code that performs deserialization of |
| 68 | + // input and serialization of output, as well as error boxing. |
| 69 | + handler: Box<dyn FnMut(&WF, Payload) -> Result<Payload, Box<dyn Error>>>, |
| 70 | +} |
| 71 | + |
| 72 | +/// TODO: Placeholder, move from (and improve in) SDK |
| 73 | +pub trait TemporalSerializable {} |
| 74 | +impl<T> TemporalSerializable for T {} |
| 75 | +/// TODO: Placeholder, move from (and improve in) SDK |
| 76 | +pub trait TemporalDeserializable {} |
| 77 | +impl<T> TemporalDeserializable for T {} |
| 78 | + |
| 79 | +#[cfg(test)] |
| 80 | +mod tests { |
| 81 | + use super::*; |
| 82 | + use futures::FutureExt; |
| 83 | + use std::{collections::HashMap, marker::PhantomData}; |
| 84 | + |
| 85 | + // Workflow implementation example |
| 86 | + struct MyWorkflow { |
| 87 | + foo: u64, |
| 88 | + bar: HashMap<String, u64>, |
| 89 | + } |
| 90 | + |
| 91 | + impl Workflow for MyWorkflow { |
| 92 | + type Input = String; |
| 93 | + type Output = u64; |
| 94 | + const NAME: &'static str = "MyWorkflowType"; |
| 95 | + |
| 96 | + fn new(input: Self::Input, _ctx: SafeWfContext) -> Self { |
| 97 | + let mut bar = HashMap::new(); |
| 98 | + bar.insert(input, 10); |
| 99 | + Self { foo: 0, bar } |
| 100 | + } |
| 101 | + |
| 102 | + fn run(&mut self, ctx: WfContext) -> BoxFuture<Self::Output> { |
| 103 | + async move { |
| 104 | + ctx.timer(Duration::from_secs(1)).await; |
| 105 | + self.foo = 1; |
| 106 | + self.foo |
| 107 | + } |
| 108 | + .boxed() |
| 109 | + // TODO: The need to box here is slightly unfortunate, but it's either that or require |
| 110 | + // users to depend on `async_trait` (which just hides the same thing). IMO this is the |
| 111 | + // best option until more language features stabilize and this can go away. |
| 112 | + } |
| 113 | + } |
| 114 | + |
| 115 | + // #[workflow] miiiight be necessary here, but, ideally is not. |
| 116 | + impl MyWorkflow { |
| 117 | + // Attrib commented out since nonexistent for now, but that's what it'd look like. |
| 118 | + // #[signal] |
| 119 | + pub fn my_signal(&mut self, arg: String) { |
| 120 | + self.bar.insert(arg, 1); |
| 121 | + } |
| 122 | + // #[query] |
| 123 | + pub fn my_query(&self, arg: String) -> Option<u64> { |
| 124 | + self.bar.get(&arg).cloned() |
| 125 | + } |
| 126 | + } |
| 127 | + |
| 128 | + // This would need to be moved into this crate and depended on by client |
| 129 | + struct WorkflowHandle<WF: Workflow> { |
| 130 | + _d: PhantomData<WF>, |
| 131 | + } |
| 132 | + struct SignalError; // just a placeholder |
| 133 | + struct QueryError; // just a placeholder |
| 134 | + |
| 135 | + // The signal/query macros would generate this trait and impl: |
| 136 | + trait MyWorkflowClientExtension { |
| 137 | + fn my_signal(&self, arg: String) -> BoxFuture<Result<(), SignalError>>; |
| 138 | + fn my_query(&self, arg: String) -> BoxFuture<Result<Option<u64>, QueryError>>; |
| 139 | + } |
| 140 | + impl MyWorkflowClientExtension for WorkflowHandle<MyWorkflow> { |
| 141 | + fn my_signal(&self, arg: String) -> BoxFuture<Result<(), SignalError>> { |
| 142 | + // Is actually something like: |
| 143 | + // self.signal("my_signal", arg.serialize()) |
| 144 | + todo!() |
| 145 | + } |
| 146 | + |
| 147 | + fn my_query(&self, arg: String) -> BoxFuture<Result<Option<u64>, QueryError>> { |
| 148 | + todo!() |
| 149 | + } |
| 150 | + } |
| 151 | + |
| 152 | + async fn client_example() { |
| 153 | + // Now you can use the client like: |
| 154 | + // (actually comes from client.start() or client.get_handle() etc) |
| 155 | + let wfh = WorkflowHandle { |
| 156 | + _d: PhantomData::<MyWorkflow>, |
| 157 | + }; |
| 158 | + let _ = wfh.my_signal("hi!".to_string()).await; |
| 159 | + } |
| 160 | +} |
0 commit comments