diff --git a/dune-project b/dune-project index 8d762f8e07a..806b80b189b 100644 --- a/dune-project +++ b/dune-project @@ -37,6 +37,13 @@ ) ) +(package + (name tgroup) + (depends + xapi-log + xapi-stdext-unix) +) + (package (name xml-light2) ) @@ -321,6 +328,7 @@ (synopsis "The toolstack daemon which implements the XenAPI") (description "This daemon exposes the XenAPI and is used by clients such as 'xe' and 'XenCenter' to manage clusters of Xen-enabled hosts.") (depends + (ocaml (>= 4.09)) (alcotest :with-test) angstrom astring @@ -374,6 +382,7 @@ tar tar-unix uri + tgroup (uuid (= :version)) uutf uuidm @@ -587,6 +596,7 @@ This package provides an Lwt compatible interface to the library.") (safe-resources(= :version)) sha (stunnel (= :version)) + tgroup uri (uuid (= :version)) xapi-backtrace diff --git a/http-lib.opam b/http-lib.opam index df1b7735eb7..ea91e9c942d 100644 --- a/http-lib.opam +++ b/http-lib.opam @@ -22,6 +22,7 @@ depends: [ "safe-resources" {= version} "sha" "stunnel" {= version} + "tgroup" "uri" "uuid" {= version} "xapi-backtrace" diff --git a/ocaml/idl/datamodel.ml b/ocaml/idl/datamodel.ml index 27068cf4d28..75137852c71 100644 --- a/ocaml/idl/datamodel.ml +++ b/ocaml/idl/datamodel.ml @@ -8517,11 +8517,18 @@ module Event = struct ] ~doc: "Blocking call which returns a (possibly empty) batch of events. This \ - method is only recommended for legacy use. New development should use \ - event.from which supersedes this method." + method is only recommended for legacy use.It stores events in a \ + buffer of limited size, raising EVENTS_LOST if too many events got \ + generated. New development should use event.from which supersedes \ + this method." ~custom_marshaller:true ~flags:[`Session] ~result:(Set (Record _event), "A set of events") - ~errs:[Api_errors.session_not_registered; Api_errors.events_lost] + ~errs: + [ + Api_errors.session_not_registered + ; Api_errors.events_lost + ; Api_errors.event_subscription_parse_failure + ] ~allowed_roles:_R_ALL () let from = @@ -8551,7 +8558,8 @@ module Event = struct ~doc: "Blocking call which returns a new token and a (possibly empty) batch \ of events. The returned token can be used in subsequent calls to this \ - function." + function. It eliminates redundant events (e.g. same field updated \ + multiple times)." ~custom_marshaller:true ~flags:[`Session] ~result: ( Set (Record _event) @@ -8562,7 +8570,11 @@ module Event = struct (*In reality the event batch is not a set of records as stated here. Due to the difficulty of representing this in the datamodel, the doc is generated manually, so ensure the markdown_backend.ml and gen_json.ml is updated if something changes. *) - ~errs:[Api_errors.session_not_registered; Api_errors.events_lost] + ~errs: + [ + Api_errors.event_from_token_parse_failure + ; Api_errors.event_subscription_parse_failure + ] ~allowed_roles:_R_ALL () let get_current_id = diff --git a/ocaml/libs/http-lib/dune b/ocaml/libs/http-lib/dune index 42286576aa4..781a555e18f 100644 --- a/ocaml/libs/http-lib/dune +++ b/ocaml/libs/http-lib/dune @@ -43,6 +43,7 @@ http_lib ipaddr polly + tgroup threads.posix tracing tracing_propagator diff --git a/ocaml/libs/http-lib/http.ml b/ocaml/libs/http-lib/http.ml index 554f3ed6217..c979e1f7d98 100644 --- a/ocaml/libs/http-lib/http.ml +++ b/ocaml/libs/http-lib/http.ml @@ -132,6 +132,8 @@ module Hdr = struct let location = "location" + let originator = "originator" + let hsts = "strict-transport-security" end @@ -674,6 +676,14 @@ module Request = struct let headers, body = to_headers_and_body x in let frame_header = if x.frame then make_frame_header headers else "" in frame_header ^ headers ^ body + + let with_originator_of req f = + Option.iter + (fun req -> + let originator = List.assoc_opt Hdr.originator req.additional_headers in + f originator + ) + req end module Response = struct diff --git a/ocaml/libs/http-lib/http.mli b/ocaml/libs/http-lib/http.mli index 13b8bcaa4fa..114ddbc4f45 100644 --- a/ocaml/libs/http-lib/http.mli +++ b/ocaml/libs/http-lib/http.mli @@ -126,6 +126,8 @@ module Request : sig val to_wire_string : t -> string (** [to_wire_string t] returns a string which could be sent to a server *) + + val with_originator_of : t option -> (string option -> unit) -> unit end (** Parsed form of the HTTP response *) diff --git a/ocaml/libs/http-lib/http_svr.ml b/ocaml/libs/http-lib/http_svr.ml index 017587f3737..d84ba6ad627 100644 --- a/ocaml/libs/http-lib/http_svr.ml +++ b/ocaml/libs/http-lib/http_svr.ml @@ -574,6 +574,8 @@ let handle_connection ~header_read_timeout ~header_total_timeout ~max_length:max_header_length ss in + Http.Request.with_originator_of req Tgroup.of_req_originator ; + (* 2. now we attempt to process the request *) let finished = Option.fold ~none:true diff --git a/ocaml/libs/stunnel/stunnel_cache.ml b/ocaml/libs/stunnel/stunnel_cache.ml index d69fbf10091..be865a216dc 100644 --- a/ocaml/libs/stunnel/stunnel_cache.ml +++ b/ocaml/libs/stunnel/stunnel_cache.ml @@ -40,15 +40,19 @@ let debug = if debug_enabled then debug else ignore_log type endpoint = {host: string; port: int} (* Need to limit the absolute number of stunnels as well as the maximum age *) -let max_stunnel = 70 +let max_stunnel = Atomic.make 70 -let max_age = 180. *. 60. (* seconds *) +let set_max_stunnel n = + D.info "Setting max_stunnel = %d" n ; + Atomic.set max_stunnel n -let max_idle = 5. *. 60. (* seconds *) +let max_age = ref (180. *. 60.) (* seconds *) + +let max_idle = ref (5. *. 60.) (* seconds *) (* The add function adds the new stunnel before doing gc, so the cache *) (* can briefly contain one more than maximum. *) -let capacity = max_stunnel + 1 +let capacity = Atomic.get max_stunnel + 1 (** An index of endpoints to stunnel IDs *) let index : (endpoint, int list) Hashtbl.t ref = ref (Hashtbl.create capacity) @@ -104,6 +108,7 @@ let unlocked_gc () = let to_gc = ref [] in (* Find the ones which are too old *) let now = Unix.gettimeofday () in + let max_age = !max_age and max_idle = !max_idle in Tbl.iter !stunnels (fun idx stunnel -> match Hashtbl.find_opt !times idx with | Some time -> @@ -122,6 +127,7 @@ let unlocked_gc () = debug "%s: found no entry for idx=%d" __FUNCTION__ idx ) ; let num_remaining = List.length all_ids - List.length !to_gc in + let max_stunnel = Atomic.get max_stunnel in if num_remaining > max_stunnel then ( let times' = Hashtbl.fold (fun k v acc -> (k, v) :: acc) !times [] in let times' = diff --git a/ocaml/libs/stunnel/stunnel_cache.mli b/ocaml/libs/stunnel/stunnel_cache.mli index 9a2923dfcbf..724642d1dc0 100644 --- a/ocaml/libs/stunnel/stunnel_cache.mli +++ b/ocaml/libs/stunnel/stunnel_cache.mli @@ -19,6 +19,11 @@ HTTP 1.1 should be used and the connection should be kept-alive. *) +val set_max_stunnel : int -> unit +(** [set_max_stunnel] set the maximum number of unusued, but cached client stunnel connections. + This should be a low number on pool members, to avoid hitting limits on the coordinator with large pools. + *) + val with_connect : ?use_fork_exec_helper:bool -> ?write_to_log:(string -> unit) @@ -46,3 +51,9 @@ val flush : unit -> unit val gc : unit -> unit (** GCs old stunnels *) + +val max_age : float ref +(** maximum time a connection is kept in the stunnel cache, counted from the time it got initially added to the cache *) + +val max_idle : float ref +(** maximum time a connection is kept in the stunnel cache, counted from the most recent time it got (re)added to the cache. *) diff --git a/ocaml/libs/tgroup/dune b/ocaml/libs/tgroup/dune new file mode 100644 index 00000000000..cff00ee1157 --- /dev/null +++ b/ocaml/libs/tgroup/dune @@ -0,0 +1,11 @@ +(library + (name tgroup) + (modules tgroup) + (public_name tgroup) + (libraries xapi-log xapi-stdext-unix xapi-stdext-std)) + +(test + (name test_tgroup) + (modules test_tgroup) + (package tgroup) + (libraries tgroup alcotest xapi-log)) diff --git a/ocaml/libs/tgroup/test_tgroup.ml b/ocaml/libs/tgroup/test_tgroup.ml new file mode 100644 index 00000000000..7623a0c01ee --- /dev/null +++ b/ocaml/libs/tgroup/test_tgroup.ml @@ -0,0 +1,83 @@ +module D = Debug.Make (struct let name = __MODULE__ end) + +let test_identity () = + let specs = + [ + ((Some "XenCenter2024", "u1000"), "u1000/XenCenter2024") + ; ((None, "u1001"), "u1001") + ; ((None, "Special!@#"), "Special") + ; ((Some "With-Hyphen", "123"), "123/WithHyphen") + ; ((Some "", ""), "root") + ; ((Some " Xen Center 2024 ", ", u 1000 "), "u1000/XenCenter2024") + ; ((Some "Xen Center ,/@.~# 2024", "root"), "root/XenCenter2024") + ; ((Some "XenCenter 2024.3.18", ""), "root/XenCenter2024318") + ; ((Some "", "S-R-X-Y1-Y2-Yn-1-Yn"), "SRXY1Y2Yn1Yn") + ; ( (Some "XenCenter2024", "S-R-X-Y1-Y2-Yn-1-Yn") + , "SRXY1Y2Yn1Yn/XenCenter2024" + ) + ] + in + + let test_make ((user_agent, subject_sid), expected_identity) = + let actual_identity = + Tgroup.Group.Identity.(make ?user_agent subject_sid |> to_string) + in + Alcotest.(check string) + "Check expected identity" expected_identity actual_identity + in + List.iter test_make specs + +let test_of_creator () = + let dummy_identity = + Tgroup.Group.Identity.make ~user_agent:"XenCenter2024" "root" + in + let specs = + [ + ((None, None, None, None), "external/unauthenticated") + ; ((Some true, None, None, None), "external/intrapool") + ; ( ( Some true + , Some Tgroup.Group.Endpoint.External + , Some dummy_identity + , Some "sm" + ) + , "external/intrapool" + ) + ; ( ( Some true + , Some Tgroup.Group.Endpoint.Internal + , Some dummy_identity + , Some "sm" + ) + , "external/intrapool" + ) + ; ( ( None + , Some Tgroup.Group.Endpoint.Internal + , Some dummy_identity + , Some "cli" + ) + , "internal/cli" + ) + ; ( (None, None, Some dummy_identity, Some "sm") + , "external/authenticated/root/XenCenter2024" + ) + ] + in + let test_make ((intrapool, endpoint, identity, originator), expected_group) = + let originator = Option.map Tgroup.Group.Originator.of_string originator in + let actual_group = + Tgroup.Group.( + Creator.make ?intrapool ?endpoint ?identity ?originator () + |> of_creator + |> to_string + ) + in + Alcotest.(check string) "Check expected group" expected_group actual_group + in + List.iter test_make specs + +let tests = + [ + ("identity make", `Quick, test_identity) + ; ("group of creator", `Quick, test_of_creator) + ] + +let () = Alcotest.run "Tgroup library" [("Thread classification", tests)] diff --git a/ocaml/libs/tgroup/test_tgroup.mli b/ocaml/libs/tgroup/test_tgroup.mli new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ocaml/libs/tgroup/tgroup.ml b/ocaml/libs/tgroup/tgroup.ml new file mode 100644 index 00000000000..171b78ee2b2 --- /dev/null +++ b/ocaml/libs/tgroup/tgroup.ml @@ -0,0 +1,327 @@ +(* + * Copyright (C) Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +module D = Debug.Make (struct let name = __MODULE__ end) + +open D + +let ( // ) = Filename.concat + +module Group = struct + module Internal = struct + type t + + let name = "internal" + end + + module External = struct + type t + + let name = "external" + + module Intrapool = struct + type t + + let name = "intrapool" + end + + module Authenticated = struct + type t = string + + let name = "authenticated" + end + + module Unauthenticated = struct + type t + + let name = "unauthenticated" + end + end + + module SM = struct + type t + + let name = "SM" + end + + module CLI = struct + type t + + let name = "cli" + end + + module Identity = struct + type t = {user_agent: string option; subject_sid: string} + + let is_alphanum = function + | '0' .. '9' | 'a' .. 'z' | 'A' .. 'Z' -> + true + | _ -> + false + + let sanitize s = + Xapi_stdext_std.Xstringext.String.filter_chars s is_alphanum + + let make ?user_agent subject_sid = + let user_agent = + user_agent + |> Option.map sanitize + |> Option.map (fun user_agent -> + let len = Int.min (String.length user_agent) 16 in + String.sub user_agent 0 len + ) + in + + let user_agent = if user_agent = Some "" then None else user_agent in + let subject_sid = + if subject_sid = "" then "root" else sanitize subject_sid + in + {user_agent; subject_sid} + + let to_string i = + match i.user_agent with + | Some user_agent -> + i.subject_sid // user_agent + | None -> + i.subject_sid + + let root_identity = make "root" + end + + type _ group = + | Internal_SM : (Internal.t * SM.t) group + | Internal_CLI : (Internal.t * CLI.t) group + | External_Intrapool : (External.t * External.Intrapool.t) group + | External_Authenticated : + Identity.t + -> (External.t * External.Authenticated.t) group + | External_Unauthenticated : (External.t * External.Unauthenticated.t) group + + type t = Group : 'a group -> t + + let all = + [ + Group Internal_SM + ; Group Internal_CLI + ; Group External_Intrapool + ; Group (External_Authenticated Identity.root_identity) + ; Group External_Unauthenticated + ] + + module Endpoint = struct type t = Internal | External end + + module Kind = struct + type t = Intrapool | Authenticated of Identity.t | Unautheticated + + let to_string = function + | Intrapool -> + External.Intrapool.name + | Authenticated identity -> + External.Authenticated.name // Identity.to_string identity + | Unautheticated -> + External.Unauthenticated.name + end + + module Originator = struct + type t = Internal_SM | Internal_CLI | External + + let of_string = function + | s + when String.equal + (String.lowercase_ascii SM.name) + (String.lowercase_ascii s) -> + Internal_SM + | s + when String.equal + (String.lowercase_ascii CLI.name) + (String.lowercase_ascii s) -> + Internal_CLI + | _ -> + External + + let to_string = function + | Internal_SM -> + SM.name + | Internal_CLI -> + CLI.name + | External -> + External.name + end + + module Creator = struct + type t = {endpoint: Endpoint.t; kind: Kind.t; originator: Originator.t} + + let make ?(intrapool = false) ?(endpoint = Endpoint.External) ?identity + ?originator () = + match (intrapool, endpoint, identity, originator) with + | true, _, _, _ -> + { + endpoint= Endpoint.External + ; kind= Kind.Intrapool + ; originator= Originator.External + } + | false, Endpoint.Internal, _, Some originator -> + { + endpoint= Endpoint.Internal + ; kind= Kind.Authenticated Identity.root_identity + ; originator + } + | false, Endpoint.Internal, _, None -> + { + endpoint= Endpoint.External + ; kind= Kind.Authenticated Identity.root_identity + ; originator= Originator.External + } + | false, Endpoint.External, Some identity, _ -> + { + endpoint= Endpoint.External + ; kind= Kind.Authenticated identity + ; originator= Originator.External + } + | false, Endpoint.External, None, _ -> + { + endpoint= Endpoint.External + ; kind= Kind.Unautheticated + ; originator= Originator.External + } + + let default_creator = + { + endpoint= Endpoint.External + ; kind= Kind.Authenticated Identity.root_identity + ; originator= Originator.External + } + + let to_string c = + Printf.sprintf "Creator -> kind:%s originator:%s" (Kind.to_string c.kind) + (Originator.to_string c.originator) + end + + let get_originator = function + | Group Internal_SM -> + Originator.Internal_SM + | Group Internal_CLI -> + Originator.Internal_CLI + | _ -> + Originator.External + + let of_creator creator = + match + ( creator.Creator.endpoint + , creator.Creator.originator + , creator.Creator.kind + ) + with + | _, _, Intrapool -> + Group External_Intrapool + | Endpoint.Internal, Internal_SM, _ -> + Group Internal_SM + | Endpoint.Internal, Internal_CLI, _ -> + Group Internal_CLI + | Endpoint.External, Internal_CLI, Authenticated identity + | Endpoint.External, Internal_SM, Authenticated identity + | _, External, Authenticated identity -> + Group (External_Authenticated identity) + | Endpoint.External, Internal_CLI, Unautheticated + | Endpoint.External, Internal_SM, Unautheticated + | _, External, Unautheticated -> + Group External_Unauthenticated + + let to_cgroup : type a. a group -> string = function + | Internal_SM -> + Internal.name // SM.name + | Internal_CLI -> + Internal.name // CLI.name + | External_Authenticated identity -> + External.name + // External.Authenticated.name + // Identity.to_string identity + | External_Intrapool -> + External.name // External.Intrapool.name + | External_Unauthenticated -> + External.name // External.Unauthenticated.name + + let to_string g = match g with Group group -> to_cgroup group +end + +module Cgroup = struct + type t = string + + let cgroup_dir = Atomic.make None + + let dir_of group : t option = + match group with + | Group.Group group -> + Option.map + (fun dir -> dir // Group.to_cgroup group) + (Atomic.get cgroup_dir) + + let with_dir dir f arg = + Xapi_stdext_unix.Unixext.mkdir_rec dir 0o755 ; + f arg + + let write_cur_tid_to_cgroup_file filename = + try + let perms = 0o640 in + let mode = [Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC] in + Xapi_stdext_unix.Unixext.with_file filename mode perms @@ fun fd -> + (* Writing 0 to the task file will automatically transform in writing + the current caller tid to the file. + + Writing 0 to the processes file will automatically write the caller's + pid to file. *) + let buf = "0\n" in + let len = String.length buf in + if Unix.write fd (Bytes.unsafe_of_string buf) 0 len <> len then + warn "writing current tid to %s failed" filename + with exn -> + warn "writing current tid to %s failed with exception: %s" filename + (Printexc.to_string exn) + + let attach_task group = + Option.iter + (fun dir -> + let tasks_file = dir // "tasks" in + with_dir dir write_cur_tid_to_cgroup_file tasks_file + ) + (dir_of group) + + let set_cur_cgroup ~creator = attach_task (Group.of_creator creator) + + let set_cgroup creator = set_cur_cgroup ~creator + + let init dir = + let () = Atomic.set cgroup_dir (Some dir) in + Group.all + |> List.filter_map dir_of + |> List.iter (fun dir -> with_dir dir debug "created cgroup for: %s" dir) ; + set_cur_cgroup ~creator:Group.Creator.default_creator +end + +let of_req_originator originator = + Option.iter + (fun _ -> + try + originator + |> Option.iter (fun originator -> + let originator = Group.Originator.of_string originator in + Group.Creator.make ~endpoint:Group.Endpoint.Internal ~originator + () + |> Cgroup.set_cgroup + ) + with _ -> () + ) + (Atomic.get Cgroup.cgroup_dir) + +let of_creator creator = creator |> Cgroup.set_cgroup diff --git a/ocaml/libs/tgroup/tgroup.mli b/ocaml/libs/tgroup/tgroup.mli new file mode 100644 index 00000000000..b9316967ae3 --- /dev/null +++ b/ocaml/libs/tgroup/tgroup.mli @@ -0,0 +1,107 @@ +(* + * Copyright (C) Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** [Group] module helps with the classification of different xapi execution + threads.*) +module Group : sig + (** Abstract type that represents a group of execution threads in xapi. Each + group corresponds to a Creator, and has a designated level of priority.*) + type t + + (** Data structures that represents the identity *) + module Identity : sig + type t + + val root_identity : t + + val make : ?user_agent:string -> string -> t + + val to_string : t -> string + end + + (** Generic representation of different xapi threads originators. *) + module Originator : sig + (** Type that represents different originators of xapi threads. *) + type t = Internal_SM | Internal_CLI | External + + val of_string : string -> t + (** [of_string s] creates an originator from a string [s]. + + e.g create an originator based on a http header. *) + + val to_string : t -> string + (** [to_string o] converts an originator [o] to its string representation.*) + end + + (** Generic representation of different xapi threads origin endpoints. *) + module Endpoint : sig + (** Type that represents different origin endpoints of xapi threads. *) + type t = Internal | External + end + + (** Generic representation of different xapi threads creators. *) + module Creator : sig + (** Abstract type that represents different creators of xapi threads.*) + type t + + val make : + ?intrapool:bool + -> ?endpoint:Endpoint.t + -> ?identity:Identity.t + -> ?originator:Originator.t + -> unit + -> t + (** [make o] creates a creator type based on a given originator [o].*) + + val to_string : t -> string + (** [to_string c] converts a creator [c] to its string representation.*) + end + + val get_originator : t -> Originator.t + (** [get_originator group] returns the originator that maps to group [group].*) + + val of_creator : Creator.t -> t + (** [of_creator c] returns the corresponding group based on the creator [c].*) + + val to_string : t -> string + (** [to_string g] returns the string representation of the group [g].*) +end + +(** [Cgroup] module encapsulates different function for managing the cgroups + corresponding with [Groups].*) +module Cgroup : sig + (** Represents one of the children of the cgroup directory.*) + type t = string + + val dir_of : Group.t -> t option + (** [dir_of group] returns the full path of the cgroup directory corresponding + to the group [group] as [Some dir]. + + Returns [None] if [init dir] has not been called. *) + + val init : string -> unit + (** [init dir] initializes the hierachy of cgroups associated to all [Group.t] + types under the directory [dir].*) + + val set_cgroup : Group.Creator.t -> unit + (** [set_cgroup c] sets the current xapi thread in a cgroup based on the + creator [c].*) +end + +val of_creator : Group.Creator.t -> unit +(** [of_creator g] classifies the current thread based based on the creator [c].*) + +val of_req_originator : string option -> unit +(** [of_req_originator o] same as [of_creator] but it classifies based on the + http request header.*) diff --git a/ocaml/libs/timeslice/dune b/ocaml/libs/timeslice/dune new file mode 100644 index 00000000000..94eff6b3a39 --- /dev/null +++ b/ocaml/libs/timeslice/dune @@ -0,0 +1,5 @@ +(library + (name xapi_timeslice) + (package xapi-idl) + (libraries threads.posix mtime mtime.clock.os xapi-log) +) diff --git a/ocaml/libs/timeslice/recommended.ml b/ocaml/libs/timeslice/recommended.ml new file mode 100644 index 00000000000..e57af54ed3f --- /dev/null +++ b/ocaml/libs/timeslice/recommended.ml @@ -0,0 +1,48 @@ +(* + * Copyright (C) Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +module D = Debug.Make (struct let name = "timeslice_recommended" end) + +let yield_stop = Atomic.make false + +let yield_worker () = + while not (Atomic.get yield_stop) do + Thread.yield () + done + +let yield_overhead () = + (* Thread.yield only has an effect if another thread exists, + so create one that yields back immediately *) + D.debug "Measuring Thread.yield overhead" ; + Atomic.set yield_stop false ; + let t = Thread.create yield_worker () in + let measured = Simple_measure.measure Thread.yield in + D.debug "Thread.yield overhead: %.6fs <= %.6fs <= %.6fs" measured.low + measured.median measured.high ; + D.debug "Waiting for worker thread to stop" ; + Atomic.set yield_stop true ; + Thread.join t ; + measured.median + +let measure ?(max_overhead_percentage = 1.0) () = + let overhead = yield_overhead () in + let interval = overhead /. (max_overhead_percentage /. 100.) in + D.debug "Recommended timeslice interval = %.4fs" interval ; + (* Avoid too high or too low intervals: + do not go below 1ms (our HZ is 250, and max is 1000, the kernel would round up anyway) + do not go above 50ms (the current default in OCaml 4.14) + *) + let interval = interval |> Float.max 0.001 |> Float.min 0.050 in + D.debug "Final recommeded timeslice interval = %.4fs" interval ; + interval diff --git a/ocaml/libs/timeslice/recommended.mli b/ocaml/libs/timeslice/recommended.mli new file mode 100644 index 00000000000..6d902419345 --- /dev/null +++ b/ocaml/libs/timeslice/recommended.mli @@ -0,0 +1,22 @@ +(* + * Copyright (C) Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +val measure : ?max_overhead_percentage:float -> unit -> float +(** [measure ?max_overhead_percentage ()] returns the recommended timeslice for the current system. + + The returned value should be used in a call to {!val:Timeslice.set}. + + @param max_overhead_percentage default 1% + @returns [interval] such that [overhead / interval <= max_overhead_percentage / 100] + *) diff --git a/ocaml/libs/timeslice/simple_measure.ml b/ocaml/libs/timeslice/simple_measure.ml new file mode 100644 index 00000000000..1b271643f8d --- /dev/null +++ b/ocaml/libs/timeslice/simple_measure.ml @@ -0,0 +1,61 @@ +(* + * Copyright (C) Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** 95% confidence interval, and median value *) +type t = {low: float; median: float; high: float} + +let span_to_s s = Mtime.Span.to_float_ns s *. 1e-9 + +let ci95 measurements = + let n = Array.length measurements in + Array.sort Float.compare measurements ; + let median = measurements.(n / 2) in + (* "Performance Evaluation of Computer and Communication Systems", Table A. 1 *) + let n = float n in + let d = 0.98 *. sqrt n in + let lo = (n /. 2.) -. d |> Float.to_int + and hi = (n /. 2.) +. 1. +. d |> Float.ceil |> Float.to_int in + {low= measurements.(lo - 1); median; high= measurements.(hi - 1)} + +let measure ?(n = 1001) ?(inner = 10) f = + if n <= 70 then (* some of the formulas below are not valid for smaller [n] *) + invalid_arg (Printf.sprintf "n must be at least 70: %d" n) ; + (* warmup *) + Sys.opaque_identity (f ()) ; + + let measure_inner _ = + let m = Mtime_clock.counter () in + for _ = 1 to inner do + (* opaque_identity prevents the call from being optimized away *) + Sys.opaque_identity (f ()) + done ; + let elapsed = Mtime_clock.count m in + span_to_s elapsed /. float inner + in + let measurements = Array.init n measure_inner in + ci95 measurements + +let measure_min ?(n = 1001) f arg = + (* warmup *) + Sys.opaque_identity (f arg) ; + let measure_one _ = + let m = Mtime_clock.counter () in + Sys.opaque_identity (f arg) ; + let elapsed = Mtime_clock.count m in + span_to_s elapsed + in + Seq.ints 0 + |> Seq.take n + |> Seq.map measure_one + |> Seq.fold_left Float.min Float.max_float diff --git a/ocaml/libs/timeslice/simple_measure.mli b/ocaml/libs/timeslice/simple_measure.mli new file mode 100644 index 00000000000..76183a948e7 --- /dev/null +++ b/ocaml/libs/timeslice/simple_measure.mli @@ -0,0 +1,47 @@ +(* + * Copyright (C) Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** Measure the speed of an operation in a very simple and robust way. + More detailed measurements can be dune using [Bechamel]. +*) + +(** 95% confidence interval, and median value *) +type t = {low: float; median: float; high: float} + +val measure : ?n:int -> ?inner:int -> (unit -> unit) -> t +(** [measure ?n ?inner f] measures [n] times the duration of [inner] iterations of [f ()]. + + Returns the median of the inner measurements, and a 95% confidence interval. + The median is used, because it makes no assumptions about the distribution of the samples, + i.e. it doesn't require a normal (Gaussian) distribution. + + The inner measurements use a simple average, because we only know the duration of [inner] iterations, + not the duration of each individual call to [f ()]. + The purpose of the [inner] iterations is to reduce measurement overhead. + + @param n iteration count for the outer loop, must be more than [70]. + @param n iteration count for the inner loop + @param f function to measure + + @raises Invalid_argument if [n<70] + *) + +val measure_min : ?n:int -> ('a -> unit) -> 'a -> float +(** [measure_min ?n:int f arg] is the minimum amount of time that [f arg] takes. + + This should be used when we try to measure the maximum speed of some operation (e.g. cached memory accesses), + while ignoring latencies/hickups introduced by other processes on the system. + + It shouldn't be used for measuring the overhead of an operation, because the hickups may be part of that overhead. + *) diff --git a/ocaml/libs/timeslice/timeslice.ml b/ocaml/libs/timeslice/timeslice.ml new file mode 100644 index 00000000000..c414b321d64 --- /dev/null +++ b/ocaml/libs/timeslice/timeslice.ml @@ -0,0 +1,72 @@ +(* + * Copyright (C) Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(* avoid allocating an extra option every time *) +let invalid_holder = -1 + +let last_lock_holder = Atomic.make invalid_holder + +let me () = Thread.self () |> Thread.id + +let lock_acquired () = + (* these need to be very low overhead, so just keep track of the last lock holder, + i.e. track only one high-priority lock at a time + *) + Atomic.set last_lock_holder (me ()) + +let lock_released () = Atomic.set last_lock_holder invalid_holder + +let[@inline always] am_i_holding_locks () = + let last = Atomic.get last_lock_holder in + last <> invalid_holder && last = me () + +let yield_interval = Atomic.make Mtime.Span.zero + +(* TODO: use bechamel.monotonic-clock instead, which has lower overhead, + but not in the right place in xs-opam yet +*) +let last_yield = Atomic.make (Mtime_clock.counter ()) + +let failures = Atomic.make 0 + +let periodic_hook (_ : Gc.Memprof.allocation) = + let () = + try + if not (am_i_holding_locks ()) then + let elapsed = Mtime_clock.count (Atomic.get last_yield) in + if Mtime.Span.compare elapsed (Atomic.get yield_interval) > 0 then ( + let now = Mtime_clock.counter () in + Atomic.set last_yield now ; Thread.yield () + ) + with _ -> + (* It is not safe to raise exceptions here, it'd require changing all code to be safe to asynchronous interrupts/exceptions, + see https://guillaume.munch.name/software/ocaml/memprof-limits/index.html#isolation + Because this is just a performance optimization, we fall back to safe behaviour: do nothing, and just keep track that we failed + *) + Atomic.incr failures + in + None + +let periodic = + Gc.Memprof. + {null_tracker with alloc_minor= periodic_hook; alloc_major= periodic_hook} + +let set ?(sampling_rate = 1e-4) interval = + Atomic.set yield_interval + (Mtime.Span.of_float_ns @@ (interval *. 1e9) |> Option.get) ; + Gc.Memprof.start ~sampling_rate ~callstack_size:0 periodic + +let clear () = + Gc.Memprof.stop () ; + Atomic.set yield_interval Mtime.Span.zero diff --git a/ocaml/libs/timeslice/timeslice.mli b/ocaml/libs/timeslice/timeslice.mli new file mode 100644 index 00000000000..8fa54677b38 --- /dev/null +++ b/ocaml/libs/timeslice/timeslice.mli @@ -0,0 +1,44 @@ +(* + * Copyright (C) Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +val set : ?sampling_rate:float -> float -> unit +(** [set ?sampling_rate interval] calls [Thread.yield ()] at most [interval] seconds. + + The implementation of [Thread.yield] guarantees since OCaml 4.09 that we'll switch to a different OCaml thread, + if one exists that is not blocked (i.e. it doesn't rely on [sched_yield] which may run the same thread again, + but uses pthread mutexes and condition variables to ensure the current thread isn't immediately runnable). + + The setting is global for the entire process, and currently uses [Gc.Memprof] to ensure that a hook function is called periodically, + although it depends on the allocation rate of the program whether it gets called at all. + + Another alternative would be to use {!val:Unix.set_itimer}, but XAPI doesn't cope with [EINTR] in a lot of places, + and POSIX interval timers rely on signals to notify of elapsed time. + + We could also have a dedicated thread that sleeps for a certain amount of time, but if it is an OCaml thread, + we'd have no guarantees it'd get scheduled often enough (and it couldn't interrupt other threads anyway, + by the time you'd be running the handler you already gave up running something else). + + It may be desirable to avoid yielding if we are currently holding a lock, see {!val:lock_acquired}, and {!val:lock_released} + to notify this module when that happens. +*) + +val clear : unit -> unit +(** [clear ()] undoes the changes made by [set]. + This is useful for testing multiple timeslices in the same program. *) + +val lock_acquired : unit -> unit +(** [lock_acquired ()] notifies about lock acquisition. *) + +val lock_released : unit -> unit +(** [lock_acquired ()] notifies about lock release. *) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune index d8036380cd7..5d61f52cfc4 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune @@ -18,7 +18,7 @@ (public_name xapi-stdext-threads.scheduler) (name xapi_stdext_threads_scheduler) (modules ipq scheduler) - (libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads) + (libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock) ) (tests diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml index 4cf29ed3d9b..7293ae625e1 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml @@ -13,7 +13,7 @@ *) (* Imperative priority queue *) -type 'a event = {ev: 'a; time: Mtime.t} +type 'a event = {ev: 'a; time: Mtime.span} type 'a t = {default: 'a event; mutable size: int; mutable data: 'a event array} @@ -23,7 +23,7 @@ let create n default = if n <= 0 then invalid_arg "create" else - let default = {ev= default; time= Mtime_clock.now ()} in + let default = {ev= default; time= Mtime_clock.elapsed ()} in {default; size= 0; data= Array.make n default} let is_empty h = h.size <= 0 @@ -45,7 +45,7 @@ let add h x = (* moving [x] up in the heap *) let rec moveup i = let fi = (i - 1) / 2 in - if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then ( + if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then ( d.(i) <- d.(fi) ; moveup fi ) else @@ -69,7 +69,7 @@ let remove h s = (* moving [x] up in the heap *) let rec moveup i = let fi = (i - 1) / 2 in - if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then ( + if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then ( d.(i) <- d.(fi) ; moveup fi ) else @@ -83,7 +83,7 @@ let remove h s = let j' = j + 1 in if j' < n && d.(j').time < d.(j).time then j' else j in - if Mtime.is_earlier d.(j).time ~than:x.time then ( + if Mtime.Span.is_shorter d.(j).time ~than:x.time then ( d.(i) <- d.(j) ; movedown j ) else @@ -93,7 +93,7 @@ let remove h s = in if s = n then () - else if Mtime.is_later d.(s).time ~than:x.time then + else if Mtime.Span.is_longer d.(s).time ~than:x.time then moveup s else movedown s ; @@ -129,7 +129,7 @@ let check h = let d = h.data in for i = 1 to h.size - 1 do let fi = (i - 1) / 2 in - let ordered = Mtime.is_later d.(i).time ~than:d.(fi).time in + let ordered = Mtime.Span.is_longer d.(i).time ~than:d.(fi).time in assert ordered done diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli index b7c4974e642..19f8bf1e33f 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli @@ -12,7 +12,7 @@ * GNU Lesser General Public License for more details. *) -type 'a event = {ev: 'a; time: Mtime.t} +type 'a event = {ev: 'a; time: Mtime.span} type 'a t diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml index e8e64093e16..a9cc2611da8 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml @@ -17,7 +17,7 @@ module Ipq = Xapi_stdext_threads_scheduler.Ipq (* test we get "out of bound" exception calling Ipq.remove *) let test_out_of_index () = let q = Ipq.create 10 0 in - Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ; let is_oob = function | Invalid_argument s when String.ends_with ~suffix:" out of bounds" s -> true @@ -43,18 +43,18 @@ let test_leak () = let use_array () = array.(0) <- 'a' in let allocated = Atomic.make true in Gc.finalise (fun _ -> Atomic.set allocated false) array ; - Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.elapsed ()} ; Ipq.remove q 0 ; Gc.full_major () ; Gc.full_major () ; Alcotest.(check bool) "allocated" false (Atomic.get allocated) ; - Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.now ()} + Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.elapsed ()} (* test Ipq.is_empty call *) let test_empty () = let q = Ipq.create 10 0 in Alcotest.(check bool) "same value" true (Ipq.is_empty q) ; - Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ; Alcotest.(check bool) "same value" false (Ipq.is_empty q) ; Ipq.remove q 0 ; Alcotest.(check bool) "same value" true (Ipq.is_empty q) @@ -75,7 +75,7 @@ let set queue = Ipq.iter (fun d -> let t = d.time in - let t = Mtime.to_uint64_ns t in + let t = Mtime.Span.to_uint64_ns t in s := Int64Set.add t !s ) queue ; @@ -86,7 +86,7 @@ let test_old () = let s = ref Int64Set.empty in let add i = let ti = Random.int64 1000000L in - let t = Mtime.of_uint64_ns ti in + let t = Mtime.Span.of_uint64_ns ti in let e = {Ipq.time= t; Ipq.ev= i} in Ipq.add test e ; s := Int64Set.add ti !s @@ -123,7 +123,7 @@ let test_old () = let prev = ref 0L in for _ = 0 to 49 do let e = Ipq.pop_maximum test in - let t = Mtime.to_uint64_ns e.time in + let t = Mtime.Span.to_uint64_ns e.time in Alcotest.(check bool) (Printf.sprintf "%Ld bigger than %Ld" t !prev) true (t >= !prev) ; diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index 200b9925786..27cf3069955 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -33,30 +33,23 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default let lock = Mutex.create () -module Clock = struct - let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9)) - - let span_to_s span = Mtime.Span.to_float_ns span |> fun ns -> ns /. 1e9 - - let add_span clock secs = - (* return mix or max available value if the add overflows *) - match Mtime.add_span clock (span secs) with - | Some t -> - t - | None when secs > 0. -> - Mtime.max_stamp - | None -> - Mtime.min_stamp -end - -let add_to_queue name ty start newfunc = - let ( ++ ) = Clock.add_span in +let add_to_queue_span name ty start_span newfunc = + let ( ++ ) = Mtime.Span.add in let item = - {Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start} + { + Ipq.ev= {func= newfunc; ty; name} + ; Ipq.time= Mtime_clock.elapsed () ++ start_span + } in with_lock lock (fun () -> Ipq.add queue item) ; Delay.signal delay +let add_to_queue name ty start newfunc = + let start_span = + Clock.Timer.s_to_span start |> Option.value ~default:Mtime.Span.max_span + in + add_to_queue_span name ty start_span newfunc + let remove_from_queue name = with_lock lock @@ fun () -> match !pending_event with @@ -71,8 +64,11 @@ let add_periodic_pending () = with_lock lock @@ fun () -> match !pending_event with | Some ({ty= Periodic timer; _} as ev) -> - let ( ++ ) = Clock.add_span in - let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in + let ( ++ ) = Mtime.Span.add in + let delta = + Clock.Timer.s_to_span timer |> Option.value ~default:Mtime.Span.max_span + in + let item = {Ipq.ev; Ipq.time= Mtime_clock.elapsed () ++ delta} in Ipq.add queue item ; pending_event := None | Some {ty= OneShot; _} -> @@ -84,15 +80,15 @@ let loop () = debug "%s started" __MODULE__ ; try while true do - let now = Mtime_clock.now () in + let now = Mtime_clock.elapsed () in let deadline, item = with_lock lock @@ fun () -> (* empty: wait till we get something *) if Ipq.is_empty queue then - (Clock.add_span now 10.0, None) + (Mtime.Span.add now Mtime.Span.(10 * s), None) else let next = Ipq.maximum queue in - if Mtime.is_later next.Ipq.time ~than:now then + if Mtime.Span.is_longer next.Ipq.time ~than:now then (* not expired: wait till time or interrupted *) (next.Ipq.time, None) else ( @@ -110,7 +106,9 @@ let loop () = | None -> ( (* Sleep until next event. *) let sleep = - Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s + Mtime.(Span.abs_diff deadline now) + |> Mtime.Span.(add ms) + |> Clock.Timer.span_to_s in try ignore (Delay.wait delay sleep) with e -> diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli index d4d19b1f790..53a7c764dcc 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli @@ -18,6 +18,10 @@ type func_ty = | OneShot (** Fire just once *) | Periodic of float (** Fire periodically with a given period in seconds *) +val add_to_queue_span : + string -> func_ty -> Mtime.span -> (unit -> unit) -> unit +(** Start a new timer. *) + val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit (** Start a new timer. *) diff --git a/ocaml/tests/bench/bench_throttle2.ml b/ocaml/tests/bench/bench_throttle2.ml new file mode 100644 index 00000000000..50582eff4cc --- /dev/null +++ b/ocaml/tests/bench/bench_throttle2.ml @@ -0,0 +1,86 @@ +open Bechamel + +let () = + Suite_init.harness_init () ; + Debug.set_level Syslog.Warning + +let __context, _ = Test_event_common.event_setup_common () + +let allocate_tasks n = + ( __context + , Array.init n @@ fun i -> + let label = Printf.sprintf "task %d" i in + Xapi_task.create ~__context ~label ~description:"test task" + ) + +let free_tasks (__context, tasks) = + let () = + tasks |> Array.iter @@ fun self -> Xapi_task.destroy ~__context ~self + in + () + +let set_pending tasks = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`pending + +let run_tasks _n (__context, tasks) = + set_pending tasks ; + let () = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success + in + tasks |> Array.iter @@ fun t -> Helpers.Task.wait_for ~__context ~tasks:[t] + +let run_tasks' _n (__context, tasks) = + set_pending tasks ; + let () = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success + in + Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks) + +module D = Debug.Make (struct let name = __MODULE__ end) + +let run_tasks'' n (__context, tasks) = + set_pending tasks ; + let finished = Atomic.make 0 in + let (t : Thread.t) = + Thread.create + (fun () -> + for _ = 1 to 10 do + Thread.yield () + done ; + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success ; + Atomic.incr finished + ) + () + in + Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks) ; + let f = Atomic.get finished in + assert (f = n || f = n - 1) ; + Thread.join t + +let benchmarks = + Test.make_grouped ~name:"Task latency" + [ + Test.make_indexed_with_resource ~name:"task complete+wait latency" + ~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks + ~free:free_tasks (fun n -> Staged.stage (run_tasks n) + ) + ; Test.make_indexed_with_resource ~name:"task complete+wait all latency" + ~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks + ~free:free_tasks (fun n -> Staged.stage (run_tasks' n) + ) + ; Test.make_indexed_with_resource + ~name:"task complete+wait all latency (thread)" ~args:[1; 10; 100] + Test.multiple ~allocate:allocate_tasks ~free:free_tasks (fun n -> + Staged.stage (run_tasks'' n) + ) + ] + +let () = Bechamel_simple_cli.cli benchmarks diff --git a/ocaml/tests/bench/dune b/ocaml/tests/bench/dune index dcd61813e1e..10cffadb857 100644 --- a/ocaml/tests/bench/dune +++ b/ocaml/tests/bench/dune @@ -1,4 +1,4 @@ (executables - (names bench_tracing bench_uuid) - (libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid) + (names bench_tracing bench_uuid bench_throttle2) + (libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid xapi_aux tests_common log xapi_internal) ) diff --git a/ocaml/tests/common/dune b/ocaml/tests/common/dune index 29acca3d2cb..a8ab57a4a23 100644 --- a/ocaml/tests/common/dune +++ b/ocaml/tests/common/dune @@ -28,6 +28,7 @@ xapi-stdext-date xapi-stdext-threads.scheduler xapi-stdext-unix + xapi_timeslice ) ) diff --git a/ocaml/tests/common/mock_rpc.ml b/ocaml/tests/common/mock_rpc.ml index 808308afb1c..9edf87897e7 100644 --- a/ocaml/tests/common/mock_rpc.ml +++ b/ocaml/tests/common/mock_rpc.ml @@ -25,7 +25,7 @@ let rpc __context call = Rpc. { success= true - ; contents= contents |> Xmlrpc.to_string |> Xmlrpc.of_string + ; contents= contents |> Jsonrpc.to_string |> Jsonrpc.of_string ; is_notification= false } | "VM.update_allowed_operations", [session_id_rpc; self_rpc] -> diff --git a/ocaml/tests/common/suite_init.ml b/ocaml/tests/common/suite_init.ml index e63deae17b5..adb9c501e88 100644 --- a/ocaml/tests/common/suite_init.ml +++ b/ocaml/tests/common/suite_init.ml @@ -11,4 +11,6 @@ let harness_init () = Filename.concat Test_common.working_area "xapi-inventory" ; Xcp_client.use_switch := false ; Pool_role.set_pool_role_for_test () ; - Message_forwarding.register_callback_fns () + Message_forwarding.register_callback_fns () ; + (* for unit tests use a fixed value *) + Xapi_timeslice.Timeslice.set 0.004 diff --git a/ocaml/tests/dune b/ocaml/tests/dune index fbdc42b66fc..f26f17cab5d 100644 --- a/ocaml/tests/dune +++ b/ocaml/tests/dune @@ -113,6 +113,7 @@ xapi-idl.xen xapi-stdext-date xapi-stdext-threads + xapi-stdext-threads.scheduler xapi-stdext-unix xapi-test-utils xapi-tracing diff --git a/ocaml/tests/test_event.ml b/ocaml/tests/test_event.ml index 9078244462b..821bb3bb52d 100644 --- a/ocaml/tests/test_event.ml +++ b/ocaml/tests/test_event.ml @@ -277,6 +277,37 @@ let object_level_event_test _session_id = if !failure then Alcotest.fail "failed to see object-level event change" +let test_short_oneshot () = + (* don't call event_setup_common here, it'll register a dummy event and hide the bug *) + let started = ref false in + let m = Mutex.create () in + let cond = Condition.create () in + let scheduler () = + Mutex.lock m ; + started := true ; + Condition.broadcast cond ; + Mutex.unlock m ; + Xapi_stdext_threads_scheduler.Scheduler.loop () + in + ignore (Thread.create scheduler ()) ; + (* ensure scheduler sees an empty queue , by waiting for it to start *) + Mutex.lock m ; + while not !started do + Condition.wait cond m + done ; + Mutex.unlock m ; + (* run the scheduler, let it realize its queue is empty, + a Thread.yield is not enough due to the use of debug which would yield back almost immediately. + *) + Thread.delay 1. ; + let fired = Atomic.make false in + let fire () = Atomic.set fired true in + let task = "test_oneshot" in + Xapi_stdext_threads_scheduler.Scheduler.add_to_queue task + Xapi_stdext_threads_scheduler.Scheduler.OneShot 1. fire ; + Thread.delay 2. ; + assert (Atomic.get fired) + let test = [ ("test_event_from_timeout", `Slow, test_event_from_timeout) @@ -287,4 +318,5 @@ let test = ; ("test_event_from", `Quick, event_from_test) ; ("test_event_from_parallel", `Slow, event_from_parallel_test) ; ("test_event_object_level_event", `Slow, object_level_event_test) + ; ("test_short_oneshot", `Slow, test_short_oneshot) ] diff --git a/ocaml/xapi-aux/dune b/ocaml/xapi-aux/dune index 86fbd8647c9..d334769d655 100644 --- a/ocaml/xapi-aux/dune +++ b/ocaml/xapi-aux/dune @@ -3,6 +3,7 @@ (modes best) (libraries astring + clock cstruct forkexec ipaddr diff --git a/ocaml/xapi-aux/throttle.ml b/ocaml/xapi-aux/throttle.ml index a9dacf7f164..26eeff877d1 100644 --- a/ocaml/xapi-aux/throttle.ml +++ b/ocaml/xapi-aux/throttle.ml @@ -39,3 +39,48 @@ module Make (Size : SIZE) = struct let execute f = execute (get_semaphore ()) f end + +module Batching = struct + type t = { + delay_initial: Mtime.span + ; delay_before: Mtime.span + ; delay_between: Mtime.span + } + + let make ~delay_before ~delay_between = + (* we are dividing, cannot overflow *) + let delay_initial = + Mtime.Span.to_float_ns delay_between /. 16. + |> Mtime.Span.of_float_ns + |> Option.get + in + {delay_initial; delay_before; delay_between} + + let span_min a b = if Mtime.Span.is_shorter a ~than:b then a else b + + (** [perform_delay delay] calls {!val:Thread.delay} when [delay] is non-zero. + + Thread.delay 0 provides no fairness guarantees, the current thread may actually be the one that gets the global lock again. + Instead {!val:Thread.yield} could be used, which does provide fairness guarantees, but it may also introduce large latencies + when there are lots of threads waiting for the OCaml runtime lock. Only invoke this once, in the [delay_before] section. + *) + let perform_delay ~yield delay = + if Mtime.Span.is_longer delay ~than:Mtime.Span.zero then + Thread.delay (Clock.Timer.span_to_s delay) + else if yield then + (* this is a low-priority thread, if there are any other threads waiting, then run them now. + If there are no threads waiting then this a noop. + Requires OCaml >= 4.09 (older versions had fairness issues in Thread.yield) + *) + Thread.yield () + + let with_recursive_loop config f = + let rec self arg input = + let arg = span_min config.delay_between Mtime.Span.(2 * arg) in + perform_delay ~yield:false arg ; + (f [@tailcall]) (self arg) input + in + let self0 input = (f [@tailcall]) (self config.delay_initial) input in + perform_delay ~yield:true config.delay_before ; + f self0 +end diff --git a/ocaml/xapi-aux/throttle.mli b/ocaml/xapi-aux/throttle.mli index 897fe5ed6ce..7c5ca1e916c 100644 --- a/ocaml/xapi-aux/throttle.mli +++ b/ocaml/xapi-aux/throttle.mli @@ -22,3 +22,39 @@ module Make (_ : SIZE) : sig val execute : (unit -> 'a) -> 'a end + +module Batching : sig + (** batching delay configuration *) + type t + + val make : delay_before:Mtime.Span.t -> delay_between:Mtime.Span.t -> t + (** [make ~delay_before ~delay_between] creates a configuration, + where we delay the API call by [delay_before] once, + and then with [delay_between] between each recursive call. + *) + + val with_recursive_loop : t -> (('a -> 'b) -> 'a -> 'b) -> 'a -> 'b + (** [with_recursive_loop config f arg] calls [f self arg], where [self] can be used + for recursive calls. + + [arg] is an argument that the implementation of [f] can change between recursive calls for its own purposes, + otherwise [()] can be used. + + A [delay_before] amount of seconds is inserted once, and [delay_between/8] is inserted between recursive calls, + except the first one, and delays increase exponentially until [delay_between] is reached + {v + delay_before + f ... + (self[@tailcall]) ... + f ... + (self[@tailcall]) ... + delay_between/8 + f ... + (self[@tailcall]) ... + delay_between/4 + f ... + v} + + The delays are determined by [config], and [delay_between] uses an exponential backoff, up to [config.delay_between] delay. + *) +end diff --git a/ocaml/xapi-cli-server/cli_operations.ml b/ocaml/xapi-cli-server/cli_operations.ml index 9a6381892b1..265c6f86a3a 100644 --- a/ocaml/xapi-cli-server/cli_operations.ml +++ b/ocaml/xapi-cli-server/cli_operations.ml @@ -2881,8 +2881,6 @@ exception Finished let event_wait_gen rpc session_id classname record_matches = (* Immediately register *) let classes = [classname] in - Client.Event.register ~rpc ~session_id ~classes ; - debug "Registered for events" ; (* Check to see if the condition is already satisfied - get all objects of whatever class specified... *) let poll () = let current_tbls = @@ -2963,96 +2961,111 @@ let event_wait_gen rpc session_id classname record_matches = in List.exists record_matches all_recs in - finally - (fun () -> - if not (poll ()) then - try - while true do - try - let events = - Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) - in - let doevent event = - let tbl = - match Event_helper.record_of_event event with - | Event_helper.VM (r, Some x) -> - let record = vm_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VDI (r, Some x) -> - let record = vdi_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.SR (r, Some x) -> - let record = sr_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Host (r, Some x) -> - let record = host_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Network (r, Some x) -> - let record = net_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VIF (r, Some x) -> - let record = vif_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.PIF (r, Some x) -> - let record = pif_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VBD (r, Some x) -> - let record = vbd_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.PBD (r, Some x) -> - let record = pbd_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Pool (r, Some x) -> - let record = pool_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Task (r, Some x) -> - let record = task_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VMSS (r, Some x) -> - let record = vmss_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Secret (r, Some x) -> - let record = secret_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | _ -> - failwith - ("Cli listening for class '" - ^ classname - ^ "' not currently implemented" - ) - in - let record = - List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl - in - if record_matches record then raise Finished + let use_event_next = !Constants.use_event_next in + let run () = + if not (poll ()) then + try + let token = ref "" in + while true do + let events = + if use_event_next then + Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + else + let event_from = + Event_types.event_from_of_rpc + (Client.Event.from ~rpc ~session_id ~timeout:30. ~token:!token + ~classes + ) in - List.iter doevent - (List.filter (fun e -> e.Event_types.snapshot <> None) events) - with - | Api_errors.Server_error (code, _) - when code = Api_errors.events_lost - -> - debug "Got EVENTS_LOST; reregistering" ; - Client.Event.unregister ~rpc ~session_id ~classes ; - Client.Event.register ~rpc ~session_id ~classes ; - if poll () then raise Finished - done - with Finished -> () - ) - (fun () -> Client.Event.unregister ~rpc ~session_id ~classes) + token := event_from.token ; + event_from.events + in + let doevent event = + let tbl = + match Event_helper.record_of_event event with + | Event_helper.VM (r, Some x) -> + let record = vm_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VDI (r, Some x) -> + let record = vdi_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.SR (r, Some x) -> + let record = sr_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Host (r, Some x) -> + let record = host_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Network (r, Some x) -> + let record = net_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VIF (r, Some x) -> + let record = vif_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.PIF (r, Some x) -> + let record = pif_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VBD (r, Some x) -> + let record = vbd_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.PBD (r, Some x) -> + let record = pbd_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Pool (r, Some x) -> + let record = pool_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Task (r, Some x) -> + let record = task_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VMSS (r, Some x) -> + let record = vmss_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Secret (r, Some x) -> + let record = secret_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | _ -> + failwith + ("Cli listening for class '" + ^ classname + ^ "' not currently implemented" + ) + in + let record = + List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl + in + if record_matches record then raise_notrace Finished + in + List.iter doevent + (List.filter (fun e -> e.Event_types.snapshot <> None) events) + done + with + | Api_errors.Server_error (code, _) + when code = Api_errors.events_lost && use_event_next -> + debug "Got EVENTS_LOST; reregistering" ; + Client.Event.unregister ~rpc ~session_id ~classes ; + Client.Event.register ~rpc ~session_id ~classes ; + if poll () then raise Finished + | Finished -> + () + in + if use_event_next then ( + Client.Event.register ~rpc ~session_id ~classes ; + debug "Registered for events" ; + finally run (fun () -> Client.Event.unregister ~rpc ~session_id ~classes) + ) else + run () (* We're done. Unregister and finish *) diff --git a/ocaml/xapi-cli-server/cli_util.ml b/ocaml/xapi-cli-server/cli_util.ml index 48fd9392ef5..75c4f30360f 100644 --- a/ocaml/xapi-cli-server/cli_util.ml +++ b/ocaml/xapi-cli-server/cli_util.ml @@ -42,21 +42,41 @@ exception Cli_failure of string (** call [callback task_record] on every update to the task, until it completes or fails *) let track callback rpc (session_id : API.ref_session) task = - let classes = ["task"] in + let use_event_next = !Constants.use_event_next in + let classes = + if use_event_next then + ["task"] + else + [Printf.sprintf "task/%s" (Ref.string_of task)] + in finally (fun () -> let finished = ref false in while not !finished do - Client.Event.register ~rpc ~session_id ~classes ; + if use_event_next then + Client.Event.register ~rpc ~session_id ~classes ; try (* Need to check once after registering to avoid a race *) finished := Client.Task.get_status ~rpc ~session_id ~self:task <> `pending ; + let token = ref "" in while not !finished do let events = - Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + if use_event_next then + let events = + Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + in + List.map Event_helper.record_of_event events + else + let event_from = + Event_types.event_from_of_rpc + (Client.Event.from ~rpc ~session_id ~classes ~token:!token + ~timeout:30. + ) + in + token := event_from.token ; + List.map Event_helper.record_of_event event_from.events in - let events = List.map Event_helper.record_of_event events in List.iter (function | Event_helper.Task (t, Some t_rec) when t = task -> diff --git a/ocaml/xapi-consts/constants.ml b/ocaml/xapi-consts/constants.ml index 2c7fc49e179..d3ee0bf8531 100644 --- a/ocaml/xapi-consts/constants.ml +++ b/ocaml/xapi-consts/constants.ml @@ -275,6 +275,9 @@ let owner_key = "owner" (* set in VBD other-config to indicate that clients can delete the attached VDI on VM uninstall if they want.. *) +(* xapi-cli-server doesn't link xapi-globs *) +let use_event_next = ref true + (* the time taken to wait before restarting in a different mode for pool eject/join operations *) let fuse_time = ref 10. diff --git a/ocaml/xapi-idl/lib/dune b/ocaml/xapi-idl/lib/dune index fed65ab1257..8f0d7ca27de 100644 --- a/ocaml/xapi-idl/lib/dune +++ b/ocaml/xapi-idl/lib/dune @@ -26,6 +26,7 @@ unix uri uuidm + xapi_timeslice xapi-backtrace xapi-consts xapi-log diff --git a/ocaml/xapi-idl/lib/xcp_service.ml b/ocaml/xapi-idl/lib/xcp_service.ml index 01c65bc49fb..817825c44fe 100644 --- a/ocaml/xapi-idl/lib/xcp_service.ml +++ b/ocaml/xapi-idl/lib/xcp_service.ml @@ -163,6 +163,26 @@ let setify = in loop [] +(** How long to let an OCaml thread run, before + switching to another thread. + This needs to be as small as possible to reduce latency. + + Too small values reduce performance due to context switching overheads + + 4ms = 1/HZ in Dom0 seems like a good default, + a better value will be written by a boot time service. + *) +let timeslice = ref 0.05 + +let adjust_timeslice () = + let interval = !timeslice in + D.debug "%s: Setting timeslice to %.3fs" __FUNCTION__ interval ; + if interval >= 0.05 then + D.debug "%s: Timeslice same as or larger than OCaml's default: not setting" + __FUNCTION__ + else + Xapi_timeslice.Timeslice.set interval + let common_options = [ ( "use-switch" @@ -236,6 +256,11 @@ let common_options = , (fun () -> !config_dir) , "Location of directory containing configuration file fragments" ) + ; ( "timeslice" + , Arg.Set_float timeslice + , (fun () -> Printf.sprintf "%.3f" !timeslice) + , "timeslice in seconds" + ) ] let loglevel () = !log_level @@ -454,6 +479,7 @@ let configure_common ~options ~resources arg_parse_fn = failwith (String.concat "\n" lines) ) resources ; + adjust_timeslice () ; Sys.set_signal Sys.sigpipe Sys.Signal_ignore let configure ?(argv = Sys.argv) ?(options = []) ?(resources = []) () = diff --git a/ocaml/xapi-storage-script/main.ml b/ocaml/xapi-storage-script/main.ml index a5b86ff533e..ae725b98459 100644 --- a/ocaml/xapi-storage-script/main.ml +++ b/ocaml/xapi-storage-script/main.ml @@ -73,8 +73,8 @@ let missing_uri () = or vbd/domid/device. For regular guests keep the domain as passed by XAPI (an integer). *) -let domain_of ~dp ~vm' = - let vm = Storage_interface.Vm.string_of vm' in +let domain_of ~dp ~vm = + let vm = Storage_interface.Vm.string_of vm in match vm with | "0" -> (* SM tries to use this in filesystem paths, so cannot have /, @@ -790,7 +790,7 @@ let vdi_of_volume x = ; persistent= true } -let choose_datapath ?(persistent = true) domain response = +let choose_datapath ?(persistent = true) response = (* We can only use a URI with a valid scheme, since we use the scheme to name the datapath plugin. *) let possible = @@ -835,7 +835,7 @@ let choose_datapath ?(persistent = true) domain response = | [] -> fail (missing_uri ()) | (script_dir, scheme, u) :: _us -> - return (fork_exec_rpc ~script_dir, scheme, u, domain) + return (fork_exec_rpc ~script_dir, scheme, u) let convert_implementation = function | Xapi_storage.Data.XenDisk {params; extra; backend_type} -> @@ -919,7 +919,7 @@ let bind ~volume_script_dir = stat ~dbg ~sr ~vdi:temporary ) >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + choose_datapath response >>>= fun (rpc, _datapath, uri) -> return_data_rpc (fun () -> Datapath_client.attach (rpc ~dbg) dbg uri domain) in let wrap th = Rpc_lwt.T.put th in @@ -1488,9 +1488,9 @@ let bind ~volume_script_dir = |> wrap in S.VDI.introduce vdi_introduce_impl ; - let vdi_attach3_impl dbg dp sr vdi' vm' _readwrite = + let vdi_attach3_impl dbg dp sr vdi' vm _readwrite = (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in + let domain = domain_of ~dp ~vm in vdi_attach_common dbg sr vdi domain >>>= fun response -> return { @@ -1506,9 +1506,9 @@ let bind ~volume_script_dir = vdi_attach3_impl dbg dp sr vdi vm () in S.DP.attach_info dp_attach_info_impl ; - let vdi_activate_common dbg dp sr vdi' vm' readonly = + let vdi_activate_common dbg dp sr vdi' vm readonly = (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in + let domain = domain_of ~dp ~vm in Attached_SRs.find sr >>>= fun sr -> (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> @@ -1522,7 +1522,7 @@ let bind ~volume_script_dir = stat ~dbg ~sr ~vdi:temporary ) >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + choose_datapath response >>>= fun (rpc, _datapath, uri) -> return_data_rpc (fun () -> let rpc = rpc ~dbg in if readonly then @@ -1533,17 +1533,17 @@ let bind ~volume_script_dir = ) |> wrap in - let vdi_activate3_impl dbg dp sr vdi' vm' = - vdi_activate_common dbg dp sr vdi' vm' false + let vdi_activate3_impl dbg dp sr vdi' vm = + vdi_activate_common dbg dp sr vdi' vm false in S.VDI.activate3 vdi_activate3_impl ; - let vdi_activate_readonly_impl dbg dp sr vdi' vm' = - vdi_activate_common dbg dp sr vdi' vm' true + let vdi_activate_readonly_impl dbg dp sr vdi' vm = + vdi_activate_common dbg dp sr vdi' vm true in S.VDI.activate_readonly vdi_activate_readonly_impl ; - let vdi_deactivate_impl dbg dp sr vdi' vm' = + let vdi_deactivate_impl dbg dp sr vdi' vm = (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in + let domain = domain_of ~dp ~vm in Attached_SRs.find sr >>>= fun sr -> (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> @@ -1556,7 +1556,7 @@ let bind ~volume_script_dir = stat ~dbg ~sr ~vdi:temporary ) >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + choose_datapath response >>>= fun (rpc, _datapath, uri) -> return_data_rpc (fun () -> Datapath_client.deactivate (rpc ~dbg) dbg uri domain ) @@ -1564,9 +1564,9 @@ let bind ~volume_script_dir = |> wrap in S.VDI.deactivate vdi_deactivate_impl ; - let vdi_detach_impl dbg dp sr vdi' vm' = + let vdi_detach_impl dbg dp sr vdi' vm = (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in + let domain = domain_of ~dp ~vm in Attached_SRs.find sr >>>= fun sr -> (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> @@ -1579,7 +1579,7 @@ let bind ~volume_script_dir = stat ~dbg ~sr ~vdi:temporary ) >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + choose_datapath response >>>= fun (rpc, _datapath, uri) -> return_data_rpc (fun () -> Datapath_client.detach (rpc ~dbg) dbg uri domain) ) |> wrap @@ -1614,14 +1614,12 @@ let bind ~volume_script_dir = |> wrap in S.SR.stat sr_stat_impl ; - let vdi_epoch_begin_impl dbg sr vdi' vm' persistent = + let vdi_epoch_begin_impl dbg sr vdi' _vm persistent = (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = Storage_interface.Vm.string_of vm' in Attached_SRs.find sr >>>= fun sr -> (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> - choose_datapath ~persistent domain response - >>>= fun (rpc, datapath, uri, _domain) -> + choose_datapath ~persistent response >>>= fun (rpc, datapath, uri) -> (* If non-persistent and the datapath plugin supports NONPERSISTENT then we delegate this to the datapath plugin. Otherwise we will make a temporary clone now and attach/detach etc this file. *) @@ -1652,13 +1650,12 @@ let bind ~volume_script_dir = |> wrap in S.VDI.epoch_begin vdi_epoch_begin_impl ; - let vdi_epoch_end_impl dbg sr vdi' vm' = + let vdi_epoch_end_impl dbg sr vdi' _vm = (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = Storage_interface.Vm.string_of vm' in Attached_SRs.find sr >>>= fun sr -> (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, datapath, uri, _domain) -> + choose_datapath response >>>= fun (rpc, datapath, uri) -> if Datapath_plugins.supports_feature datapath _nonpersistent then return_data_rpc (fun () -> Datapath_client.close (rpc ~dbg) dbg uri) else @@ -1677,9 +1674,9 @@ let bind ~volume_script_dir = S.VDI.epoch_end vdi_epoch_end_impl ; let vdi_set_persistent_impl _dbg _sr _vdi _persistent = return () |> wrap in S.VDI.set_persistent vdi_set_persistent_impl ; - let dp_destroy2 dbg dp sr vdi' vm' _allow_leak = + let dp_destroy2 dbg dp sr vdi' vm _allow_leak = (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in + let domain = domain_of ~dp ~vm in Attached_SRs.find sr >>>= fun sr -> (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> @@ -1692,7 +1689,7 @@ let bind ~volume_script_dir = stat ~dbg ~sr ~vdi:temporary ) >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + choose_datapath response >>>= fun (rpc, _datapath, uri) -> return_data_rpc (fun () -> Datapath_client.deactivate (rpc ~dbg) dbg uri domain ) @@ -1814,7 +1811,7 @@ let bind ~volume_script_dir = stat ~dbg ~sr ~vdi:temporary ) >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, datapath, uri, domain) -> + choose_datapath response >>>= fun (rpc, datapath, uri) -> if Datapath_plugins.supports_feature datapath _vdi_mirror_in then return_data_rpc (fun () -> Datapath_client.import_activate (rpc ~dbg) dbg uri domain @@ -1910,8 +1907,6 @@ let rec diff a b = | a :: aa -> if List.mem a b then diff aa b else a :: diff aa b -(* default false due to bugs in SMAPIv3 plugins, - once they are fixed this should be set to true *) let concurrent = ref true type reload = All | Files of string list | Nothing diff --git a/ocaml/xapi-types/event_types.ml b/ocaml/xapi-types/event_types.ml index fcd8840e59f..83c82b0bc8d 100644 --- a/ocaml/xapi-types/event_types.ml +++ b/ocaml/xapi-types/event_types.ml @@ -77,6 +77,24 @@ let rec rpc_of_event_from e = ; ("token", rpc_of_token e.token) ] +(* xmlrpc and jsonrpc would map Int32 to Int, but int32_of_rpc can't actually parse + an Int32 back as an int32... this is a bug in ocaml-rpc that should be fixed. + meanwhile work it around by mapping Rpc.Int32 to Rpc.Int upon receiving the message + (it is only Rpc.Int32 for backward compat with non-XAPI Xmlrpc clients) +*) + +let rec fixup_int32 = function + | Rpc.Dict dict -> + Rpc.Dict (List.map fixup_kv dict) + | Rpc.Int32 i -> + Rpc.Int (Int64.of_int32 i) + | rpc -> + rpc + +and fixup_kv (k, v) = (k, fixup_int32 v) + +let event_from_of_rpc rpc = rpc |> fixup_int32 |> event_from_of_rpc + (** Return result of an events.from call *) open Printf diff --git a/ocaml/xapi/dune b/ocaml/xapi/dune index b0a5c6efc31..3088a830a63 100644 --- a/ocaml/xapi/dune +++ b/ocaml/xapi/dune @@ -79,6 +79,7 @@ sexplib0 sexplib sexpr + tgroup forkexec xapi-idl xapi_aux @@ -152,6 +153,7 @@ tapctl tar tar-unix + tgroup threads.posix tracing tracing_propagator @@ -242,6 +244,7 @@ rpclib.json rpclib.xml stunnel + tgroup threads.posix tracing tracing_propagator diff --git a/ocaml/xapi/helpers.ml b/ocaml/xapi/helpers.ml index 4d1ede48abd..3802d78e0b6 100644 --- a/ocaml/xapi/helpers.ml +++ b/ocaml/xapi/helpers.ml @@ -410,7 +410,13 @@ let make_rpc ~__context rpc : Rpc.response = let subtask_of = Ref.string_of (Context.get_task_id __context) in let open Xmlrpc_client in let tracing = Context.set_client_span __context in - let http = xmlrpc ~subtask_of ~version:"1.1" "/" in + let dorpc, path = + if !Xapi_globs.use_xmlrpc then + (XMLRPC_protocol.rpc, "/") + else + (JSONRPC_protocol.rpc, "/jsonrpc") + in + let http = xmlrpc ~subtask_of ~version:"1.1" path in let http = TraceHelper.inject_span_into_req tracing http in let transport = if Pool_role.is_master () then @@ -423,7 +429,7 @@ let make_rpc ~__context rpc : Rpc.response = , !Constants.https_port ) in - XMLRPC_protocol.rpc ~srcstr:"xapi" ~dststr:"xapi" ~transport ~http rpc + dorpc ~srcstr:"xapi" ~dststr:"xapi" ~transport ~http rpc let make_timeboxed_rpc ~__context timeout rpc : Rpc.response = let subtask_of = Ref.string_of (Context.get_task_id __context) in diff --git a/ocaml/xapi/server_helpers.ml b/ocaml/xapi/server_helpers.ml index 1e8261b38f1..b424036c0bf 100644 --- a/ocaml/xapi/server_helpers.ml +++ b/ocaml/xapi/server_helpers.ml @@ -141,6 +141,19 @@ let do_dispatch ?session_id ?forward_op ?self:_ supports_async called_fn_name Context.of_http_req ?session_id ~internal_async_subtask ~generate_task_for ~supports_async ~label ~http_req ~fd () in + let identity = + try + Option.map + (fun session_id -> + let subject = + Db.Session.get_auth_user_sid ~__context ~self:session_id + in + Tgroup.Group.Identity.make ?user_agent:http_req.user_agent subject + ) + session_id + with _ -> None + in + Tgroup.of_creator (Tgroup.Group.Creator.make ?identity ()) ; let sync () = let need_complete = not (Context.forwarded_task __context) in exec_with_context ~__context ~need_complete ~called_async diff --git a/ocaml/xapi/sm_exec.ml b/ocaml/xapi/sm_exec.ml index d689b019bcc..1da0c6c7e83 100644 --- a/ocaml/xapi/sm_exec.ml +++ b/ocaml/xapi/sm_exec.ml @@ -38,6 +38,13 @@ let with_dbg ~name ~dbg f = (*********************************************************************************************) (* Random utility functions *) +let env_vars = + Array.concat + [ + Forkhelpers.default_path_env_pair + ; Env_record.to_string_array [Env_record.pair ("ORIGINATOR", "SM")] + ] + type call = { (* All calls are performed by a specific Host with a special Session and device_config *) host_ref: API.ref_host @@ -355,9 +362,9 @@ let exec_xmlrpc ~dbg ?context:_ ?(needs_session = true) (driver : string) let env, exe, args = match Xapi_observer_components.is_smapi_enabled () with | false -> - (None, exe, args) + (Some env_vars, exe, args) | true -> - Xapi_observer_components.env_exe_args_of + Xapi_observer_components.env_exe_args_of ~env_vars ~component:Xapi_observer_components.SMApi ~exe ~args in Forkhelpers.execute_command_get_output ?tracing:di.tracing ?env diff --git a/ocaml/xapi/taskHelper.mli b/ocaml/xapi/taskHelper.mli index dc5d76cf65b..1c4d5381586 100644 --- a/ocaml/xapi/taskHelper.mli +++ b/ocaml/xapi/taskHelper.mli @@ -36,6 +36,9 @@ val set_result : __context:Context.t -> Rpc.t option -> unit val status_is_completed : [> `cancelled | `failure | `success] -> bool +val status_to_string : + [< `pending | `success | `failure | `cancelling | `cancelled] -> string + val complete : __context:Context.t -> Rpc.t option -> unit val set_cancellable : __context:Context.t -> unit diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml index f4e9a6f0f68..2834cd15b3c 100644 --- a/ocaml/xapi/xapi.ml +++ b/ocaml/xapi/xapi.ml @@ -1058,6 +1058,12 @@ let server_init () = ; ("Initialising random number generator", [], random_setup) ; ("Initialise TLS verification", [], init_tls_verification) ; ("Running startup check", [], startup_check) + ; ( "Initialize cgroups via tgroup" + , [] + , fun () -> + if !Xapi_globs.tgroups_enabled then + Tgroup.Cgroup.init Xapi_globs.xapi_requests_cgroup + ) ; ( "Registering SMAPIv1 plugins" , [Startup.OnlyMaster] , Sm.register ~__context @@ -1137,6 +1143,8 @@ let server_init () = ] ; ( match Pool_role.get_role () with | Pool_role.Master -> + Stunnel_cache.set_max_stunnel + !Xapi_globs.coordinator_max_stunnel_cache ; () | Pool_role.Broken -> info "This node is broken; moving straight to emergency mode" ; @@ -1145,6 +1153,7 @@ let server_init () = server_run_in_emergency_mode () | Pool_role.Slave _ -> info "Running in 'Pool Slave' mode" ; + Stunnel_cache.set_max_stunnel !Xapi_globs.member_max_stunnel_cache ; (* Set emergency mode until we actually talk to the master *) Xapi_globs.slave_emergency_mode := true ; (* signal the init script that it should succeed even though we're bust *) diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index cdc82ca20d8..a7412790019 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -49,9 +49,19 @@ module Token = struct Printf.sprintf "%020Ld,%020Ld" last last_t end +let is_lowercase_char c = Char.equal (Char.lowercase_ascii c) c + +let is_lowercase str = String.for_all is_lowercase_char str + module Subscription = struct type t = Class of string | Object of string * string | All + let is_task_only = function + | Class "task" | Object ("task", _) -> + true + | Class _ | Object _ | All -> + false + let of_string x = if x = "*" then All @@ -67,11 +77,9 @@ module Subscription = struct (Api_errors.event_subscription_parse_failure, [x]) ) - let any = List.fold_left (fun acc x -> acc || x) false - (** [table_matches subs tbl]: true if at least one subscription from [subs] would select some events from [tbl] *) let table_matches subs tbl = - let tbl = String.lowercase_ascii tbl in + let tbl = if is_lowercase tbl then tbl else String.lowercase_ascii tbl in let matches = function | All -> true @@ -80,11 +88,11 @@ module Subscription = struct | Object (x, _) -> x = tbl in - any (List.map matches subs) + List.exists matches subs (** [event_matches subs ev]: true if at least one subscription from [subs] selects for specified class and object *) let object_matches subs ty _ref = - let tbl = String.lowercase_ascii ty in + let tbl = if is_lowercase ty then ty else String.lowercase_ascii ty in let matches = function | All -> true @@ -93,7 +101,7 @@ module Subscription = struct | Object (x, y) -> x = tbl && y = _ref in - any (List.map matches subs) + List.exists matches subs (** [event_matches subs ev]: true if at least one subscription from [subs] selects for event [ev] *) let event_matches subs ev = object_matches subs ev.ty ev.reference @@ -417,20 +425,25 @@ module From = struct let session_is_invalid call = with_lock call.m (fun () -> call.session_invalid) - let wait2 call from_id deadline = + let wait2 call from_id timer = let timeoutname = Printf.sprintf "event_from_timeout_%Ld" call.index in with_lock m (fun () -> while from_id = call.cur_id && (not (session_is_invalid call)) - && Unix.gettimeofday () < deadline + && not (Clock.Timer.has_expired timer) do - Xapi_stdext_threads_scheduler.Scheduler.add_to_queue timeoutname - Xapi_stdext_threads_scheduler.Scheduler.OneShot - (deadline -. Unix.gettimeofday () +. 0.5) - (fun () -> Condition.broadcast c) ; - Condition.wait c m ; - Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue timeoutname + match Clock.Timer.remaining timer with + | Expired _ -> + () + | Remaining delta -> + Xapi_stdext_threads_scheduler.Scheduler.add_to_queue_span + timeoutname Xapi_stdext_threads_scheduler.Scheduler.OneShot + delta (fun () -> Condition.broadcast c + ) ; + Condition.wait c m ; + Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue + timeoutname done ) ; if session_is_invalid call then ( @@ -463,6 +476,13 @@ let unregister ~__context ~classes = (** Blocking call which returns the next set of events relevant to this session. *) let rec next ~__context = + let batching = + if !Constants.use_event_next then + Throttle.Batching.make ~delay_before:Mtime.Span.zero + ~delay_between:Mtime.Span.zero + else + !Xapi_globs.event_next_delay + in let session = Context.get_session_id __context in let open Next in assert_subscribed session ; @@ -482,11 +502,12 @@ let rec next ~__context = ) in (* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *) - let rec grab_nonempty_range () = + let grab_nonempty_range = + Throttle.Batching.with_recursive_loop batching @@ fun self arg -> let last_id, end_id = grab_range () in if last_id = end_id then let (_ : int64) = wait subscription end_id in - grab_nonempty_range () + (self [@tailcall]) arg else (last_id, end_id) in @@ -504,7 +525,7 @@ let rec next ~__context = else rpc_of_events relevant -let from_inner __context session subs from from_t deadline = +let from_inner __context session subs from from_t timer batching = let open Xapi_database in let open From in (* The database tables involved in our subscription *) @@ -540,11 +561,7 @@ let from_inner __context session subs from from_t deadline = Db_cache_types.Table.fold_over_recent !last_generation (fun objref {Db_cache_types.Stat.created; modified; deleted} _ (creates, mods, deletes, last) -> - if - Subscription.object_matches subs - (String.lowercase_ascii table) - objref - then + if Subscription.object_matches subs table objref then let last = max last (max modified deleted) in (* mtime guaranteed to always be larger than ctime *) ( ( if created > !last_generation then @@ -574,11 +591,7 @@ let from_inner __context session subs from from_t deadline = Db_cache_types.Table.fold_over_deleted !last_generation (fun objref {Db_cache_types.Stat.created; modified; deleted} (creates, mods, deletes, last) -> - if - Subscription.object_matches subs - (String.lowercase_ascii table) - objref - then + if Subscription.object_matches subs table objref then let last = max last (max modified deleted) in (* mtime guaranteed to always be larger than ctime *) if created > !last_generation then @@ -600,7 +613,8 @@ let from_inner __context session subs from from_t deadline = (* Each event.from should have an independent subscription record *) let msg_gen, messages, tableset, (creates, mods, deletes, last) = with_call session subs (fun sub -> - let rec grab_nonempty_range () = + let grab_nonempty_range = + Throttle.Batching.with_recursive_loop batching @@ fun self arg -> let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last)) as result ) = @@ -611,16 +625,15 @@ let from_inner __context session subs from from_t deadline = && mods = [] && deletes = [] && messages = [] - && Unix.gettimeofday () < deadline + && not (Clock.Timer.has_expired timer) then ( last_generation := last ; (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *) sub.cur_id <- last ; (* last id the client got is equivalent to the current one *) last_msg_gen := msg_gen ; - wait2 sub last deadline ; - Thread.delay 0.05 ; - grab_nonempty_range () + wait2 sub last timer ; + (self [@tailcall]) arg ) else result in @@ -699,6 +712,19 @@ let from_inner __context session subs from from_t deadline = {events; valid_ref_counts; token= Token.to_string (last, msg_gen)} let from ~__context ~classes ~token ~timeout = + let duration = + timeout + |> Clock.Timer.s_to_span + |> Option.value ~default:Mtime.Span.(24 * hour) + in + let timer = Clock.Timer.start ~duration in + let subs = List.map Subscription.of_string classes in + let batching = + if List.for_all Subscription.is_task_only subs then + !Xapi_globs.event_from_task_delay + else + !Xapi_globs.event_from_delay + in let session = Context.get_session_id __context in let from, from_t = try Token.of_string token @@ -710,15 +736,15 @@ let from ~__context ~classes ~token ~timeout = (Api_errors.event_from_token_parse_failure, [token]) ) in - let subs = List.map Subscription.of_string classes in - let deadline = Unix.gettimeofday () +. timeout in (* We need to iterate because it's possible for an empty event set to be generated if we peek in-between a Modify and a Delete; we'll miss the Delete event and fail to generate the Modify because the snapshot can't be taken. *) let rec loop () = - let event_from = from_inner __context session subs from from_t deadline in - if event_from.events = [] && Unix.gettimeofday () < deadline then ( + let event_from = + from_inner __context session subs from from_t timer batching + in + if event_from.events = [] && not (Clock.Timer.has_expired timer) then ( debug "suppressing empty event.from" ; loop () ) else diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index 29075949474..c07c3d9b739 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1021,6 +1021,10 @@ let header_total_timeout_tcp = ref 60. let max_header_length_tcp = ref 1024 (* Maximum accepted size of HTTP headers in bytes (on TCP only) *) +let coordinator_max_stunnel_cache = ref 70 + +let member_max_stunnel_cache = ref 70 + let conn_limit_tcp = ref 800 let conn_limit_unix = ref 1024 @@ -1035,6 +1039,8 @@ let max_spans = ref 10000 let max_traces = ref 10000 +let use_xmlrpc = ref true + let compress_tracing_files = ref true let prefer_nbd_attach = ref false @@ -1067,6 +1073,52 @@ let disable_webserver = ref false let test_open = ref 0 +let tgroups_enabled = ref false + +let xapi_requests_cgroup = + "/sys/fs/cgroup/cpu/control.slice/xapi.service/request" + +(* Event.{from,next} batching delays *) +let make_batching name ~delay_before ~delay_between = + let name = Printf.sprintf "%s_delay" name in + let config = ref (Throttle.Batching.make ~delay_before ~delay_between) + and config_vals = ref (delay_before, delay_between) in + let set str = + Scanf.sscanf str "%f,%f" @@ fun delay_before delay_between -> + match + (Clock.Timer.s_to_span delay_before, Clock.Timer.s_to_span delay_between) + with + | Some delay_before, Some delay_between -> + config_vals := (delay_before, delay_between) ; + config := Throttle.Batching.make ~delay_before ~delay_between + | _ -> + D.warn + "Ignoring argument '%s'. (it only allows durations of less than 104 \ + days)" + str + and get () = + let d1, d2 = !config_vals in + Printf.sprintf "%f,%f" (Clock.Timer.span_to_s d1) (Clock.Timer.span_to_s d2) + and desc = + Printf.sprintf + "delays in seconds before the API call, and between internal recursive \ + calls, separated with a comma" + in + (config, (name, Arg.String set, get, desc)) + +let event_from_delay, event_from_entry = + make_batching "event_from" ~delay_before:Mtime.Span.zero + ~delay_between:Mtime.Span.(50 * ms) + +let event_from_task_delay, event_from_task_entry = + make_batching "event_from_task" ~delay_before:Mtime.Span.zero + ~delay_between:Mtime.Span.(50 * ms) + +let event_next_delay, event_next_entry = + make_batching "event_next" + ~delay_before:Mtime.Span.(200 * ms) + ~delay_between:Mtime.Span.(50 * ms) + let xapi_globs_spec = [ ( "master_connection_reset_timeout" @@ -1145,9 +1197,13 @@ let xapi_globs_spec = ; ("header_read_timeout_tcp", Float header_read_timeout_tcp) ; ("header_total_timeout_tcp", Float header_total_timeout_tcp) ; ("max_header_length_tcp", Int max_header_length_tcp) + ; ("coordinator_max_stunnel_cache", Int coordinator_max_stunnel_cache) + ; ("member_max_stunnel_cache", Int member_max_stunnel_cache) ; ("conn_limit_tcp", Int conn_limit_tcp) ; ("conn_limit_unix", Int conn_limit_unix) ; ("conn_limit_clientcert", Int conn_limit_clientcert) + ; ("stunnel_cache_max_age", Float Stunnel_cache.max_age) + ; ("stunnel_cache_max_idle", Float Stunnel_cache.max_idle) ; ("export_interval", Float export_interval) ; ("max_spans", Int max_spans) ; ("max_traces", Int max_traces) @@ -1412,6 +1468,11 @@ let other_options = , (fun () -> string_of_bool !Db_globs.idempotent_map) , "True if the add_to_ API calls should be idempotent" ) + ; ( "use-event-next" + , Arg.Set Constants.use_event_next + , (fun () -> string_of_bool !Constants.use_event_next) + , "Use deprecated Event.next instead of Event.from" + ) ; ( "nvidia_multi_vgpu_enabled_driver_versions" , Arg.String (fun x -> @@ -1453,6 +1514,11 @@ let other_options = , (fun () -> string_of_bool !allow_host_sched_gran_modification) , "Allows to modify the host's scheduler granularity" ) + ; ( "use-xmlrpc" + , Arg.Set use_xmlrpc + , (fun () -> string_of_bool !use_xmlrpc) + , "Use XMLRPC (deprecated) for internal communication or JSONRPC" + ) ; ( "extauth_ad_backend" , Arg.Set_string extauth_ad_backend , (fun () -> !extauth_ad_backend) @@ -1626,6 +1692,14 @@ let other_options = , (fun () -> string_of_bool !disable_webserver) , "Disable the host webserver" ) + ; ( "tgroups-enabled" + , Arg.Set tgroups_enabled + , (fun () -> string_of_bool !tgroups_enabled) + , "Turn on tgroups classification" + ) + ; event_from_entry + ; event_from_task_entry + ; event_next_entry ; ( "drivertool" , Arg.Set_string driver_tool , (fun () -> !driver_tool) diff --git a/ocaml/xapi/xapi_observer_components.ml b/ocaml/xapi/xapi_observer_components.ml index d3e0587b143..0b3b884f465 100644 --- a/ocaml/xapi/xapi_observer_components.ml +++ b/ocaml/xapi/xapi_observer_components.ml @@ -101,12 +101,12 @@ let ( // ) = Filename.concat let dir_name_of_component component = Xapi_globs.observer_config_dir // to_string component // "enabled" -let env_exe_args_of ~component ~exe ~args = +let env_exe_args_of ~env_vars ~component ~exe ~args = let dir_name_value = Filename.quote (dir_name_of_component component) in - let env_vars = + let new_env_vars = Array.concat [ - Forkhelpers.default_path_env_pair + env_vars ; Env_record.to_string_array [ Env_record.pair ("OBSERVER_CONFIG_DIR", dir_name_value) @@ -116,4 +116,4 @@ let env_exe_args_of ~component ~exe ~args = in let args = "-m" :: "observer" :: exe :: args in let new_exe = Xapi_globs.python3_path in - (Some env_vars, new_exe, args) + (Some new_env_vars, new_exe, args) diff --git a/ocaml/xapi/xapi_observer_components.mli b/ocaml/xapi/xapi_observer_components.mli index 55bdf7e7f05..9e046dddaf3 100644 --- a/ocaml/xapi/xapi_observer_components.mli +++ b/ocaml/xapi/xapi_observer_components.mli @@ -63,7 +63,8 @@ val dir_name_of_component : t -> string *) val env_exe_args_of : - component:t + env_vars:string array + -> component:t -> exe:string -> args:string list -> string array option * string * string list diff --git a/ocaml/xapi/xapi_periodic_scheduler_init.ml b/ocaml/xapi/xapi_periodic_scheduler_init.ml index 39866292460..1bd13d5f6d6 100644 --- a/ocaml/xapi/xapi_periodic_scheduler_init.ml +++ b/ocaml/xapi/xapi_periodic_scheduler_init.ml @@ -128,6 +128,11 @@ let register ~__context = Xapi_host.alert_if_tls_verification_was_emergency_disabled ~__context ) ) ; + let stunnel_period = !Stunnel_cache.max_idle /. 2. in + Xapi_stdext_threads_scheduler.Scheduler.add_to_queue + "Check stunnel cache expiry" + (Xapi_stdext_threads_scheduler.Scheduler.Periodic stunnel_period) + stunnel_period Stunnel_cache.gc ; if master && Db.Pool.get_update_sync_enabled ~__context diff --git a/ocaml/xapi/xapi_session.ml b/ocaml/xapi/xapi_session.ml index 7e77def1f43..95d310a085e 100644 --- a/ocaml/xapi/xapi_session.ml +++ b/ocaml/xapi/xapi_session.ml @@ -686,6 +686,7 @@ let consider_touching_session rpc session_id = (* Make sure the pool secret matches *) let slave_login_common ~__context ~host_str ~psecret = Context.with_tracing ~__context __FUNCTION__ @@ fun __context -> + Tgroup.of_creator (Tgroup.Group.Creator.make ~intrapool:true ()) ; if not (Helpers.PoolSecret.is_authorized psecret) then ( let msg = "Pool credentials invalid" in debug "Failed to authenticate slave %s: %s" host_str msg ; @@ -881,6 +882,8 @@ let login_with_password ~__context ~uname ~pwd ~version:_ ~originator = | Some `root -> (* in this case, the context origin of this login request is a unix socket bound locally to a filename *) (* we trust requests from local unix filename sockets, so no need to authenticate them before login *) + Tgroup.of_creator + Tgroup.Group.(Creator.make ~identity:Identity.root_identity ()) ; login_no_password_common ~__context ~uname:(Some uname) ~originator ~host:(Helpers.get_localhost ~__context) ~pool:false ~is_local_superuser:true ~subject:Ref.null ~auth_user_sid:"" @@ -929,6 +932,8 @@ let login_with_password ~__context ~uname ~pwd ~version:_ ~originator = do_local_auth uname pwd ; debug "Success: local auth, user %s from %s" uname (Context.get_origin __context) ; + Tgroup.of_creator + Tgroup.Group.(Creator.make ~identity:Identity.root_identity ()) ; login_no_password_common ~__context ~uname:(Some uname) ~originator ~host:(Helpers.get_localhost ~__context) ~pool:false ~is_local_superuser:true ~subject:Ref.null @@ -1224,6 +1229,10 @@ let login_with_password ~__context ~uname ~pwd ~version:_ ~originator = Caching.memoize ~__context uname pwd ~slow_path:query_external_auth in + Tgroup.of_creator + Tgroup.Group.( + Creator.make ~identity:(Identity.make subject_identifier) () + ) ; login_no_password_common ~__context ~uname:(Some uname) ~originator ~host:(Helpers.get_localhost ~__context) diff --git a/ocaml/xe-cli/newcli.ml b/ocaml/xe-cli/newcli.ml index bb3a40d74de..1b729cf6523 100644 --- a/ocaml/xe-cli/newcli.ml +++ b/ocaml/xe-cli/newcli.ml @@ -816,6 +816,7 @@ let main () = in let args = String.concat "\n" args in Printf.fprintf oc "User-agent: xe-cli/Unix/%d.%d\r\n" major minor ; + Printf.fprintf oc "originator: cli\r\n" ; Option.iter (Printf.fprintf oc "traceparent: %s\r\n") traceparent ; Option.iter (Printf.fprintf oc "baggage: %s\r\n") diff --git a/ocaml/xenopsd/scripts/qemu-wrapper b/ocaml/xenopsd/scripts/qemu-wrapper index 93f5c685eac..03312651075 100644 --- a/ocaml/xenopsd/scripts/qemu-wrapper +++ b/ocaml/xenopsd/scripts/qemu-wrapper @@ -47,7 +47,7 @@ xenstore = xs.xs() # - 'system.slice' means move it into the system slice, etc. # If the nominated slice does not already exist, the process will be # left in its parent's slice. -cgroup_slice = '' +cgroup_slice = 'vm.slice' CLONE_NEWNS = 0x00020000 # mount namespace CLONE_NEWNET = 0x40000000 # network namespace diff --git a/python3/examples/XenAPI/XenAPI.py b/python3/examples/XenAPI/XenAPI.py index e37f8813b6e..012dcf40de7 100644 --- a/python3/examples/XenAPI/XenAPI.py +++ b/python3/examples/XenAPI/XenAPI.py @@ -106,6 +106,7 @@ def connect(self): class UDSTransport(xmlrpclib.Transport): def add_extra_header(self, key, value): self._extra_headers += [ (key,value) ] + def with_tracecontext(self): if otel: headers = {} @@ -114,11 +115,22 @@ def with_tracecontext(self): # pylint: disable=possibly-used-before-assignment propagators = propagate.get_global_textmap() propagators.inject(headers, ctx) - self._extra_headers = [] + for k, v in headers.items(): self.add_extra_header(k, v) + + def with_originator(self): + originator_k = "ORIGINATOR" + originator_v = os.getenv(originator_k, None) + if originator_v: + self.add_extra_header(originator_k.lower(), originator_v) + def make_connection(self, host): + # clear the extra headers when making a new connection. This makes sure + # headers such as "traceparent" do not get duplicated. + self._extra_headers = [] self.with_tracecontext() + self.with_originator() # compatibility with parent xmlrpclib.Transport HTTP/1.1 support if self._connection and host == self._connection[0]: diff --git a/tgroup.opam b/tgroup.opam new file mode 100644 index 00000000000..423b4628877 --- /dev/null +++ b/tgroup.opam @@ -0,0 +1,28 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +maintainer: ["Xapi project maintainers"] +authors: ["xen-api@lists.xen.org"] +license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception" +homepage: "https://xapi-project.github.io/" +bug-reports: "https://github.com/xapi-project/xen-api/issues" +depends: [ + "dune" {>= "3.15"} + "xapi-log" + "xapi-stdext-unix" + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/xapi-project/xen-api.git" diff --git a/xapi.opam b/xapi.opam index 21d157b2161..7cfbd383bd5 100644 --- a/xapi.opam +++ b/xapi.opam @@ -10,6 +10,7 @@ homepage: "https://xapi-project.github.io/" bug-reports: "https://github.com/xapi-project/xen-api/issues" depends: [ "dune" {>= "3.15"} + "ocaml" {>= "4.09"} "alcotest" {with-test} "angstrom" "astring" @@ -63,6 +64,7 @@ depends: [ "tar" "tar-unix" "uri" + "tgroup" "uuid" {= version} "uutf" "uuidm"