Skip to content

Commit cb08b1b

Browse files
authored
Query buffering fixes (#673)
1 parent f673f1d commit cb08b1b

File tree

5 files changed

+143
-24
lines changed

5 files changed

+143
-24
lines changed

core/src/core_tests/queries.rs

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use crate::{
22
test_help::{
33
build_mock_pollers, canned_histories, hist_to_poll_resp, mock_worker, single_hist_mock_sg,
4-
MockPollCfg, ResponseType, WorkerExt,
4+
MockPollCfg, MocksHolder, ResponseType, WorkerExt,
55
},
66
worker::{client::mocks::mock_workflow_client, LEGACY_QUERY_ID},
77
};
8+
use futures_util::stream;
89
use std::{
910
collections::{HashMap, VecDeque},
1011
time::Duration,
@@ -891,3 +892,91 @@ async fn build_id_set_properly_on_query_on_first_task() {
891892
.unwrap();
892893
core.drain_pollers_and_shutdown().await;
893894
}
895+
896+
#[tokio::test]
897+
async fn queries_arent_lost_in_buffer_void() {
898+
let wfid = "fake_wf_id";
899+
let mut t = TestHistoryBuilder::default();
900+
t.add_by_type(EventType::WorkflowExecutionStarted);
901+
t.add_full_wf_task();
902+
t.add_we_signaled("sig", vec![]);
903+
t.add_full_wf_task();
904+
t.add_workflow_execution_completed();
905+
let tasks = [
906+
hist_to_poll_resp(&t, wfid.to_owned(), 1.into()),
907+
{
908+
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), 1.into());
909+
pr.query = Some(WorkflowQuery {
910+
query_type: "1".to_string(),
911+
..Default::default()
912+
});
913+
pr.started_event_id = 0;
914+
pr
915+
},
916+
{
917+
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), 1.into());
918+
pr.query = Some(WorkflowQuery {
919+
query_type: "2".to_string(),
920+
..Default::default()
921+
});
922+
pr.started_event_id = 0;
923+
pr
924+
},
925+
hist_to_poll_resp(&t, wfid.to_owned(), 2.into()),
926+
]
927+
.map(|r| r.resp);
928+
929+
let mut mock = mock_workflow_client();
930+
mock.expect_complete_workflow_task()
931+
.returning(|_| Ok(Default::default()));
932+
mock.expect_respond_legacy_query()
933+
.times(2)
934+
.returning(|_, _| Ok(Default::default()));
935+
let mut mock = MocksHolder::from_wft_stream(mock, stream::iter(tasks));
936+
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
937+
let core = mock_worker(mock);
938+
939+
let task = core.poll_workflow_activation().await.unwrap();
940+
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
941+
.await
942+
.unwrap();
943+
944+
let task = core.poll_workflow_activation().await.unwrap();
945+
assert_matches!(
946+
task.jobs.as_slice(),
947+
[WorkflowActivationJob {
948+
variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)),
949+
}] => q.query_type == "1"
950+
);
951+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
952+
task.run_id,
953+
query_ok(LEGACY_QUERY_ID.to_string(), "hi"),
954+
))
955+
.await
956+
.unwrap();
957+
958+
let task = core.poll_workflow_activation().await.unwrap();
959+
assert_matches!(
960+
task.jobs.as_slice(),
961+
[WorkflowActivationJob {
962+
variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)),
963+
}] => q.query_type == "2"
964+
);
965+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
966+
task.run_id,
967+
query_ok(LEGACY_QUERY_ID.to_string(), "hi"),
968+
))
969+
.await
970+
.unwrap();
971+
972+
let task = core.poll_workflow_activation().await.unwrap();
973+
assert_matches!(
974+
task.jobs.as_slice(),
975+
[WorkflowActivationJob {
976+
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
977+
}]
978+
);
979+
core.complete_execution(&task.run_id).await;
980+
981+
core.shutdown().await;
982+
}

core/src/test_help/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use mockall::TimesRange;
2020
use parking_lot::RwLock;
2121
use std::{
2222
collections::{BTreeMap, HashMap, HashSet, VecDeque},
23+
fmt::Debug,
2324
ops::{Deref, DerefMut},
2425
pin::Pin,
2526
sync::{
@@ -722,6 +723,14 @@ impl<T> DerefMut for QueueResponse<T> {
722723
&mut self.resp
723724
}
724725
}
726+
impl<T> Debug for QueueResponse<T>
727+
where
728+
T: Debug,
729+
{
730+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
731+
self.resp.fmt(f)
732+
}
733+
}
725734

726735
pub fn hist_to_poll_resp(
727736
t: &TestHistoryBuilder,

core/src/worker/workflow/mod.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use std::{
5050
collections::VecDeque,
5151
fmt::Debug,
5252
future::Future,
53+
mem,
5354
mem::discriminant,
5455
ops::DerefMut,
5556
rc::Rc,
@@ -1023,17 +1024,31 @@ struct BufferedTasks {
10231024
/// supersede any old one.
10241025
wft: Option<PermittedWFT>,
10251026
/// For query only tasks, multiple may be received concurrently and it's OK to buffer more
1026-
/// than one - however they should be dropped if, by the time we try to process them, we
1027-
/// have already processed a newer real WFT than the one the query was targeting (otherwise
1028-
/// we'd return data from the "future").
1027+
/// than one - however they must all be handled before applying the next "real" wft (after the
1028+
/// current one has been processed).
10291029
query_only_tasks: VecDeque<PermittedWFT>,
1030+
/// These are query-only tasks for the *buffered* wft, if any. They will all be discarded if
1031+
/// a buffered wft is replaced before being handled. They move to `query_only_tasks` once the
1032+
/// buffered task is taken.
1033+
query_only_tasks_for_buffered: VecDeque<PermittedWFT>,
10301034
}
10311035

10321036
impl BufferedTasks {
1037+
/// Buffers a new task. If it is a query-only task, multiple such tasks may be buffered which
1038+
/// all will be handled at the end of the current WFT. If a new WFT which would advance history
1039+
/// is provided, it will be buffered - but if another such task comes in while there is already
1040+
/// one buffered, the old one will be overriden, and all queries will be invalidated.
10331041
fn buffer(&mut self, task: PermittedWFT) {
10341042
if task.work.is_query_only() {
1035-
self.query_only_tasks.push_back(task);
1043+
if self.wft.is_none() {
1044+
self.query_only_tasks.push_back(task);
1045+
} else {
1046+
self.query_only_tasks_for_buffered.push_back(task);
1047+
}
10361048
} else {
1049+
if self.wft.is_some() {
1050+
self.query_only_tasks_for_buffered.clear();
1051+
}
10371052
let _ = self.wft.insert(task);
10381053
}
10391054
}
@@ -1042,12 +1057,18 @@ impl BufferedTasks {
10421057
self.wft.is_some() || !self.query_only_tasks.is_empty()
10431058
}
10441059

1045-
/// Remove and return the next WFT from the buffer that should be applied. WFTs which would
1046-
/// advance workflow state are returned before query-only tasks.
1060+
/// Remove and return the next WFT from the buffer that should be applied. Queries are returned
1061+
/// first for the current workflow task, if there are any. If not, the next WFT that would
1062+
/// advance history is returned.
10471063
fn get_next_wft(&mut self) -> Option<PermittedWFT> {
1048-
self.wft
1049-
.take()
1050-
.or_else(|| self.query_only_tasks.pop_front())
1064+
if let Some(q) = self.query_only_tasks.pop_front() {
1065+
return Some(q);
1066+
}
1067+
if let Some(t) = self.wft.take() {
1068+
self.query_only_tasks = mem::take(&mut self.query_only_tasks_for_buffered);
1069+
return Some(t);
1070+
}
1071+
None
10511072
}
10521073
}
10531074

sdk-core-protos/src/history_info.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::temporal::api::{
66
workflowservice::v1::{GetWorkflowExecutionHistoryResponse, PollWorkflowTaskQueueResponse},
77
};
88
use anyhow::{anyhow, bail};
9-
use rand::{thread_rng, Rng};
9+
use rand::random;
1010

1111
/// Contains information about a validated history. Used for replay and other testing.
1212
#[derive(Clone, Debug, PartialEq)]
@@ -154,7 +154,7 @@ impl HistoryInfo {
154154
/// randomly generated task token. Caller should attach a meaningful `workflow_execution` if
155155
/// needed.
156156
pub fn as_poll_wft_response(&self) -> PollWorkflowTaskQueueResponse {
157-
let task_token: [u8; 16] = thread_rng().gen();
157+
let task_token: [u8; 16] = random();
158158
PollWorkflowTaskQueueResponse {
159159
history: Some(History {
160160
events: self.events.clone(),

tests/integ_tests/queries_tests.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,8 @@ async fn multiple_concurrent_queries_no_new_history() {
384384
}
385385

386386
#[tokio::test]
387-
async fn query_superseded_by_newer_wft_is_discarded() {
388-
let mut starter = init_core_and_create_wf("query_superseded_by_newer_wft_is_discarded").await;
387+
async fn queries_handled_before_next_wft() {
388+
let mut starter = init_core_and_create_wf("queries_handled_before_next_wft").await;
389389
let core = starter.get_worker().await;
390390
let workflow_id = starter.get_task_queue().to_string();
391391
let task = core.poll_workflow_activation().await.unwrap();
@@ -447,16 +447,7 @@ async fn query_superseded_by_newer_wft_is_discarded() {
447447
))
448448
.await
449449
.unwrap();
450-
// We should get the signal activation since the in-buffer query should've been failed
451-
let task = core.poll_workflow_activation().await.unwrap();
452-
assert_matches!(
453-
task.jobs.as_slice(),
454-
[WorkflowActivationJob {
455-
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
456-
}]
457-
);
458-
core.complete_execution(&task.run_id).await;
459-
// Query will get retried by server since we fail the task w/ the stale query
450+
// We now get the second query
460451
let task = core.poll_workflow_activation().await.unwrap();
461452
let query = assert_matches!(
462453
task.jobs.as_slice(),
@@ -479,6 +470,15 @@ async fn query_superseded_by_newer_wft_is_discarded() {
479470
))
480471
.await
481472
.unwrap();
473+
// Then the signal afterward
474+
let task = core.poll_workflow_activation().await.unwrap();
475+
assert_matches!(
476+
task.jobs.as_slice(),
477+
[WorkflowActivationJob {
478+
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
479+
}]
480+
);
481+
core.complete_execution(&task.run_id).await;
482482
};
483483
join!(join_all(query_futs), complete_fut);
484484
drain_pollers_and_shutdown(&core).await;

0 commit comments

Comments
 (0)