diff --git a/ocaml/idl/datamodel_cluster.ml b/ocaml/idl/datamodel_cluster.ml index 10c30bb540b..dba9b76c73b 100644 --- a/ocaml/idl/datamodel_cluster.ml +++ b/ocaml/idl/datamodel_cluster.ml @@ -169,6 +169,16 @@ let pool_resync = ~params:[(Ref _cluster, "self", "The cluster to resync")] ~lifecycle ~allowed_roles:_R_POOL_OP ~errs:[] () +let cstack_sync = + call ~name:"cstack_sync" + ~doc: + "Sync xapi db with the cluster stack synchronously, and generate alerts \ + as needed. Only happens on the coordinator as this is where the cluster \ + watcher performs updates." + ~params:[(Ref _cluster, "self", "The cluster to sync")] + ~hide_from_docs:true ~pool_internal:true ~lifecycle + ~allowed_roles:_R_POOL_OP ~errs:[] () + let t = create_obj ~name:_cluster ~descr:"Cluster-wide Cluster metadata" ~doccomments:[] ~gen_constructor_destructor:false ~gen_events:true @@ -245,5 +255,6 @@ let t = ; pool_force_destroy ; pool_destroy ; pool_resync + ; cstack_sync ] () diff --git a/ocaml/tests/test_cluster.ml b/ocaml/tests/test_cluster.ml index d34258c512c..b42621a300f 100644 --- a/ocaml/tests/test_cluster.ml +++ b/ocaml/tests/test_cluster.ml @@ -95,6 +95,10 @@ let test_rpc ~__context call = Rpc.{success= true; contents= Rpc.String ""; is_notification= false} | "Cluster_host.get_cluster_config", _ -> Rpc.{success= true; contents= Rpc.String ""; is_notification= false} + | "Cluster.cstack_sync", [_session; self] -> + let open API in + Xapi_cluster.cstack_sync ~__context ~self:(ref_Cluster_of_rpc self) ; + Rpc.{success= true; contents= Rpc.String ""; is_notification= false} | name, params -> Alcotest.failf "Unexpected RPC: %s(%s)" name (String.concat " " (List.map Rpc.to_string params)) diff --git a/ocaml/xapi/message_forwarding.ml b/ocaml/xapi/message_forwarding.ml index e323bd4248b..34e420259b8 100644 --- a/ocaml/xapi/message_forwarding.ml +++ b/ocaml/xapi/message_forwarding.ml @@ -6419,6 +6419,14 @@ functor ) ; debug "Cluster.pool_resync for host %s" (Ref.string_of host) ) + + let cstack_sync ~__context ~self = + info "Cluster.cstack_sync cluster %s" (Ref.string_of self) ; + let local_fn = Local.Cluster.cstack_sync ~self in + let coor = Helpers.get_master ~__context in + do_op_on ~local_fn ~__context ~host:coor (fun session_id rpc -> + Client.Cluster.cstack_sync ~rpc ~session_id ~self + ) end module Cluster_host = struct diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index dda26d201f4..cfa55fde2c7 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -115,7 +115,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout ~verify ; (* Create the watcher here in addition to resync_host since pool_create in resync_host only calls cluster_host.create for pool member nodes *) - create_cluster_watcher_on_master ~__context ~host ; + Watcher.create_as_necessary ~__context ~host ; Xapi_cluster_host_helpers.update_allowed_operations ~__context ~self:cluster_host_ref ; D.debug "Created Cluster: %s and Cluster_host: %s" @@ -294,3 +294,10 @@ let pool_resync ~__context ~self:_ = ) (* If host.clustering_enabled then resync_host should successfully find or create a matching cluster_host which is also enabled *) + +let cstack_sync ~__context ~self = + if Xapi_cluster_helpers.cluster_health_enabled ~__context then ( + debug "%s: sync db data with cluster stack" __FUNCTION__ ; + Watcher.on_corosync_update ~__context ~cluster:self + ["Updates due to cluster api calls"] + ) diff --git a/ocaml/xapi/xapi_cluster.mli b/ocaml/xapi/xapi_cluster.mli index a9a71f275ad..bcdc029c49b 100644 --- a/ocaml/xapi/xapi_cluster.mli +++ b/ocaml/xapi/xapi_cluster.mli @@ -74,3 +74,13 @@ val pool_resync : __context:Context.t -> self:API.ref_Cluster -> unit Cluster_host objects (ie., one for each host in the pool if the Cluster has [pool_auto_join] set. If there is a failure, this function must return an error that enables the administrator to fix the problem. *) + +val cstack_sync : __context:Context.t -> self:API.ref_Cluster -> unit +(** [cstack_sync ~__context ~self] is the implementation of the internal XenAPI method, +which synchronously performs a diagnostics call to xapi-clusterd and updates the +xapi db according to the call. This is used internally by cluster-host-create/destroy +to generate the correct alert as a result of the API call. The other part of the +alerts generated due to network failure (e.g. a host left as its network is down) +is handled by the cluster watcher. This call only happens on the coordinator as that +is where the cluster watcher performs the updates, which shares the code with +this function. *) diff --git a/ocaml/xapi/xapi_cluster_helpers.ml b/ocaml/xapi/xapi_cluster_helpers.ml index 31a655a3e72..f7ea78eab9d 100644 --- a/ocaml/xapi/xapi_cluster_helpers.ml +++ b/ocaml/xapi/xapi_cluster_helpers.ml @@ -114,7 +114,7 @@ let corosync3_enabled ~__context = let restrictions = Db.Pool.get_restrictions ~__context ~self:pool in List.assoc_opt "restrict_corosync3" restrictions = Some "false" -let maybe_generate_alert ~__context ~num_hosts ~missing_hosts ~new_hosts ~quorum +let maybe_generate_alert ~__context ~num_hosts ~hosts_left ~hosts_joined ~quorum = let generate_alert join cluster_host = let host = Db.Cluster_host.get_host ~__context ~self:cluster_host in @@ -148,10 +148,10 @@ let maybe_generate_alert ~__context ~num_hosts ~missing_hosts ~new_hosts ~quorum ) in if cluster_health_enabled ~__context then ( - List.iter (generate_alert false) missing_hosts ; - List.iter (generate_alert true) new_hosts ; + List.iter (generate_alert false) hosts_left ; + List.iter (generate_alert true) hosts_joined ; (* only generate this alert when the number of hosts is decreasing *) - if missing_hosts <> [] && num_hosts <= quorum then + if hosts_left <> [] && num_hosts <= quorum then let pool = Helpers.get_pool ~__context in let pool_uuid = Db.Pool.get_uuid ~__context ~self:pool in let name, priority = Api_messages.cluster_quorum_approaching_lost in diff --git a/ocaml/xapi/xapi_cluster_host.ml b/ocaml/xapi/xapi_cluster_host.ml index 782d5a240f5..c55d789b8d9 100644 --- a/ocaml/xapi/xapi_cluster_host.ml +++ b/ocaml/xapi/xapi_cluster_host.ml @@ -13,7 +13,6 @@ *) open Xapi_clustering -open Xapi_cluster_helpers open Ipaddr_rpc_type module D = Debug.Make (struct let name = "xapi_cluster_host" end) @@ -55,20 +54,6 @@ let call_api_function_with_alert ~__context ~msg ~cls ~obj_uuid ~body raise err ) -let alert_for_cluster_host ~__context ~cluster_host ~missing_hosts ~new_hosts = - let num_hosts = Db.Cluster_host.get_all ~__context |> List.length in - let cluster = Db.Cluster_host.get_cluster ~__context ~self:cluster_host in - let quorum = Db.Cluster.get_quorum ~__context ~self:cluster |> Int64.to_int in - maybe_generate_alert ~__context ~missing_hosts ~new_hosts ~num_hosts ~quorum - -let alert_for_cluster_host_leave ~__context ~cluster_host = - alert_for_cluster_host ~__context ~cluster_host ~missing_hosts:[cluster_host] - ~new_hosts:[] - -let alert_for_cluster_host_join ~__context ~cluster_host = - alert_for_cluster_host ~__context ~cluster_host ~missing_hosts:[] - ~new_hosts:[cluster_host] - (* Create xapi db object for cluster_host, resync_host calls clusterd *) let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = with_clustering_lock __LOC__ (fun () -> @@ -81,7 +66,6 @@ let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = ~enabled:false ~current_operations:[] ~allowed_operations:[] ~other_config:[] ~joined:false ~live:false ~last_update_live:API.Date.epoch ; - alert_for_cluster_host_join ~__context ~cluster_host:ref ; ref ) @@ -232,7 +216,7 @@ let resync_host ~__context ~host = (* If we have just joined, enable will prevent concurrent clustering ops *) if not (Db.Cluster_host.get_joined ~__context ~self) then ( join_internal ~__context ~self ; - create_cluster_watcher_on_master ~__context ~host ; + Watcher.create_as_necessary ~__context ~host ; Xapi_observer.initialise_observer ~__context Xapi_observer_components.Xapi_clusterd ) else if Db.Cluster_host.get_enabled ~__context ~self then ( @@ -269,16 +253,21 @@ let destroy_op ~__context ~self ~force = (Cluster_client.LocalClient.leave, "destroy") in let result = local_fn (rpc ~__context) dbg in + let cluster = Db.Cluster_host.get_cluster ~__context ~self in match Idl.IdM.run @@ Cluster_client.IDL.T.get result with | Ok () -> - alert_for_cluster_host_leave ~__context ~cluster_host:self ; + Helpers.call_api_functions ~__context (fun rpc session_id -> + Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster + ) ; Db.Cluster_host.destroy ~__context ~self ; debug "Cluster_host.%s was successful" fn_str ; Xapi_clustering.Daemon.disable ~__context | Error error -> warn "Error occurred during Cluster_host.%s" fn_str ; if force then ( - alert_for_cluster_host_leave ~__context ~cluster_host:self ; + Helpers.call_api_functions ~__context (fun rpc session_id -> + Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster + ) ; let ref_str = Ref.string_of self in Db.Cluster_host.destroy ~__context ~self ; debug "Cluster_host %s force destroyed." ref_str @@ -326,7 +315,9 @@ let forget ~__context ~self = Db.Cluster.set_pending_forget ~__context ~self:cluster ~value:[] ; (* must not disable the daemon here, because we declared another unreachable node dead, * not the current one *) - alert_for_cluster_host_leave ~__context ~cluster_host:self ; + Helpers.call_api_functions ~__context (fun rpc session_id -> + Client.Client.Cluster.cstack_sync ~rpc ~session_id ~self:cluster + ) ; Db.Cluster_host.destroy ~__context ~self ; debug "Cluster_host.forget was successful" | Error error -> @@ -375,7 +366,7 @@ let enable ~__context ~self = "Cluster_host.enable: xapi-clusterd not running - attempting to start" ; Xapi_clustering.Daemon.enable ~__context ) ; - create_cluster_watcher_on_master ~__context ~host ; + Watcher.create_as_necessary ~__context ~host ; Xapi_observer.initialise_observer ~__context Xapi_observer_components.Xapi_clusterd ; let verify = Stunnel_client.get_verify_by_default () in diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 4fc9314aa2e..21794537268 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -426,133 +426,169 @@ let compute_corosync_max_host_failures ~__context = in corosync_ha_max_hosts -let on_corosync_update ~__context ~cluster updates = - debug - "%s: Received %d updates from corosync_notifyd , run diagnostics to get \ - new state" - __FUNCTION__ (List.length updates) ; - let m = - Cluster_client.LocalClient.diagnostics (rpc ~__context) - "update quorum api fields with diagnostics" - in - match Idl.IdM.run @@ Cluster_client.IDL.T.get m with - | Ok diag -> - Db.Cluster.set_is_quorate ~__context ~self:cluster ~value:diag.is_quorate ; - let all_cluster_hosts = Db.Cluster_host.get_all ~__context in - let ip_ch = - List.map - (fun ch -> - let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in - let ipstr = - ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF) - |> ipstr_of_address - in - (ipstr, ch) - ) - all_cluster_hosts - in - let current_time = API.Date.now () in - ( match diag.quorum_members with - | None -> - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:false ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time - ) - all_cluster_hosts - | Some nodel -> - let quorum_hosts = - List.filter_map - (fun {addr; _} -> - let ipstr = ipstr_of_address addr in - match List.assoc_opt ipstr ip_ch with - | None -> - error - "%s: cannot find cluster host with network address %s, \ - ignoring this host" - __FUNCTION__ ipstr ; - None - | Some ch -> - Some ch - ) - nodel +module Watcher = struct + let on_corosync_update ~__context ~cluster updates = + debug + "%s: Received %d updates from corosync_notifyd, run diagnostics to get \ + new state" + __FUNCTION__ (List.length updates) ; + let m = + Cluster_client.LocalClient.diagnostics (rpc ~__context) + "update quorum api fields with diagnostics" + in + match Idl.IdM.run @@ Cluster_client.IDL.T.get m with + | Ok diag -> + ( Db.Cluster.set_is_quorate ~__context ~self:cluster + ~value:diag.is_quorate ; + let all_cluster_hosts = Db.Cluster_host.get_all ~__context in + let live_hosts = + Db.Cluster_host.get_refs_where ~__context + ~expr:(Eq (Field "live", Literal "true")) in - let missing_hosts = - List.filter - (fun h -> not (List.mem h quorum_hosts)) - all_cluster_hosts + let dead_hosts = + List.filter (fun h -> not (List.mem h live_hosts)) all_cluster_hosts in - let new_hosts = - List.filter - (fun h -> not (Db.Cluster_host.get_live ~__context ~self:h)) - quorum_hosts + let ip_ch = + List.map + (fun ch -> + let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in + let ipstr = + ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF) + |> ipstr_of_address + in + (ipstr, ch) + ) + all_cluster_hosts in - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:true ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time - ) - new_hosts ; - List.iter - (fun self -> - Db.Cluster_host.set_live ~__context ~self ~value:false ; - Db.Cluster_host.set_last_update_live ~__context ~self - ~value:current_time - ) - missing_hosts ; - maybe_generate_alert ~__context ~missing_hosts ~new_hosts - ~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum - ) ; - Db.Cluster.set_quorum ~__context ~self:cluster - ~value:(Int64.of_int diag.quorum) ; - Db.Cluster.set_live_hosts ~__context ~self:cluster - ~value:(Int64.of_int diag.total_votes) - | Error (InternalError message) | Error (Unix_error message) -> - warn "%s Cannot query diagnostics due to %s, not performing update" - __FUNCTION__ message - | exception exn -> - warn - "%s: Got exception %s while retrieving diagnostics info, not \ - performing update" - __FUNCTION__ (Printexc.to_string exn) - -let create_cluster_watcher_on_master ~__context ~host = - if Helpers.is_pool_master ~__context ~host then - let watch () = - while !Daemon.enabled do - let m = - Cluster_client.LocalClient.UPDATES.get (rpc ~__context) - "call cluster watcher" 3. - in - match Idl.IdM.run @@ Cluster_client.IDL.T.get m with - | Ok updates -> ( - match find_cluster_host ~__context ~host with - | Some ch -> - let cluster = Db.Cluster_host.get_cluster ~__context ~self:ch in - on_corosync_update ~__context ~cluster updates + let current_time = API.Date.now () in + match diag.quorum_members with | None -> - () - ) - | Error (InternalError "UPDATES.Timeout") -> - (* UPDATES.get timed out, this is normal, now retry *) + List.iter + (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:false ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time + ) + all_cluster_hosts + | Some nodel -> + (* nodel contains the current members of the cluster, according to corosync *) + let quorum_hosts = + List.filter_map + (fun {addr; _} -> + let ipstr = ipstr_of_address addr in + match List.assoc_opt ipstr ip_ch with + | None -> + error + "%s: cannot find cluster host with network address \ + %s, ignoring this host" + __FUNCTION__ ipstr ; + None + | Some ch -> + Some ch + ) + nodel + in + + (* hosts_left contains the hosts that were live, but not in the list + of live hosts according to the cluster stack *) + let hosts_left = + List.filter (fun h -> not (List.mem h quorum_hosts)) live_hosts + in + (* hosts_joined contains the hosts that were dead but exists in the db, + and is now viewed as a member of the cluster by the cluster stack *) + let hosts_joined = + List.filter (fun h -> List.mem h quorum_hosts) dead_hosts + in + debug "%s: there are %d hosts joined and %d hosts left" + __FUNCTION__ (List.length hosts_joined) (List.length hosts_left) ; + + List.iter + (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:true ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time + ) + quorum_hosts ; + List.filter + (fun h -> not (List.mem h quorum_hosts)) + all_cluster_hosts + |> List.iter (fun self -> + Db.Cluster_host.set_live ~__context ~self ~value:false ; + Db.Cluster_host.set_last_update_live ~__context ~self + ~value:current_time + ) ; + maybe_generate_alert ~__context ~hosts_left ~hosts_joined + ~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum + ) ; + Db.Cluster.set_quorum ~__context ~self:cluster + ~value:(Int64.of_int diag.quorum) ; + Db.Cluster.set_live_hosts ~__context ~self:cluster + ~value:(Int64.of_int diag.total_votes) + | Error (InternalError message) | Error (Unix_error message) -> + warn "%s Cannot query diagnostics due to %s, not performing update" + __FUNCTION__ message + | exception exn -> + warn + "%s: Got exception %s while retrieving diagnostics info, not \ + performing update" + __FUNCTION__ (Printexc.to_string exn) + + let cluster_change_watcher : bool Atomic.t = Atomic.make false + + (* this is the time it takes for the update request to time out. It is ok to set + it to a relatively long value since the call will return immediately if there + is an update *) + let cluster_change_interval = Mtime.Span.min + + (* we handle unclean hosts join and leave in the watcher, i.e. hosts joining and leaving + due to network problems, power cut, etc. Join and leave initiated by the + API will be handled in the API call themselves, but they share the same code + as the watcher. *) + let watch_cluster_change ~__context ~host = + while !Daemon.enabled do + let m = + Cluster_client.LocalClient.UPDATES.get (rpc ~__context) + "call cluster watcher" + (Clock.Timer.span_to_s cluster_change_interval) + in + match Idl.IdM.run @@ Cluster_client.IDL.T.get m with + | Ok updates -> ( + match find_cluster_host ~__context ~host with + | Some ch -> + let cluster = Db.Cluster_host.get_cluster ~__context ~self:ch in + on_corosync_update ~__context ~cluster updates + | None -> () - | Error (InternalError message) | Error (Unix_error message) -> - warn "%s: Cannot query cluster host updates with error %s" - __FUNCTION__ message - | exception exn -> - warn - "%s: Got exception %s while query cluster host updates, retrying" - __FUNCTION__ (Printexc.to_string exn) ; - Thread.delay 3. - done - in - if Xapi_cluster_helpers.cluster_health_enabled ~__context then ( - debug "%s: create watcher for corosync-notifyd on master" __FUNCTION__ ; - ignore @@ Thread.create watch () - ) else - debug - "%s: not creating watcher for corosync-notifyd: feature cluster_health \ - not enabled" - __FUNCTION__ + ) + | Error (InternalError "UPDATES.Timeout") -> + (* UPDATES.get timed out, this is normal, now retry *) + () + | Error (InternalError message) | Error (Unix_error message) -> + warn "%s: Cannot query cluster host updates with error %s" + __FUNCTION__ message + | exception exn -> + warn "%s: Got exception %s while query cluster host updates, retrying" + __FUNCTION__ (Printexc.to_string exn) ; + Thread.delay (Clock.Timer.span_to_s cluster_change_interval) + done ; + Atomic.set cluster_change_watcher false + + (** [create_as_necessary] will create cluster watchers on the coordinator if they are not + already created. + There is no need to destroy them: once the clustering daemon is disabled, + these threads will exit as well. *) + let create_as_necessary ~__context ~host = + if Helpers.is_pool_master ~__context ~host then + if Xapi_cluster_helpers.cluster_health_enabled ~__context then + if Atomic.compare_and_set cluster_change_watcher false true then ( + debug "%s: create watcher for corosync-notifyd on coordinator" + __FUNCTION__ ; + ignore + @@ Thread.create (fun () -> watch_cluster_change ~__context ~host) () + ) else + (* someone else must have gone into the if branch above and created the thread + before us, leave it to them *) + debug + "%s: not create watcher for corosync-notifyd as it already exists" + __FUNCTION__ +end