diff --git a/ocaml/tests/bench/bench_throttle2.ml b/ocaml/tests/bench/bench_throttle2.ml new file mode 100644 index 00000000000..50582eff4cc --- /dev/null +++ b/ocaml/tests/bench/bench_throttle2.ml @@ -0,0 +1,86 @@ +open Bechamel + +let () = + Suite_init.harness_init () ; + Debug.set_level Syslog.Warning + +let __context, _ = Test_event_common.event_setup_common () + +let allocate_tasks n = + ( __context + , Array.init n @@ fun i -> + let label = Printf.sprintf "task %d" i in + Xapi_task.create ~__context ~label ~description:"test task" + ) + +let free_tasks (__context, tasks) = + let () = + tasks |> Array.iter @@ fun self -> Xapi_task.destroy ~__context ~self + in + () + +let set_pending tasks = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`pending + +let run_tasks _n (__context, tasks) = + set_pending tasks ; + let () = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success + in + tasks |> Array.iter @@ fun t -> Helpers.Task.wait_for ~__context ~tasks:[t] + +let run_tasks' _n (__context, tasks) = + set_pending tasks ; + let () = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success + in + Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks) + +module D = Debug.Make (struct let name = __MODULE__ end) + +let run_tasks'' n (__context, tasks) = + set_pending tasks ; + let finished = Atomic.make 0 in + let (t : Thread.t) = + Thread.create + (fun () -> + for _ = 1 to 10 do + Thread.yield () + done ; + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success ; + Atomic.incr finished + ) + () + in + Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks) ; + let f = Atomic.get finished in + assert (f = n || f = n - 1) ; + Thread.join t + +let benchmarks = + Test.make_grouped ~name:"Task latency" + [ + Test.make_indexed_with_resource ~name:"task complete+wait latency" + ~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks + ~free:free_tasks (fun n -> Staged.stage (run_tasks n) + ) + ; Test.make_indexed_with_resource ~name:"task complete+wait all latency" + ~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks + ~free:free_tasks (fun n -> Staged.stage (run_tasks' n) + ) + ; Test.make_indexed_with_resource + ~name:"task complete+wait all latency (thread)" ~args:[1; 10; 100] + Test.multiple ~allocate:allocate_tasks ~free:free_tasks (fun n -> + Staged.stage (run_tasks'' n) + ) + ] + +let () = Bechamel_simple_cli.cli benchmarks diff --git a/ocaml/tests/bench/dune b/ocaml/tests/bench/dune index dcd61813e1e..10cffadb857 100644 --- a/ocaml/tests/bench/dune +++ b/ocaml/tests/bench/dune @@ -1,4 +1,4 @@ (executables - (names bench_tracing bench_uuid) - (libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid) + (names bench_tracing bench_uuid bench_throttle2) + (libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid xapi_aux tests_common log xapi_internal) ) diff --git a/ocaml/xapi-cli-server/cli_operations.ml b/ocaml/xapi-cli-server/cli_operations.ml index 1e8ba0f3b37..4f61e843140 100644 --- a/ocaml/xapi-cli-server/cli_operations.ml +++ b/ocaml/xapi-cli-server/cli_operations.ml @@ -2848,8 +2848,6 @@ exception Finished let event_wait_gen rpc session_id classname record_matches = (* Immediately register *) let classes = [classname] in - Client.Event.register ~rpc ~session_id ~classes ; - debug "Registered for events" ; (* Check to see if the condition is already satisfied - get all objects of whatever class specified... *) let poll () = let current_tbls = @@ -2930,96 +2928,111 @@ let event_wait_gen rpc session_id classname record_matches = in List.exists record_matches all_recs in - finally - (fun () -> - if not (poll ()) then - try - while true do - try - let events = - Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) - in - let doevent event = - let tbl = - match Event_helper.record_of_event event with - | Event_helper.VM (r, Some x) -> - let record = vm_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VDI (r, Some x) -> - let record = vdi_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.SR (r, Some x) -> - let record = sr_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Host (r, Some x) -> - let record = host_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Network (r, Some x) -> - let record = net_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VIF (r, Some x) -> - let record = vif_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.PIF (r, Some x) -> - let record = pif_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VBD (r, Some x) -> - let record = vbd_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.PBD (r, Some x) -> - let record = pbd_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Pool (r, Some x) -> - let record = pool_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Task (r, Some x) -> - let record = task_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VMSS (r, Some x) -> - let record = vmss_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Secret (r, Some x) -> - let record = secret_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | _ -> - failwith - ("Cli listening for class '" - ^ classname - ^ "' not currently implemented" - ) - in - let record = - List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl - in - if record_matches record then raise Finished + let use_event_next = !Constants.use_event_next in + let run () = + if not (poll ()) then + try + let token = ref "" in + while true do + let events = + if use_event_next then + Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + else + let event_from = + Event_types.event_from_of_rpc + (Client.Event.from ~rpc ~session_id ~timeout:30. ~token:!token + ~classes + ) in - List.iter doevent - (List.filter (fun e -> e.Event_types.snapshot <> None) events) - with - | Api_errors.Server_error (code, _) - when code = Api_errors.events_lost - -> - debug "Got EVENTS_LOST; reregistering" ; - Client.Event.unregister ~rpc ~session_id ~classes ; - Client.Event.register ~rpc ~session_id ~classes ; - if poll () then raise Finished - done - with Finished -> () - ) - (fun () -> Client.Event.unregister ~rpc ~session_id ~classes) + token := event_from.token ; + event_from.events + in + let doevent event = + let tbl = + match Event_helper.record_of_event event with + | Event_helper.VM (r, Some x) -> + let record = vm_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VDI (r, Some x) -> + let record = vdi_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.SR (r, Some x) -> + let record = sr_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Host (r, Some x) -> + let record = host_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Network (r, Some x) -> + let record = net_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VIF (r, Some x) -> + let record = vif_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.PIF (r, Some x) -> + let record = pif_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VBD (r, Some x) -> + let record = vbd_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.PBD (r, Some x) -> + let record = pbd_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Pool (r, Some x) -> + let record = pool_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Task (r, Some x) -> + let record = task_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VMSS (r, Some x) -> + let record = vmss_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Secret (r, Some x) -> + let record = secret_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | _ -> + failwith + ("Cli listening for class '" + ^ classname + ^ "' not currently implemented" + ) + in + let record = + List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl + in + if record_matches record then raise_notrace Finished + in + List.iter doevent + (List.filter (fun e -> e.Event_types.snapshot <> None) events) + done + with + | Api_errors.Server_error (code, _) + when code = Api_errors.events_lost && use_event_next -> + debug "Got EVENTS_LOST; reregistering" ; + Client.Event.unregister ~rpc ~session_id ~classes ; + Client.Event.register ~rpc ~session_id ~classes ; + if poll () then raise Finished + | Finished -> + () + in + if use_event_next then ( + Client.Event.register ~rpc ~session_id ~classes ; + debug "Registered for events" ; + finally run (fun () -> Client.Event.unregister ~rpc ~session_id ~classes) + ) else + run () (* We're done. Unregister and finish *) diff --git a/ocaml/xapi-cli-server/cli_util.ml b/ocaml/xapi-cli-server/cli_util.ml index 48fd9392ef5..75c4f30360f 100644 --- a/ocaml/xapi-cli-server/cli_util.ml +++ b/ocaml/xapi-cli-server/cli_util.ml @@ -42,21 +42,41 @@ exception Cli_failure of string (** call [callback task_record] on every update to the task, until it completes or fails *) let track callback rpc (session_id : API.ref_session) task = - let classes = ["task"] in + let use_event_next = !Constants.use_event_next in + let classes = + if use_event_next then + ["task"] + else + [Printf.sprintf "task/%s" (Ref.string_of task)] + in finally (fun () -> let finished = ref false in while not !finished do - Client.Event.register ~rpc ~session_id ~classes ; + if use_event_next then + Client.Event.register ~rpc ~session_id ~classes ; try (* Need to check once after registering to avoid a race *) finished := Client.Task.get_status ~rpc ~session_id ~self:task <> `pending ; + let token = ref "" in while not !finished do let events = - Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + if use_event_next then + let events = + Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + in + List.map Event_helper.record_of_event events + else + let event_from = + Event_types.event_from_of_rpc + (Client.Event.from ~rpc ~session_id ~classes ~token:!token + ~timeout:30. + ) + in + token := event_from.token ; + List.map Event_helper.record_of_event event_from.events in - let events = List.map Event_helper.record_of_event events in List.iter (function | Event_helper.Task (t, Some t_rec) when t = task -> diff --git a/ocaml/xapi-consts/constants.ml b/ocaml/xapi-consts/constants.ml index 2c7fc49e179..d3ee0bf8531 100644 --- a/ocaml/xapi-consts/constants.ml +++ b/ocaml/xapi-consts/constants.ml @@ -275,6 +275,9 @@ let owner_key = "owner" (* set in VBD other-config to indicate that clients can delete the attached VDI on VM uninstall if they want.. *) +(* xapi-cli-server doesn't link xapi-globs *) +let use_event_next = ref true + (* the time taken to wait before restarting in a different mode for pool eject/join operations *) let fuse_time = ref 10. diff --git a/ocaml/xapi-types/event_types.ml b/ocaml/xapi-types/event_types.ml index fcd8840e59f..83c82b0bc8d 100644 --- a/ocaml/xapi-types/event_types.ml +++ b/ocaml/xapi-types/event_types.ml @@ -77,6 +77,24 @@ let rec rpc_of_event_from e = ; ("token", rpc_of_token e.token) ] +(* xmlrpc and jsonrpc would map Int32 to Int, but int32_of_rpc can't actually parse + an Int32 back as an int32... this is a bug in ocaml-rpc that should be fixed. + meanwhile work it around by mapping Rpc.Int32 to Rpc.Int upon receiving the message + (it is only Rpc.Int32 for backward compat with non-XAPI Xmlrpc clients) +*) + +let rec fixup_int32 = function + | Rpc.Dict dict -> + Rpc.Dict (List.map fixup_kv dict) + | Rpc.Int32 i -> + Rpc.Int (Int64.of_int32 i) + | rpc -> + rpc + +and fixup_kv (k, v) = (k, fixup_int32 v) + +let event_from_of_rpc rpc = rpc |> fixup_int32 |> event_from_of_rpc + (** Return result of an events.from call *) open Printf diff --git a/ocaml/xapi/taskHelper.mli b/ocaml/xapi/taskHelper.mli index dc5d76cf65b..1c4d5381586 100644 --- a/ocaml/xapi/taskHelper.mli +++ b/ocaml/xapi/taskHelper.mli @@ -36,6 +36,9 @@ val set_result : __context:Context.t -> Rpc.t option -> unit val status_is_completed : [> `cancelled | `failure | `success] -> bool +val status_to_string : + [< `pending | `success | `failure | `cancelling | `cancelled] -> string + val complete : __context:Context.t -> Rpc.t option -> unit val set_cancellable : __context:Context.t -> unit diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index efdcabfbdb6..0c061731924 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1395,6 +1395,11 @@ let other_options = , (fun () -> string_of_bool !Db_globs.idempotent_map) , "True if the add_to_ API calls should be idempotent" ) + ; ( "use-event-next" + , Arg.Set Constants.use_event_next + , (fun () -> string_of_bool !Constants.use_event_next) + , "Use deprecated Event.next instead of Event.from" + ) ; ( "nvidia_multi_vgpu_enabled_driver_versions" , Arg.String (fun x ->