From ff08a18dc446d3e69220c8fc8540a05683e9fa41 Mon Sep 17 00:00:00 2001 From: Vincent Liu Date: Wed, 22 May 2024 16:09:03 +0100 Subject: [PATCH 1/6] Refactor watcher creation code Signed-off-by: Vincent Liu --- ocaml/xapi/xapi_cluster.ml | 2 +- ocaml/xapi/xapi_cluster_host.ml | 4 +- ocaml/xapi/xapi_clustering.ml | 255 ++++++++++++++++---------------- 3 files changed, 133 insertions(+), 128 deletions(-) diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index dda26d201f4..cfc9147434b 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -115,7 +115,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout ~verify ; (* Create the watcher here in addition to resync_host since pool_create in resync_host only calls cluster_host.create for pool member nodes *) - create_cluster_watcher_on_master ~__context ~host ; + Watcher.create_as_necessary ~__context ~host ; Xapi_cluster_host_helpers.update_allowed_operations ~__context ~self:cluster_host_ref ; D.debug "Created Cluster: %s and Cluster_host: %s" diff --git a/ocaml/xapi/xapi_cluster_host.ml b/ocaml/xapi/xapi_cluster_host.ml index 782d5a240f5..291a522fe89 100644 --- a/ocaml/xapi/xapi_cluster_host.ml +++ b/ocaml/xapi/xapi_cluster_host.ml @@ -232,7 +232,7 @@ let resync_host ~__context ~host = (* If we have just joined, enable will prevent concurrent clustering ops *) if not (Db.Cluster_host.get_joined ~__context ~self) then ( join_internal ~__context ~self ; - create_cluster_watcher_on_master ~__context ~host ; + Watcher.create_as_necessary ~__context ~host ; Xapi_observer.initialise_observer ~__context Xapi_observer_components.Xapi_clusterd ) else if Db.Cluster_host.get_enabled ~__context ~self then ( @@ -375,7 +375,7 @@ let enable ~__context ~self = "Cluster_host.enable: xapi-clusterd not running - attempting to start" ; Xapi_clustering.Daemon.enable ~__context ) ; - create_cluster_watcher_on_master ~__context ~host ; + Watcher.create_as_necessary ~__context ~host ; Xapi_observer.initialise_observer ~__context Xapi_observer_components.Xapi_clusterd ; let verify = Stunnel_client.get_verify_by_default () in diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 4fc9314aa2e..8886984cf7a 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -426,133 +426,138 @@ let compute_corosync_max_host_failures ~__context = in corosync_ha_max_hosts -let on_corosync_update ~__context ~cluster updates = - debug - "%s: Received %d updates from corosync_notifyd , run diagnostics to get \ - new state" - __FUNCTION__ (List.length updates) ; - let m = - Cluster_client.LocalClient.diagnostics (rpc ~__context) - "update quorum api fields with diagnostics" - in - match Idl.IdM.run @@ Cluster_client.IDL.T.get m with - | Ok diag -> - Db.Cluster.set_is_quorate ~__context ~self:cluster ~value:diag.is_quorate ; - let all_cluster_hosts = Db.Cluster_host.get_all ~__context in - let ip_ch = - List.map - (fun ch -> - let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in - let ipstr = - ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF) - |> ipstr_of_address - in - (ipstr, ch) - ) - all_cluster_hosts - in - let current_time = API.Date.now () in - ( match diag.quorum_members with - | None -> - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:false ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time +module Watcher = struct + let on_corosync_update ~__context ~cluster updates = + debug + "%s: Received %d updates from corosync_notifyd, run diagnostics to get \ + new state" + __FUNCTION__ (List.length updates) ; + let m = + Cluster_client.LocalClient.diagnostics (rpc ~__context) + "update quorum api fields with diagnostics" + in + match Idl.IdM.run @@ Cluster_client.IDL.T.get m with + | Ok diag -> + Db.Cluster.set_is_quorate ~__context ~self:cluster + ~value:diag.is_quorate ; + let all_cluster_hosts = Db.Cluster_host.get_all ~__context in + let ip_ch = + List.map + (fun ch -> + let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in + let ipstr = + ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF) + |> ipstr_of_address + in + (ipstr, ch) ) all_cluster_hosts - | Some nodel -> - let quorum_hosts = - List.filter_map - (fun {addr; _} -> - let ipstr = ipstr_of_address addr in - match List.assoc_opt ipstr ip_ch with - | None -> - error - "%s: cannot find cluster host with network address %s, \ - ignoring this host" - __FUNCTION__ ipstr ; - None - | Some ch -> - Some ch + in + let current_time = API.Date.now () in + ( match diag.quorum_members with + | None -> + List.iter + (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:false ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time ) - nodel - in - let missing_hosts = - List.filter - (fun h -> not (List.mem h quorum_hosts)) all_cluster_hosts - in - let new_hosts = - List.filter - (fun h -> not (Db.Cluster_host.get_live ~__context ~self:h)) - quorum_hosts - in - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:true ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time - ) - new_hosts ; - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:false ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time - ) - missing_hosts ; - maybe_generate_alert ~__context ~missing_hosts ~new_hosts - ~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum - ) ; - Db.Cluster.set_quorum ~__context ~self:cluster - ~value:(Int64.of_int diag.quorum) ; - Db.Cluster.set_live_hosts ~__context ~self:cluster - ~value:(Int64.of_int diag.total_votes) - | Error (InternalError message) | Error (Unix_error message) -> - warn "%s Cannot query diagnostics due to %s, not performing update" - __FUNCTION__ message - | exception exn -> - warn - "%s: Got exception %s while retrieving diagnostics info, not \ - performing update" - __FUNCTION__ (Printexc.to_string exn) - -let create_cluster_watcher_on_master ~__context ~host = - if Helpers.is_pool_master ~__context ~host then - let watch () = - while !Daemon.enabled do - let m = - Cluster_client.LocalClient.UPDATES.get (rpc ~__context) - "call cluster watcher" 3. - in - match Idl.IdM.run @@ Cluster_client.IDL.T.get m with - | Ok updates -> ( - match find_cluster_host ~__context ~host with - | Some ch -> - let cluster = Db.Cluster_host.get_cluster ~__context ~self:ch in - on_corosync_update ~__context ~cluster updates - | None -> - () - ) - | Error (InternalError "UPDATES.Timeout") -> - (* UPDATES.get timed out, this is normal, now retry *) + | Some nodel -> + let quorum_hosts = + List.filter_map + (fun {addr; _} -> + let ipstr = ipstr_of_address addr in + match List.assoc_opt ipstr ip_ch with + | None -> + error + "%s: cannot find cluster host with network address %s, \ + ignoring this host" + __FUNCTION__ ipstr ; + None + | Some ch -> + Some ch + ) + nodel + in + let missing_hosts = + List.filter + (fun h -> not (List.mem h quorum_hosts)) + all_cluster_hosts + in + let new_hosts = + List.filter + (fun h -> not (Db.Cluster_host.get_live ~__context ~self:h)) + quorum_hosts + in + List.iter + (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:true ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time + ) + new_hosts ; + List.iter + (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:false ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time + ) + missing_hosts ; + maybe_generate_alert ~__context ~missing_hosts ~new_hosts + ~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum + ) ; + Db.Cluster.set_quorum ~__context ~self:cluster + ~value:(Int64.of_int diag.quorum) ; + Db.Cluster.set_live_hosts ~__context ~self:cluster + ~value:(Int64.of_int diag.total_votes) + | Error (InternalError message) | Error (Unix_error message) -> + warn "%s Cannot query diagnostics due to %s, not performing update" + __FUNCTION__ message + | exception exn -> + warn + "%s: Got exception %s while retrieving diagnostics info, not \ + performing update" + __FUNCTION__ (Printexc.to_string exn) + + let watch_cluster_change ~__context ~host = + while !Daemon.enabled do + let m = + Cluster_client.LocalClient.UPDATES.get (rpc ~__context) + "call cluster watcher" 3. + in + match Idl.IdM.run @@ Cluster_client.IDL.T.get m with + | Ok updates -> ( + match find_cluster_host ~__context ~host with + | Some ch -> + let cluster = Db.Cluster_host.get_cluster ~__context ~self:ch in + on_corosync_update ~__context ~cluster updates + | None -> () - | Error (InternalError message) | Error (Unix_error message) -> - warn "%s: Cannot query cluster host updates with error %s" - __FUNCTION__ message - | exception exn -> - warn - "%s: Got exception %s while query cluster host updates, retrying" - __FUNCTION__ (Printexc.to_string exn) ; - Thread.delay 3. - done - in - if Xapi_cluster_helpers.cluster_health_enabled ~__context then ( - debug "%s: create watcher for corosync-notifyd on master" __FUNCTION__ ; - ignore @@ Thread.create watch () - ) else - debug - "%s: not creating watcher for corosync-notifyd: feature cluster_health \ - not enabled" - __FUNCTION__ + ) + | Error (InternalError "UPDATES.Timeout") -> + (* UPDATES.get timed out, this is normal, now retry *) + () + | Error (InternalError message) | Error (Unix_error message) -> + warn "%s: Cannot query cluster host updates with error %s" + __FUNCTION__ message + | exception exn -> + warn "%s: Got exception %s while query cluster host updates, retrying" + __FUNCTION__ (Printexc.to_string exn) ; + Thread.delay 3. + done + + (** [create_as_necessary] will create cluster watchers on the coordinator if they are not + already created. + There is no need to destroy them: once the clustering daemon is disabled, + these threads will exit as well. *) + let create_as_necessary ~__context ~host = + if Helpers.is_pool_master ~__context ~host then + if Xapi_cluster_helpers.cluster_health_enabled ~__context then ( + debug "%s: create watcher for corosync-notifyd on coordinator" + __FUNCTION__ ; + + ignore + @@ Thread.create (fun () -> watch_cluster_change ~__context ~host) () + ) +end From 733882a2c3a2d026024e354b28918be3382a5ead Mon Sep 17 00:00:00 2001 From: Vincent Liu Date: Wed, 22 May 2024 16:57:47 +0100 Subject: [PATCH 2/6] Only create watcher once Use the Atomic module to track whether a watcher has been created. Signed-off-by: Vincent Liu --- ocaml/xapi/xapi_clustering.ml | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 8886984cf7a..d355b0b731a 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -520,6 +520,8 @@ module Watcher = struct performing update" __FUNCTION__ (Printexc.to_string exn) + let cluster_change_watcher : bool Atomic.t = Atomic.make false + let watch_cluster_change ~__context ~host = while !Daemon.enabled do let m = @@ -545,7 +547,8 @@ module Watcher = struct warn "%s: Got exception %s while query cluster host updates, retrying" __FUNCTION__ (Printexc.to_string exn) ; Thread.delay 3. - done + done ; + Atomic.set cluster_change_watcher false (** [create_as_necessary] will create cluster watchers on the coordinator if they are not already created. @@ -553,11 +556,16 @@ module Watcher = struct these threads will exit as well. *) let create_as_necessary ~__context ~host = if Helpers.is_pool_master ~__context ~host then - if Xapi_cluster_helpers.cluster_health_enabled ~__context then ( - debug "%s: create watcher for corosync-notifyd on coordinator" - __FUNCTION__ ; - - ignore - @@ Thread.create (fun () -> watch_cluster_change ~__context ~host) () - ) + if Xapi_cluster_helpers.cluster_health_enabled ~__context then + if Atomic.compare_and_set cluster_change_watcher false true then ( + debug "%s: create watcher for corosync-notifyd on coordinator" + __FUNCTION__ ; + ignore + @@ Thread.create (fun () -> watch_cluster_change ~__context ~host) () + ) else + (* someone else must have gone into the if branch above and created the thread + before us, leave it to them *) + debug + "%s: not create watcher for corosync-notifyd as it already exists" + __FUNCTION__ end From e612873d7e3a7d552f97b603fa6ad2a60e67e9ee Mon Sep 17 00:00:00 2001 From: Vincent Liu Date: Wed, 19 Jun 2024 22:33:20 +0100 Subject: [PATCH 3/6] Refactor cluster change watcher interval Signed-off-by: Vincent Liu --- ocaml/xapi/xapi_clustering.ml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index d355b0b731a..a470546135d 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -522,11 +522,16 @@ module Watcher = struct let cluster_change_watcher : bool Atomic.t = Atomic.make false + (* this is the time it takes for the update request to time out. It is ok to set + it to a relatively long value since the call will return immediately if there + is an update *) + let cluster_change_interval = Mtime.Span.min + let watch_cluster_change ~__context ~host = while !Daemon.enabled do let m = Cluster_client.LocalClient.UPDATES.get (rpc ~__context) - "call cluster watcher" 3. + "call cluster watcher" (Clock.Timer.span_to_s cluster_change_interval) in match Idl.IdM.run @@ Cluster_client.IDL.T.get m with | Ok updates -> ( @@ -546,7 +551,7 @@ module Watcher = struct | exception exn -> warn "%s: Got exception %s while query cluster host updates, retrying" __FUNCTION__ (Printexc.to_string exn) ; - Thread.delay 3. + Thread.delay (Clock.Timer.span_to_s cluster_change_interval) done ; Atomic.set cluster_change_watcher false From af142fc413928250c14fcb2b38f04fe8ba27ce73 Mon Sep 17 00:00:00 2001 From: Vincent Liu Date: Thu, 20 Jun 2024 20:00:40 +0100 Subject: [PATCH 4/6] Add new internal API cstack_sync This allows one to force sync the state of xapi db with the cluster stack, useful for cluster API methods change the state of the cluster. Signed-off-by: Vincent Liu --- ocaml/idl/datamodel_cluster.ml | 11 +++++++++++ ocaml/xapi/message_forwarding.ml | 8 ++++++++ ocaml/xapi/xapi_cluster.ml | 5 +++++ ocaml/xapi/xapi_cluster.mli | 10 ++++++++++ ocaml/xapi/xapi_clustering.ml | 3 ++- 5 files changed, 36 insertions(+), 1 deletion(-) diff --git a/ocaml/idl/datamodel_cluster.ml b/ocaml/idl/datamodel_cluster.ml index 10c30bb540b..dba9b76c73b 100644 --- a/ocaml/idl/datamodel_cluster.ml +++ b/ocaml/idl/datamodel_cluster.ml @@ -169,6 +169,16 @@ let pool_resync = ~params:[(Ref _cluster, "self", "The cluster to resync")] ~lifecycle ~allowed_roles:_R_POOL_OP ~errs:[] () +let cstack_sync = + call ~name:"cstack_sync" + ~doc: + "Sync xapi db with the cluster stack synchronously, and generate alerts \ + as needed. Only happens on the coordinator as this is where the cluster \ + watcher performs updates." + ~params:[(Ref _cluster, "self", "The cluster to sync")] + ~hide_from_docs:true ~pool_internal:true ~lifecycle + ~allowed_roles:_R_POOL_OP ~errs:[] () + let t = create_obj ~name:_cluster ~descr:"Cluster-wide Cluster metadata" ~doccomments:[] ~gen_constructor_destructor:false ~gen_events:true @@ -245,5 +255,6 @@ let t = ; pool_force_destroy ; pool_destroy ; pool_resync + ; cstack_sync ] () diff --git a/ocaml/xapi/message_forwarding.ml b/ocaml/xapi/message_forwarding.ml index e323bd4248b..34e420259b8 100644 --- a/ocaml/xapi/message_forwarding.ml +++ b/ocaml/xapi/message_forwarding.ml @@ -6419,6 +6419,14 @@ functor ) ; debug "Cluster.pool_resync for host %s" (Ref.string_of host) ) + + let cstack_sync ~__context ~self = + info "Cluster.cstack_sync cluster %s" (Ref.string_of self) ; + let local_fn = Local.Cluster.cstack_sync ~self in + let coor = Helpers.get_master ~__context in + do_op_on ~local_fn ~__context ~host:coor (fun session_id rpc -> + Client.Cluster.cstack_sync ~rpc ~session_id ~self + ) end module Cluster_host = struct diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index cfc9147434b..26fc0317ac2 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -294,3 +294,8 @@ let pool_resync ~__context ~self:_ = ) (* If host.clustering_enabled then resync_host should successfully find or create a matching cluster_host which is also enabled *) + +let cstack_sync ~__context ~self = + debug "%s: sync db data with cluster stack" __FUNCTION__ ; + Watcher.on_corosync_update ~__context ~cluster:self + ["Updates due to cluster api calls"] diff --git a/ocaml/xapi/xapi_cluster.mli b/ocaml/xapi/xapi_cluster.mli index a9a71f275ad..bcdc029c49b 100644 --- a/ocaml/xapi/xapi_cluster.mli +++ b/ocaml/xapi/xapi_cluster.mli @@ -74,3 +74,13 @@ val pool_resync : __context:Context.t -> self:API.ref_Cluster -> unit Cluster_host objects (ie., one for each host in the pool if the Cluster has [pool_auto_join] set. If there is a failure, this function must return an error that enables the administrator to fix the problem. *) + +val cstack_sync : __context:Context.t -> self:API.ref_Cluster -> unit +(** [cstack_sync ~__context ~self] is the implementation of the internal XenAPI method, +which synchronously performs a diagnostics call to xapi-clusterd and updates the +xapi db according to the call. This is used internally by cluster-host-create/destroy +to generate the correct alert as a result of the API call. The other part of the +alerts generated due to network failure (e.g. a host left as its network is down) +is handled by the cluster watcher. This call only happens on the coordinator as that +is where the cluster watcher performs the updates, which shares the code with +this function. *) diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index a470546135d..eedc60290bc 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -531,7 +531,8 @@ module Watcher = struct while !Daemon.enabled do let m = Cluster_client.LocalClient.UPDATES.get (rpc ~__context) - "call cluster watcher" (Clock.Timer.span_to_s cluster_change_interval) + "call cluster watcher" + (Clock.Timer.span_to_s cluster_change_interval) in match Idl.IdM.run @@ Cluster_client.IDL.T.get m with | Ok updates -> ( From 3dc79e0c4435ae9535dabc39e8c93b4a61fc6bc6 Mon Sep 17 00:00:00 2001 From: Vincent Liu Date: Wed, 19 Jun 2024 22:14:53 +0100 Subject: [PATCH 5/6] CP-394109: Alert only once for cluster host leave/join Previously there were two ways an alert for a cluster host join/leave can be raised: 1. through the cluster change watcher; 2. through the api call. These two can generate duplicate alerts as an API call can cause the cluster change watcher to notice the change as well. The idea of the fix here is still to let API and watcher raise alerts separately, but now add synchronous API calls to allow API call (cluster-host-join, etc) to call the cluster change update code at the right time so that the cluster change watcher won't see the change again, hence not generating duplicate alerts. Signed-off-by: Vincent Liu --- ocaml/tests/test_cluster.ml | 4 + ocaml/xapi/xapi_cluster_helpers.ml | 8 +- ocaml/xapi/xapi_cluster_host.ml | 29 ++---- ocaml/xapi/xapi_clustering.ml | 141 ++++++++++++++++------------- 4 files changed, 97 insertions(+), 85 deletions(-) diff --git a/ocaml/tests/test_cluster.ml b/ocaml/tests/test_cluster.ml index d34258c512c..b42621a300f 100644 --- a/ocaml/tests/test_cluster.ml +++ b/ocaml/tests/test_cluster.ml @@ -95,6 +95,10 @@ let test_rpc ~__context call = Rpc.{success= true; contents= Rpc.String ""; is_notification= false} | "Cluster_host.get_cluster_config", _ -> Rpc.{success= true; contents= Rpc.String ""; is_notification= false} + | "Cluster.cstack_sync", [_session; self] -> + let open API in + Xapi_cluster.cstack_sync ~__context ~self:(ref_Cluster_of_rpc self) ; + Rpc.{success= true; contents= Rpc.String ""; is_notification= false} | name, params -> Alcotest.failf "Unexpected RPC: %s(%s)" name (String.concat " " (List.map Rpc.to_string params)) diff --git a/ocaml/xapi/xapi_cluster_helpers.ml b/ocaml/xapi/xapi_cluster_helpers.ml index 31a655a3e72..f7ea78eab9d 100644 --- a/ocaml/xapi/xapi_cluster_helpers.ml +++ b/ocaml/xapi/xapi_cluster_helpers.ml @@ -114,7 +114,7 @@ let corosync3_enabled ~__context = let restrictions = Db.Pool.get_restrictions ~__context ~self:pool in List.assoc_opt "restrict_corosync3" restrictions = Some "false" -let maybe_generate_alert ~__context ~num_hosts ~missing_hosts ~new_hosts ~quorum +let maybe_generate_alert ~__context ~num_hosts ~hosts_left ~hosts_joined ~quorum = let generate_alert join cluster_host = let host = Db.Cluster_host.get_host ~__context ~self:cluster_host in @@ -148,10 +148,10 @@ let maybe_generate_alert ~__context ~num_hosts ~missing_hosts ~new_hosts ~quorum ) in if cluster_health_enabled ~__context then ( - List.iter (generate_alert false) missing_hosts ; - List.iter (generate_alert true) new_hosts ; + List.iter (generate_alert false) hosts_left ; + List.iter (generate_alert true) hosts_joined ; (* only generate this alert when the number of hosts is decreasing *) - if missing_hosts <> [] && num_hosts <= quorum then + if hosts_left <> [] && num_hosts <= quorum then let pool = Helpers.get_pool ~__context in let pool_uuid = Db.Pool.get_uuid ~__context ~self:pool in let name, priority = Api_messages.cluster_quorum_approaching_lost in diff --git a/ocaml/xapi/xapi_cluster_host.ml b/ocaml/xapi/xapi_cluster_host.ml index 291a522fe89..c55d789b8d9 100644 --- a/ocaml/xapi/xapi_cluster_host.ml +++ b/ocaml/xapi/xapi_cluster_host.ml @@ -13,7 +13,6 @@ *) open Xapi_clustering -open Xapi_cluster_helpers open Ipaddr_rpc_type module D = Debug.Make (struct let name = "xapi_cluster_host" end) @@ -55,20 +54,6 @@ let call_api_function_with_alert ~__context ~msg ~cls ~obj_uuid ~body raise err ) -let alert_for_cluster_host ~__context ~cluster_host ~missing_hosts ~new_hosts = - let num_hosts = Db.Cluster_host.get_all ~__context |> List.length in - let cluster = Db.Cluster_host.get_cluster ~__context ~self:cluster_host in - let quorum = Db.Cluster.get_quorum ~__context ~self:cluster |> Int64.to_int in - maybe_generate_alert ~__context ~missing_hosts ~new_hosts ~num_hosts ~quorum - -let alert_for_cluster_host_leave ~__context ~cluster_host = - alert_for_cluster_host ~__context ~cluster_host ~missing_hosts:[cluster_host] - ~new_hosts:[] - -let alert_for_cluster_host_join ~__context ~cluster_host = - alert_for_cluster_host ~__context ~cluster_host ~missing_hosts:[] - ~new_hosts:[cluster_host] - (* Create xapi db object for cluster_host, resync_host calls clusterd *) let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = with_clustering_lock __LOC__ (fun () -> @@ -81,7 +66,6 @@ let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = ~enabled:false ~current_operations:[] ~allowed_operations:[] ~other_config:[] ~joined:false ~live:false ~last_update_live:API.Date.epoch ; - alert_for_cluster_host_join ~__context ~cluster_host:ref ; ref ) @@ -269,16 +253,21 @@ let destroy_op ~__context ~self ~force = (Cluster_client.LocalClient.leave, "destroy") in let result = local_fn (rpc ~__context) dbg in + let cluster = Db.Cluster_host.get_cluster ~__context ~self in match Idl.IdM.run @@ Cluster_client.IDL.T.get result with | Ok () -> - alert_for_cluster_host_leave ~__context ~cluster_host:self ; + Helpers.call_api_functions ~__context (fun rpc session_id -> + Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster + ) ; Db.Cluster_host.destroy ~__context ~self ; debug "Cluster_host.%s was successful" fn_str ; Xapi_clustering.Daemon.disable ~__context | Error error -> warn "Error occurred during Cluster_host.%s" fn_str ; if force then ( - alert_for_cluster_host_leave ~__context ~cluster_host:self ; + Helpers.call_api_functions ~__context (fun rpc session_id -> + Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster + ) ; let ref_str = Ref.string_of self in Db.Cluster_host.destroy ~__context ~self ; debug "Cluster_host %s force destroyed." ref_str @@ -326,7 +315,9 @@ let forget ~__context ~self = Db.Cluster.set_pending_forget ~__context ~self:cluster ~value:[] ; (* must not disable the daemon here, because we declared another unreachable node dead, * not the current one *) - alert_for_cluster_host_leave ~__context ~cluster_host:self ; + Helpers.call_api_functions ~__context (fun rpc session_id -> + Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster + ) ; Db.Cluster_host.destroy ~__context ~self ; debug "Cluster_host.forget was successful" | Error error -> diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index eedc60290bc..21794537268 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -438,74 +438,87 @@ module Watcher = struct in match Idl.IdM.run @@ Cluster_client.IDL.T.get m with | Ok diag -> - Db.Cluster.set_is_quorate ~__context ~self:cluster - ~value:diag.is_quorate ; - let all_cluster_hosts = Db.Cluster_host.get_all ~__context in - let ip_ch = - List.map - (fun ch -> - let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in - let ipstr = - ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF) - |> ipstr_of_address - in - (ipstr, ch) - ) - all_cluster_hosts - in - let current_time = API.Date.now () in - ( match diag.quorum_members with - | None -> - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:false ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time + ( Db.Cluster.set_is_quorate ~__context ~self:cluster + ~value:diag.is_quorate ; + let all_cluster_hosts = Db.Cluster_host.get_all ~__context in + let live_hosts = + Db.Cluster_host.get_refs_where ~__context + ~expr:(Eq (Field "live", Literal "true")) + in + let dead_hosts = + List.filter (fun h -> not (List.mem h live_hosts)) all_cluster_hosts + in + let ip_ch = + List.map + (fun ch -> + let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in + let ipstr = + ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF) + |> ipstr_of_address + in + (ipstr, ch) ) all_cluster_hosts - | Some nodel -> - let quorum_hosts = - List.filter_map - (fun {addr; _} -> - let ipstr = ipstr_of_address addr in - match List.assoc_opt ipstr ip_ch with - | None -> - error - "%s: cannot find cluster host with network address %s, \ - ignoring this host" - __FUNCTION__ ipstr ; - None - | Some ch -> - Some ch + in + let current_time = API.Date.now () in + match diag.quorum_members with + | None -> + List.iter + (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:false ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time ) - nodel - in - let missing_hosts = + all_cluster_hosts + | Some nodel -> + (* nodel contains the current members of the cluster, according to corosync *) + let quorum_hosts = + List.filter_map + (fun {addr; _} -> + let ipstr = ipstr_of_address addr in + match List.assoc_opt ipstr ip_ch with + | None -> + error + "%s: cannot find cluster host with network address \ + %s, ignoring this host" + __FUNCTION__ ipstr ; + None + | Some ch -> + Some ch + ) + nodel + in + + (* hosts_left contains the hosts that were live, but not in the list + of live hosts according to the cluster stack *) + let hosts_left = + List.filter (fun h -> not (List.mem h quorum_hosts)) live_hosts + in + (* hosts_joined contains the hosts that were dead but exists in the db, + and is now viewed as a member of the cluster by the cluster stack *) + let hosts_joined = + List.filter (fun h -> List.mem h quorum_hosts) dead_hosts + in + debug "%s: there are %d hosts joined and %d hosts left" + __FUNCTION__ (List.length hosts_joined) (List.length hosts_left) ; + + List.iter + (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:true ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time + ) + quorum_hosts ; List.filter (fun h -> not (List.mem h quorum_hosts)) all_cluster_hosts - in - let new_hosts = - List.filter - (fun h -> not (Db.Cluster_host.get_live ~__context ~self:h)) - quorum_hosts - in - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:true ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time - ) - new_hosts ; - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:false ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time - ) - missing_hosts ; - maybe_generate_alert ~__context ~missing_hosts ~new_hosts - ~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum + |> List.iter (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:false ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time + ) ; + maybe_generate_alert ~__context ~hosts_left ~hosts_joined + ~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum ) ; Db.Cluster.set_quorum ~__context ~self:cluster ~value:(Int64.of_int diag.quorum) ; @@ -527,6 +540,10 @@ module Watcher = struct is an update *) let cluster_change_interval = Mtime.Span.min + (* we handle unclean hosts join and leave in the watcher, i.e. hosts joining and leaving + due to network problems, power cut, etc. Join and leave initiated by the + API will be handled in the API call themselves, but they share the same code + as the watcher. *) let watch_cluster_change ~__context ~host = while !Daemon.enabled do let m = From cca43a4956c3e1db8584622d322d456d34beab74 Mon Sep 17 00:00:00 2001 From: Vincent Liu Date: Wed, 10 Jul 2024 11:25:39 +0100 Subject: [PATCH 6/6] Feature flag the cstack_sync call Signed-off-by: Vincent Liu --- ocaml/xapi/xapi_cluster.ml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index 26fc0317ac2..cfa55fde2c7 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -296,6 +296,8 @@ let pool_resync ~__context ~self:_ = find or create a matching cluster_host which is also enabled *) let cstack_sync ~__context ~self = - debug "%s: sync db data with cluster stack" __FUNCTION__ ; - Watcher.on_corosync_update ~__context ~cluster:self - ["Updates due to cluster api calls"] + if Xapi_cluster_helpers.cluster_health_enabled ~__context then ( + debug "%s: sync db data with cluster stack" __FUNCTION__ ; + Watcher.on_corosync_update ~__context ~cluster:self + ["Updates due to cluster api calls"] + )