Skip to content

Commit d18811d

Browse files
feat(status): fully compute status for status updates
Signed-off-by: Brooks Townsend <brooks@cosmonic.com> de bugh Signed-off-by: Brooks Townsend <brooks@cosmonic.com> what if not efficient Signed-off-by: Brooks Townsend <brooks@cosmonic.com> manifest unpublished Signed-off-by: Brooks Townsend <brooks@cosmonic.com> ci: update checkout to v3 Signed-off-by: Brooks Townsend <brooks@cosmonic.com> update only on full reconcile ez Signed-off-by: Brooks Townsend <brooks@cosmonic.com> cheating Signed-off-by: Brooks Townsend <brooks@cosmonic.com> cleanup Signed-off-by: Brooks Townsend <brooks@cosmonic.com> moar cleanup Signed-off-by: Brooks Townsend <brooks@cosmonic.com> more cleanup Signed-off-by: Brooks Townsend <brooks@cosmonic.com> status cheat Signed-off-by: Brooks Townsend <brooks@cosmonic.com> correct error messages Signed-off-by: Brooks Townsend <brooks@cosmonic.com> update on hint too Signed-off-by: Brooks Townsend <brooks@cosmonic.com> wait Signed-off-by: Brooks Townsend <brooks@cosmonic.com> refactor Signed-off-by: Brooks Townsend <brooks@cosmonic.com> no cheating all cleanup Signed-off-by: Brooks Townsend <brooks@cosmonic.com> cleanup Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
1 parent 1fa321a commit d18811d

File tree

11 files changed

+110
-120
lines changed

11 files changed

+110
-120
lines changed

.github/workflows/e2e.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
e2e_test: [e2e_multiple_hosts, e2e_multitenant, e2e_upgrades]
1616

1717
steps:
18-
- uses: actions/checkout@v2
18+
- uses: actions/checkout@v3
1919

2020
- name: Install latest Rust stable toolchain
2121
uses: actions-rs/toolchain@v1

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
nats_version: [2.9.15]
1616

1717
steps:
18-
- uses: actions/checkout@v2
18+
- uses: actions/checkout@v3
1919

2020
- name: Install latest Rust stable toolchain
2121
uses: actions-rs/toolchain@v1

src/scaler/spreadscaler/link.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ where
6262
}
6363

6464
async fn status(&self) -> StatusInfo {
65+
let _ = self.reconcile().await;
6566
self.status.read().await.to_owned()
6667
}
6768

src/scaler/spreadscaler/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorSpreadScaler<S> {
5858
}
5959

6060
async fn status(&self) -> StatusInfo {
61+
let _ = self.reconcile().await;
6162
self.status.read().await.to_owned()
6263
}
6364

src/scaler/spreadscaler/provider.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ProviderSpreadScaler<S> {
6161
}
6262

6363
async fn status(&self) -> StatusInfo {
64+
let _ = self.reconcile().await;
6465
self.status.read().await.to_owned()
6566
}
6667

src/server/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ pub struct TraitStatus {
187187
}
188188

189189
/// Common high-level status information
190-
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
190+
#[derive(Debug, Serialize, Deserialize, Default, Clone, Eq, PartialEq)]
191191
pub struct StatusInfo {
192192
#[serde(rename = "type")]
193193
pub status_type: StatusType,

src/workers/event.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -742,14 +742,13 @@ where
742742
)
743743
.await;
744744

745-
trace!(?commands, "Publishing commands");
746-
747745
let status = if commands.is_empty() {
748746
scaler_status(&scalers).await
749747
} else {
750748
StatusInfo::compensating("Model deployed, running initial compensating commands")
751749
};
752750

751+
trace!(?status, "Setting status");
753752
if let Err(e) = self
754753
.status_publisher
755754
.publish_status(data.manifest.metadata.name.as_ref(), status)
@@ -759,6 +758,7 @@ where
759758
};
760759

761760
// Now handle the result from reconciliation
761+
trace!(?commands, "Publishing commands");
762762
self.command_publisher.publish_commands(commands).await?;
763763

764764
res
@@ -779,20 +779,20 @@ where
779779
)
780780
.await;
781781

782-
trace!(?commands, "Publishing commands");
783-
784782
let status = if commands.is_empty() {
785783
scaler_status(&scalers).await
786784
} else {
787785
StatusInfo::compensating(&format!(
788-
"Event modified scaler {} state, running compensating commands.",
789-
name.to_owned()
786+
"Event modified scaler \"{}\" state, running compensating commands",
787+
name.to_owned(),
790788
))
791789
};
790+
trace!(?status, "Setting status");
792791
if let Err(e) = self.status_publisher.publish_status(name, status).await {
793792
warn!(error = ?e, "Failed to set status for scaler");
794793
};
795794

795+
trace!(?commands, "Publishing commands");
796796
self.command_publisher.publish_commands(commands).await?;
797797

798798
res
@@ -803,25 +803,27 @@ where
803803
let scalers = self.scalers.get_all_scalers().await;
804804

805805
let futs = scalers.iter().map(|(name, scalers)| async {
806-
let (cmds, res) = get_commands_and_result(
806+
let (commands, res) = get_commands_and_result(
807807
scalers.iter().map(|scaler| scaler.handle_event(event)),
808808
"Errors occurred while handling event with all scalers",
809809
)
810810
.await;
811811

812-
let status = if cmds.is_empty() {
812+
let status = if commands.is_empty() {
813813
scaler_status(scalers).await
814814
} else {
815815
StatusInfo::compensating(&format!(
816-
"Event modified scaler {} state, running compensating commands.",
817-
name.to_owned()
816+
"Event modified scaler \"{}\" state, running compensating commands",
817+
name.to_owned(),
818818
))
819819
};
820+
821+
trace!(?status, "Setting status");
820822
if let Err(e) = self.status_publisher.publish_status(name, status).await {
821823
warn!(error = ?e, "Failed to set status for scaler");
822824
};
823825

824-
(cmds, res)
826+
(commands, res)
825827
});
826828

827829
// Resolve futures, computing commands for scalers, publishing statuses, and combining any errors
@@ -944,9 +946,12 @@ where
944946
{
945947
warn!(error = ?e, "Failed to set status to undeployed");
946948
}
947-
Ok(None)
949+
return message.ack().await.map_err(WorkError::from);
950+
}
951+
Some(Err(e)) => {
952+
message.nack().await;
953+
return Err(WorkError::Other(e.into()));
948954
}
949-
Some(Err(e)) => Err(e),
950955
None => Ok(None),
951956
}
952957
}

tests/e2e.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use tokio::{
1919
use wadm::{
2020
model::Manifest,
2121
server::{
22-
DeployModelRequest, DeployModelResponse, PutModelResponse, StatusInfo, UndeployModelRequest,
22+
DeployModelRequest, DeployModelResponse, PutModelResponse, StatusInfo, StatusType,
23+
UndeployModelRequest,
2324
},
2425
APP_SPEC_ANNOTATION, MANAGED_BY_ANNOTATION, MANAGED_BY_IDENTIFIER,
2526
};
@@ -462,6 +463,30 @@ pub enum ExpectedCount {
462463
Exactly(usize),
463464
}
464465

466+
pub async fn check_status(
467+
stream: &Stream,
468+
lattice_id: &str,
469+
manifest_name: &str,
470+
expected_status: StatusType,
471+
) -> anyhow::Result<()> {
472+
for i in 0..5 {
473+
let status = get_manifest_status(stream, lattice_id, manifest_name).await;
474+
match status.as_ref() {
475+
Some(status) if status.status_type == expected_status => break,
476+
_ if i < 4 => tokio::time::sleep(std::time::Duration::from_secs(1)).await,
477+
Some(status) => {
478+
anyhow::bail!(
479+
"Expected {manifest_name} to have status {expected_status:?}, found {status:?}"
480+
)
481+
}
482+
None => anyhow::bail!(
483+
"Expected {manifest_name} to have status {expected_status:?}, found no status"
484+
),
485+
}
486+
}
487+
Ok(())
488+
}
489+
465490
pub fn check_providers(
466491
inventory: &HashMap<String, HostInventory>,
467492
image_ref: &str,

tests/e2e_multiple_hosts.rs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ mod helpers;
1111
use e2e::{assert_status, check_actors, check_providers, ClientInfo, ExpectedCount};
1212
use helpers::{ECHO_ACTOR_ID, HTTP_SERVER_PROVIDER_ID};
1313

14-
use crate::e2e::get_manifest_status;
14+
use crate::e2e::check_status;
1515

1616
const MANIFESTS_PATH: &str = "test/data";
1717
const DOCKER_COMPOSE_FILE: &str = "test/docker-compose-e2e.yaml";
@@ -40,6 +40,7 @@ async fn run_multiple_host_tests() {
4040
for _ in 0..10 {
4141
match client_info.ctl_client("default").get_hosts().await {
4242
Ok(hosts) if hosts.len() == 5 => {
43+
eprintln!("Hosts {}/5 currently available", hosts.len());
4344
did_start = true;
4445
break;
4546
}
@@ -110,14 +111,9 @@ async fn test_no_requirements(client_info: &ClientInfo) {
110111
);
111112

112113
// Once manifest is deployed, first status should be compensating
113-
for _ in 0..5 {
114-
if let Some(status) = get_manifest_status(&stream, "default", "echo-simple").await {
115-
assert_eq!(status.status_type, StatusType::Compensating);
116-
break;
117-
} else {
118-
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
119-
}
120-
}
114+
check_status(&stream, "default", "echo-simple", StatusType::Compensating)
115+
.await
116+
.unwrap();
121117

122118
// NOTE: This runs for a while, but it's because we're waiting for the provider to download,
123119
// which can take a bit
@@ -152,12 +148,9 @@ async fn test_no_requirements(client_info: &ClientInfo) {
152148
)
153149
}
154150

155-
// SAFETY: we already know some status existed when we checked for compensating. If there's no status now, it means
156-
// we borked our stream and this _should_ fail
157-
let status = get_manifest_status(&stream, "default", "echo-simple")
151+
check_status(&stream, "default", "echo-simple", StatusType::Ready)
158152
.await
159153
.unwrap();
160-
assert_eq!(status.status_type, StatusType::Ready);
161154

162155
Ok(())
163156
})
@@ -175,14 +168,9 @@ async fn test_no_requirements(client_info: &ClientInfo) {
175168
);
176169

177170
// Once manifest is undeployed, status should be undeployed
178-
for _ in 0..5 {
179-
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
180-
if let Some(status) = get_manifest_status(&stream, "default", "echo-simple").await {
181-
assert_eq!(status.status_type, StatusType::Undeployed);
182-
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
183-
break;
184-
}
185-
}
171+
check_status(&stream, "default", "echo-simple", StatusType::Undeployed)
172+
.await
173+
.unwrap();
186174

187175
// assert that no actors or providers with annotations exist
188176
assert_status(None, None, || async {
@@ -200,12 +188,10 @@ async fn test_no_requirements(client_info: &ClientInfo) {
200188
ExpectedCount::Exactly(0),
201189
)?;
202190

203-
let status = get_manifest_status(&stream, "default", "echo-simple")
191+
check_status(&stream, "default", "echo-simple", StatusType::Undeployed)
204192
.await
205193
.unwrap();
206194

207-
assert_eq!(status.status_type, StatusType::Undeployed);
208-
209195
Ok(())
210196
})
211197
.await;

0 commit comments

Comments
 (0)