From 0e909ec335b40655a44b52454ba742b4800cdeac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 18 Apr 2024 21:43:55 +0100 Subject: [PATCH 1/7] CP-49158: [prep] batching: add a helper for recursive, batched calls like Event.{from,next} MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No functional change Signed-off-by: Edwin Török --- ocaml/xapi-aux/dune | 1 + ocaml/xapi-aux/throttle.ml | 24 ++++++++++++++++++++++++ ocaml/xapi-aux/throttle.mli | 30 ++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/ocaml/xapi-aux/dune b/ocaml/xapi-aux/dune index 86fbd8647c9..d334769d655 100644 --- a/ocaml/xapi-aux/dune +++ b/ocaml/xapi-aux/dune @@ -3,6 +3,7 @@ (modes best) (libraries astring + clock cstruct forkexec ipaddr diff --git a/ocaml/xapi-aux/throttle.ml b/ocaml/xapi-aux/throttle.ml index a9dacf7f164..79761c1b762 100644 --- a/ocaml/xapi-aux/throttle.ml +++ b/ocaml/xapi-aux/throttle.ml @@ -39,3 +39,27 @@ module Make (Size : SIZE) = struct let execute f = execute (get_semaphore ()) f end + +module Batching = struct + type t = {delay_before: Mtime.span; delay_between: Mtime.span} + + let make ~delay_before ~delay_between = {delay_before; delay_between} + + (** [perform_delay delay] calls {!val:Thread.delay} when [delay] is non-zero. + + Thread.delay 0 provides no fairness guarantees, the current thread may actually be the one that gets the global lock again. + Instead {!val:Thread.yield} could be used, which does provide fairness guarantees, but it may also introduce large latencies + when there are lots of threads waiting for the OCaml runtime lock. + *) + let perform_delay delay = + if Mtime.Span.is_longer delay ~than:Mtime.Span.zero then + Thread.delay (Clock.Timer.span_to_s delay) + + let with_recursive_loop config f arg = + let rec self arg = + perform_delay config.delay_between ; + (f [@tailcall]) self arg + in + perform_delay config.delay_before ; + f self arg +end diff --git a/ocaml/xapi-aux/throttle.mli b/ocaml/xapi-aux/throttle.mli index 897fe5ed6ce..fb4212b565b 100644 --- a/ocaml/xapi-aux/throttle.mli +++ b/ocaml/xapi-aux/throttle.mli @@ -22,3 +22,33 @@ module Make (_ : SIZE) : sig val execute : (unit -> 'a) -> 'a end + +module Batching : sig + (** batching delay configuration *) + type t + + val make : delay_before:Mtime.Span.t -> delay_between:Mtime.Span.t -> t + (** [make ~delay_before ~delay_between] creates a configuration, + where we delay the API call by [delay_before] once, + and then with [delay_between] between each recursive call. + *) + + val with_recursive_loop : t -> (('a -> 'b) -> 'a -> 'b) -> 'a -> 'b + (** [with_recursive_loop config f arg] calls [f self arg], where [self] can be used + for recursive calls. + + A [delay_before] amount of seconds is inserted once, and [delay_between] is inserted between recursive calls: + {v + delay_before + f ... + (self[@tailcall]) ... + delay_between + f ... + (self[@tailcall]) ... + delay_between + f ... + v} + + The delays are determined by [config] + *) +end From efaa6064b926d35195c91eb2bd3428f40f88ca04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 18 Apr 2024 21:47:38 +0100 Subject: [PATCH 2/7] CP-49158: [prep] Event.from: replace recursion with Batching.with_recursive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No functional change. Signed-off-by: Edwin Török --- ocaml/xapi/xapi_event.ml | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index 94b7c4bd9b7..388d6130820 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -511,7 +511,7 @@ let rec next ~__context = else rpc_of_events relevant -let from_inner __context session subs from from_t timer = +let from_inner __context session subs from from_t timer batching = let open Xapi_database in let open From in (* The database tables involved in our subscription *) @@ -599,7 +599,8 @@ let from_inner __context session subs from from_t timer = (* Each event.from should have an independent subscription record *) let msg_gen, messages, tableset, (creates, mods, deletes, last) = with_call session subs (fun sub -> - let rec grab_nonempty_range () = + let grab_nonempty_range = + Throttle.Batching.with_recursive_loop batching @@ fun self () -> let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last)) as result ) = @@ -618,8 +619,7 @@ let from_inner __context session subs from from_t timer = (* last id the client got is equivalent to the current one *) last_msg_gen := msg_gen ; wait2 sub last timer ; - Thread.delay 0.05 ; - grab_nonempty_range () + (self [@tailcall]) () ) else result in @@ -698,6 +698,10 @@ let from_inner __context session subs from from_t timer = {events; valid_ref_counts; token= Token.to_string (last, msg_gen)} let from ~__context ~classes ~token ~timeout = + let batching = + Throttle.Batching.make ~delay_before:Mtime.Span.zero + ~delay_between:Mtime.Span.(50 * ms) + in let session = Context.get_session_id __context in let from, from_t = try Token.of_string token @@ -721,7 +725,9 @@ let from ~__context ~classes ~token ~timeout = miss the Delete event and fail to generate the Modify because the snapshot can't be taken. *) let rec loop () = - let event_from = from_inner __context session subs from from_t timer in + let event_from = + from_inner __context session subs from from_t timer batching + in if event_from.events = [] && not (Clock.Timer.has_expired timer) then ( debug "suppressing empty event.from" ; loop () From 3e1d8a27a755597f5237dd74496cfe968da3321c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 18 Apr 2024 21:49:09 +0100 Subject: [PATCH 3/7] CP-51692: Event.next: use same batching as Event.from MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Event.next is deprecated, but was allowed to use a lot more CPU in a tight loop than Event.from. No feature flag for this one, because Event.next is deprecated. Signed-off-by: Edwin Török --- ocaml/xapi/xapi_event.ml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index 388d6130820..3563d01a5f2 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -470,6 +470,10 @@ let unregister ~__context ~classes = (** Blocking call which returns the next set of events relevant to this session. *) let rec next ~__context = + let batching = + Throttle.Batching.make ~delay_before:Mtime.Span.zero + ~delay_between:Mtime.Span.(50 * ms) + in let session = Context.get_session_id __context in let open Next in assert_subscribed session ; @@ -489,11 +493,12 @@ let rec next ~__context = ) in (* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *) - let rec grab_nonempty_range () = + let grab_nonempty_range = + Throttle.Batching.with_recursive_loop batching @@ fun self () -> let last_id, end_id = grab_range () in if last_id = end_id then let (_ : int64) = wait subscription end_id in - grab_nonempty_range () + (self [@tailcall]) () else (last_id, end_id) in From 2b4e0db649d0ccdc81196853cb2801bf0e9a13fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 00:14:11 +0100 Subject: [PATCH 4/7] CP-49158: [prep] Event.{from,next}: make delays configurable and prepare for task specific delays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tasks are very small, and we need to react to them more quickly, there is usually nothing to batch. Refactor code and prepare for using different delays for tasks. The delays are now configurable, but their default values are exactly the same as before. Also in the future we should probably use monotonic clocks here, but I've kep t that code unchanged for now. No functional change (except for configurability). Signed-off-by: Edwin Török --- ocaml/xapi/xapi_event.ml | 31 +++++++++++++++++------------ ocaml/xapi/xapi_globs.ml | 43 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 13 deletions(-) diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index 3563d01a5f2..39d87363df0 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -56,6 +56,12 @@ let is_lowercase str = String.for_all is_lowercase_char str module Subscription = struct type t = Class of string | Object of string * string | All + let is_task_only = function + | Class "task" | Object ("task", _) -> + true + | Class _ | Object _ | All -> + false + let of_string x = if x = "*" then All @@ -470,10 +476,7 @@ let unregister ~__context ~classes = (** Blocking call which returns the next set of events relevant to this session. *) let rec next ~__context = - let batching = - Throttle.Batching.make ~delay_before:Mtime.Span.zero - ~delay_between:Mtime.Span.(50 * ms) - in + let batching = !Xapi_globs.event_next_delay in let session = Context.get_session_id __context in let open Next in assert_subscribed session ; @@ -703,9 +706,18 @@ let from_inner __context session subs from from_t timer batching = {events; valid_ref_counts; token= Token.to_string (last, msg_gen)} let from ~__context ~classes ~token ~timeout = + let duration = + timeout + |> Clock.Timer.s_to_span + |> Option.value ~default:Mtime.Span.(24 * hour) + in + let timer = Clock.Timer.start ~duration in + let subs = List.map Subscription.of_string classes in let batching = - Throttle.Batching.make ~delay_before:Mtime.Span.zero - ~delay_between:Mtime.Span.(50 * ms) + if List.for_all Subscription.is_task_only subs then + !Xapi_globs.event_from_task_delay + else + !Xapi_globs.event_from_delay in let session = Context.get_session_id __context in let from, from_t = @@ -718,13 +730,6 @@ let from ~__context ~classes ~token ~timeout = (Api_errors.event_from_token_parse_failure, [token]) ) in - let subs = List.map Subscription.of_string classes in - let duration = - timeout - |> Clock.Timer.s_to_span - |> Option.value ~default:Mtime.Span.(24 * hour) - in - let timer = Clock.Timer.start ~duration in (* We need to iterate because it's possible for an empty event set to be generated if we peek in-between a Modify and a Delete; we'll miss the Delete event and fail to generate the Modify because the diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index ac498c60596..8f6544be663 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1068,6 +1068,46 @@ let tgroups_enabled = ref false let xapi_requests_cgroup = "/sys/fs/cgroup/cpu/control.slice/xapi.service/request" +(* Event.{from,next} batching delays *) +let make_batching name ~delay_before ~delay_between = + let name = Printf.sprintf "%s_delay" name in + let config = ref (Throttle.Batching.make ~delay_before ~delay_between) + and config_vals = ref (delay_before, delay_between) in + let set str = + Scanf.sscanf str "%f,%f" @@ fun delay_before delay_between -> + match + (Clock.Timer.s_to_span delay_before, Clock.Timer.s_to_span delay_between) + with + | Some delay_before, Some delay_between -> + config_vals := (delay_before, delay_between) ; + config := Throttle.Batching.make ~delay_before ~delay_between + | _ -> + D.warn + "Ignoring argument '%s'. (it only allows durations of less than 104 \ + days)" + str + and get () = + let d1, d2 = !config_vals in + Printf.sprintf "%f,%f" (Clock.Timer.span_to_s d1) (Clock.Timer.span_to_s d2) + and desc = + Printf.sprintf + "delays in seconds before the API call, and between internal recursive \ + calls, separated with a comma" + in + (config, (name, Arg.String set, get, desc)) + +let event_from_delay, event_from_entry = + make_batching "event_from" ~delay_before:Mtime.Span.zero + ~delay_between:Mtime.Span.(50 * ms) + +let event_from_task_delay, event_from_task_entry = + make_batching "event_from_task" ~delay_before:Mtime.Span.zero + ~delay_between:Mtime.Span.(50 * ms) + +let event_next_delay, event_next_entry = + make_batching "event_next" ~delay_before:Mtime.Span.zero + ~delay_between:Mtime.Span.(50 * ms) + let xapi_globs_spec = [ ( "master_connection_reset_timeout" @@ -1644,6 +1684,9 @@ let other_options = , (fun () -> string_of_bool !tgroups_enabled) , "Turn on tgroups classification" ) + ; event_from_entry + ; event_from_task_entry + ; event_next_entry ] (* The options can be set with the variable xapiflags in /etc/sysconfig/xapi. From 9435eea0b425b4b187b6d1f94c0877f4ccd15360 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 00:33:16 +0100 Subject: [PATCH 5/7] CP-49158: Event.next is deprecated: increase delays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No feature flag, because this is a deprecated API. Clients that wants the best performance should've used Event.from. Signed-off-by: Edwin Török --- ocaml/xapi/xapi_globs.ml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index 8f6544be663..140f8d355e7 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1105,7 +1105,8 @@ let event_from_task_delay, event_from_task_entry = ~delay_between:Mtime.Span.(50 * ms) let event_next_delay, event_next_entry = - make_batching "event_next" ~delay_before:Mtime.Span.zero + make_batching "event_next" + ~delay_before:Mtime.Span.(200 * ms) ~delay_between:Mtime.Span.(50 * ms) let xapi_globs_spec = From 0beb5c194c1a1d23a88df81c12a9c452ae6f29a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 00:27:20 +0100 Subject: [PATCH 6/7] CP-49158: Use exponential backoff for delay between recursive calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This delay was right after we waited for a new event, delaying all event responses by 50ms (including task completions). Eliminate the first delay, so that if we find the event we're looking after the DB update, then we can return immediately. On spurious wakeups (e.g. not the event we subscribed for) the delay is still useful, so keep it for recursive calls after the first one, and exponentially increase it up to the configured maximum. No feature flag, this is a relatively small change, and we use exponential backoffs elsewhere in XAPI already. Signed-off-by: Edwin Török --- ocaml/xapi-aux/throttle.ml | 29 ++++++++++++++++++++++------- ocaml/xapi-aux/throttle.mli | 14 ++++++++++---- ocaml/xapi/xapi_event.ml | 8 ++++---- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/ocaml/xapi-aux/throttle.ml b/ocaml/xapi-aux/throttle.ml index 79761c1b762..c4606d7abaf 100644 --- a/ocaml/xapi-aux/throttle.ml +++ b/ocaml/xapi-aux/throttle.ml @@ -41,9 +41,20 @@ module Make (Size : SIZE) = struct end module Batching = struct - type t = {delay_before: Mtime.span; delay_between: Mtime.span} + type t = { + delay_initial: Mtime.span + ; delay_before: Mtime.span + ; delay_between: Mtime.span + } - let make ~delay_before ~delay_between = {delay_before; delay_between} + let make ~delay_before ~delay_between = + (* we are dividing, cannot overflow *) + let delay_initial = + Mtime.Span.to_float_ns delay_between /. 16. + |> Mtime.Span.of_float_ns + |> Option.get + in + {delay_initial; delay_before; delay_between} (** [perform_delay delay] calls {!val:Thread.delay} when [delay] is non-zero. @@ -55,11 +66,15 @@ module Batching = struct if Mtime.Span.is_longer delay ~than:Mtime.Span.zero then Thread.delay (Clock.Timer.span_to_s delay) - let with_recursive_loop config f arg = - let rec self arg = - perform_delay config.delay_between ; - (f [@tailcall]) self arg + let span_min a b = if Mtime.Span.is_shorter a ~than:b then a else b + + let with_recursive_loop config f = + let rec self arg input = + let arg = span_min config.delay_between Mtime.Span.(2 * arg) in + perform_delay arg ; + (f [@tailcall]) (self arg) input in + let self0 arg input = (f [@tailcall]) (self arg) input in perform_delay config.delay_before ; - f self arg + f (self0 config.delay_initial) end diff --git a/ocaml/xapi-aux/throttle.mli b/ocaml/xapi-aux/throttle.mli index fb4212b565b..7c5ca1e916c 100644 --- a/ocaml/xapi-aux/throttle.mli +++ b/ocaml/xapi-aux/throttle.mli @@ -37,18 +37,24 @@ module Batching : sig (** [with_recursive_loop config f arg] calls [f self arg], where [self] can be used for recursive calls. - A [delay_before] amount of seconds is inserted once, and [delay_between] is inserted between recursive calls: + [arg] is an argument that the implementation of [f] can change between recursive calls for its own purposes, + otherwise [()] can be used. + + A [delay_before] amount of seconds is inserted once, and [delay_between/8] is inserted between recursive calls, + except the first one, and delays increase exponentially until [delay_between] is reached {v delay_before f ... (self[@tailcall]) ... - delay_between f ... (self[@tailcall]) ... - delay_between + delay_between/8 f ... + (self[@tailcall]) ... + delay_between/4 + f ... v} - The delays are determined by [config] + The delays are determined by [config], and [delay_between] uses an exponential backoff, up to [config.delay_between] delay. *) end diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index 39d87363df0..18195d0337e 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -497,11 +497,11 @@ let rec next ~__context = in (* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *) let grab_nonempty_range = - Throttle.Batching.with_recursive_loop batching @@ fun self () -> + Throttle.Batching.with_recursive_loop batching @@ fun self arg -> let last_id, end_id = grab_range () in if last_id = end_id then let (_ : int64) = wait subscription end_id in - (self [@tailcall]) () + (self [@tailcall]) arg else (last_id, end_id) in @@ -608,7 +608,7 @@ let from_inner __context session subs from from_t timer batching = let msg_gen, messages, tableset, (creates, mods, deletes, last) = with_call session subs (fun sub -> let grab_nonempty_range = - Throttle.Batching.with_recursive_loop batching @@ fun self () -> + Throttle.Batching.with_recursive_loop batching @@ fun self arg -> let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last)) as result ) = @@ -627,7 +627,7 @@ let from_inner __context session subs from from_t timer batching = (* last id the client got is equivalent to the current one *) last_msg_gen := msg_gen ; wait2 sub last timer ; - (self [@tailcall]) () + (self [@tailcall]) arg ) else result in From 257af948423d50721a0e749e8e47d1776d25356e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 19 Apr 2024 00:12:15 +0100 Subject: [PATCH 7/7] CP-49158: Throttle: add Thread.yield MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Give an opportunity for more fields to be filled, e.g. when waiting for a task to complete, give a chance for the task to actually run. No feature flag, it only changes timing. Signed-off-by: Edwin Török --- dune-project | 1 + ocaml/xapi-aux/throttle.ml | 22 ++++++++++++++-------- xapi.opam | 1 + 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/dune-project b/dune-project index e69a04e745a..651c039bc22 100644 --- a/dune-project +++ b/dune-project @@ -327,6 +327,7 @@ (synopsis "The toolstack daemon which implements the XenAPI") (description "This daemon exposes the XenAPI and is used by clients such as 'xe' and 'XenCenter' to manage clusters of Xen-enabled hosts.") (depends + (ocaml (>= 4.09)) (alcotest :with-test) angstrom astring diff --git a/ocaml/xapi-aux/throttle.ml b/ocaml/xapi-aux/throttle.ml index c4606d7abaf..26eeff877d1 100644 --- a/ocaml/xapi-aux/throttle.ml +++ b/ocaml/xapi-aux/throttle.ml @@ -56,25 +56,31 @@ module Batching = struct in {delay_initial; delay_before; delay_between} + let span_min a b = if Mtime.Span.is_shorter a ~than:b then a else b + (** [perform_delay delay] calls {!val:Thread.delay} when [delay] is non-zero. Thread.delay 0 provides no fairness guarantees, the current thread may actually be the one that gets the global lock again. Instead {!val:Thread.yield} could be used, which does provide fairness guarantees, but it may also introduce large latencies - when there are lots of threads waiting for the OCaml runtime lock. + when there are lots of threads waiting for the OCaml runtime lock. Only invoke this once, in the [delay_before] section. *) - let perform_delay delay = + let perform_delay ~yield delay = if Mtime.Span.is_longer delay ~than:Mtime.Span.zero then Thread.delay (Clock.Timer.span_to_s delay) - - let span_min a b = if Mtime.Span.is_shorter a ~than:b then a else b + else if yield then + (* this is a low-priority thread, if there are any other threads waiting, then run them now. + If there are no threads waiting then this a noop. + Requires OCaml >= 4.09 (older versions had fairness issues in Thread.yield) + *) + Thread.yield () let with_recursive_loop config f = let rec self arg input = let arg = span_min config.delay_between Mtime.Span.(2 * arg) in - perform_delay arg ; + perform_delay ~yield:false arg ; (f [@tailcall]) (self arg) input in - let self0 arg input = (f [@tailcall]) (self arg) input in - perform_delay config.delay_before ; - f (self0 config.delay_initial) + let self0 input = (f [@tailcall]) (self config.delay_initial) input in + perform_delay ~yield:true config.delay_before ; + f self0 end diff --git a/xapi.opam b/xapi.opam index e9dce9e47f5..915cc192de6 100644 --- a/xapi.opam +++ b/xapi.opam @@ -10,6 +10,7 @@ homepage: "https://xapi-project.github.io/" bug-reports: "https://github.com/xapi-project/xen-api/issues" depends: [ "dune" {>= "3.15"} + "ocaml" {>= "4.09"} "alcotest" {with-test} "angstrom" "astring"