Skip to content

Commit 7e3c23f

Browse files
authored
Handle queries-in-same-wft case properly (#666)
1 parent c5b6445 commit 7e3c23f

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

core/src/core_tests/queries.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
test_help::{
33
build_mock_pollers, canned_histories, hist_to_poll_resp, mock_worker, single_hist_mock_sg,
4-
MockPollCfg, ResponseType,
4+
MockPollCfg, ResponseType, WorkerExt,
55
},
66
worker::{client::mocks::mock_workflow_client, LEGACY_QUERY_ID},
77
};
@@ -850,3 +850,44 @@ async fn legacy_query_combined_with_timer_fire_repro() {
850850
.unwrap();
851851
core.shutdown().await;
852852
}
853+
854+
#[tokio::test]
855+
async fn build_id_set_properly_on_query_on_first_task() {
856+
let wfid = "fake_wf_id";
857+
let mut t = TestHistoryBuilder::default();
858+
t.add_by_type(EventType::WorkflowExecutionStarted);
859+
t.add_workflow_task_scheduled_and_started();
860+
let tasks = VecDeque::from(vec![{
861+
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::AllHistory);
862+
pr.queries.insert(
863+
"q".to_string(),
864+
WorkflowQuery {
865+
query_type: "query-type".to_string(),
866+
query_args: Some(b"hi".into()),
867+
header: None,
868+
},
869+
);
870+
pr
871+
}]);
872+
let mut mock_client = mock_workflow_client();
873+
mock_client.expect_respond_legacy_query().times(0);
874+
let mh = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client());
875+
let mut mock = build_mock_pollers(mh);
876+
mock.worker_cfg(|wc| {
877+
wc.max_cached_workflows = 10;
878+
wc.worker_build_id = "1.0".to_string();
879+
});
880+
let core = mock_worker(mock);
881+
882+
let task = core.poll_workflow_activation().await.unwrap();
883+
assert_eq!(task.build_id_for_current_task, "1.0");
884+
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
885+
.await
886+
.unwrap();
887+
let task = core.poll_workflow_activation().await.unwrap();
888+
assert_eq!(task.build_id_for_current_task, "1.0");
889+
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
890+
.await
891+
.unwrap();
892+
core.drain_pollers_and_shutdown().await;
893+
}

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ impl WorkflowMachines {
422422
let build_id_for_current_task = if is_replaying {
423423
self.current_wft_build_id.clone().unwrap_or_default()
424424
} else {
425+
self.current_wft_build_id = Some(self.worker_config.worker_build_id.clone());
425426
self.worker_config.worker_build_id.clone()
426427
};
427428
WorkflowActivation {
@@ -560,9 +561,6 @@ impl WorkflowMachines {
560561
// Save this tasks' Build ID if it had one
561562
if let Some(bid) = next_complete.worker_version.as_ref().map(|wv| &wv.build_id) {
562563
self.current_wft_build_id = Some(bid.to_string());
563-
} else {
564-
// Otherwise we do not want to keep anything previously stored
565-
self.current_wft_build_id = None;
566564
}
567565
}
568566

0 commit comments

Comments
 (0)