From ff6f3f83661927049d97d07248e9d75c151b7c45 Mon Sep 17 00:00:00 2001 From: Steven Woods Date: Tue, 10 Jan 2023 15:52:56 +0000 Subject: [PATCH 01/13] CP-32622: Replace select in buf_io MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use SO_RCVTIMEO socket option instead. This will cause EAGAIN or EWOULDBLOCK to be returned according to `socket(7)`. Need to be careful about the distinction between select 0 and SO_RCVTIMEO 0. select 0 would timeout immediately, but SO_RCVTIMEO would timeout never. Signed-off-by: Steven Woods Signed-off-by: Edwin Török --- ocaml/libs/http-lib/buf_io.ml | 16 ++++++++++------ ocaml/libs/http-lib/http.ml | 9 +-------- .../xapi-stdext/lib/xapi-stdext-unix/unixext.ml | 15 +++++++++++++++ .../xapi-stdext/lib/xapi-stdext-unix/unixext.mli | 2 ++ 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/ocaml/libs/http-lib/buf_io.ml b/ocaml/libs/http-lib/buf_io.ml index 6a6397a614c..b5d41008c4a 100644 --- a/ocaml/libs/http-lib/buf_io.ml +++ b/ocaml/libs/http-lib/buf_io.ml @@ -79,21 +79,21 @@ let is_full ic = ic.cur = 0 && ic.max = Bytes.length ic.buf let fill_buf ~buffered ic timeout = let buf_size = Bytes.length ic.buf in let fill_no_exc timeout len = - let l, _, _ = Unix.select [ic.fd] [] [] timeout in - if l <> [] then ( + Xapi_stdext_unix.Unixext.with_socket_timeout ic.fd timeout @@ fun () -> + try let n = Unix.read ic.fd ic.buf ic.max len in ic.max <- n + ic.max ; if n = 0 && len <> 0 then raise Eof ; n - ) else - -1 + with Unix.Unix_error (Unix.(EAGAIN | EWOULDBLOCK), _, _) -> -1 in (* If there's no space to read, shift *) if ic.max = buf_size then shift ic ; let space_left = buf_size - ic.max in (* Read byte one by one just do make sure we don't buffer too many chars *) let n = - fill_no_exc timeout (if buffered then space_left else min space_left 1) + fill_no_exc (Some timeout) + (if buffered then space_left else min space_left 1) in (* Select returned nothing to read *) if n = -1 then raise Timeout ; @@ -102,7 +102,11 @@ let fill_buf ~buffered ic timeout = let tofillsz = if buffered then buf_size - ic.max else min (buf_size - ic.max) 1 in - ignore (fill_no_exc 0.0 tofillsz) + (* cannot use 0 here, for select that'd mean timeout immediately, for + setsockopt it would mean no timeout. + So use a very short timeout instead + *) + ignore (fill_no_exc (Some 1e-6) tofillsz) ) (** Input one line terminated by \n *) diff --git a/ocaml/libs/http-lib/http.ml b/ocaml/libs/http-lib/http.ml index c6ff41be709..751ce7208fd 100644 --- a/ocaml/libs/http-lib/http.ml +++ b/ocaml/libs/http-lib/http.ml @@ -368,14 +368,8 @@ let read_frame_header buf = let prefix = Bytes.sub_string buf 0 frame_header_length in try Scanf.sscanf prefix "FRAME %012d" (fun x -> Some x) with _ -> None -let set_socket_timeout fd t = - try Unix.(setsockopt_float fd SO_RCVTIMEO t) - with Unix.Unix_error (Unix.ENOTSOCK, _, _) -> - (* In the unit tests, the fd comes from a pipe... ignore *) - () - let read_http_request_header ~read_timeout ~total_timeout ~max_length fd = - Option.iter (fun t -> set_socket_timeout fd t) read_timeout ; + Unixext.with_socket_timeout fd read_timeout @@ fun () -> let buf = Bytes.create (Option.value ~default:1024 max_length) in let deadline = Option.map @@ -420,7 +414,6 @@ let read_http_request_header ~read_timeout ~total_timeout ~max_length fd = check_timeout_and_read 0 length ; (true, length) in - set_socket_timeout fd 0. ; (frame, Bytes.sub_string buf 0 headers_length, proxy) let read_http_response_header buf fd = diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml index ae2c92dc87b..b033162d332 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml @@ -991,3 +991,18 @@ module Daemon = struct true with Unix.Unix_error _ -> false end + +let set_socket_timeout fd t = + try Unix.(setsockopt_float fd SO_RCVTIMEO t) + with Unix.Unix_error (Unix.ENOTSOCK, _, _) -> + (* In the unit tests, the fd comes from a pipe... ignore *) + () + +let with_socket_timeout fd timeout_opt f = + match timeout_opt with + | Some t -> + if t < 1e-6 then invalid_arg (Printf.sprintf "Timeout too short: %g" t) ; + let finally () = set_socket_timeout fd 0. in + set_socket_timeout fd t ; Fun.protect ~finally f + | None -> + f () diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli index 176adc94cf8..b2e76069aef 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli @@ -146,6 +146,8 @@ val try_read_string : ?limit:int -> Unix.file_descr -> string exception Timeout +val with_socket_timeout : Unix.file_descr -> float option -> (unit -> 'a) -> 'a + val time_limited_write : Unix.file_descr -> int -> bytes -> float -> unit val time_limited_write_substring : From 368e8fb861484928b85189fed1559c504a50b995 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 23 May 2024 17:54:08 +0100 Subject: [PATCH 02/13] CP-47536: replace select in ezxenstore with Delay module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This introduces a dependency between 'ezxenstore' and 'xapi-stdext-threads'. Signed-off-by: Edwin Török --- ocaml/libs/ezxenstore/core/dune | 1 + ocaml/libs/ezxenstore/core/watch.ml | 46 ++++++++++------------------- 2 files changed, 16 insertions(+), 31 deletions(-) diff --git a/ocaml/libs/ezxenstore/core/dune b/ocaml/libs/ezxenstore/core/dune index 53e812032f7..84e96b4c46c 100644 --- a/ocaml/libs/ezxenstore/core/dune +++ b/ocaml/libs/ezxenstore/core/dune @@ -9,5 +9,6 @@ (re_export xenstore) (re_export xenstore_transport) threads.posix + xapi-stdext-threads (re_export xenstore.unix)) ) diff --git a/ocaml/libs/ezxenstore/core/watch.ml b/ocaml/libs/ezxenstore/core/watch.ml index 35f3aee0b5e..36508767347 100644 --- a/ocaml/libs/ezxenstore/core/watch.ml +++ b/ocaml/libs/ezxenstore/core/watch.ml @@ -31,41 +31,25 @@ let has_fired ~xs x = true with Xs_protocol.Eagain -> false +module Delay = Xapi_stdext_threads.Threadext.Delay + (** Block waiting for a result *) let wait_for ~xs ?(timeout = 300.) (x : 'a t) = let _ = ignore xs in - let with_pipe f = - let p1, p2 = Unix.pipe () in - let close_all () = - let close p = try Unix.close p with _ -> () in - close p1 ; close p2 - in - try - let result = f (p1, p2) in - close_all () ; result - with e -> close_all () ; raise e - in let task = Xs.wait x.evaluate in - with_pipe (fun (p1, p2) -> - let thread = - Thread.create - (fun () -> - let r, _, _ = Unix.select [p1] [] [] timeout in - if r <> [] then - () - else - try Xs_client_unix.Task.cancel task with _ -> () - ) - () - in - try - let result = Xs_client_unix.Task.wait task in - ignore (Unix.write p2 (Bytes.of_string "x") 0 1) ; - Thread.join thread ; - result - with Xs_client_unix.Cancelled -> - Thread.join thread ; raise (Timeout timeout) - ) + let delay = Delay.make () in + let thread = + Thread.create + (fun () -> + if Delay.wait delay timeout then + try Xs_client_unix.Task.cancel task with _ -> () + ) + () + in + try + let result = Xs_client_unix.Task.wait task in + Delay.signal delay ; Thread.join thread ; result + with Xs_client_unix.Cancelled -> Thread.join thread ; raise (Timeout timeout) (** Wait for a node to appear in the store and return its value *) let value_to_appear (path : path) : string t = From 5bd3d35214968dd9e62b1e256fc11f202fa499e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 23 May 2024 16:13:24 +0100 Subject: [PATCH 03/13] CP-47536: Drop posix_channel and channel_helper: unused and a mix of Unix/Lwt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It was a mix of Lwt and Unix code, which means that if the Unix call blocks the entire Lwt code blocks too. This was only installed by the specfile in a -devel package. `message-cli tail --follow` can be used to debug the IDL protocol instead. Signed-off-by: Edwin Török --- ocaml/xapi-idl/README.md | 1 - ocaml/xapi-idl/lib/posix_channel.ml | 234 ------------------------ ocaml/xapi-idl/lib/posix_channel.mli | 21 --- ocaml/xapi-idl/lib/xcp_channel.ml | 17 -- ocaml/xapi-idl/lib/xcp_channel.mli | 13 -- ocaml/xapi-idl/lib_test/channel_test.ml | 77 -------- ocaml/xapi-idl/misc/channel_helper.ml | 221 ---------------------- ocaml/xapi-idl/misc/dune | 16 -- quality-gate.sh | 2 +- 9 files changed, 1 insertion(+), 601 deletions(-) delete mode 100644 ocaml/xapi-idl/lib/posix_channel.ml delete mode 100644 ocaml/xapi-idl/lib/posix_channel.mli delete mode 100644 ocaml/xapi-idl/lib/xcp_channel.ml delete mode 100644 ocaml/xapi-idl/lib/xcp_channel.mli delete mode 100644 ocaml/xapi-idl/lib_test/channel_test.ml delete mode 100644 ocaml/xapi-idl/misc/channel_helper.ml delete mode 100644 ocaml/xapi-idl/misc/dune diff --git a/ocaml/xapi-idl/README.md b/ocaml/xapi-idl/README.md index 3b34349a152..2da87aa0c20 100644 --- a/ocaml/xapi-idl/README.md +++ b/ocaml/xapi-idl/README.md @@ -10,7 +10,6 @@ This repository contains * argument parsing * RPCs 3. The following CLI tools for debugging: - * lib/channel_helper.exe -- a channel passing helper CLI * memory/memory_cli.exe -- a squeezed debugging CLI * v6/v6_cli.exe -- a V6d debugging CLI * cluster/cluster_cli.exe -- a xapi-clusterd debugging CLI diff --git a/ocaml/xapi-idl/lib/posix_channel.ml b/ocaml/xapi-idl/lib/posix_channel.ml deleted file mode 100644 index 06708561011..00000000000 --- a/ocaml/xapi-idl/lib/posix_channel.ml +++ /dev/null @@ -1,234 +0,0 @@ -let my_domid = 0 (* TODO: figure this out *) - -exception End_of_file - -exception Channel_setup_failed - -module CBuf = struct - (** A circular buffer constructed from a string *) - type t = { - mutable buffer: bytes - ; mutable len: int (** bytes of valid data in [buffer] *) - ; mutable start: int (** index of first valid byte in [buffer] *) - ; mutable r_closed: bool (** true if no more data can be read due to EOF *) - ; mutable w_closed: bool - (** true if no more data can be written due to EOF *) - } - - let empty length = - { - buffer= Bytes.create length - ; len= 0 - ; start= 0 - ; r_closed= false - ; w_closed= false - } - - let drop (x : t) n = - if n > x.len then failwith (Printf.sprintf "drop %d > %d" n x.len) ; - x.start <- (x.start + n) mod Bytes.length x.buffer ; - x.len <- x.len - n - - let should_read (x : t) = - (not x.r_closed) && x.len < Bytes.length x.buffer - 1 - - let should_write (x : t) = (not x.w_closed) && x.len > 0 - - let end_of_reads (x : t) = x.r_closed && x.len = 0 - - let end_of_writes (x : t) = x.w_closed - - let write (x : t) fd = - (* Offset of the character after the substring *) - let next = min (Bytes.length x.buffer) (x.start + x.len) in - let len = next - x.start in - let written = - try Unix.single_write fd x.buffer x.start len - with _e -> - x.w_closed <- true ; - len - in - drop x written - - let read (x : t) fd = - (* Offset of the next empty character *) - let next = (x.start + x.len) mod Bytes.length x.buffer in - let len = - min (Bytes.length x.buffer - next) (Bytes.length x.buffer - x.len) - in - let read = Unix.read fd x.buffer next len in - if read = 0 then x.r_closed <- true ; - x.len <- x.len + read -end - -let proxy (a : Unix.file_descr) (b : Unix.file_descr) = - let size = 64 * 1024 in - (* [a'] is read from [a] and will be written to [b] *) - (* [b'] is read from [b] and will be written to [a] *) - let a' = CBuf.empty size and b' = CBuf.empty size in - Unix.set_nonblock a ; - Unix.set_nonblock b ; - try - while true do - let r = - (if CBuf.should_read a' then [a] else []) - @ if CBuf.should_read b' then [b] else [] - in - let w = - (if CBuf.should_write a' then [b] else []) - @ if CBuf.should_write b' then [a] else [] - in - (* If we can't make any progress (because fds have been closed), then stop *) - if r = [] && w = [] then raise End_of_file ; - let r, w, _ = Unix.select r w [] (-1.0) in - (* Do the writing before the reading *) - List.iter - (fun fd -> if a = fd then CBuf.write b' a else CBuf.write a' b) - w ; - List.iter (fun fd -> if a = fd then CBuf.read a' a else CBuf.read b' b) r ; - (* If there's nothing else to read or write then signal the other end *) - List.iter - (fun (buf, fd) -> - if CBuf.end_of_reads buf then Unix.shutdown fd Unix.SHUTDOWN_SEND ; - if CBuf.end_of_writes buf then Unix.shutdown fd Unix.SHUTDOWN_RECEIVE - ) - [(a', b); (b', a)] - done - with _ -> ( - (try Unix.clear_nonblock a with _ -> ()) ; - try Unix.clear_nonblock b with _ -> () - ) - -let finally f g = - try - let result = f () in - g () ; result - with e -> g () ; raise e - -let ip = ref "127.0.0.1" - -let send proxy_socket = - let to_close = ref [] in - let to_unlink = ref [] in - finally - (fun () -> - let s_ip = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in - to_close := s_ip :: !to_close ; - Unix.bind s_ip (Unix.ADDR_INET (Unix.inet_addr_of_string !ip, 0)) ; - Unix.listen s_ip 5 ; - let port = - match Unix.getsockname s_ip with - | Unix.ADDR_INET (_, port) -> - port - | _ -> - assert false - in - let s_unix = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - to_close := s_unix :: !to_close ; - let path = Filename.temp_file "channel" "" in - to_unlink := path :: !to_unlink ; - if Sys.file_exists path then Unix.unlink path ; - Unix.bind s_unix (Unix.ADDR_UNIX path) ; - Unix.listen s_unix 5 ; - let token = "token" in - let protocols = - let open Xcp_channel_protocol in - [TCP_proxy (!ip, port); Unix_sendmsg (my_domid, path, token)] - in - (* We need to hang onto a copy of the proxy_socket so we can run a proxy - in a background thread, allowing the caller to close their copy. *) - let proxy_socket = Unix.dup proxy_socket in - to_close := proxy_socket :: !to_close ; - let (_ : Thread.t) = - Thread.create - (fun (fds, paths) -> - (* The thread takes over management of the listening sockets *) - let to_close = ref fds in - let to_unlink = ref paths in - let close fd = - if List.mem fd !to_close then ( - to_close := List.filter (fun x -> x <> fd) !to_close ; - Unix.close fd - ) - in - finally - (fun () -> - let readable, _, _ = Unix.select [s_ip; s_unix] [] [] (-1.0) in - if List.mem s_unix readable then ( - let fd, _peer = Unix.accept s_unix in - to_close := fd :: !to_close ; - let buffer = Bytes.make (String.length token) '\000' in - let n = Unix.recv fd buffer 0 (Bytes.length buffer) [] in - let token' = Bytes.sub_string buffer 0 n in - if token = token' then - let (_ : int) = - Fd_send_recv.send_fd_substring fd token 0 - (String.length token) [] proxy_socket - in - () - ) else if List.mem s_ip readable then ( - let fd, _peer = Unix.accept s_ip in - List.iter close !to_close ; - to_close := fd :: !to_close ; - proxy fd proxy_socket - ) else - assert false - (* can never happen *) - ) - (fun () -> - List.iter close !to_close ; - List.iter Unix.unlink !to_unlink - ) - ) - (!to_close, !to_unlink) - in - (* Handover of listening sockets successful *) - to_close := [] ; - to_unlink := [] ; - protocols - ) - (fun () -> - List.iter Unix.close !to_close ; - List.iter Unix.unlink !to_unlink - ) - -let receive protocols = - let open Xcp_channel_protocol in - let weight = function - | TCP_proxy (_, _) -> - 2 - | Unix_sendmsg (domid, _, _) -> - if my_domid = domid then 3 else 0 - | V4V_proxy (_, _) -> - 0 - in - let protocol = - match List.sort (fun a b -> compare (weight b) (weight a)) protocols with - | [] -> - raise Channel_setup_failed - | best :: _ -> - if weight best = 0 then raise Channel_setup_failed else best - in - match protocol with - | V4V_proxy (_, _) -> - assert false (* weight is 0 above *) - | TCP_proxy (ip, port) -> ( - let unwrapped_ip = Scanf.ksscanf ip (fun _ _ -> ip) "[%s@]" Fun.id in - let addr = Unix.ADDR_INET (Unix.inet_addr_of_string unwrapped_ip, port) in - let family = Unix.domain_of_sockaddr addr in - let s = Unix.socket family Unix.SOCK_STREAM 0 in - try Unix.connect s addr ; s with e -> Unix.close s ; raise e - ) - | Unix_sendmsg (_, path, token) -> - let s = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - finally - (fun () -> - Unix.connect s (Unix.ADDR_UNIX path) ; - let (_ : int) = - Unix.send_substring s token 0 (String.length token) [] - in - let buf = Bytes.create (String.length token) in - let _, _, fd = Fd_send_recv.recv_fd s buf 0 (Bytes.length buf) [] in - fd - ) - (fun () -> Unix.close s) diff --git a/ocaml/xapi-idl/lib/posix_channel.mli b/ocaml/xapi-idl/lib/posix_channel.mli deleted file mode 100644 index 8610f27a86d..00000000000 --- a/ocaml/xapi-idl/lib/posix_channel.mli +++ /dev/null @@ -1,21 +0,0 @@ -(* - * Copyright (C) Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -val send : Unix.file_descr -> Xcp_channel_protocol.t list -(** [send fd] attempts to send the channel represented by [fd] to a remote - process. Note the file descriptor remains open in the original process and - should still be closed normally. *) - -val receive : Xcp_channel_protocol.t list -> Unix.file_descr -(** [receive protocols] receives a channel from a remote. *) diff --git a/ocaml/xapi-idl/lib/xcp_channel.ml b/ocaml/xapi-idl/lib/xcp_channel.ml deleted file mode 100644 index 395da851a5f..00000000000 --- a/ocaml/xapi-idl/lib/xcp_channel.ml +++ /dev/null @@ -1,17 +0,0 @@ -type t = Unix.file_descr - -let file_descr_of_t t = t - -let t_of_file_descr t = t - -[@@@ocaml.warning "-34"] - -type protocols = Xcp_channel_protocol.t list [@@deriving rpc] - -let rpc_of_t fd = - let protocols = Posix_channel.send fd in - rpc_of_protocols protocols - -let t_of_rpc x = - let protocols = protocols_of_rpc x in - Posix_channel.receive protocols diff --git a/ocaml/xapi-idl/lib/xcp_channel.mli b/ocaml/xapi-idl/lib/xcp_channel.mli deleted file mode 100644 index 35849a1e5d4..00000000000 --- a/ocaml/xapi-idl/lib/xcp_channel.mli +++ /dev/null @@ -1,13 +0,0 @@ -type t - -val rpc_of_t : t -> Rpc.t - -val t_of_rpc : Rpc.t -> t - -val file_descr_of_t : t -> Unix.file_descr - -val t_of_file_descr : Unix.file_descr -> t - -val protocols_of_rpc : Rpc.t -> Xcp_channel_protocol.t list - -val rpc_of_protocols : Xcp_channel_protocol.t list -> Rpc.t diff --git a/ocaml/xapi-idl/lib_test/channel_test.ml b/ocaml/xapi-idl/lib_test/channel_test.ml deleted file mode 100644 index dd607935778..00000000000 --- a/ocaml/xapi-idl/lib_test/channel_test.ml +++ /dev/null @@ -1,77 +0,0 @@ -(* - * Copyright (C) 2011-2013 Citrix Inc - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -let dup_automatic x = - let x = Xcp_channel.t_of_file_descr x in - let y = Xcp_channel.rpc_of_t x in - let z = Xcp_channel.t_of_rpc y in - Xcp_channel.file_descr_of_t z - -let dup_sendmsg x = - let protos = Posix_channel.send x in - let proto = - List.find - (function - | Xcp_channel_protocol.Unix_sendmsg (_, _, _) -> true | _ -> false - ) - protos - in - Posix_channel.receive [proto] - -let count_fds () = Array.length (Sys.readdir "/proc/self/fd") - -(* dup stdout, check /proc/pid/fd *) -let check_for_leak dup_function () = - let before = count_fds () in - let stdout2 = dup_function Unix.stdout in - let after = count_fds () in - Alcotest.(check int) "fds" (before + 1) after ; - Unix.close stdout2 ; - let after' = count_fds () in - Alcotest.(check int) "fds" before after' - -let dup_proxy x = - let protos = Posix_channel.send x in - let proto = - List.find - (function - | Xcp_channel_protocol.TCP_proxy (_ip, _port) -> true | _ -> false - ) - protos - in - Posix_channel.receive [proto] - -let check_for_leak_proxy () = - let a, _b = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in - let before = count_fds () in - let c = dup_proxy a in - (* background fd closing *) - Thread.delay 1.0 ; - let after = count_fds () in - Alcotest.(check int) "fds" (before + 2) after ; - Unix.close c ; - (* background fd closing *) - Thread.delay 1.0 ; - let after' = count_fds () in - Alcotest.(check int) "fds" before after' - -let tests = - [ - ( "check_for_leak with automatic selection" - , `Quick - , check_for_leak dup_automatic - ) - ; ("check_for_leak with sendmsg", `Quick, check_for_leak dup_sendmsg) - ; ("check_for_leak_proxy", `Quick, check_for_leak_proxy) - ] diff --git a/ocaml/xapi-idl/misc/channel_helper.ml b/ocaml/xapi-idl/misc/channel_helper.ml deleted file mode 100644 index 1485e6a5ead..00000000000 --- a/ocaml/xapi-idl/misc/channel_helper.ml +++ /dev/null @@ -1,221 +0,0 @@ -let project_url = "https://github.com/xen-org/xcp-idl" - -open Lwt - -let my_domid = 0 (* TODO: figure this out *) - -exception Short_write of int * int - -exception End_of_file - -let copy_all src dst = - let buffer = Bytes.make 16384 '\000' in - let rec loop () = - Lwt_unix.read src buffer 0 (Bytes.length buffer) >>= fun n -> - if n = 0 then - Lwt.fail End_of_file - else - Lwt_unix.write dst buffer 0 n >>= fun m -> - if n <> m then Lwt.fail (Short_write (m, n)) else loop () - in - loop () - -let proxy a b = - let copy _id src dst = - Lwt.catch - (fun () -> copy_all src dst) - (fun _e -> - (try Lwt_unix.shutdown src Lwt_unix.SHUTDOWN_RECEIVE with _ -> ()) ; - (try Lwt_unix.shutdown dst Lwt_unix.SHUTDOWN_SEND with _ -> ()) ; - return () - ) - in - let ts = [copy "ab" a b; copy "ba" b a] in - Lwt.join ts - -let file_descr_of_int (x : int) : Unix.file_descr = Obj.magic x - -(* Keep this in sync with ocaml's file_descr type *) - -let ip = ref "127.0.0.1" - -let unix = ref "/tmp" - -module Common = struct - type t = {verbose: bool; debug: bool; port: int} [@@deriving rpc] - - let make verbose debug port = {verbose; debug; port} -end - -let _common_options = "COMMON OPTIONS" - -open Cmdliner - -(* Options common to all commands *) -let common_options_t = - let docs = _common_options in - let debug = - let doc = "Give only debug output." in - Arg.(value & flag & info ["debug"] ~docs ~doc) - in - let verb = - let doc = "Give verbose output." in - let verbose = (true, Arg.info ["v"; "verbose"] ~docs ~doc) in - Arg.(last & vflag_all [false] [verbose]) - in - let port = - let doc = Printf.sprintf "Specify port to connect to the message switch." in - Arg.(value & opt int 8080 & info ["port"] ~docs ~doc) - in - Term.(const Common.make $ debug $ verb $ port) - -(* Help sections common to all commands *) -let help = - [ - `S _common_options - ; `P "These options are common to all commands." - ; `S "MORE HELP" - ; `P "Use `$(mname) $(i,COMMAND) --help' for help on a single command." - ; `Noblank - ; `S "BUGS" - ; `P (Printf.sprintf "Check bug reports at %s" project_url) - ] - -(* Commands *) -let advertise_t _common_options_t proxy_socket = - let unwrapped_ip = Scanf.ksscanf !ip (fun _ _ -> !ip) "[%s@]" Fun.id in - let addr = Lwt_unix.ADDR_INET (Unix.inet_addr_of_string unwrapped_ip, 0) in - let family = Unix.domain_of_sockaddr addr in - let s_ip = Lwt_unix.socket family Lwt_unix.SOCK_STREAM 0 in - (* INET socket, can't block *) - Lwt_unix.bind s_ip addr >>= fun () -> - Lwt_unix.listen s_ip 5 ; - let port = - match Lwt_unix.getsockname s_ip with - | Unix.ADDR_INET (_, port) -> - port - | _ -> - assert false - in - let s_unix = Lwt_unix.socket Lwt_unix.PF_UNIX Lwt_unix.SOCK_STREAM 0 in - (* Try to avoid polluting the filesystem with unused unix domain sockets *) - let path = - Printf.sprintf "%s/%s.%d" !unix - (Filename.basename Sys.argv.(0)) - (Unix.getpid ()) - in - if Sys.file_exists path then Unix.unlink path ; - Lwt_unix.bind s_unix (Lwt_unix.ADDR_UNIX path) >>= fun () -> - List.iter - (fun signal -> - ignore (Lwt_unix.on_signal signal (fun _ -> Unix.unlink path ; exit 1)) - ) - [Sys.sigterm; Sys.sigint] ; - Lwt_unix.listen s_unix 5 ; - let token = "token" in - let protocols = - let open Xcp_channel_protocol in - [TCP_proxy (!ip, port); Unix_sendmsg (my_domid, path, token)] - in - Printf.fprintf stdout "%s\n%!" - (Jsonrpc.to_string (Xcp_channel.rpc_of_protocols protocols)) ; - let t_ip = - Lwt_unix.accept s_ip >>= fun (fd, _peer) -> - Lwt_unix.close s_ip >>= fun () -> - proxy fd (Lwt_unix.of_unix_file_descr proxy_socket) - in - let t_unix = - Lwt_unix.accept s_unix >>= fun (fd, _peer) -> - let buffer = Bytes.make (String.length token) '\000' in - let io_vector = Lwt_unix.IO_vectors.create () in - Lwt_unix.IO_vectors.append_bytes io_vector buffer 0 (Bytes.length buffer) ; - Lwt_unix.recv_msg ~socket:fd ~io_vectors:io_vector >>= fun (n, fds) -> - List.iter Unix.close fds ; - let token' = Bytes.sub buffer 0 n in - let io_vector' = Lwt_unix.IO_vectors.create () in - Lwt_unix.IO_vectors.append_bytes io_vector' token' 0 (Bytes.length token') ; - if token = Bytes.to_string token' then - Lwt_unix.send_msg ~socket:fd ~io_vectors:io_vector' ~fds:[proxy_socket] - >>= fun _ -> return () - else - return () - in - Lwt.pick [t_ip; t_unix] >>= fun () -> Unix.unlink path ; return () - -let advertise common_options_t fd = - match fd with - | Some x -> - Lwt_main.run (advertise_t common_options_t (file_descr_of_int x)) ; - `Ok () - | None -> - `Error (true, "you must provide a file descriptor to proxy") - -let advertise_cmd = - let doc = "advertise a given channel represented as a file-descriptor" in - let man = - [ - `S "DESCRIPTION" - ; `P - "Advertises a given channel over as many protocols as possible, and \ - waits for someone to connect." - ] - @ help - in - let fd = - let doc = Printf.sprintf "File descriptor to advertise" in - Arg.(value & pos 0 (some int) None & info [] ~docv:"FD" ~doc) - in - Cmd.v - (Cmd.info "advertise" ~sdocs:_common_options ~doc ~man) - Term.(ret (const advertise $ common_options_t $ fd)) - -let connect_t _common_options_t = - (Lwt_io.read_line_opt Lwt_io.stdin >>= function - | None -> - return "" - | Some x -> - return x - ) - >>= fun advertisement -> - let open Xcp_channel in - let fd = - Lwt_unix.of_unix_file_descr - (file_descr_of_t (t_of_rpc (Jsonrpc.of_string advertisement))) - in - let a = copy_all Lwt_unix.stdin fd in - let b = copy_all fd Lwt_unix.stdout in - Lwt.join [a; b] - -let connect common_options_t = - Lwt_main.run (connect_t common_options_t) ; - `Ok () - -let connect_cmd = - let doc = "connect to a channel and proxy to the terminal" in - let man = - [ - `S "DESCRIPTION" - ; `P - "Connect to a channel which has been advertised and proxy I/O to the \ - console. The advertisement will be read from stdin as a single line \ - of text." - ] - @ help - in - Cmd.v - (Cmd.info "connect" ~sdocs:_common_options ~doc ~man) - Term.(ret (const connect $ common_options_t)) - -let cmds = [advertise_cmd; connect_cmd] - -let () = - let default = - Term.(ret (const (fun _ -> `Help (`Pager, None)) $ common_options_t)) - in - let info = - let doc = "channel (file-descriptor) passing helper program" in - let man = help in - Cmd.info "proxy" ~version:"1.0.0" ~sdocs:_common_options ~doc ~man - in - let cmd = Cmd.group ~default info cmds in - exit (Cmd.eval cmd) diff --git a/ocaml/xapi-idl/misc/dune b/ocaml/xapi-idl/misc/dune deleted file mode 100644 index 9d009d01260..00000000000 --- a/ocaml/xapi-idl/misc/dune +++ /dev/null @@ -1,16 +0,0 @@ -(executable - (name channel_helper) - (public_name xcp-idl-debugger) - (modules channel_helper) - (package xapi-idl) - (libraries - cmdliner - dune-build-info - lwt - lwt.unix - rpclib.core - rpclib.json - xapi-idl - xapi-log - ) - (preprocess (pps ppx_deriving_rpc))) diff --git a/quality-gate.sh b/quality-gate.sh index b3cd2e67813..f9c644467f5 100755 --- a/quality-gate.sh +++ b/quality-gate.sh @@ -25,7 +25,7 @@ verify-cert () { } mli-files () { - N=515 + N=513 # do not count ml files from the tests in ocaml/{tests/perftest/quicktest} MLIS=$(git ls-files -- '**/*.mli' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;) MLS=$(git ls-files -- '**/*.ml' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;) From 007df282b116f8ecc3db102b585c6ab164ea56ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 10 May 2024 11:47:50 +0100 Subject: [PATCH 04/13] CP-47536: replace Protocol_unix.scheduler.Delay with Threadext.Delay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Its implementation was identical, except for the use of time_limited_read in Threadext, but the semantics is identical. Use one well tested implementation instead of duplicating code. One less function to convert to epoll. Signed-off-by: Edwin Török --- ocaml/message-switch/unix/dune | 1 + .../unix/protocol_unix_scheduler.ml | 66 +------------------ 2 files changed, 2 insertions(+), 65 deletions(-) diff --git a/ocaml/message-switch/unix/dune b/ocaml/message-switch/unix/dune index 54b6c0e77bf..3e088a12556 100644 --- a/ocaml/message-switch/unix/dune +++ b/ocaml/message-switch/unix/dune @@ -11,6 +11,7 @@ rpclib.core rpclib.json threads.posix + xapi-stdext-threads ) (preprocess (pps ppx_deriving_rpc)) ) diff --git a/ocaml/message-switch/unix/protocol_unix_scheduler.ml b/ocaml/message-switch/unix/protocol_unix_scheduler.ml index 92e6cdd3b1b..3eaeb83218c 100644 --- a/ocaml/message-switch/unix/protocol_unix_scheduler.ml +++ b/ocaml/message-switch/unix/protocol_unix_scheduler.ml @@ -34,71 +34,7 @@ module Int64Map = Map.Make (struct let compare = compare end) -module Delay = struct - (* Concrete type is the ends of a pipe *) - type t = { - (* A pipe is used to wake up a thread blocked in wait: *) - mutable pipe_out: Unix.file_descr option - ; mutable pipe_in: Unix.file_descr option - ; (* Indicates that a signal arrived before a wait: *) - mutable signalled: bool - ; m: Mutex.t - } - - let make () = - {pipe_out= None; pipe_in= None; signalled= false; m= Mutex.create ()} - - exception Pre_signalled - - let wait (x : t) (seconds : float) = - let to_close = ref [] in - let close' fd = - if List.mem fd !to_close then Unix.close fd ; - to_close := List.filter (fun x -> fd <> x) !to_close - in - finally' - (fun () -> - try - let pipe_out = - Mutex.execute x.m (fun () -> - if x.signalled then ( - x.signalled <- false ; - raise Pre_signalled - ) ; - let pipe_out, pipe_in = Unix.pipe () in - (* these will be unconditionally closed on exit *) - to_close := [pipe_out; pipe_in] ; - x.pipe_out <- Some pipe_out ; - x.pipe_in <- Some pipe_in ; - x.signalled <- false ; - pipe_out - ) - in - let r, _, _ = Unix.select [pipe_out] [] [] seconds in - (* flush the single byte from the pipe *) - if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ; - (* return true if we waited the full length of time, false if we were woken *) - r = [] - with Pre_signalled -> false - ) - (fun () -> - Mutex.execute x.m (fun () -> - x.pipe_out <- None ; - x.pipe_in <- None ; - List.iter close' !to_close - ) - ) - - let signal (x : t) = - Mutex.execute x.m (fun () -> - match x.pipe_in with - | Some fd -> - ignore (Unix.write fd (Bytes.of_string "X") 0 1) - | None -> - x.signalled <- true - (* If the wait hasn't happened yet then store up the signal *) - ) -end +module Delay = Xapi_stdext_threads.Threadext.Delay type item = {id: int; name: string; fn: unit -> unit} From ca910c218bcc406959edc33d46e42a2827f89d3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 13 May 2024 16:36:05 +0100 Subject: [PATCH 05/13] CP-49499: Introduce 'Timer' and 'Timeout' modules in Unixext. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Absolute deadlines can be represented using `Mtime.t`, or `Mtime_clock.counter` and comparing against a deadline. We hide this as an implementation detail in `Timer.t` and use counters (although an implementation using `Mtime.t` would work just as well). Relative deadlines are represented using `Mtime.Span.t`. We wrap it in `Timeout.t` to avoid accidentally mistaking it for a time duration calculated elsewhere. These helper modules correctly convert from time represented as strings (e.g. in configuration files), and calculate the remaining time until the deadline. `Mtime.Span.t` expects `float`s as nanoseconds, which can be quite error prone when converting existing code if the conversion factor is forgotten. The various `time_limited*` functions will be changed to take a `Timer.t` as argument in a followup commit. Signed-off-by: Edwin Török --- .../xapi-stdext/lib/xapi-stdext-unix/dune | 4 ++ .../lib/xapi-stdext-unix/test/dune | 2 +- .../lib/xapi-stdext-unix/test/unixext_test.ml | 58 ++++++++++++++++++- 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune index 92b77753a86..218233f6dd0 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune @@ -4,6 +4,10 @@ (libraries fd-send-recv integers + fmt + mtime + mtime.clock + mtime.clock.os polly unix xapi-backtrace diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/dune index 407d025a8a8..e1776e81622 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/dune @@ -2,7 +2,7 @@ (name unixext_test) (package xapi-stdext-unix) (modules unixext_test) - (libraries xapi_stdext_unix qcheck-core mtime.clock.os qcheck-core.runner fmt xapi_fd_test mtime threads.posix rresult) + (libraries xapi_stdext_unix qcheck-core mtime.clock.os qcheck-core.runner fmt xapi_fd_test mtime threads.posix rresult clock) ; use fixed seed to avoid causing random failures in CI and package builds (action (run %{test} -v -bt --seed 42)) ) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml index e0f2726f303..547ba30b46a 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml @@ -187,7 +187,63 @@ let test_proxy = expect_string ~expected:write.data ~actual:read.data ; true -let tests = [test_proxy; test_time_limited_write; test_time_limited_read] +let mtime_span_gen = + let open Gen in + (* We can't use Mtime.Span.max_span, because that is too big for float, + only works for int64 conversion, and there is no Mtime.Span.max_span_float. + The documentation says that 2^53 is the maximum though, so use that. + Otherwise we'll fail later when converting to string and back goes through float. + + Use microseconds instead of nanoseconds, because nanoseconds have rounding + errors during conversion. + *) + let+ usval = 0 -- (((1 lsl 53) - 1) / 1_000_000) in + Mtime.Span.(usval * us) + +let test_timeout_string_conv = + let gen = mtime_span_gen and print = Fmt.to_to_string Mtime.Span.pp in + Test.make ~name:__FUNCTION__ ~print gen @@ fun timeout -> + let str = Clock.Timer.span_to_s timeout |> Printf.sprintf "%.6f" in + let timeout' = + str |> Float.of_string |> Clock.Timer.s_to_span |> Option.get + in + if not (Mtime.Span.equal timeout timeout') then + Test.fail_reportf + "timeout not equal after round-trip through %S: %Lu (%a) <> %Lu (%a)" str + (Mtime.Span.to_uint64_ns (timeout :> Mtime.Span.t)) + Mtime.Span.pp timeout + (Mtime.Span.to_uint64_ns (timeout' :> Mtime.Span.t)) + Mtime.Span.pp timeout' ; + true + +let delays = + Gen.oneofa ([|10; 100; 300|] |> Array.map (fun v -> Mtime.Span.(v * ms))) + +let test_delay = + let gen = delays and print = Fmt.to_to_string Mtime.Span.pp in + Test.make ~count:5 ~name:__FUNCTION__ ~print gen @@ fun timeout -> + let counter = Mtime_clock.counter () in + Unixext.delay_span timeout ; + let actual = Mtime_clock.count counter in + let expected_min = + Int64.div (timeout |> Mtime.Span.to_uint64_ns) 2L |> Mtime.Span.of_uint64_ns + and expected_max = Mtime.Span.(2 * timeout) in + if Clock.Timer.span_is_shorter ~than:expected_min actual then + Test.fail_reportf "Actual delay shorter than half expected: %a << %a" + Mtime.Span.pp actual Mtime.Span.pp timeout ; + if Clock.Timer.span_is_longer ~than:expected_max actual then + Test.fail_reportf "Actual delay longer than twice expected: %a >> %a" + Mtime.Span.pp actual Mtime.Span.pp timeout ; + true + +let tests = + [ + test_delay + ; test_timeout_string_conv + ; test_proxy + ; test_time_limited_write + ; test_time_limited_read + ] let () = (* avoid SIGPIPE *) From ff8cef1467999fd035f4000171e95d602b89f447 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 1 Jul 2024 17:19:55 +0100 Subject: [PATCH 06/13] Fix Short/Long duration printing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Edwin Török --- ocaml/xapi/xapi_globs.ml | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index 0c2417bb829..8b899e6d054 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1160,15 +1160,8 @@ let options_of_xapi_globs_spec = string_of_float !x | Int x -> string_of_int !x - | ShortDurationFromSeconds x -> - let literal = - Mtime.Span.to_uint64_ns !x |> fun ns -> - Int64.div ns 1_000_000_000L |> Int64.to_int |> string_of_int - in - Fmt.str "%s (%a)" literal Mtime.Span.pp !x - | LongDurationFromSeconds x -> - let literal = Clock.Timer.span_to_s !x |> string_of_float in - Fmt.str "%s (%a)" literal Mtime.Span.pp !x + | ShortDurationFromSeconds x | LongDurationFromSeconds x -> + Fmt.str "%Luns (%a)" (Mtime.Span.to_uint64_ns !x) Mtime.Span.pp !x ) , Printf.sprintf "Set the value of '%s'" name ) From b4cb43659dc8d9d2cf0bfbf9e833217977e55208 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 10 May 2024 16:44:07 +0100 Subject: [PATCH 07/13] CP-49499: Unixext.time_limited_{read,write}: use Timer.t for deadlines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of Unix.gettimeofday. This is required to start using these functions in Jsonrpclient, which already uses Mtime. Mtime should also be more robust against clock changes on the host. TODO: needs some extra testing of block_device_io, which doesn't many suitable tests. Signed-off-by: Edwin Török --- ocaml/database/block_device_io.ml | 46 ++++--- ocaml/database/db_globs.ml | 15 ++- ocaml/database/dune | 7 + ocaml/database/redo_log.ml | 39 +++--- ocaml/database/redo_log.mli | 2 +- ocaml/libs/http-lib/bufio_test.ml | 11 +- ocaml/libs/http-lib/dune | 1 + .../xapi-stdext/lib/xapi-fd-test/generate.ml | 2 +- .../xapi-stdext/lib/xapi-fd-test/generate.mli | 2 +- .../xapi-stdext/lib/xapi-stdext-threads/dune | 1 + .../lib/xapi-stdext-threads/threadext.ml | 10 +- .../xapi-stdext/lib/xapi-stdext-unix/dune | 1 + .../lib/xapi-stdext-unix/test/unixext_test.ml | 51 ++++--- .../lib/xapi-stdext-unix/unixext.ml | 126 ++++++++++-------- .../lib/xapi-stdext-unix/unixext.mli | 17 ++- ocaml/xapi/xapi_globs.ml | 18 ++- 16 files changed, 210 insertions(+), 139 deletions(-) diff --git a/ocaml/database/block_device_io.ml b/ocaml/database/block_device_io.ml index 7587a34d5d5..d218f30a030 100644 --- a/ocaml/database/block_device_io.ml +++ b/ocaml/database/block_device_io.ml @@ -325,16 +325,20 @@ let listen_on sock = s let accept_conn s latest_response_time = - let now = Unix.gettimeofday () in - let timeout = latest_response_time -. now in - (* Await an incoming connection... *) - let ready_to_read, _, _ = Unix.select [s] [] [] timeout in - R.info "Finished selecting" ; - if List.mem s ready_to_read then - (* We've received a connection. Accept it and return the socket. *) - fst (Unix.accept s) - else (* We must have timed out *) - raise Unixext.Timeout + match Clock.Timer.remaining latest_response_time with + | Expired _ -> + raise Unixext.Timeout + | Remaining timeout -> + (* Await an incoming connection... *) + let ready_to_read, _, _ = + Unix.select [s] [] [] (Clock.Timer.span_to_s timeout) + in + R.info "Finished selecting" ; + if List.mem s ready_to_read then + (* We've received a connection. Accept it and return the socket. *) + fst (Unix.accept s) + else (* We must have timed out *) + raise Unixext.Timeout (* Listen on a given socket. Accept a single connection and transfer all the data from it to dest_fd, or raise Timeout if target_response_time happens first. *) (* Raises NotEnoughSpace if the next write would exceed the available_space. *) @@ -731,6 +735,8 @@ let dump = ref false let empty = ref false +let target_response_time delta = Clock.Timer.start ~duration:delta + let _ = (* Initialise debug logging *) initialise_logging () ; @@ -770,11 +776,12 @@ let _ = (* Open the block device *) let block_dev_fd = open_block_device !block_dev - (Unix.gettimeofday () +. !Db_globs.redo_log_max_startup_time) + (target_response_time !Db_globs.redo_log_max_startup_time) in R.info "Opened block device." ; - let target_response_time = Unix.gettimeofday () +. 3600. in - (* Read the validity byte *) + let target_response_time = + target_response_time !Db_globs.redo_log_max_connect_time + in try let validity = read_validity_byte block_dev_fd target_response_time in Printf.printf "*** Validity byte: [%s]\n" validity ; @@ -841,10 +848,12 @@ let _ = (* Open the block device *) let block_dev_fd = open_block_device !block_dev - (Unix.gettimeofday () +. !Db_globs.redo_log_max_startup_time) + (target_response_time !Db_globs.redo_log_max_startup_time) in R.info "Opened block device." ; - let target_response_time = Unix.gettimeofday () +. 3600. in + let target_response_time = + target_response_time !Db_globs.redo_log_max_connect_time + in initialise_redo_log block_dev_fd target_response_time ; Printf.printf "Block device initialised.\n" ) ; @@ -855,9 +864,8 @@ let _ = let s = listen_on !ctrlsock in (* Main loop: accept a new client, communicate with it until it stops sending commands, repeat. *) while true do - let start_of_startup = Unix.gettimeofday () in let target_startup_response_time = - start_of_startup +. !Db_globs.redo_log_max_startup_time + target_response_time !Db_globs.redo_log_max_startup_time in R.info "Awaiting incoming connections on %s..." !ctrlsock ; let client = accept_conn s target_startup_response_time in @@ -898,11 +906,11 @@ let _ = send_failure client (str ^ "|nack") ("Unknown command " ^ str) ) - , 0. + , Mtime.Span.zero ) in (* "Start the clock!" -- set the latest time by which we need to have responded to the client. *) - let target_response_time = Unix.gettimeofday () +. block_time in + let target_response_time = target_response_time block_time in action_fn block_dev_fd client !datasock target_response_time with (* this must be an exception in Unixext.really_read because action_fn doesn't throw exceptions *) diff --git a/ocaml/database/db_globs.ml b/ocaml/database/db_globs.ml index d4fa5826629..b09d407ded6 100644 --- a/ocaml/database/db_globs.ml +++ b/ocaml/database/db_globs.ml @@ -3,19 +3,22 @@ let redo_log_block_device_io = ref "block_device_io" (** The delay between each attempt to connect to the block device I/O process *) -let redo_log_connect_delay = ref 0.1 +let redo_log_connect_delay = ref Mtime.Span.(100 * ms) (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while emptying *) -let redo_log_max_block_time_empty = ref 2. +let redo_log_max_block_time_empty = ref Mtime.Span.(2 * s) (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while reading *) -let redo_log_max_block_time_read = ref 30. +let redo_log_max_block_time_read = ref Mtime.Span.(30 * s) (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while writing a delta *) -let redo_log_max_block_time_writedelta = ref 2. +let redo_log_max_block_time_writedelta = ref Mtime.Span.(2 * s) (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while writing a database *) -let redo_log_max_block_time_writedb = ref 30. +let redo_log_max_block_time_writedb = ref Mtime.Span.(30 * s) + +(** The maximum amount of time to wait for a connection (used to be hardcoded to 3600s) *) +let redo_log_max_connect_time = ref Mtime.Span.(1 * hour) (** {3 Settings related to the exponential back-off of repeated attempts to reconnect after failure} *) @@ -35,7 +38,7 @@ let redo_log_max_dying_processes = 2 let redo_log_comms_socket_stem = "sock-blkdev-io" (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while initially connecting to it *) -let redo_log_max_startup_time = ref 5. +let redo_log_max_startup_time = ref Mtime.Span.(5 * s) (** The length, in bytes, of one redo log which constitutes half of the VDI *) let redo_log_length_of_half = 60 * 1024 * 1024 diff --git a/ocaml/database/dune b/ocaml/database/dune index 08108ad6c55..db4b431749f 100644 --- a/ocaml/database/dune +++ b/ocaml/database/dune @@ -28,7 +28,11 @@ test_schemas unit_test_marshall unit_test_sql)) (libraries forkexec + fmt gzip + mtime + mtime.clock + mtime.clock.os rpclib.core rpclib.json safe-resources @@ -61,6 +65,9 @@ (modules block_device_io) (libraries dune-build-info + mtime + mtime.clock + mtime.clock.os xapi_database xapi-log xapi-stdext-pervasives diff --git a/ocaml/database/redo_log.ml b/ocaml/database/redo_log.ml index 429646dcce7..506add08701 100644 --- a/ocaml/database/redo_log.ml +++ b/ocaml/database/redo_log.ml @@ -244,9 +244,7 @@ let generation_size = 16 let length_size = 16 -let get_latest_response_time block_time = - let now = Unix.gettimeofday () in - now +. block_time +let get_latest_response_time block_time = Clock.Timer.start ~duration:block_time (* Returns the PID of the process *) let start_io_process block_dev ctrlsockpath datasockpath = @@ -265,26 +263,29 @@ let connect sockpath latest_response_time = Unix.connect s (Unix.ADDR_UNIX sockpath) ; D.debug "Connected to I/O process via socket %s" sockpath ; s - with Unix.Unix_error (a, b, _) -> + with Unix.Unix_error (a, b, _) -> ( (* It's probably the case that the process hasn't started yet. *) (* See if we can afford to wait and try again *) Unix.close s ; let attempt_delay = !Db_globs.redo_log_connect_delay in - let now = Unix.gettimeofday () in - let remaining = latest_response_time -. now in - if attempt_delay < remaining then ( - (* Wait for a while then try again *) - D.debug - "Waiting to connect to I/O process via socket %s (error was %s: \ - %s)..." - sockpath b (Unix.error_message a) ; - D.debug "Remaining time for retries: %f" remaining ; - Thread.delay attempt_delay ; - attempt () - ) else ( - D.debug "%s timeout reached" __FUNCTION__ ; - raise Unixext.Timeout - ) + match + Clock.Timer.remaining + (Clock.Timer.shorten_by attempt_delay latest_response_time) + with + | Remaining remaining -> + (* Wait for a while then try again *) + D.debug + "Waiting to connect to I/O process via socket %s (error was %s: \ + %s)..." + sockpath b (Unix.error_message a) ; + D.debug "Remaining time for retries: %a" Debug.Pp.mtime_span remaining ; + Unixext.delay_span attempt_delay ; + attempt () + | Expired overrun -> + D.debug "%s timeout reached (%a overrun)" __FUNCTION__ + Debug.Pp.mtime_span overrun ; + raise Unixext.Timeout + ) in attempt () diff --git a/ocaml/database/redo_log.mli b/ocaml/database/redo_log.mli index 710612fe9fa..0af623523de 100644 --- a/ocaml/database/redo_log.mli +++ b/ocaml/database/redo_log.mli @@ -99,7 +99,7 @@ type t = represents the write to the field with name [fldname] of a row in table [tblname] with key [objref], overwriting its value with [newval]. *) val apply : - (Generation.t -> Unix.file_descr -> int -> float -> unit) + (Generation.t -> Unix.file_descr -> int -> Clock.Timer.t -> unit) -> (Generation.t -> t -> unit) -> [< `RO | `RW] redo_log -> unit diff --git a/ocaml/libs/http-lib/bufio_test.ml b/ocaml/libs/http-lib/bufio_test.ml index 7937adc73ea..b4184ff8121 100644 --- a/ocaml/libs/http-lib/bufio_test.ml +++ b/ocaml/libs/http-lib/bufio_test.ml @@ -1,7 +1,7 @@ open QCheck2 open Xapi_fd_test -let print_timeout = string_of_float +let print_timeout = Fmt.to_to_string Mtime.Span.pp let expect_string ~expected ~actual = if not (String.equal expected actual) then @@ -20,7 +20,7 @@ let test_buf_io = let timeouts = Generate.timeouts in let gen = Gen.tup2 Generate.t timeouts and print = Print.tup2 Generate.print print_timeout in - Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout) -> + Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout_span) -> let every_bytes = Int.min (Option.map Observations.Delay.every_bytes behaviour.delay_read @@ -38,11 +38,8 @@ let test_buf_io = timeout_span remains the span for the entire function, and timeout the per operation timeout that we'll pass to the function under test. *) - let timeout_span = Mtime.Span.of_float_ns (timeout *. 1e9) |> Option.get in - let timeout = timeout /. float operations in - let timeout_operation_span = - Mtime.Span.of_float_ns (timeout *. 1e9) |> Option.get - in + let timeout = Clock.Timer.span_to_s timeout_span /. float operations in + let timeout_operation_span = Clock.Timer.s_to_span timeout |> Option.get in (* timeout < 1us would get truncated to 0 *) QCheck2.assume (timeout > 1e-6) ; (* Format.eprintf "Testing %s@." (print (behaviour, timeout)); *) diff --git a/ocaml/libs/http-lib/dune b/ocaml/libs/http-lib/dune index ee510d7fc42..27a3f53a9f8 100644 --- a/ocaml/libs/http-lib/dune +++ b/ocaml/libs/http-lib/dune @@ -72,6 +72,7 @@ (modes (best exe)) (modules bufio_test) (libraries + clock fmt mtime mtime.clock diff --git a/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.ml b/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.ml index 96cd2a897e6..23a07961980 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.ml @@ -56,7 +56,7 @@ let total_delays = Gen.oneofa [|0.001; 0.01; 0.1; 0.4|] let span_of_s s = s *. 1e9 |> Mtime.Span.of_float_ns |> Option.get (* keep these short *) -let timeouts = Gen.oneofa [|0.0; 0.001; 0.1; 0.3|] +let timeouts = Gen.oneofa Mtime.Span.[|zero; 1 * ms; 100 * ms; 300 * ms|] let delay_of_size total_delay size = let open Gen in diff --git a/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.mli b/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.mli index 6aba67c7a6d..bb500d46745 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.mli @@ -24,7 +24,7 @@ type t = { ; kind: Unix.file_kind } -val timeouts : float QCheck2.Gen.t +val timeouts : Mtime.Span.t QCheck2.Gen.t (** [timeouts] is a generator for small timeouts *) val make : diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune index f7e9141c3a9..51ebe3898d0 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune @@ -6,6 +6,7 @@ threads.posix unix xapi-stdext-unix + mtime xapi-stdext-pervasives) ) (test diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml index ef30cfb5ba4..28e9ea46433 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml @@ -62,6 +62,13 @@ module Delay = struct exception Pre_signalled let wait (x : t) (seconds : float) = + let max_wait = + match Mtime.Span.of_float_ns (seconds *. 1e9) with + | None -> + invalid_arg (Printf.sprintf "wait %g" seconds) + | Some max_wait -> + max_wait + in let finally = Xapi_stdext_pervasives.Pervasiveext.finally in let to_close = ref [] in let close' fd = @@ -90,7 +97,8 @@ module Delay = struct (* flush the single byte from the pipe *) try let (_ : string) = - time_limited_single_read pipe_out 1 ~max_wait:seconds + time_limited_single_read pipe_out 1 + (Clock.Timer.start ~duration:max_wait) in false with Timeout -> true diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune index 218233f6dd0..9f675e9a03e 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune @@ -2,6 +2,7 @@ (name xapi_stdext_unix) (public_name xapi-stdext-unix) (libraries + (re_export clock) fd-send-recv integers fmt diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml index 547ba30b46a..0628bd2dc4e 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml @@ -39,9 +39,12 @@ let pp_pair = ) *) +let print_timeout = Fmt.to_to_string Mtime.Span.pp + let test_time_limited_write = - let gen = Gen.tup2 Generate.t Generate.timeouts - and print = Print.tup2 Generate.print Print.float in + let timeouts = Generate.timeouts in + let gen = Gen.tup2 Generate.t timeouts + and print = Print.tup2 Generate.print print_timeout in Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout) -> skip_blk behaviour.kind ; skip_dirlnk behaviour.kind ; @@ -53,7 +56,7 @@ let test_time_limited_write = let fd = Xapi_fdcaps.Operations.For_test.unsafe_fd_exn wrapped_fd in Unix.set_nonblock fd ; let dt = Mtime_clock.counter () in - let deadline = Unix.gettimeofday () +. timeout in + let deadline = Clock.Timer.start ~duration:timeout in let finally () = test_elapsed := Mtime_clock.count dt in Fun.protect ~finally (fun () -> Unixext.time_limited_write_substring fd len buf deadline @@ -64,11 +67,14 @@ let test_time_limited_write = let observations, result = Generate.run_wo behaviour ~f:test in let () = let open Observations in - let elapsed_s = Mtime.Span.to_float_ns !test_elapsed *. 1e-9 in - if elapsed_s > timeout +. 0.5 then + let elapsed = !test_elapsed in + let timeout_extra = + Mtime.Span.(add (timeout :> Mtime.Span.t) @@ (500 * ms)) + in + if Mtime.Span.compare elapsed timeout_extra > 0 then Test.fail_reportf - "Function duration significantly exceeds timeout: %f > %f; %s" - elapsed_s timeout + "Function duration significantly exceeds timeout: %a > %a; %s" + Mtime.Span.pp elapsed Mtime.Span.pp timeout (Fmt.to_to_string Fmt.(option pp) observations.Observations.read) ; match (observations, result) with | {read= Some read; _}, Ok expected -> @@ -76,10 +82,10 @@ let test_time_limited_write = expect_amount ~expected:(String.length expected) read ; expect_string ~expected ~actual:read.data | {read= Some read; _}, Error (`Exn_trap (Unixext.Timeout, _)) -> - let elapsed_s = Mtime.Span.to_float_ns !test_elapsed *. 1e-9 in - if elapsed_s < timeout then - Test.fail_reportf "Timed out earlier than requested: %f < %f" - elapsed_s timeout ; + let elapsed = !test_elapsed in + if Mtime.Span.compare elapsed timeout < 0 then + Test.fail_reportf "Timed out earlier than requested: %a < %a" + Mtime.Span.pp elapsed Mtime.Span.pp timeout ; let actual = String.length read.data in if actual >= behaviour.size then Test.fail_reportf "Timed out, but transferred enough data: %d >= %d" @@ -103,7 +109,7 @@ let test_time_limited_write = let test_time_limited_read = let gen = Gen.tup2 Generate.t Generate.timeouts - and print = Print.tup2 Generate.print Print.float in + and print = Print.tup2 Generate.print print_timeout in Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout) -> (* Format.eprintf "Testing %s@." (print (behaviour, timeout)); *) skip_blk behaviour.kind ; @@ -113,7 +119,7 @@ let test_time_limited_read = let fd = Xapi_fdcaps.Operations.For_test.unsafe_fd_exn wrapped_fd in Unix.set_nonblock fd ; let dt = Mtime_clock.counter () in - let deadline = Unix.gettimeofday () +. timeout in + let deadline = Clock.Timer.start ~duration:timeout in let finally () = test_elapsed := Mtime_clock.count dt in Fun.protect ~finally (fun () -> Unixext.time_limited_read fd behaviour.size deadline @@ -126,11 +132,14 @@ let test_time_limited_read = in let () = let open Observations in - let elapsed_s = Mtime.Span.to_float_ns !test_elapsed *. 1e-9 in - if elapsed_s > timeout +. 0.5 then + let elapsed = !test_elapsed in + let timeout_extra = + Mtime.Span.(add (timeout :> Mtime.Span.t) @@ (500 * ms)) + in + if Mtime.Span.compare elapsed timeout_extra > 0 then Test.fail_reportf - "Function duration significantly exceeds timeout: %f > %f; %s" elapsed_s - timeout + "Function duration significantly exceeds timeout: %a > %a; %s" + Mtime.Span.pp elapsed Mtime.Span.pp timeout (Fmt.to_to_string Fmt.(option pp) observations.Observations.write) ; (* Format.eprintf "Result: %a@." (Fmt.option Observations.pp) observations.write;*) match (observations, result) with @@ -138,10 +147,10 @@ let test_time_limited_read = expect_amount ~expected:(String.length actual) write ; expect_string ~expected:write.data ~actual | {write= Some _; _}, Error (`Exn_trap (Unixext.Timeout, _)) -> - let elapsed_s = Mtime.Span.to_float_ns !test_elapsed *. 1e-9 in - if elapsed_s < timeout then - Test.fail_reportf "Timed out earlier than requested: %f < %f" - elapsed_s timeout + let elapsed = !test_elapsed in + if Mtime.Span.compare elapsed timeout < 0 then + Test.fail_reportf "Timed out earlier than requested: %a < %a" + Mtime.Span.pp elapsed Mtime.Span.pp timeout | ( {write= Some write; _} , Error (`Exn_trap (Unix.Unix_error (Unix.EPIPE, _, _), _)) ) -> if String.length write.data = behaviour.size then diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml index b033162d332..05b6bdf5954 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml @@ -547,7 +547,17 @@ let really_read_string fd length = exception Timeout -let to_milliseconds ms = ms *. 1000. |> ceil |> int_of_float +(* Need to round up when converting to milliseconds: + The span will have the amount of time already elapsed subtracted, so if we are asked to wait + for 1ms, the span will be slightly smaller than that. + If we truncate then all waits of 1ms will be 0ms instead, which would be wrong, as we wouldn't wait at all. + If we round to nearest rather than up, then waits of 1ms will be ~0.5ms instead, since values below 0.5ms will get + rounded to 0. + Rounding up works correctly, and makes the tests pass. +*) +let to_milliseconds span = + Int64.div (Int64.add 999_999L @@ Mtime.Span.to_uint64_ns span) 1_000_000L + |> Int64.to_int (* Allocating a new polly and waiting like this results in at least 3 syscalls. An alternative for sockets would be to use [setsockopt], @@ -570,19 +580,17 @@ let with_polly_wait kind fd f = and check the timeout after each chunk. select() would've silently succeeded here, whereas epoll() is stricted and returns EPERM *) - let wait remaining_time = if remaining_time < 0. then raise Timeout in - f wait fd + f (fun _ -> true) fd | S_CHR | S_FIFO | S_SOCK -> with_polly @@ fun polly -> Polly.add polly fd kind ; let wait remaining_time = let milliseconds = to_milliseconds remaining_time in - if milliseconds <= 0 then raise Timeout ; let ready = Polly.wait polly 1 milliseconds @@ fun _ event_on_fd _ -> assert (event_on_fd = fd) in - if ready = 0 then raise Timeout + ready > 0 in f wait fd @@ -595,22 +603,23 @@ let time_limited_write_internal with_polly_wait Polly.Events.out filedesc @@ fun wait filedesc -> let total_bytes_to_write = length in let bytes_written = ref 0 in - let now = ref (Unix.gettimeofday ()) in - while !bytes_written < total_bytes_to_write && !now < target_response_time do - let remaining_time = target_response_time -. !now in - wait remaining_time ; - let bytes_to_write = total_bytes_to_write - !bytes_written in - let bytes = - try write filedesc data !bytes_written bytes_to_write - with - | Unix.Unix_error (Unix.EAGAIN, _, _) - | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) - -> - 0 - in - (* write from buffer=data from offset=bytes_written, length=bytes_to_write *) - bytes_written := bytes + !bytes_written ; - now := Unix.gettimeofday () + while !bytes_written < total_bytes_to_write do + match Clock.Timer.remaining target_response_time with + | Expired _ -> + raise Timeout + | Remaining remaining_time -> + if not (wait remaining_time) then raise Timeout ; + let bytes_to_write = total_bytes_to_write - !bytes_written in + let bytes = + try write filedesc data !bytes_written bytes_to_write + with + | Unix.Unix_error (Unix.EAGAIN, _, _) + | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) + -> + 0 + in + (* write from buffer=data from offset=bytes_written, length=bytes_to_write *) + bytes_written := bytes + !bytes_written done ; if !bytes_written = total_bytes_to_write then () @@ -633,44 +642,57 @@ let time_limited_read filedesc length target_response_time = let total_bytes_to_read = length in let bytes_read = ref 0 in let buf = Bytes.make total_bytes_to_read '\000' in - let now = ref (Unix.gettimeofday ()) in - while !bytes_read < total_bytes_to_read && !now < target_response_time do - let remaining_time = target_response_time -. !now in - wait remaining_time ; - let bytes_to_read = total_bytes_to_read - !bytes_read in - let bytes = - try Unix.read filedesc buf !bytes_read bytes_to_read - with - | Unix.Unix_error (Unix.EAGAIN, _, _) - | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) - -> - 0 - in - (* read into buffer=buf from offset=bytes_read, length=bytes_to_read *) - if bytes = 0 then - raise End_of_file (* End of file has been reached *) - else - bytes_read := bytes + !bytes_read ; - now := Unix.gettimeofday () + while !bytes_read < total_bytes_to_read do + match Clock.Timer.remaining target_response_time with + | Expired _ -> + raise Timeout + | Remaining remaining_time -> + if not (wait remaining_time) then raise Timeout ; + let bytes_to_read = total_bytes_to_read - !bytes_read in + let bytes = + try Unix.read filedesc buf !bytes_read bytes_to_read + with + | Unix.Unix_error (Unix.EAGAIN, _, _) + | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) + -> + 0 + in + (* read into buffer=buf from offset=bytes_read, length=bytes_to_read *) + if bytes = 0 then + raise End_of_file (* End of file has been reached *) + else + bytes_read := bytes + !bytes_read done ; if !bytes_read = total_bytes_to_read then Bytes.unsafe_to_string buf else (* we ran out of time *) raise Timeout -let time_limited_single_read filedesc length ~max_wait = +let wait_timed_read filedesc span = + with_polly_wait Polly.Events.inp filedesc @@ fun wait _filedesc -> wait span + +let time_limited_single_read filedesc length target_response_time = let buf = Bytes.make length '\000' in - with_polly_wait Polly.Events.inp filedesc @@ fun wait filedesc -> - wait max_wait ; - let bytes = - try Unix.read filedesc buf 0 length - with - | Unix.Unix_error (Unix.EAGAIN, _, _) - | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) - -> - 0 - in - Bytes.sub_string buf 0 bytes + match Clock.Timer.remaining target_response_time with + | Expired _ -> + raise Timeout + | Remaining remaining_time -> + if not (wait_timed_read filedesc remaining_time) then raise Timeout ; + let bytes = + try Unix.read filedesc buf 0 length + with + | Unix.Unix_error (Unix.EAGAIN, _, _) + | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) + -> + 0 + in + Bytes.sub_string buf 0 bytes + +let wait_timed_read filedesc target_response_time = + with_polly_wait Polly.Events.inp filedesc @@ fun wait _fd -> + wait target_response_time + +let delay_span span = span |> Clock.Timer.span_to_s |> Unix.sleepf (* --------------------------------------------------------------------------------------- *) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli index b2e76069aef..c0678733a42 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli @@ -148,15 +148,22 @@ exception Timeout val with_socket_timeout : Unix.file_descr -> float option -> (unit -> 'a) -> 'a -val time_limited_write : Unix.file_descr -> int -> bytes -> float -> unit +val time_limited_write : + Unix.file_descr -> int -> bytes -> Clock.Timer.t -> unit val time_limited_write_substring : - Unix.file_descr -> int -> string -> float -> unit + Unix.file_descr -> int -> string -> Clock.Timer.t -> unit -val time_limited_read : Unix.file_descr -> int -> float -> string +val time_limited_read : Unix.file_descr -> int -> Clock.Timer.t -> string -val time_limited_single_read : - Unix.file_descr -> int -> max_wait:float -> string +val time_limited_single_read : Unix.file_descr -> int -> Clock.Timer.t -> string + +val delay_span : Mtime.Span.t -> unit + +val wait_timed_read : Unix.file_descr -> Mtime.Span.t -> bool +(** [wait_timed_read fd span] waits at most [span] seconds until [fd] contains at least one byte to read, + or and EOF. Returns [true] if the [fd] is ready, or [false] if the timeout has expired. + See {!Thread.wait_timed_read}. *) val read_data_in_string_chunks : (string -> int -> unit) diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index 8b899e6d054..82b062eb2da 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1093,17 +1093,23 @@ let xapi_globs_spec = , Float Db_globs.permanent_master_failure_retry_interval ) ; ( "redo_log_max_block_time_empty" - , Float Db_globs.redo_log_max_block_time_empty + , ShortDurationFromSeconds Db_globs.redo_log_max_block_time_empty + ) + ; ( "redo_log_max_block_time_read" + , ShortDurationFromSeconds Db_globs.redo_log_max_block_time_read ) - ; ("redo_log_max_block_time_read", Float Db_globs.redo_log_max_block_time_read) ; ( "redo_log_max_block_time_writedelta" - , Float Db_globs.redo_log_max_block_time_writedelta + , ShortDurationFromSeconds Db_globs.redo_log_max_block_time_writedelta ) ; ( "redo_log_max_block_time_writedb" - , Float Db_globs.redo_log_max_block_time_writedb + , ShortDurationFromSeconds Db_globs.redo_log_max_block_time_writedb + ) + ; ( "redo_log_max_startup_time" + , ShortDurationFromSeconds Db_globs.redo_log_max_startup_time + ) + ; ( "redo_log_connect_delay" + , ShortDurationFromSeconds Db_globs.redo_log_connect_delay ) - ; ("redo_log_max_startup_time", Float Db_globs.redo_log_max_startup_time) - ; ("redo_log_connect_delay", Float Db_globs.redo_log_connect_delay) ; ("default-vbd3-polling-duration", Int default_vbd3_polling_duration) ; ( "default-vbd3-polling-idle-threshold" , Int default_vbd3_polling_idle_threshold From bb7ee94fafbaf4492fc1b1c864f354b743b4a814 Mon Sep 17 00:00:00 2001 From: Steven Woods Date: Wed, 4 Jan 2023 11:20:14 +0000 Subject: [PATCH 08/13] CP-32622: Replace select with polly in xn Signed-off-by: Steven Woods --- ocaml/xenopsd/cli/dune | 1 + ocaml/xenopsd/cli/xn.ml | 53 +++++++++++++++++++++++++---------------- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/ocaml/xenopsd/cli/dune b/ocaml/xenopsd/cli/dune index b194b10323c..aacf70d07ca 100644 --- a/ocaml/xenopsd/cli/dune +++ b/ocaml/xenopsd/cli/dune @@ -9,6 +9,7 @@ astring cmdliner dune-build-info + polly re result rpclib.core diff --git a/ocaml/xenopsd/cli/xn.ml b/ocaml/xenopsd/cli/xn.ml index 5ac6100669c..186fb47db13 100644 --- a/ocaml/xenopsd/cli/xn.ml +++ b/ocaml/xenopsd/cli/xn.ml @@ -1009,27 +1009,38 @@ let raw_console_proxy sockaddr = ) else if !final then finished := true else - let r, _, _ = Unix.select [Unix.stdin; fd] [] [] (-1.) in - if List.mem Unix.stdin r then ( - let b = - Unix.read Unix.stdin buf_remote !buf_remote_end - (block - !buf_remote_end) - in - let i = ref !buf_remote_end in - while - !i < !buf_remote_end + b - && Char.code (Bytes.get buf_remote !i) <> 0x1d - do - incr i - done ; - if !i < !buf_remote_end + b then final := true ; - buf_remote_end := !i - ) ; - if List.mem fd r then - let b = - Unix.read fd buf_local !buf_local_end (block - !buf_local_end) - in - buf_local_end := !buf_local_end + b + let epoll = Polly.create () in + List.iter + (fun fd -> Polly.add epoll fd Polly.Events.inp) + [Unix.stdin; fd] ; + Fun.protect + ~finally:(fun () -> Polly.close epoll) + (fun () -> + ignore + @@ Polly.wait epoll 2 (-1) (fun _ file_desc _ -> + if Unix.stdin = file_desc then ( + let b = + Unix.read Unix.stdin buf_remote !buf_remote_end + (block - !buf_remote_end) + in + let i = ref !buf_remote_end in + while + !i < !buf_remote_end + b + && Char.code (Bytes.get buf_remote !i) <> 0x1d + do + incr i + done ; + if !i < !buf_remote_end + b then final := true ; + buf_remote_end := !i + ) ; + if fd = file_desc then + let b = + Unix.read fd buf_local !buf_local_end + (block - !buf_local_end) + in + buf_local_end := !buf_local_end + b + ) + ) done in let delay = ref 0.1 in From 2a70dc9f7a16bff8d1464c67d51a45824583d16c Mon Sep 17 00:00:00 2001 From: Steven Woods Date: Wed, 4 Jan 2023 11:21:19 +0000 Subject: [PATCH 09/13] CP-32622: Replace select with polly in xsh Note: Unixext.proxy is more efficient and doesn't recreate the epoll instance many times. But xsh is not performance critical. Signed-off-by: Steven Woods --- ocaml/xsh/dune | 1 + ocaml/xsh/xsh.ml | 25 +++++++++++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/ocaml/xsh/dune b/ocaml/xsh/dune index 13fc1e74c46..541b4f2d20a 100644 --- a/ocaml/xsh/dune +++ b/ocaml/xsh/dune @@ -5,6 +5,7 @@ (package xapi) (libraries dune-build-info + polly stunnel safe-resources xapi-consts diff --git a/ocaml/xsh/xsh.ml b/ocaml/xsh/xsh.ml index 982ff6c346f..990e61a1113 100644 --- a/ocaml/xsh/xsh.ml +++ b/ocaml/xsh/xsh.ml @@ -60,12 +60,25 @@ let proxy (ain : Unix.file_descr) (aout : Unix.file_descr) (bin : Unixfd.t) (if can_write a' then [bout] else []) @ if can_write b' then [aout] else [] in - let r, w, _ = Unix.select r w [] (-1.0) in - (* Do the writing before the reading *) - List.iter - (fun fd -> if aout = fd then write_from b' a' else write_from a' b') - w ; - List.iter (fun fd -> if ain = fd then read_into a' else read_into b') r + let epoll = Polly.create () in + List.iter (fun fd -> Polly.add epoll fd Polly.Events.inp) r ; + List.iter (fun fd -> Polly.add epoll fd Polly.Events.out) w ; + Fun.protect + ~finally:(fun () -> Polly.close epoll) + (fun () -> + ignore + @@ Polly.wait epoll 4 (-1) (fun _ fd _ -> + (* Note: only one fd is handled *) + if aout = fd then + write_from b' a' + else if bout = fd then + write_from a' b' + else if ain = fd then + read_into a' + else + read_into b' + ) + ) done with _ -> ( (try Unix.clear_nonblock ain with _ -> ()) ; From 58bac7e9536e04d0a26f18ed2dad3d699d653f6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 10 Jun 2024 17:21:45 +0100 Subject: [PATCH 10/13] fix(xapi-idl): replace another Thread.wait_timed_read MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Edwin Török --- ocaml/xapi-idl/lib_test/dune | 1 + ocaml/xapi-idl/lib_test/scheduler_test.ml | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/ocaml/xapi-idl/lib_test/dune b/ocaml/xapi-idl/lib_test/dune index 57c8c95e592..bb5a05933d5 100644 --- a/ocaml/xapi-idl/lib_test/dune +++ b/ocaml/xapi-idl/lib_test/dune @@ -35,6 +35,7 @@ rpclib.xml test_lib threads.posix + xapi-stdext-unix xapi-idl xapi-idl.cluster xapi-idl.rrd diff --git a/ocaml/xapi-idl/lib_test/scheduler_test.ml b/ocaml/xapi-idl/lib_test/scheduler_test.ml index 640ae938862..39a58f06b9a 100644 --- a/ocaml/xapi-idl/lib_test/scheduler_test.ml +++ b/ocaml/xapi-idl/lib_test/scheduler_test.ml @@ -25,7 +25,8 @@ let test_delay_cancel () = let elapsed = after -. before in assert_bool "elapsed_time1" (elapsed < 0.4) -let timed_wait_callback ~msg ?(time_min = 0.) ?(eps = 0.1) ?(time_max = 60.) f = +let timed_wait_callback ~msg ?(time_min = 0.) ?(eps = 0.1) + ?(time_max = Mtime.Span.(60 * s)) f = let rd, wr = Unix.pipe () in let finally () = Unix.close rd ; Unix.close wr in Fun.protect ~finally (fun () -> @@ -37,13 +38,13 @@ let timed_wait_callback ~msg ?(time_min = 0.) ?(eps = 0.1) ?(time_max = 60.) f = () in f callback ; - let ready = Thread.wait_timed_read rd time_max in + let ready = Xapi_stdext_unix.Unixext.wait_timed_read rd time_max in match (ready, !after) with | true, None -> Alcotest.fail "pipe ready to read, but after is not set" | false, None -> - Alcotest.fail - (Printf.sprintf "%s: callback not invoked within %gs" msg time_max) + Alcotest.failf "%s: callback not invoked within %a" msg Mtime.Span.pp + time_max | _, Some t -> let actual_minimum = min (t -. before) time_min in Alcotest.(check (float eps)) From fc821da591df188d407a27b95a3ccd12e12d1162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 10 Jun 2024 17:19:05 +0100 Subject: [PATCH 11/13] fix(xapi-idl): replace PipeDelay with Delay, avoid another Thread.wait_timed_read MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Edwin Török --- ocaml/xapi-idl/lib/scheduler.ml | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/ocaml/xapi-idl/lib/scheduler.ml b/ocaml/xapi-idl/lib/scheduler.ml index 407120c9fc6..d4d5c7c5cca 100644 --- a/ocaml/xapi-idl/lib/scheduler.ml +++ b/ocaml/xapi-idl/lib/scheduler.ml @@ -18,33 +18,7 @@ open D let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute -module PipeDelay = struct - (* Concrete type is the ends of a pipe *) - type t = { - (* A pipe is used to wake up a thread blocked in wait: *) - pipe_out: Unix.file_descr - ; pipe_in: Unix.file_descr - } - - let make () = - let pipe_out, pipe_in = Unix.pipe () in - {pipe_out; pipe_in} - - let wait (x : t) (seconds : float) = - let timeout = if seconds < 0.0 then 0.0 else seconds in - if Thread.wait_timed_read x.pipe_out timeout then - (* flush the single byte from the pipe *) - let (_ : int) = Unix.read x.pipe_out (Bytes.create 1) 0 1 in - (* return false if we were woken *) - false - else - (* return true if we waited the full length of time, false if we were woken *) - true - - let signal (x : t) = - let (_ : int) = Unix.write x.pipe_in (Bytes.of_string "X") 0 1 in - () -end +module PipeDelay = Xapi_stdext_threads.Threadext.Delay type handle = Mtime.span * int From 198cd592391a368bd075f3f7dd7102f258fc212c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 10 Jun 2024 17:16:58 +0100 Subject: [PATCH 12/13] fix(master_connection): avoid Thread.wait_timed_read, just as bad as Unix.select MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Edwin Török --- ocaml/database/master_connection.ml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ocaml/database/master_connection.ml b/ocaml/database/master_connection.ml index 2547ae53182..d6e9e228c64 100644 --- a/ocaml/database/master_connection.ml +++ b/ocaml/database/master_connection.ml @@ -171,7 +171,11 @@ let open_secure_connection () = ~write_to_log:(fun x -> debug "stunnel: %s\n" x) ~verify_cert host port @@ fun st_proc -> - let fd_closed = Thread.wait_timed_read Unixfd.(!(st_proc.Stunnel.fd)) 5. in + let fd_closed = + Xapi_stdext_unix.Unixext.wait_timed_read + Unixfd.(!(st_proc.Stunnel.fd)) + Mtime.Span.(5 * s) + in let proc_quit = try Unix.kill (Stunnel.getpid st_proc.Stunnel.pid) 0 ; From ef13b446f17c0bda3cfc65579da77a8571f92f49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Wed, 3 Jul 2024 17:48:38 +0100 Subject: [PATCH 13/13] forkexecd: do not clip commandline in logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If we want to reproduce a failure we need to know the exact commandline that was used. Longer than 80 chars is not a problem, this is a logfile, and a truncated line is worse than a long line. Signed-off-by: Edwin Török --- ocaml/forkexecd/src/child.ml | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/ocaml/forkexecd/src/child.ml b/ocaml/forkexecd/src/child.ml index ef4ad887f31..e800e8bf95f 100644 --- a/ocaml/forkexecd/src/child.ml +++ b/ocaml/forkexecd/src/child.ml @@ -94,14 +94,7 @@ let handle_comms comms_sock fd_sock state = let log_failure args child_pid reason = (* The commandline might be too long to clip it *) let cmdline = String.concat " " args in - let limit = 80 - 3 in - let cmdline' = - if String.length cmdline > limit then - String.sub cmdline 0 limit ^ "..." - else - cmdline - in - Fe_debug.error "%d (%s) %s" child_pid cmdline' reason + Fe_debug.error "%d (%s) %s" child_pid cmdline reason let report_child_exit comms_sock args child_pid status = let module Unixext = Xapi_stdext_unix.Unixext in