|
1 | 1 | use std::{
|
2 |
| - borrow::Cow, |
3 | 2 | collections::{BTreeMap, HashMap},
|
4 | 3 | num::TryFromIntError,
|
5 | 4 | };
|
6 | 5 |
|
7 |
| -use futures::future::try_join_all; |
8 | 6 | use product_config::types::PropertyNameKind;
|
9 | 7 | use security::AuthenticationConfig;
|
10 | 8 | use serde::{Deserialize, Serialize};
|
@@ -37,7 +35,6 @@ use stackable_operator::{
|
37 | 35 | schemars::{self, JsonSchema},
|
38 | 36 | status::condition::{ClusterCondition, HasStatusCondition},
|
39 | 37 | time::Duration,
|
40 |
| - utils::cluster_info::KubernetesClusterInfo, |
41 | 38 | versioned::versioned,
|
42 | 39 | };
|
43 | 40 | use strum::{Display, EnumIter, EnumString};
|
@@ -591,169 +588,6 @@ impl v1alpha1::HbaseCluster {
|
591 | 588 | role_group: group_name.into(),
|
592 | 589 | }
|
593 | 590 | }
|
594 |
| - |
595 |
| - /// Returns rolegroup and replica information for a specific role. |
596 |
| - /// We can't pass through the merged config for a particular role-group |
597 |
| - /// here as we need more than the config. As this will be called by role, |
598 |
| - /// the merged listener-class is called so that only role-group information |
599 |
| - /// for externally-reachable services (based on their listener class) are |
600 |
| - /// included in the collection. |
601 |
| - pub fn rolegroup_ref_and_replicas( |
602 |
| - &self, |
603 |
| - role: &HbaseRole, |
604 |
| - ) -> Vec<(RoleGroupRef<v1alpha1::HbaseCluster>, u16)> { |
605 |
| - match role { |
606 |
| - HbaseRole::Master => self |
607 |
| - .spec |
608 |
| - .masters |
609 |
| - .iter() |
610 |
| - .flat_map(|role| &role.role_groups) |
611 |
| - // Order rolegroups consistently, to avoid spurious downstream rewrites |
612 |
| - .collect::<BTreeMap<_, _>>() |
613 |
| - .into_iter() |
614 |
| - .map(|(rolegroup_name, role_group)| { |
615 |
| - ( |
616 |
| - self.rolegroup_ref(HbaseRole::Master.to_string(), rolegroup_name), |
617 |
| - role_group.replicas.unwrap_or_default(), |
618 |
| - ) |
619 |
| - }) |
620 |
| - .collect(), |
621 |
| - HbaseRole::RegionServer => self |
622 |
| - .spec |
623 |
| - .region_servers |
624 |
| - .iter() |
625 |
| - .flat_map(|role| &role.role_groups) |
626 |
| - // Order rolegroups consistently, to avoid spurious downstream rewrites |
627 |
| - .collect::<BTreeMap<_, _>>() |
628 |
| - .into_iter() |
629 |
| - .map(|(rolegroup_name, role_group)| { |
630 |
| - ( |
631 |
| - self.rolegroup_ref(HbaseRole::RegionServer.to_string(), rolegroup_name), |
632 |
| - role_group.replicas.unwrap_or_default(), |
633 |
| - ) |
634 |
| - }) |
635 |
| - .collect(), |
636 |
| - HbaseRole::RestServer => self |
637 |
| - .spec |
638 |
| - .rest_servers |
639 |
| - .iter() |
640 |
| - .flat_map(|role| &role.role_groups) |
641 |
| - // Order rolegroups consistently, to avoid spurious downstream rewrites |
642 |
| - .collect::<BTreeMap<_, _>>() |
643 |
| - .into_iter() |
644 |
| - .map(|(rolegroup_name, role_group)| { |
645 |
| - ( |
646 |
| - self.rolegroup_ref(HbaseRole::RestServer.to_string(), rolegroup_name), |
647 |
| - role_group.replicas.unwrap_or_default(), |
648 |
| - ) |
649 |
| - }) |
650 |
| - .collect(), |
651 |
| - } |
652 |
| - } |
653 |
| - |
654 |
| - pub fn pod_refs( |
655 |
| - &self, |
656 |
| - role: &HbaseRole, |
657 |
| - hbase_version: &str, |
658 |
| - ) -> Result<Vec<HbasePodRef>, Error> { |
659 |
| - let ns = self.metadata.namespace.clone().context(NoNamespaceSnafu)?; |
660 |
| - let rolegroup_ref_and_replicas = self.rolegroup_ref_and_replicas(role); |
661 |
| - |
662 |
| - Ok(rolegroup_ref_and_replicas |
663 |
| - .iter() |
664 |
| - .flat_map(|(rolegroup_ref, replicas)| { |
665 |
| - let ns = ns.clone(); |
666 |
| - (0..*replicas).map(move |i| HbasePodRef { |
667 |
| - namespace: ns.clone(), |
668 |
| - role_group_service_name: rolegroup_ref.object_name(), |
669 |
| - pod_name: format!("{}-{}", rolegroup_ref.object_name(), i), |
670 |
| - ports: self |
671 |
| - .ports(role, hbase_version) |
672 |
| - .iter() |
673 |
| - .map(|(n, p)| (n.clone(), *p)) |
674 |
| - .collect(), |
675 |
| - fqdn_override: None, |
676 |
| - }) |
677 |
| - }) |
678 |
| - .collect()) |
679 |
| - } |
680 |
| - |
681 |
| - pub async fn listener_refs( |
682 |
| - &self, |
683 |
| - client: &stackable_operator::client::Client, |
684 |
| - role: &HbaseRole, |
685 |
| - hbase_version: &str, |
686 |
| - ) -> Result<Vec<HbasePodRef>, Error> { |
687 |
| - let pod_refs = self.pod_refs(role, hbase_version)?; |
688 |
| - try_join_all(pod_refs.into_iter().map(|pod_ref| async { |
689 |
| - // N.B. use the naming convention for persistent listener volumes as we |
690 |
| - // have specified above that we only want externally-reachable endpoints. |
691 |
| - let listener_name = format!("{LISTENER_VOLUME_NAME}-{}", pod_ref.pod_name); |
692 |
| - let listener_ref = |
693 |
| - || ObjectRef::<Listener>::new(&listener_name).within(&pod_ref.namespace); |
694 |
| - let pod_obj_ref = |
695 |
| - || ObjectRef::<Pod>::new(&pod_ref.pod_name).within(&pod_ref.namespace); |
696 |
| - let listener = client |
697 |
| - .get::<Listener>(&listener_name, &pod_ref.namespace) |
698 |
| - .await |
699 |
| - .context(GetPodListenerSnafu { |
700 |
| - listener: listener_ref(), |
701 |
| - pod: pod_obj_ref(), |
702 |
| - })?; |
703 |
| - let listener_address = listener |
704 |
| - .status |
705 |
| - .and_then(|s| s.ingress_addresses?.into_iter().next()) |
706 |
| - .context(PodListenerHasNoAddressSnafu { |
707 |
| - listener: listener_ref(), |
708 |
| - pod: pod_obj_ref(), |
709 |
| - })?; |
710 |
| - Ok(HbasePodRef { |
711 |
| - fqdn_override: Some(listener_address.address), |
712 |
| - ports: listener_address |
713 |
| - .ports |
714 |
| - .into_iter() |
715 |
| - .map(|(port_name, port)| { |
716 |
| - let port = u16::try_from(port).context(PortOutOfBoundsSnafu { |
717 |
| - port_name: &port_name, |
718 |
| - port, |
719 |
| - })?; |
720 |
| - Ok((port_name, port)) |
721 |
| - }) |
722 |
| - .collect::<Result<_, _>>()?, |
723 |
| - ..pod_ref |
724 |
| - }) |
725 |
| - })) |
726 |
| - .await |
727 |
| - } |
728 |
| -} |
729 |
| - |
730 |
| -/// Reference to a single `Pod` that is a component of a [`HbaseCluster`] |
731 |
| -/// |
732 |
| -/// Used for service discovery. |
733 |
| -#[derive(Debug)] |
734 |
| -pub struct HbasePodRef { |
735 |
| - pub namespace: String, |
736 |
| - pub role_group_service_name: String, |
737 |
| - pub pod_name: String, |
738 |
| - pub fqdn_override: Option<String>, |
739 |
| - pub ports: HashMap<String, u16>, |
740 |
| -} |
741 |
| - |
742 |
| -impl HbasePodRef { |
743 |
| - pub fn fqdn(&self, cluster_info: &KubernetesClusterInfo) -> Cow<str> { |
744 |
| - self.fqdn_override.as_deref().map_or_else( |
745 |
| - || { |
746 |
| - Cow::Owned(format!( |
747 |
| - "{pod_name}.{role_group_service_name}.{namespace}.svc.{cluster_domain}", |
748 |
| - pod_name = self.pod_name, |
749 |
| - role_group_service_name = self.role_group_service_name, |
750 |
| - namespace = self.namespace, |
751 |
| - cluster_domain = cluster_info.cluster_domain, |
752 |
| - )) |
753 |
| - }, |
754 |
| - Cow::Borrowed, |
755 |
| - ) |
756 |
| - } |
757 | 591 | }
|
758 | 592 |
|
759 | 593 | pub fn merged_env(rolegroup_config: Option<&BTreeMap<String, String>>) -> Vec<EnvVar> {
|
|
0 commit comments