Skip to content

Commit 8313995

Browse files
committed
resolve listener-class across role-groups/role
1 parent 0030bfb commit 8313995

File tree

7 files changed

+198
-109
lines changed

7 files changed

+198
-109
lines changed

rust/operator-binary/src/crd/mod.rs

Lines changed: 133 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,12 @@ impl v1alpha1::HbaseCluster {
583583
}
584584
}
585585

586+
/// Returns rolegroup and replica information for a specific role.
587+
/// We can't pass through the merged config for a particular role-group
588+
/// here as we need more than the config. As this will be called by role,
589+
/// the merged listener-class is called so that only role-group information
590+
/// for externally-reachable services (based on their listener class) are
591+
/// included in the collection.
586592
pub fn rolegroup_ref_and_replicas(
587593
&self,
588594
role: &HbaseRole,
@@ -596,6 +602,9 @@ impl v1alpha1::HbaseCluster {
596602
// Order rolegroups consistently, to avoid spurious downstream rewrites
597603
.collect::<BTreeMap<_, _>>()
598604
.into_iter()
605+
.filter(|(rolegroup_name, _)| {
606+
self.resolved_listener_class_discoverable(role, rolegroup_name)
607+
})
599608
.map(|(rolegroup_name, role_group)| {
600609
(
601610
self.rolegroup_ref(HbaseRole::Master.to_string(), rolegroup_name),
@@ -611,6 +620,9 @@ impl v1alpha1::HbaseCluster {
611620
// Order rolegroups consistently, to avoid spurious downstream rewrites
612621
.collect::<BTreeMap<_, _>>()
613622
.into_iter()
623+
.filter(|(rolegroup_name, _)| {
624+
self.resolved_listener_class_discoverable(role, rolegroup_name)
625+
})
614626
.map(|(rolegroup_name, role_group)| {
615627
(
616628
self.rolegroup_ref(HbaseRole::RegionServer.to_string(), rolegroup_name),
@@ -626,6 +638,9 @@ impl v1alpha1::HbaseCluster {
626638
// Order rolegroups consistently, to avoid spurious downstream rewrites
627639
.collect::<BTreeMap<_, _>>()
628640
.into_iter()
641+
.filter(|(rolegroup_name, _)| {
642+
self.resolved_listener_class_discoverable(role, rolegroup_name)
643+
})
629644
.map(|(rolegroup_name, role_group)| {
630645
(
631646
self.rolegroup_ref(HbaseRole::RestServer.to_string(), rolegroup_name),
@@ -636,6 +651,19 @@ impl v1alpha1::HbaseCluster {
636651
}
637652
}
638653

654+
fn resolved_listener_class_discoverable(
655+
&self,
656+
role: &HbaseRole,
657+
rolegroup_name: &&String,
658+
) -> bool {
659+
let listener_class = self.merged_listener_class(role, rolegroup_name);
660+
if let Some(listener_class) = listener_class {
661+
listener_class.discoverable()
662+
} else {
663+
false
664+
}
665+
}
666+
639667
pub fn pod_refs(
640668
&self,
641669
role: &HbaseRole,
@@ -667,53 +695,113 @@ impl v1alpha1::HbaseCluster {
667695
&self,
668696
client: &stackable_operator::client::Client,
669697
role: &HbaseRole,
670-
merged_config: &AnyServiceConfig,
671698
hbase_version: &str,
672699
) -> Result<Vec<HbasePodRef>, Error> {
673-
// only externally-reachable listeners are relevant
674-
if merged_config.listener_class().discoverable() {
675-
let pod_refs = self.pod_refs(role, hbase_version)?;
676-
try_join_all(pod_refs.into_iter().map(|pod_ref| async {
677-
// N.B. use the naming convention for persistent listener volumes as we
678-
// have specified above that we only want externally-reachable endpoints.
679-
let listener_name = format!("{LISTENER_VOLUME_NAME}-{}", pod_ref.pod_name);
680-
let listener_ref =
681-
|| ObjectRef::<Listener>::new(&listener_name).within(&pod_ref.namespace);
682-
let pod_obj_ref =
683-
|| ObjectRef::<Pod>::new(&pod_ref.pod_name).within(&pod_ref.namespace);
684-
let listener = client
685-
.get::<Listener>(&listener_name, &pod_ref.namespace)
686-
.await
687-
.context(GetPodListenerSnafu {
688-
listener: listener_ref(),
689-
pod: pod_obj_ref(),
690-
})?;
691-
let listener_address = listener
692-
.status
693-
.and_then(|s| s.ingress_addresses?.into_iter().next())
694-
.context(PodListenerHasNoAddressSnafu {
695-
listener: listener_ref(),
696-
pod: pod_obj_ref(),
697-
})?;
698-
Ok(HbasePodRef {
699-
fqdn_override: Some(listener_address.address),
700-
ports: listener_address
701-
.ports
702-
.into_iter()
703-
.map(|(port_name, port)| {
704-
let port = u16::try_from(port).context(PortOutOfBoundsSnafu {
705-
port_name: &port_name,
706-
port,
707-
})?;
708-
Ok((port_name, port))
709-
})
710-
.collect::<Result<_, _>>()?,
711-
..pod_ref
712-
})
713-
}))
714-
.await
715-
} else {
716-
Ok(vec![])
700+
let pod_refs = self.pod_refs(role, hbase_version)?;
701+
try_join_all(pod_refs.into_iter().map(|pod_ref| async {
702+
// N.B. use the naming convention for persistent listener volumes as we
703+
// have specified above that we only want externally-reachable endpoints.
704+
let listener_name = format!("{LISTENER_VOLUME_NAME}-{}", pod_ref.pod_name);
705+
let listener_ref =
706+
|| ObjectRef::<Listener>::new(&listener_name).within(&pod_ref.namespace);
707+
let pod_obj_ref =
708+
|| ObjectRef::<Pod>::new(&pod_ref.pod_name).within(&pod_ref.namespace);
709+
let listener = client
710+
.get::<Listener>(&listener_name, &pod_ref.namespace)
711+
.await
712+
.context(GetPodListenerSnafu {
713+
listener: listener_ref(),
714+
pod: pod_obj_ref(),
715+
})?;
716+
let listener_address = listener
717+
.status
718+
.and_then(|s| s.ingress_addresses?.into_iter().next())
719+
.context(PodListenerHasNoAddressSnafu {
720+
listener: listener_ref(),
721+
pod: pod_obj_ref(),
722+
})?;
723+
Ok(HbasePodRef {
724+
fqdn_override: Some(listener_address.address),
725+
ports: listener_address
726+
.ports
727+
.into_iter()
728+
.map(|(port_name, port)| {
729+
let port = u16::try_from(port).context(PortOutOfBoundsSnafu {
730+
port_name: &port_name,
731+
port,
732+
})?;
733+
Ok((port_name, port))
734+
})
735+
.collect::<Result<_, _>>()?,
736+
..pod_ref
737+
})
738+
}))
739+
.await
740+
}
741+
742+
pub fn merged_listener_class(
743+
&self,
744+
role: &HbaseRole,
745+
rolegroup_name: &String,
746+
) -> Option<SupportedListenerClasses> {
747+
match role {
748+
HbaseRole::Master => {
749+
if let Some(masters) = self.spec.masters.as_ref() {
750+
let conf_defaults = Some(SupportedListenerClasses::ClusterInternal);
751+
let mut conf_role = masters.config.config.listener_class.to_owned();
752+
let mut conf_rolegroup = masters
753+
.role_groups
754+
.get(rolegroup_name)
755+
.map(|rg| rg.config.config.listener_class.clone())
756+
.unwrap_or_default();
757+
758+
conf_role.merge(&conf_defaults);
759+
conf_rolegroup.merge(&conf_role);
760+
761+
tracing::debug!("Merged listener-class: {:?} for {role}", conf_rolegroup);
762+
conf_rolegroup
763+
} else {
764+
None
765+
}
766+
}
767+
HbaseRole::RegionServer => {
768+
if let Some(region_servers) = self.spec.region_servers.as_ref() {
769+
let conf_defaults = Some(SupportedListenerClasses::ClusterInternal);
770+
let mut conf_role = region_servers.config.config.listener_class.to_owned();
771+
let mut conf_rolegroup = region_servers
772+
.role_groups
773+
.get(rolegroup_name)
774+
.map(|rg| rg.config.config.listener_class.clone())
775+
.unwrap_or_default();
776+
777+
conf_role.merge(&conf_defaults);
778+
conf_rolegroup.merge(&conf_role);
779+
780+
tracing::debug!("Merged listener-class: {:?} for {role}", conf_rolegroup);
781+
conf_rolegroup
782+
} else {
783+
None
784+
}
785+
}
786+
HbaseRole::RestServer => {
787+
if let Some(rest_servers) = self.spec.rest_servers.as_ref() {
788+
let conf_defaults = Some(SupportedListenerClasses::ClusterInternal);
789+
let mut conf_role = rest_servers.config.config.listener_class.to_owned();
790+
let mut conf_rolegroup = rest_servers
791+
.role_groups
792+
.get(rolegroup_name)
793+
.map(|rg| rg.config.config.listener_class.clone())
794+
.unwrap_or_default();
795+
796+
conf_role.merge(&conf_defaults);
797+
conf_rolegroup.merge(&conf_role);
798+
799+
tracing::debug!("Merged listener-class: {:?} for {role}", conf_rolegroup);
800+
conf_rolegroup
801+
} else {
802+
None
803+
}
804+
}
717805
}
718806
}
719807
}

rust/operator-binary/src/discovery.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,23 +112,19 @@ pub fn build_endpoint_configmap(
112112
);
113113

114114
for role_podref in role_podrefs {
115-
let role_name = role_podref.0;
116-
// podrefs are written into the collection by replica index
117-
// and can be retrieved in the same order
118-
let mut i = 0;
119115
for podref in role_podref.1 {
120116
if let HbasePodRef {
121117
fqdn_override: Some(fqdn_override),
122118
ports,
119+
pod_name,
123120
..
124121
} = podref
125122
{
126123
if let Some(ui_port) = ports.get(&hbase.ui_port_name()) {
127124
cmm.add_data(
128-
format!("hbase.{role_name}-{i}.ui"),
125+
format!("{pod_name}.http"),
129126
format!("{fqdn_override}:{ui_port}"),
130127
);
131-
i += 1;
132128
}
133129
}
134130
}

rust/operator-binary/src/hbase_controller.rs

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -482,30 +482,6 @@ pub async fn reconcile_hbase(
482482
rolegroup: rolegroup.clone(),
483483
})?,
484484
);
485-
// if the replicas are changed at the same time as the reconciliation
486-
// being paused, it may be possible to have listeners that are *expected*
487-
// (according to their replica number) but which are not yet created, so
488-
// deactivate this action in such cases.
489-
if hbase.spec.cluster_operation.reconciliation_paused
490-
|| hbase.spec.cluster_operation.stopped
491-
{
492-
tracing::info!(
493-
"Cluster is in a transitional state so do not attempt to collect listener information that will only be active once cluster has returned to a non-transitional state."
494-
);
495-
} else {
496-
listener_refs.insert(
497-
hbase_role.to_string(),
498-
hbase
499-
.listener_refs(
500-
client,
501-
&hbase_role,
502-
&merged_config,
503-
&resolved_product_image.product_version,
504-
)
505-
.await
506-
.context(CollectDiscoveryConfigSnafu)?,
507-
);
508-
}
509485
}
510486

511487
let role_config = hbase.role_config(&hbase_role);
@@ -517,6 +493,26 @@ pub async fn reconcile_hbase(
517493
.await
518494
.context(FailedToCreatePdbSnafu)?;
519495
}
496+
497+
// if the replicas are changed at the same time as the reconciliation
498+
// being paused, it may be possible to have listeners that are *expected*
499+
// (according to their replica number) but which are not yet created, so
500+
// deactivate this action in such cases.
501+
if hbase.spec.cluster_operation.reconciliation_paused
502+
|| hbase.spec.cluster_operation.stopped
503+
{
504+
tracing::info!(
505+
"Cluster is in a transitional state so do not attempt to collect listener information that will only be active once cluster has returned to a non-transitional state."
506+
);
507+
} else {
508+
listener_refs.insert(
509+
hbase_role.to_string(),
510+
hbase
511+
.listener_refs(client, &hbase_role, &resolved_product_image.product_version)
512+
.await
513+
.context(CollectDiscoveryConfigSnafu)?,
514+
);
515+
}
520516
}
521517

522518
tracing::debug!(

tests/templates/kuttl/external-access/03-assert.yaml

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,29 @@ kind: StatefulSet
1616
metadata:
1717
name: test-hbase-regionserver-default
1818
status:
19-
readyReplicas: 2
20-
replicas: 2
19+
readyReplicas: 1
20+
replicas: 1
21+
---
22+
apiVersion: apps/v1
23+
kind: StatefulSet
24+
metadata:
25+
name: test-hbase-regionserver-cluster-internal
26+
status:
27+
readyReplicas: 1
28+
replicas: 1
2129
---
2230
apiVersion: apps/v1
2331
kind: StatefulSet
2432
metadata:
2533
name: test-hbase-restserver-default
2634
status:
27-
readyReplicas: 2
28-
replicas: 2
35+
readyReplicas: 1
36+
replicas: 1
37+
---
38+
apiVersion: apps/v1
39+
kind: StatefulSet
40+
metadata:
41+
name: test-hbase-restserver-external-unstable
42+
status:
43+
readyReplicas: 1
44+
replicas: 1

tests/templates/kuttl/external-access/03-install-hbase.yaml.j2

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ spec:
2323
gracefulShutdownTimeout: 1m
2424
logging:
2525
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
26-
listenerClass: {{ test_scenario['values']['listener-class'] }}
26+
listenerClass: external-unstable
2727
roleGroups:
2828
default:
2929
replicas: 2
@@ -32,16 +32,24 @@ spec:
3232
gracefulShutdownTimeout: 1m
3333
logging:
3434
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
35-
listenerClass: {{ test_scenario['values']['listener-class'] }}
35+
listenerClass: external-unstable
3636
roleGroups:
3737
default:
38-
replicas: 2
38+
replicas: 1
39+
cluster-internal:
40+
replicas: 1
41+
config:
42+
listenerClass: cluster-internal
3943
restServers:
4044
config:
4145
gracefulShutdownTimeout: 1m
4246
logging:
4347
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
44-
listenerClass: {{ test_scenario['values']['listener-class'] }}
48+
listenerClass: cluster-internal
4549
roleGroups:
4650
default:
47-
replicas: 2
51+
replicas: 1
52+
external-unstable:
53+
replicas: 1
54+
config:
55+
listenerClass: external-unstable

0 commit comments

Comments
 (0)