Skip to content

Commit 55e80a5

Browse files
committed
add release upgrade functionality
1 parent 480443d commit 55e80a5

File tree

3 files changed

+199
-0
lines changed

3 files changed

+199
-0
lines changed

rust/stackable-cockpit/src/platform/release/spec.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
operator::{self, ChartSourceType, OperatorSpec},
1414
product,
1515
},
16+
utils::k8s::{self, Client},
1617
};
1718

1819
type Result<T, E = Error> = std::result::Result<T, E>;
@@ -30,6 +31,15 @@ pub enum Error {
3031

3132
#[snafu(display("failed to launch background task"))]
3233
BackgroundTask { source: JoinError },
34+
35+
#[snafu(display("failed to deploy manifests using the kube client"))]
36+
DeployManifest { source: k8s::Error },
37+
38+
#[snafu(display("failed to access the CRDs from source"))]
39+
AccessCRDs { source: reqwest::Error },
40+
41+
#[snafu(display("failed to read CRD manifests"))]
42+
ReadManifests { source: reqwest::Error },
3343
}
3444

3545
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -108,6 +118,72 @@ impl ReleaseSpec {
108118
.await
109119
}
110120

121+
/// Upgrades a release by upgrading individual operators.
122+
#[instrument(skip_all, fields(
123+
%namespace,
124+
))]
125+
pub async fn upgrade_crds(
126+
&self,
127+
include_products: &[String],
128+
exclude_products: &[String],
129+
namespace: &str,
130+
k8s_client: &Client,
131+
) -> Result<()> {
132+
debug!("Upgrading CRDs for release");
133+
134+
include_products.iter().for_each(|product| {
135+
Span::current().record("product.included", product);
136+
});
137+
exclude_products.iter().for_each(|product| {
138+
Span::current().record("product.excluded", product);
139+
});
140+
141+
let client = reqwest::Client::new();
142+
143+
let operators = self.filter_products(include_products, exclude_products);
144+
145+
Span::current().pb_set_style(
146+
&ProgressStyle::with_template("Upgrading CRDs {wide_bar} {pos}/{len}").unwrap(),
147+
);
148+
Span::current().pb_set_length(operators.len() as u64);
149+
150+
for (product_name, product) in operators {
151+
Span::current().record("product_name", &product_name);
152+
debug!("Upgrading CRDs for {product_name}-operator");
153+
154+
let release = match product.version.pre.as_str() {
155+
"dev" => "main".to_string(),
156+
_ => {
157+
format!("{}", product.version)
158+
}
159+
};
160+
161+
let request_url = format!(
162+
"https://raw.githubusercontent.com/stackabletech/{product_name}-operator/{release}/deploy/helm/{product_name}-operator/crds/crds.yaml"
163+
);
164+
165+
// Get CRD manifests
166+
// TODO bei nicht 200 Status, Fehler werfen
167+
let response = client
168+
.get(request_url)
169+
.send()
170+
.await
171+
.context(AccessCRDsSnafu)?;
172+
let crd_manifests = response.text().await.context(ReadManifestsSnafu)?;
173+
174+
// Upgrade CRDs
175+
k8s_client
176+
.replace_crds(&crd_manifests)
177+
.await
178+
.context(DeployManifestSnafu)?;
179+
180+
debug!("Upgraded {product_name}-operator CRDs");
181+
Span::current().pb_inc(1);
182+
}
183+
184+
Ok(())
185+
}
186+
111187
#[instrument(skip_all)]
112188
pub fn uninstall(&self, namespace: &str) -> Result<()> {
113189
info!("Uninstalling release");

rust/stackable-cockpit/src/utils/k8s/client.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ pub enum Error {
6565
#[snafu(display("failed to retrieve cluster information"))]
6666
ClusterInformation { source: cluster::Error },
6767

68+
#[snafu(display("failed to retrieve previous resource version information"))]
69+
ResourceVersion { source: kube::error::Error },
70+
6871
#[snafu(display("invalid or empty secret data in '{secret_name}'"))]
6972
InvalidSecretData { secret_name: String },
7073

@@ -148,6 +151,46 @@ impl Client {
148151
Ok(())
149152
}
150153

154+
pub async fn replace_crds(&self, crds: &str) -> Result<()> {
155+
for crd in serde_yaml::Deserializer::from_str(crds) {
156+
let mut object = DynamicObject::deserialize(crd).context(DeserializeYamlSnafu)?;
157+
158+
let object_type = object.types.as_ref().ok_or(
159+
ObjectTypeSnafu {
160+
object: object.clone(),
161+
}
162+
.build(),
163+
)?;
164+
165+
let gvk = Self::gvk_of_typemeta(object_type);
166+
let (resource, _) = self
167+
.resolve_gvk(&gvk)
168+
.await?
169+
.context(GVKUnkownSnafu { gvk })?;
170+
171+
// CRDs are cluster scoped
172+
let api: Api<DynamicObject> = Api::all_with(self.client.clone(), &resource);
173+
174+
if let Some(resource) = api
175+
.get_opt(&object.name_any())
176+
.await
177+
.context(KubeClientFetchSnafu)?
178+
{
179+
object.metadata.resource_version = resource.resource_version();
180+
api.replace(&object.name_any(), &PostParams::default(), &object)
181+
.await
182+
.context(KubeClientPatchSnafu)?;
183+
} else {
184+
// Create CRD if a previous version wasn't found
185+
api.create(&PostParams::default(), &object)
186+
.await
187+
.context(KubeClientCreateSnafu)?;
188+
}
189+
}
190+
191+
Ok(())
192+
}
193+
151194
/// Lists objects by looking up a GVK via the discovery. It returns an
152195
/// optional list of dynamic objects. The method returns [`Ok(None)`]
153196
/// if the client was unable to resolve the GVK. An error is returned

rust/stackablectl/src/cmds/release.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ pub enum ReleaseCommands {
4444
/// Uninstall a release
4545
#[command(aliases(["rm", "un"]))]
4646
Uninstall(ReleaseUninstallArgs),
47+
48+
// Upgrade a release
49+
Upgrade(ReleaseUpgradeArgs),
4750
}
4851

4952
#[derive(Debug, Args)]
@@ -83,6 +86,25 @@ pub struct ReleaseInstallArgs {
8386
local_cluster: CommonClusterArgs,
8487
}
8588

89+
#[derive(Debug, Args)]
90+
pub struct ReleaseUpgradeArgs {
91+
/// Release to upgrade to
92+
#[arg(name = "RELEASE")]
93+
release: String,
94+
95+
/// List of product operators to upgrade
96+
#[arg(short, long = "include", group = "products")]
97+
included_products: Vec<String>,
98+
99+
/// Blacklist of product operators to install
100+
#[arg(short, long = "exclude", group = "products")]
101+
excluded_products: Vec<String>,
102+
103+
/// Namespace in the cluster used to deploy the operators
104+
#[arg(long, default_value = DEFAULT_OPERATOR_NAMESPACE, visible_aliases(["operator-ns"]))]
105+
pub operator_namespace: String,
106+
}
107+
86108
#[derive(Debug, Args)]
87109
pub struct ReleaseUninstallArgs {
88110
/// Name of the release to uninstall
@@ -111,6 +133,9 @@ pub enum CmdError {
111133
#[snafu(display("failed to install release"))]
112134
ReleaseInstall { source: release::Error },
113135

136+
#[snafu(display("failed to upgrade CRDs for release"))]
137+
CrdUpgrade { source: release::Error },
138+
114139
#[snafu(display("failed to uninstall release"))]
115140
ReleaseUninstall { source: release::Error },
116141

@@ -146,6 +171,7 @@ impl ReleaseArgs {
146171
ReleaseCommands::Describe(args) => describe_cmd(args, cli, release_list).await,
147172
ReleaseCommands::Install(args) => install_cmd(args, cli, release_list).await,
148173
ReleaseCommands::Uninstall(args) => uninstall_cmd(args, cli, release_list).await,
174+
ReleaseCommands::Upgrade(args) => upgrade_cmd(args, cli, release_list).await,
149175
}
150176
}
151177
}
@@ -314,6 +340,60 @@ async fn install_cmd(
314340
}
315341
}
316342

343+
#[instrument(skip(cli, release_list))]
344+
async fn upgrade_cmd(
345+
args: &ReleaseUpgradeArgs,
346+
cli: &Cli,
347+
release_list: release::ReleaseList,
348+
) -> Result<String, CmdError> {
349+
debug!(release = %args.release, "Upgrading release");
350+
Span::current().pb_set_style(&ProgressStyle::with_template("").unwrap());
351+
352+
match release_list.get(&args.release) {
353+
Some(release) => {
354+
let mut output = cli.result();
355+
let client = Client::new().await.context(KubeClientCreateSnafu)?;
356+
357+
// Uninstall the old operator release first
358+
release
359+
.uninstall(&args.operator_namespace)
360+
.context(ReleaseUninstallSnafu)?;
361+
362+
// Upgrade the CRDs for all the operators to be upgraded
363+
release
364+
.upgrade_crds(
365+
&args.included_products,
366+
&args.excluded_products,
367+
&args.operator_namespace,
368+
&client,
369+
)
370+
.await
371+
.context(CrdUpgradeSnafu)?;
372+
373+
// Install the new operator release
374+
release
375+
.install(
376+
&args.included_products,
377+
&args.excluded_products,
378+
&args.operator_namespace,
379+
&ChartSourceType::from(cli.chart_type()),
380+
)
381+
.await
382+
.context(ReleaseInstallSnafu)?;
383+
384+
output
385+
.with_command_hint(
386+
"stackablectl operator installed",
387+
"list installed operators",
388+
)
389+
.with_output(format!("Upgraded to release '{}'", args.release));
390+
391+
Ok(output.render())
392+
}
393+
None => Ok("No such release".into()),
394+
}
395+
}
396+
317397
#[instrument(skip(cli, release_list))]
318398
async fn uninstall_cmd(
319399
args: &ReleaseUninstallArgs,

0 commit comments

Comments
 (0)