Skip to content

implement KubernetesExecutor #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 54 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
9762dfa
wip: using template mounted as config map
adwk67 Jul 28, 2023
0172890
wip: build config map
adwk67 Aug 1, 2023
3ffbb2f
wip: initial working configmap
adwk67 Aug 1, 2023
94065da
wip: cleanup
adwk67 Aug 1, 2023
6f8b14e
wip: clippy allow
adwk67 Aug 1, 2023
4f91708
rebuild charts
adwk67 Aug 2, 2023
e0b9730
merged main
adwk67 Aug 2, 2023
4321e77
fixed merged conflict
adwk67 Aug 2, 2023
4623cbd
fixed merged conflict
adwk67 Aug 2, 2023
0cd4824
added provisional comment re. worker pod usage
adwk67 Aug 2, 2023
3e3a304
updated changelog
adwk67 Aug 2, 2023
0fa8dec
pin serde/serde_derive at 1.0.171
adwk67 Aug 2, 2023
d6b448b
fixed typo
adwk67 Aug 2, 2023
cfa37a2
reverted executor change
adwk67 Aug 3, 2023
14bec71
added namespace for k8s executors
adwk67 Aug 3, 2023
f89eca9
add alternative env-var for older versions
adwk67 Aug 3, 2023
27814b9
simplified config map
adwk67 Aug 4, 2023
21d71ff
wip: replace role with enum
adwk67 Aug 7, 2023
965b255
flatten celery executor config
adwk67 Aug 8, 2023
6d01971
wip: adapting tests
adwk67 Aug 8, 2023
e2105ee
wip: added templating
adwk67 Aug 8, 2023
b6e2efb
working smoke/celery test
adwk67 Aug 8, 2023
41966b8
working smoke/kubernetes test
adwk67 Aug 8, 2023
6ed22bf
linting
adwk67 Aug 8, 2023
1f4800f
fix: added pre-2.5 name of env-var
adwk67 Aug 9, 2023
ecfd1cb
added env overrides for config map test
adwk67 Aug 9, 2023
c34c244
added gitysnc elements to pod template and adapted tests
adwk67 Aug 10, 2023
a892133
adapted ldap tests
adwk67 Aug 10, 2023
a8a0e33
adapted cluster-operation test
adwk67 Aug 10, 2023
fdf4114
adapted orphaned resources test
adwk67 Aug 10, 2023
67b9281
adapted resources test
adwk67 Aug 10, 2023
0f56477
adapted logging test
adwk67 Aug 10, 2023
2181d7d
make redis conditional on executor for smoke tests
adwk67 Aug 10, 2023
a890f58
added logging components to config map
adwk67 Aug 10, 2023
c0b95a4
moved template code into a function, added some comments
adwk67 Aug 11, 2023
b14f5f0
fix merge conflicts
adwk67 Aug 11, 2023
a6e255c
linting
adwk67 Aug 11, 2023
a5d87ba
updated docs and examples
adwk67 Aug 14, 2023
b628846
move constants
adwk67 Aug 14, 2023
cc7f695
specified the resources for workers
adwk67 Aug 14, 2023
7ed321a
make executor non-optional, remove enum-discriminant usage
adwk67 Aug 15, 2023
70cd55a
regenerate charts
adwk67 Aug 15, 2023
ca331ab
added note on image name and shutdown hook
adwk67 Aug 15, 2023
0d2613e
added shell capture for statefulsets
adwk67 Aug 17, 2023
e08a07c
wip:crd refactoring
adwk67 Aug 17, 2023
23983db
fixed tests, rebuilt charts
adwk67 Aug 18, 2023
d3ab0e5
rebuilt charts, adapted integration tests
adwk67 Aug 18, 2023
d612a59
further integration test fixes
adwk67 Aug 18, 2023
aa88c1d
updated docs
adwk67 Aug 18, 2023
b35966a
corrected docs
adwk67 Aug 18, 2023
eaec70b
Update rust/operator-binary/src/airflow_controller.rs
adwk67 Aug 23, 2023
86d9493
Update docs/modules/airflow/pages/index.adoc
adwk67 Aug 23, 2023
8412573
Update docs/modules/airflow/pages/getting_started/first_steps.adoc
adwk67 Aug 23, 2023
c1f43d9
input from review
adwk67 Aug 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions deploy/helm/airflow-operator/templates/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ rules:
- serviceaccounts
verbs:
- get
- apiGroups:
- ""
resources:
- pods
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- events.k8s.io
resources:
Expand Down
42 changes: 37 additions & 5 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ pub struct AirflowClusterConfig {
pub dags_git_sync: Vec<GitSync>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub database_initialization: Option<airflowdb::AirflowDbConfigFragment>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub executor: Option<String>,
pub executor: AirflowExecutor,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expose_config: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -303,20 +302,53 @@ pub enum AirflowRole {
Worker,
}

#[derive(
Clone,
Debug,
Default,
Deserialize,
Display,
EnumIter,
Eq,
Hash,
JsonSchema,
PartialEq,
Serialize,
EnumString,
)]
pub enum AirflowExecutor {
#[strum(serialize = "CeleryExecutor")]
#[default]
CeleryExecutor,
#[strum(serialize = "KubernetesExecutor")]
KubernetesExecutor,
#[strum(serialize = "CeleryKubernetesExecutor")]
CeleryKubernetesExecutor,
}

impl AirflowRole {
/// Returns the start commands for the different airflow components. Airflow expects all
/// components to have the same image/configuration (e.g. DAG folder location), even if not all
/// configuration settings are used everywhere. For this reason we ensure that the webserver
/// config file is in the Airflow home directory on all pods.
pub fn get_commands(&self) -> Vec<String> {
pub fn get_commands(&self, executor: &AirflowExecutor) -> Vec<String> {
let copy_config = format!(
"cp -RL {CONFIG_PATH}/{AIRFLOW_CONFIG_FILENAME} \
{AIRFLOW_HOME}/{AIRFLOW_CONFIG_FILENAME}"
);
match &self {
AirflowRole::Webserver => vec![copy_config, "airflow webserver".to_string()],
AirflowRole::Scheduler => vec![copy_config, "airflow scheduler".to_string()],
AirflowRole::Worker => vec![copy_config, "airflow celery worker".to_string()],
AirflowRole::Worker => {
match executor {
// TODO-ke which command for CeleryKubernetes?
&AirflowExecutor::CeleryExecutor
| &AirflowExecutor::CeleryKubernetesExecutor => {
vec![copy_config, "airflow celery worker".to_string()]
}
&AirflowExecutor::KubernetesExecutor => vec![],
}
}
}
}

Expand Down Expand Up @@ -706,7 +738,7 @@ mod tests {
assert_eq!("2.6.1", &resolved_airflow_image.product_version);
assert_eq!(
"KubernetesExecutor",
cluster.spec.cluster_config.executor.unwrap_or_default()
cluster.spec.cluster_config.executor.to_string()
);
assert!(cluster.spec.cluster_config.load_examples.unwrap_or(false));
assert!(cluster.spec.cluster_config.expose_config.unwrap_or(false));
Expand Down
3 changes: 1 addition & 2 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ clap = "4.1"
fnv = "1.0"
futures = { version = "0.3", features = ["compat"] }
serde = "1.0"
serde_yaml = "0.8"
snafu = "0.7"
stackable-airflow-crd = { path = "../crd" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.44.0" }
Expand All @@ -26,5 +27,3 @@ built = { version = "0.6", features = ["chrono", "git2"] }
stackable-airflow-crd = { path = "../crd" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.44.0" }

[dev-dependencies]
serde_yaml = "0.8"
Loading