diff --git a/ocaml/database/block_device_io.ml b/ocaml/database/block_device_io.ml index 7587a34d5d5..d218f30a030 100644 --- a/ocaml/database/block_device_io.ml +++ b/ocaml/database/block_device_io.ml @@ -325,16 +325,20 @@ let listen_on sock = s let accept_conn s latest_response_time = - let now = Unix.gettimeofday () in - let timeout = latest_response_time -. now in - (* Await an incoming connection... *) - let ready_to_read, _, _ = Unix.select [s] [] [] timeout in - R.info "Finished selecting" ; - if List.mem s ready_to_read then - (* We've received a connection. Accept it and return the socket. *) - fst (Unix.accept s) - else (* We must have timed out *) - raise Unixext.Timeout + match Clock.Timer.remaining latest_response_time with + | Expired _ -> + raise Unixext.Timeout + | Remaining timeout -> + (* Await an incoming connection... *) + let ready_to_read, _, _ = + Unix.select [s] [] [] (Clock.Timer.span_to_s timeout) + in + R.info "Finished selecting" ; + if List.mem s ready_to_read then + (* We've received a connection. Accept it and return the socket. *) + fst (Unix.accept s) + else (* We must have timed out *) + raise Unixext.Timeout (* Listen on a given socket. Accept a single connection and transfer all the data from it to dest_fd, or raise Timeout if target_response_time happens first. *) (* Raises NotEnoughSpace if the next write would exceed the available_space. *) @@ -731,6 +735,8 @@ let dump = ref false let empty = ref false +let target_response_time delta = Clock.Timer.start ~duration:delta + let _ = (* Initialise debug logging *) initialise_logging () ; @@ -770,11 +776,12 @@ let _ = (* Open the block device *) let block_dev_fd = open_block_device !block_dev - (Unix.gettimeofday () +. !Db_globs.redo_log_max_startup_time) + (target_response_time !Db_globs.redo_log_max_startup_time) in R.info "Opened block device." ; - let target_response_time = Unix.gettimeofday () +. 3600. in - (* Read the validity byte *) + let target_response_time = + target_response_time !Db_globs.redo_log_max_connect_time + in try let validity = read_validity_byte block_dev_fd target_response_time in Printf.printf "*** Validity byte: [%s]\n" validity ; @@ -841,10 +848,12 @@ let _ = (* Open the block device *) let block_dev_fd = open_block_device !block_dev - (Unix.gettimeofday () +. !Db_globs.redo_log_max_startup_time) + (target_response_time !Db_globs.redo_log_max_startup_time) in R.info "Opened block device." ; - let target_response_time = Unix.gettimeofday () +. 3600. in + let target_response_time = + target_response_time !Db_globs.redo_log_max_connect_time + in initialise_redo_log block_dev_fd target_response_time ; Printf.printf "Block device initialised.\n" ) ; @@ -855,9 +864,8 @@ let _ = let s = listen_on !ctrlsock in (* Main loop: accept a new client, communicate with it until it stops sending commands, repeat. *) while true do - let start_of_startup = Unix.gettimeofday () in let target_startup_response_time = - start_of_startup +. !Db_globs.redo_log_max_startup_time + target_response_time !Db_globs.redo_log_max_startup_time in R.info "Awaiting incoming connections on %s..." !ctrlsock ; let client = accept_conn s target_startup_response_time in @@ -898,11 +906,11 @@ let _ = send_failure client (str ^ "|nack") ("Unknown command " ^ str) ) - , 0. + , Mtime.Span.zero ) in (* "Start the clock!" -- set the latest time by which we need to have responded to the client. *) - let target_response_time = Unix.gettimeofday () +. block_time in + let target_response_time = target_response_time block_time in action_fn block_dev_fd client !datasock target_response_time with (* this must be an exception in Unixext.really_read because action_fn doesn't throw exceptions *) diff --git a/ocaml/database/db_globs.ml b/ocaml/database/db_globs.ml index d4fa5826629..b09d407ded6 100644 --- a/ocaml/database/db_globs.ml +++ b/ocaml/database/db_globs.ml @@ -3,19 +3,22 @@ let redo_log_block_device_io = ref "block_device_io" (** The delay between each attempt to connect to the block device I/O process *) -let redo_log_connect_delay = ref 0.1 +let redo_log_connect_delay = ref Mtime.Span.(100 * ms) (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while emptying *) -let redo_log_max_block_time_empty = ref 2. +let redo_log_max_block_time_empty = ref Mtime.Span.(2 * s) (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while reading *) -let redo_log_max_block_time_read = ref 30. +let redo_log_max_block_time_read = ref Mtime.Span.(30 * s) (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while writing a delta *) -let redo_log_max_block_time_writedelta = ref 2. +let redo_log_max_block_time_writedelta = ref Mtime.Span.(2 * s) (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while writing a database *) -let redo_log_max_block_time_writedb = ref 30. +let redo_log_max_block_time_writedb = ref Mtime.Span.(30 * s) + +(** The maximum amount of time to wait for a connection (used to be hardcoded to 3600s) *) +let redo_log_max_connect_time = ref Mtime.Span.(1 * hour) (** {3 Settings related to the exponential back-off of repeated attempts to reconnect after failure} *) @@ -35,7 +38,7 @@ let redo_log_max_dying_processes = 2 let redo_log_comms_socket_stem = "sock-blkdev-io" (** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while initially connecting to it *) -let redo_log_max_startup_time = ref 5. +let redo_log_max_startup_time = ref Mtime.Span.(5 * s) (** The length, in bytes, of one redo log which constitutes half of the VDI *) let redo_log_length_of_half = 60 * 1024 * 1024 diff --git a/ocaml/database/dune b/ocaml/database/dune index 08108ad6c55..db4b431749f 100644 --- a/ocaml/database/dune +++ b/ocaml/database/dune @@ -28,7 +28,11 @@ test_schemas unit_test_marshall unit_test_sql)) (libraries forkexec + fmt gzip + mtime + mtime.clock + mtime.clock.os rpclib.core rpclib.json safe-resources @@ -61,6 +65,9 @@ (modules block_device_io) (libraries dune-build-info + mtime + mtime.clock + mtime.clock.os xapi_database xapi-log xapi-stdext-pervasives diff --git a/ocaml/database/master_connection.ml b/ocaml/database/master_connection.ml index 2547ae53182..d6e9e228c64 100644 --- a/ocaml/database/master_connection.ml +++ b/ocaml/database/master_connection.ml @@ -171,7 +171,11 @@ let open_secure_connection () = ~write_to_log:(fun x -> debug "stunnel: %s\n" x) ~verify_cert host port @@ fun st_proc -> - let fd_closed = Thread.wait_timed_read Unixfd.(!(st_proc.Stunnel.fd)) 5. in + let fd_closed = + Xapi_stdext_unix.Unixext.wait_timed_read + Unixfd.(!(st_proc.Stunnel.fd)) + Mtime.Span.(5 * s) + in let proc_quit = try Unix.kill (Stunnel.getpid st_proc.Stunnel.pid) 0 ; diff --git a/ocaml/database/redo_log.ml b/ocaml/database/redo_log.ml index 429646dcce7..506add08701 100644 --- a/ocaml/database/redo_log.ml +++ b/ocaml/database/redo_log.ml @@ -244,9 +244,7 @@ let generation_size = 16 let length_size = 16 -let get_latest_response_time block_time = - let now = Unix.gettimeofday () in - now +. block_time +let get_latest_response_time block_time = Clock.Timer.start ~duration:block_time (* Returns the PID of the process *) let start_io_process block_dev ctrlsockpath datasockpath = @@ -265,26 +263,29 @@ let connect sockpath latest_response_time = Unix.connect s (Unix.ADDR_UNIX sockpath) ; D.debug "Connected to I/O process via socket %s" sockpath ; s - with Unix.Unix_error (a, b, _) -> + with Unix.Unix_error (a, b, _) -> ( (* It's probably the case that the process hasn't started yet. *) (* See if we can afford to wait and try again *) Unix.close s ; let attempt_delay = !Db_globs.redo_log_connect_delay in - let now = Unix.gettimeofday () in - let remaining = latest_response_time -. now in - if attempt_delay < remaining then ( - (* Wait for a while then try again *) - D.debug - "Waiting to connect to I/O process via socket %s (error was %s: \ - %s)..." - sockpath b (Unix.error_message a) ; - D.debug "Remaining time for retries: %f" remaining ; - Thread.delay attempt_delay ; - attempt () - ) else ( - D.debug "%s timeout reached" __FUNCTION__ ; - raise Unixext.Timeout - ) + match + Clock.Timer.remaining + (Clock.Timer.shorten_by attempt_delay latest_response_time) + with + | Remaining remaining -> + (* Wait for a while then try again *) + D.debug + "Waiting to connect to I/O process via socket %s (error was %s: \ + %s)..." + sockpath b (Unix.error_message a) ; + D.debug "Remaining time for retries: %a" Debug.Pp.mtime_span remaining ; + Unixext.delay_span attempt_delay ; + attempt () + | Expired overrun -> + D.debug "%s timeout reached (%a overrun)" __FUNCTION__ + Debug.Pp.mtime_span overrun ; + raise Unixext.Timeout + ) in attempt () diff --git a/ocaml/database/redo_log.mli b/ocaml/database/redo_log.mli index 710612fe9fa..0af623523de 100644 --- a/ocaml/database/redo_log.mli +++ b/ocaml/database/redo_log.mli @@ -99,7 +99,7 @@ type t = represents the write to the field with name [fldname] of a row in table [tblname] with key [objref], overwriting its value with [newval]. *) val apply : - (Generation.t -> Unix.file_descr -> int -> float -> unit) + (Generation.t -> Unix.file_descr -> int -> Clock.Timer.t -> unit) -> (Generation.t -> t -> unit) -> [< `RO | `RW] redo_log -> unit diff --git a/ocaml/forkexecd/src/child.ml b/ocaml/forkexecd/src/child.ml index ef4ad887f31..e800e8bf95f 100644 --- a/ocaml/forkexecd/src/child.ml +++ b/ocaml/forkexecd/src/child.ml @@ -94,14 +94,7 @@ let handle_comms comms_sock fd_sock state = let log_failure args child_pid reason = (* The commandline might be too long to clip it *) let cmdline = String.concat " " args in - let limit = 80 - 3 in - let cmdline' = - if String.length cmdline > limit then - String.sub cmdline 0 limit ^ "..." - else - cmdline - in - Fe_debug.error "%d (%s) %s" child_pid cmdline' reason + Fe_debug.error "%d (%s) %s" child_pid cmdline reason let report_child_exit comms_sock args child_pid status = let module Unixext = Xapi_stdext_unix.Unixext in diff --git a/ocaml/libs/ezxenstore/core/dune b/ocaml/libs/ezxenstore/core/dune index 53e812032f7..84e96b4c46c 100644 --- a/ocaml/libs/ezxenstore/core/dune +++ b/ocaml/libs/ezxenstore/core/dune @@ -9,5 +9,6 @@ (re_export xenstore) (re_export xenstore_transport) threads.posix + xapi-stdext-threads (re_export xenstore.unix)) ) diff --git a/ocaml/libs/ezxenstore/core/watch.ml b/ocaml/libs/ezxenstore/core/watch.ml index 35f3aee0b5e..36508767347 100644 --- a/ocaml/libs/ezxenstore/core/watch.ml +++ b/ocaml/libs/ezxenstore/core/watch.ml @@ -31,41 +31,25 @@ let has_fired ~xs x = true with Xs_protocol.Eagain -> false +module Delay = Xapi_stdext_threads.Threadext.Delay + (** Block waiting for a result *) let wait_for ~xs ?(timeout = 300.) (x : 'a t) = let _ = ignore xs in - let with_pipe f = - let p1, p2 = Unix.pipe () in - let close_all () = - let close p = try Unix.close p with _ -> () in - close p1 ; close p2 - in - try - let result = f (p1, p2) in - close_all () ; result - with e -> close_all () ; raise e - in let task = Xs.wait x.evaluate in - with_pipe (fun (p1, p2) -> - let thread = - Thread.create - (fun () -> - let r, _, _ = Unix.select [p1] [] [] timeout in - if r <> [] then - () - else - try Xs_client_unix.Task.cancel task with _ -> () - ) - () - in - try - let result = Xs_client_unix.Task.wait task in - ignore (Unix.write p2 (Bytes.of_string "x") 0 1) ; - Thread.join thread ; - result - with Xs_client_unix.Cancelled -> - Thread.join thread ; raise (Timeout timeout) - ) + let delay = Delay.make () in + let thread = + Thread.create + (fun () -> + if Delay.wait delay timeout then + try Xs_client_unix.Task.cancel task with _ -> () + ) + () + in + try + let result = Xs_client_unix.Task.wait task in + Delay.signal delay ; Thread.join thread ; result + with Xs_client_unix.Cancelled -> Thread.join thread ; raise (Timeout timeout) (** Wait for a node to appear in the store and return its value *) let value_to_appear (path : path) : string t = diff --git a/ocaml/libs/http-lib/buf_io.ml b/ocaml/libs/http-lib/buf_io.ml index 6a6397a614c..b5d41008c4a 100644 --- a/ocaml/libs/http-lib/buf_io.ml +++ b/ocaml/libs/http-lib/buf_io.ml @@ -79,21 +79,21 @@ let is_full ic = ic.cur = 0 && ic.max = Bytes.length ic.buf let fill_buf ~buffered ic timeout = let buf_size = Bytes.length ic.buf in let fill_no_exc timeout len = - let l, _, _ = Unix.select [ic.fd] [] [] timeout in - if l <> [] then ( + Xapi_stdext_unix.Unixext.with_socket_timeout ic.fd timeout @@ fun () -> + try let n = Unix.read ic.fd ic.buf ic.max len in ic.max <- n + ic.max ; if n = 0 && len <> 0 then raise Eof ; n - ) else - -1 + with Unix.Unix_error (Unix.(EAGAIN | EWOULDBLOCK), _, _) -> -1 in (* If there's no space to read, shift *) if ic.max = buf_size then shift ic ; let space_left = buf_size - ic.max in (* Read byte one by one just do make sure we don't buffer too many chars *) let n = - fill_no_exc timeout (if buffered then space_left else min space_left 1) + fill_no_exc (Some timeout) + (if buffered then space_left else min space_left 1) in (* Select returned nothing to read *) if n = -1 then raise Timeout ; @@ -102,7 +102,11 @@ let fill_buf ~buffered ic timeout = let tofillsz = if buffered then buf_size - ic.max else min (buf_size - ic.max) 1 in - ignore (fill_no_exc 0.0 tofillsz) + (* cannot use 0 here, for select that'd mean timeout immediately, for + setsockopt it would mean no timeout. + So use a very short timeout instead + *) + ignore (fill_no_exc (Some 1e-6) tofillsz) ) (** Input one line terminated by \n *) diff --git a/ocaml/libs/http-lib/bufio_test.ml b/ocaml/libs/http-lib/bufio_test.ml index 7937adc73ea..b4184ff8121 100644 --- a/ocaml/libs/http-lib/bufio_test.ml +++ b/ocaml/libs/http-lib/bufio_test.ml @@ -1,7 +1,7 @@ open QCheck2 open Xapi_fd_test -let print_timeout = string_of_float +let print_timeout = Fmt.to_to_string Mtime.Span.pp let expect_string ~expected ~actual = if not (String.equal expected actual) then @@ -20,7 +20,7 @@ let test_buf_io = let timeouts = Generate.timeouts in let gen = Gen.tup2 Generate.t timeouts and print = Print.tup2 Generate.print print_timeout in - Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout) -> + Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout_span) -> let every_bytes = Int.min (Option.map Observations.Delay.every_bytes behaviour.delay_read @@ -38,11 +38,8 @@ let test_buf_io = timeout_span remains the span for the entire function, and timeout the per operation timeout that we'll pass to the function under test. *) - let timeout_span = Mtime.Span.of_float_ns (timeout *. 1e9) |> Option.get in - let timeout = timeout /. float operations in - let timeout_operation_span = - Mtime.Span.of_float_ns (timeout *. 1e9) |> Option.get - in + let timeout = Clock.Timer.span_to_s timeout_span /. float operations in + let timeout_operation_span = Clock.Timer.s_to_span timeout |> Option.get in (* timeout < 1us would get truncated to 0 *) QCheck2.assume (timeout > 1e-6) ; (* Format.eprintf "Testing %s@." (print (behaviour, timeout)); *) diff --git a/ocaml/libs/http-lib/dune b/ocaml/libs/http-lib/dune index ee510d7fc42..27a3f53a9f8 100644 --- a/ocaml/libs/http-lib/dune +++ b/ocaml/libs/http-lib/dune @@ -72,6 +72,7 @@ (modes (best exe)) (modules bufio_test) (libraries + clock fmt mtime mtime.clock diff --git a/ocaml/libs/http-lib/http.ml b/ocaml/libs/http-lib/http.ml index c6ff41be709..751ce7208fd 100644 --- a/ocaml/libs/http-lib/http.ml +++ b/ocaml/libs/http-lib/http.ml @@ -368,14 +368,8 @@ let read_frame_header buf = let prefix = Bytes.sub_string buf 0 frame_header_length in try Scanf.sscanf prefix "FRAME %012d" (fun x -> Some x) with _ -> None -let set_socket_timeout fd t = - try Unix.(setsockopt_float fd SO_RCVTIMEO t) - with Unix.Unix_error (Unix.ENOTSOCK, _, _) -> - (* In the unit tests, the fd comes from a pipe... ignore *) - () - let read_http_request_header ~read_timeout ~total_timeout ~max_length fd = - Option.iter (fun t -> set_socket_timeout fd t) read_timeout ; + Unixext.with_socket_timeout fd read_timeout @@ fun () -> let buf = Bytes.create (Option.value ~default:1024 max_length) in let deadline = Option.map @@ -420,7 +414,6 @@ let read_http_request_header ~read_timeout ~total_timeout ~max_length fd = check_timeout_and_read 0 length ; (true, length) in - set_socket_timeout fd 0. ; (frame, Bytes.sub_string buf 0 headers_length, proxy) let read_http_response_header buf fd = diff --git a/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.ml b/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.ml index 96cd2a897e6..23a07961980 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.ml @@ -56,7 +56,7 @@ let total_delays = Gen.oneofa [|0.001; 0.01; 0.1; 0.4|] let span_of_s s = s *. 1e9 |> Mtime.Span.of_float_ns |> Option.get (* keep these short *) -let timeouts = Gen.oneofa [|0.0; 0.001; 0.1; 0.3|] +let timeouts = Gen.oneofa Mtime.Span.[|zero; 1 * ms; 100 * ms; 300 * ms|] let delay_of_size total_delay size = let open Gen in diff --git a/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.mli b/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.mli index 6aba67c7a6d..bb500d46745 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-fd-test/generate.mli @@ -24,7 +24,7 @@ type t = { ; kind: Unix.file_kind } -val timeouts : float QCheck2.Gen.t +val timeouts : Mtime.Span.t QCheck2.Gen.t (** [timeouts] is a generator for small timeouts *) val make : diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune index f7e9141c3a9..51ebe3898d0 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune @@ -6,6 +6,7 @@ threads.posix unix xapi-stdext-unix + mtime xapi-stdext-pervasives) ) (test diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml index ef30cfb5ba4..28e9ea46433 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml @@ -62,6 +62,13 @@ module Delay = struct exception Pre_signalled let wait (x : t) (seconds : float) = + let max_wait = + match Mtime.Span.of_float_ns (seconds *. 1e9) with + | None -> + invalid_arg (Printf.sprintf "wait %g" seconds) + | Some max_wait -> + max_wait + in let finally = Xapi_stdext_pervasives.Pervasiveext.finally in let to_close = ref [] in let close' fd = @@ -90,7 +97,8 @@ module Delay = struct (* flush the single byte from the pipe *) try let (_ : string) = - time_limited_single_read pipe_out 1 ~max_wait:seconds + time_limited_single_read pipe_out 1 + (Clock.Timer.start ~duration:max_wait) in false with Timeout -> true diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune index 92b77753a86..9f675e9a03e 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune @@ -2,8 +2,13 @@ (name xapi_stdext_unix) (public_name xapi-stdext-unix) (libraries + (re_export clock) fd-send-recv integers + fmt + mtime + mtime.clock + mtime.clock.os polly unix xapi-backtrace diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/dune index 407d025a8a8..e1776e81622 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/dune @@ -2,7 +2,7 @@ (name unixext_test) (package xapi-stdext-unix) (modules unixext_test) - (libraries xapi_stdext_unix qcheck-core mtime.clock.os qcheck-core.runner fmt xapi_fd_test mtime threads.posix rresult) + (libraries xapi_stdext_unix qcheck-core mtime.clock.os qcheck-core.runner fmt xapi_fd_test mtime threads.posix rresult clock) ; use fixed seed to avoid causing random failures in CI and package builds (action (run %{test} -v -bt --seed 42)) ) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml index e0f2726f303..0628bd2dc4e 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/test/unixext_test.ml @@ -39,9 +39,12 @@ let pp_pair = ) *) +let print_timeout = Fmt.to_to_string Mtime.Span.pp + let test_time_limited_write = - let gen = Gen.tup2 Generate.t Generate.timeouts - and print = Print.tup2 Generate.print Print.float in + let timeouts = Generate.timeouts in + let gen = Gen.tup2 Generate.t timeouts + and print = Print.tup2 Generate.print print_timeout in Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout) -> skip_blk behaviour.kind ; skip_dirlnk behaviour.kind ; @@ -53,7 +56,7 @@ let test_time_limited_write = let fd = Xapi_fdcaps.Operations.For_test.unsafe_fd_exn wrapped_fd in Unix.set_nonblock fd ; let dt = Mtime_clock.counter () in - let deadline = Unix.gettimeofday () +. timeout in + let deadline = Clock.Timer.start ~duration:timeout in let finally () = test_elapsed := Mtime_clock.count dt in Fun.protect ~finally (fun () -> Unixext.time_limited_write_substring fd len buf deadline @@ -64,11 +67,14 @@ let test_time_limited_write = let observations, result = Generate.run_wo behaviour ~f:test in let () = let open Observations in - let elapsed_s = Mtime.Span.to_float_ns !test_elapsed *. 1e-9 in - if elapsed_s > timeout +. 0.5 then + let elapsed = !test_elapsed in + let timeout_extra = + Mtime.Span.(add (timeout :> Mtime.Span.t) @@ (500 * ms)) + in + if Mtime.Span.compare elapsed timeout_extra > 0 then Test.fail_reportf - "Function duration significantly exceeds timeout: %f > %f; %s" - elapsed_s timeout + "Function duration significantly exceeds timeout: %a > %a; %s" + Mtime.Span.pp elapsed Mtime.Span.pp timeout (Fmt.to_to_string Fmt.(option pp) observations.Observations.read) ; match (observations, result) with | {read= Some read; _}, Ok expected -> @@ -76,10 +82,10 @@ let test_time_limited_write = expect_amount ~expected:(String.length expected) read ; expect_string ~expected ~actual:read.data | {read= Some read; _}, Error (`Exn_trap (Unixext.Timeout, _)) -> - let elapsed_s = Mtime.Span.to_float_ns !test_elapsed *. 1e-9 in - if elapsed_s < timeout then - Test.fail_reportf "Timed out earlier than requested: %f < %f" - elapsed_s timeout ; + let elapsed = !test_elapsed in + if Mtime.Span.compare elapsed timeout < 0 then + Test.fail_reportf "Timed out earlier than requested: %a < %a" + Mtime.Span.pp elapsed Mtime.Span.pp timeout ; let actual = String.length read.data in if actual >= behaviour.size then Test.fail_reportf "Timed out, but transferred enough data: %d >= %d" @@ -103,7 +109,7 @@ let test_time_limited_write = let test_time_limited_read = let gen = Gen.tup2 Generate.t Generate.timeouts - and print = Print.tup2 Generate.print Print.float in + and print = Print.tup2 Generate.print print_timeout in Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout) -> (* Format.eprintf "Testing %s@." (print (behaviour, timeout)); *) skip_blk behaviour.kind ; @@ -113,7 +119,7 @@ let test_time_limited_read = let fd = Xapi_fdcaps.Operations.For_test.unsafe_fd_exn wrapped_fd in Unix.set_nonblock fd ; let dt = Mtime_clock.counter () in - let deadline = Unix.gettimeofday () +. timeout in + let deadline = Clock.Timer.start ~duration:timeout in let finally () = test_elapsed := Mtime_clock.count dt in Fun.protect ~finally (fun () -> Unixext.time_limited_read fd behaviour.size deadline @@ -126,11 +132,14 @@ let test_time_limited_read = in let () = let open Observations in - let elapsed_s = Mtime.Span.to_float_ns !test_elapsed *. 1e-9 in - if elapsed_s > timeout +. 0.5 then + let elapsed = !test_elapsed in + let timeout_extra = + Mtime.Span.(add (timeout :> Mtime.Span.t) @@ (500 * ms)) + in + if Mtime.Span.compare elapsed timeout_extra > 0 then Test.fail_reportf - "Function duration significantly exceeds timeout: %f > %f; %s" elapsed_s - timeout + "Function duration significantly exceeds timeout: %a > %a; %s" + Mtime.Span.pp elapsed Mtime.Span.pp timeout (Fmt.to_to_string Fmt.(option pp) observations.Observations.write) ; (* Format.eprintf "Result: %a@." (Fmt.option Observations.pp) observations.write;*) match (observations, result) with @@ -138,10 +147,10 @@ let test_time_limited_read = expect_amount ~expected:(String.length actual) write ; expect_string ~expected:write.data ~actual | {write= Some _; _}, Error (`Exn_trap (Unixext.Timeout, _)) -> - let elapsed_s = Mtime.Span.to_float_ns !test_elapsed *. 1e-9 in - if elapsed_s < timeout then - Test.fail_reportf "Timed out earlier than requested: %f < %f" - elapsed_s timeout + let elapsed = !test_elapsed in + if Mtime.Span.compare elapsed timeout < 0 then + Test.fail_reportf "Timed out earlier than requested: %a < %a" + Mtime.Span.pp elapsed Mtime.Span.pp timeout | ( {write= Some write; _} , Error (`Exn_trap (Unix.Unix_error (Unix.EPIPE, _, _), _)) ) -> if String.length write.data = behaviour.size then @@ -187,7 +196,63 @@ let test_proxy = expect_string ~expected:write.data ~actual:read.data ; true -let tests = [test_proxy; test_time_limited_write; test_time_limited_read] +let mtime_span_gen = + let open Gen in + (* We can't use Mtime.Span.max_span, because that is too big for float, + only works for int64 conversion, and there is no Mtime.Span.max_span_float. + The documentation says that 2^53 is the maximum though, so use that. + Otherwise we'll fail later when converting to string and back goes through float. + + Use microseconds instead of nanoseconds, because nanoseconds have rounding + errors during conversion. + *) + let+ usval = 0 -- (((1 lsl 53) - 1) / 1_000_000) in + Mtime.Span.(usval * us) + +let test_timeout_string_conv = + let gen = mtime_span_gen and print = Fmt.to_to_string Mtime.Span.pp in + Test.make ~name:__FUNCTION__ ~print gen @@ fun timeout -> + let str = Clock.Timer.span_to_s timeout |> Printf.sprintf "%.6f" in + let timeout' = + str |> Float.of_string |> Clock.Timer.s_to_span |> Option.get + in + if not (Mtime.Span.equal timeout timeout') then + Test.fail_reportf + "timeout not equal after round-trip through %S: %Lu (%a) <> %Lu (%a)" str + (Mtime.Span.to_uint64_ns (timeout :> Mtime.Span.t)) + Mtime.Span.pp timeout + (Mtime.Span.to_uint64_ns (timeout' :> Mtime.Span.t)) + Mtime.Span.pp timeout' ; + true + +let delays = + Gen.oneofa ([|10; 100; 300|] |> Array.map (fun v -> Mtime.Span.(v * ms))) + +let test_delay = + let gen = delays and print = Fmt.to_to_string Mtime.Span.pp in + Test.make ~count:5 ~name:__FUNCTION__ ~print gen @@ fun timeout -> + let counter = Mtime_clock.counter () in + Unixext.delay_span timeout ; + let actual = Mtime_clock.count counter in + let expected_min = + Int64.div (timeout |> Mtime.Span.to_uint64_ns) 2L |> Mtime.Span.of_uint64_ns + and expected_max = Mtime.Span.(2 * timeout) in + if Clock.Timer.span_is_shorter ~than:expected_min actual then + Test.fail_reportf "Actual delay shorter than half expected: %a << %a" + Mtime.Span.pp actual Mtime.Span.pp timeout ; + if Clock.Timer.span_is_longer ~than:expected_max actual then + Test.fail_reportf "Actual delay longer than twice expected: %a >> %a" + Mtime.Span.pp actual Mtime.Span.pp timeout ; + true + +let tests = + [ + test_delay + ; test_timeout_string_conv + ; test_proxy + ; test_time_limited_write + ; test_time_limited_read + ] let () = (* avoid SIGPIPE *) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml index ae2c92dc87b..05b6bdf5954 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml @@ -547,7 +547,17 @@ let really_read_string fd length = exception Timeout -let to_milliseconds ms = ms *. 1000. |> ceil |> int_of_float +(* Need to round up when converting to milliseconds: + The span will have the amount of time already elapsed subtracted, so if we are asked to wait + for 1ms, the span will be slightly smaller than that. + If we truncate then all waits of 1ms will be 0ms instead, which would be wrong, as we wouldn't wait at all. + If we round to nearest rather than up, then waits of 1ms will be ~0.5ms instead, since values below 0.5ms will get + rounded to 0. + Rounding up works correctly, and makes the tests pass. +*) +let to_milliseconds span = + Int64.div (Int64.add 999_999L @@ Mtime.Span.to_uint64_ns span) 1_000_000L + |> Int64.to_int (* Allocating a new polly and waiting like this results in at least 3 syscalls. An alternative for sockets would be to use [setsockopt], @@ -570,19 +580,17 @@ let with_polly_wait kind fd f = and check the timeout after each chunk. select() would've silently succeeded here, whereas epoll() is stricted and returns EPERM *) - let wait remaining_time = if remaining_time < 0. then raise Timeout in - f wait fd + f (fun _ -> true) fd | S_CHR | S_FIFO | S_SOCK -> with_polly @@ fun polly -> Polly.add polly fd kind ; let wait remaining_time = let milliseconds = to_milliseconds remaining_time in - if milliseconds <= 0 then raise Timeout ; let ready = Polly.wait polly 1 milliseconds @@ fun _ event_on_fd _ -> assert (event_on_fd = fd) in - if ready = 0 then raise Timeout + ready > 0 in f wait fd @@ -595,22 +603,23 @@ let time_limited_write_internal with_polly_wait Polly.Events.out filedesc @@ fun wait filedesc -> let total_bytes_to_write = length in let bytes_written = ref 0 in - let now = ref (Unix.gettimeofday ()) in - while !bytes_written < total_bytes_to_write && !now < target_response_time do - let remaining_time = target_response_time -. !now in - wait remaining_time ; - let bytes_to_write = total_bytes_to_write - !bytes_written in - let bytes = - try write filedesc data !bytes_written bytes_to_write - with - | Unix.Unix_error (Unix.EAGAIN, _, _) - | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) - -> - 0 - in - (* write from buffer=data from offset=bytes_written, length=bytes_to_write *) - bytes_written := bytes + !bytes_written ; - now := Unix.gettimeofday () + while !bytes_written < total_bytes_to_write do + match Clock.Timer.remaining target_response_time with + | Expired _ -> + raise Timeout + | Remaining remaining_time -> + if not (wait remaining_time) then raise Timeout ; + let bytes_to_write = total_bytes_to_write - !bytes_written in + let bytes = + try write filedesc data !bytes_written bytes_to_write + with + | Unix.Unix_error (Unix.EAGAIN, _, _) + | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) + -> + 0 + in + (* write from buffer=data from offset=bytes_written, length=bytes_to_write *) + bytes_written := bytes + !bytes_written done ; if !bytes_written = total_bytes_to_write then () @@ -633,44 +642,57 @@ let time_limited_read filedesc length target_response_time = let total_bytes_to_read = length in let bytes_read = ref 0 in let buf = Bytes.make total_bytes_to_read '\000' in - let now = ref (Unix.gettimeofday ()) in - while !bytes_read < total_bytes_to_read && !now < target_response_time do - let remaining_time = target_response_time -. !now in - wait remaining_time ; - let bytes_to_read = total_bytes_to_read - !bytes_read in - let bytes = - try Unix.read filedesc buf !bytes_read bytes_to_read - with - | Unix.Unix_error (Unix.EAGAIN, _, _) - | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) - -> - 0 - in - (* read into buffer=buf from offset=bytes_read, length=bytes_to_read *) - if bytes = 0 then - raise End_of_file (* End of file has been reached *) - else - bytes_read := bytes + !bytes_read ; - now := Unix.gettimeofday () + while !bytes_read < total_bytes_to_read do + match Clock.Timer.remaining target_response_time with + | Expired _ -> + raise Timeout + | Remaining remaining_time -> + if not (wait remaining_time) then raise Timeout ; + let bytes_to_read = total_bytes_to_read - !bytes_read in + let bytes = + try Unix.read filedesc buf !bytes_read bytes_to_read + with + | Unix.Unix_error (Unix.EAGAIN, _, _) + | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) + -> + 0 + in + (* read into buffer=buf from offset=bytes_read, length=bytes_to_read *) + if bytes = 0 then + raise End_of_file (* End of file has been reached *) + else + bytes_read := bytes + !bytes_read done ; if !bytes_read = total_bytes_to_read then Bytes.unsafe_to_string buf else (* we ran out of time *) raise Timeout -let time_limited_single_read filedesc length ~max_wait = +let wait_timed_read filedesc span = + with_polly_wait Polly.Events.inp filedesc @@ fun wait _filedesc -> wait span + +let time_limited_single_read filedesc length target_response_time = let buf = Bytes.make length '\000' in - with_polly_wait Polly.Events.inp filedesc @@ fun wait filedesc -> - wait max_wait ; - let bytes = - try Unix.read filedesc buf 0 length - with - | Unix.Unix_error (Unix.EAGAIN, _, _) - | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) - -> - 0 - in - Bytes.sub_string buf 0 bytes + match Clock.Timer.remaining target_response_time with + | Expired _ -> + raise Timeout + | Remaining remaining_time -> + if not (wait_timed_read filedesc remaining_time) then raise Timeout ; + let bytes = + try Unix.read filedesc buf 0 length + with + | Unix.Unix_error (Unix.EAGAIN, _, _) + | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) + -> + 0 + in + Bytes.sub_string buf 0 bytes + +let wait_timed_read filedesc target_response_time = + with_polly_wait Polly.Events.inp filedesc @@ fun wait _fd -> + wait target_response_time + +let delay_span span = span |> Clock.Timer.span_to_s |> Unix.sleepf (* --------------------------------------------------------------------------------------- *) @@ -991,3 +1013,18 @@ module Daemon = struct true with Unix.Unix_error _ -> false end + +let set_socket_timeout fd t = + try Unix.(setsockopt_float fd SO_RCVTIMEO t) + with Unix.Unix_error (Unix.ENOTSOCK, _, _) -> + (* In the unit tests, the fd comes from a pipe... ignore *) + () + +let with_socket_timeout fd timeout_opt f = + match timeout_opt with + | Some t -> + if t < 1e-6 then invalid_arg (Printf.sprintf "Timeout too short: %g" t) ; + let finally () = set_socket_timeout fd 0. in + set_socket_timeout fd t ; Fun.protect ~finally f + | None -> + f () diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli index 176adc94cf8..c0678733a42 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli @@ -146,15 +146,24 @@ val try_read_string : ?limit:int -> Unix.file_descr -> string exception Timeout -val time_limited_write : Unix.file_descr -> int -> bytes -> float -> unit +val with_socket_timeout : Unix.file_descr -> float option -> (unit -> 'a) -> 'a + +val time_limited_write : + Unix.file_descr -> int -> bytes -> Clock.Timer.t -> unit val time_limited_write_substring : - Unix.file_descr -> int -> string -> float -> unit + Unix.file_descr -> int -> string -> Clock.Timer.t -> unit + +val time_limited_read : Unix.file_descr -> int -> Clock.Timer.t -> string + +val time_limited_single_read : Unix.file_descr -> int -> Clock.Timer.t -> string -val time_limited_read : Unix.file_descr -> int -> float -> string +val delay_span : Mtime.Span.t -> unit -val time_limited_single_read : - Unix.file_descr -> int -> max_wait:float -> string +val wait_timed_read : Unix.file_descr -> Mtime.Span.t -> bool +(** [wait_timed_read fd span] waits at most [span] seconds until [fd] contains at least one byte to read, + or and EOF. Returns [true] if the [fd] is ready, or [false] if the timeout has expired. + See {!Thread.wait_timed_read}. *) val read_data_in_string_chunks : (string -> int -> unit) diff --git a/ocaml/message-switch/unix/dune b/ocaml/message-switch/unix/dune index 54b6c0e77bf..3e088a12556 100644 --- a/ocaml/message-switch/unix/dune +++ b/ocaml/message-switch/unix/dune @@ -11,6 +11,7 @@ rpclib.core rpclib.json threads.posix + xapi-stdext-threads ) (preprocess (pps ppx_deriving_rpc)) ) diff --git a/ocaml/message-switch/unix/protocol_unix_scheduler.ml b/ocaml/message-switch/unix/protocol_unix_scheduler.ml index 92e6cdd3b1b..3eaeb83218c 100644 --- a/ocaml/message-switch/unix/protocol_unix_scheduler.ml +++ b/ocaml/message-switch/unix/protocol_unix_scheduler.ml @@ -34,71 +34,7 @@ module Int64Map = Map.Make (struct let compare = compare end) -module Delay = struct - (* Concrete type is the ends of a pipe *) - type t = { - (* A pipe is used to wake up a thread blocked in wait: *) - mutable pipe_out: Unix.file_descr option - ; mutable pipe_in: Unix.file_descr option - ; (* Indicates that a signal arrived before a wait: *) - mutable signalled: bool - ; m: Mutex.t - } - - let make () = - {pipe_out= None; pipe_in= None; signalled= false; m= Mutex.create ()} - - exception Pre_signalled - - let wait (x : t) (seconds : float) = - let to_close = ref [] in - let close' fd = - if List.mem fd !to_close then Unix.close fd ; - to_close := List.filter (fun x -> fd <> x) !to_close - in - finally' - (fun () -> - try - let pipe_out = - Mutex.execute x.m (fun () -> - if x.signalled then ( - x.signalled <- false ; - raise Pre_signalled - ) ; - let pipe_out, pipe_in = Unix.pipe () in - (* these will be unconditionally closed on exit *) - to_close := [pipe_out; pipe_in] ; - x.pipe_out <- Some pipe_out ; - x.pipe_in <- Some pipe_in ; - x.signalled <- false ; - pipe_out - ) - in - let r, _, _ = Unix.select [pipe_out] [] [] seconds in - (* flush the single byte from the pipe *) - if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ; - (* return true if we waited the full length of time, false if we were woken *) - r = [] - with Pre_signalled -> false - ) - (fun () -> - Mutex.execute x.m (fun () -> - x.pipe_out <- None ; - x.pipe_in <- None ; - List.iter close' !to_close - ) - ) - - let signal (x : t) = - Mutex.execute x.m (fun () -> - match x.pipe_in with - | Some fd -> - ignore (Unix.write fd (Bytes.of_string "X") 0 1) - | None -> - x.signalled <- true - (* If the wait hasn't happened yet then store up the signal *) - ) -end +module Delay = Xapi_stdext_threads.Threadext.Delay type item = {id: int; name: string; fn: unit -> unit} diff --git a/ocaml/xapi-idl/README.md b/ocaml/xapi-idl/README.md index 3b34349a152..2da87aa0c20 100644 --- a/ocaml/xapi-idl/README.md +++ b/ocaml/xapi-idl/README.md @@ -10,7 +10,6 @@ This repository contains * argument parsing * RPCs 3. The following CLI tools for debugging: - * lib/channel_helper.exe -- a channel passing helper CLI * memory/memory_cli.exe -- a squeezed debugging CLI * v6/v6_cli.exe -- a V6d debugging CLI * cluster/cluster_cli.exe -- a xapi-clusterd debugging CLI diff --git a/ocaml/xapi-idl/lib/posix_channel.ml b/ocaml/xapi-idl/lib/posix_channel.ml deleted file mode 100644 index 06708561011..00000000000 --- a/ocaml/xapi-idl/lib/posix_channel.ml +++ /dev/null @@ -1,234 +0,0 @@ -let my_domid = 0 (* TODO: figure this out *) - -exception End_of_file - -exception Channel_setup_failed - -module CBuf = struct - (** A circular buffer constructed from a string *) - type t = { - mutable buffer: bytes - ; mutable len: int (** bytes of valid data in [buffer] *) - ; mutable start: int (** index of first valid byte in [buffer] *) - ; mutable r_closed: bool (** true if no more data can be read due to EOF *) - ; mutable w_closed: bool - (** true if no more data can be written due to EOF *) - } - - let empty length = - { - buffer= Bytes.create length - ; len= 0 - ; start= 0 - ; r_closed= false - ; w_closed= false - } - - let drop (x : t) n = - if n > x.len then failwith (Printf.sprintf "drop %d > %d" n x.len) ; - x.start <- (x.start + n) mod Bytes.length x.buffer ; - x.len <- x.len - n - - let should_read (x : t) = - (not x.r_closed) && x.len < Bytes.length x.buffer - 1 - - let should_write (x : t) = (not x.w_closed) && x.len > 0 - - let end_of_reads (x : t) = x.r_closed && x.len = 0 - - let end_of_writes (x : t) = x.w_closed - - let write (x : t) fd = - (* Offset of the character after the substring *) - let next = min (Bytes.length x.buffer) (x.start + x.len) in - let len = next - x.start in - let written = - try Unix.single_write fd x.buffer x.start len - with _e -> - x.w_closed <- true ; - len - in - drop x written - - let read (x : t) fd = - (* Offset of the next empty character *) - let next = (x.start + x.len) mod Bytes.length x.buffer in - let len = - min (Bytes.length x.buffer - next) (Bytes.length x.buffer - x.len) - in - let read = Unix.read fd x.buffer next len in - if read = 0 then x.r_closed <- true ; - x.len <- x.len + read -end - -let proxy (a : Unix.file_descr) (b : Unix.file_descr) = - let size = 64 * 1024 in - (* [a'] is read from [a] and will be written to [b] *) - (* [b'] is read from [b] and will be written to [a] *) - let a' = CBuf.empty size and b' = CBuf.empty size in - Unix.set_nonblock a ; - Unix.set_nonblock b ; - try - while true do - let r = - (if CBuf.should_read a' then [a] else []) - @ if CBuf.should_read b' then [b] else [] - in - let w = - (if CBuf.should_write a' then [b] else []) - @ if CBuf.should_write b' then [a] else [] - in - (* If we can't make any progress (because fds have been closed), then stop *) - if r = [] && w = [] then raise End_of_file ; - let r, w, _ = Unix.select r w [] (-1.0) in - (* Do the writing before the reading *) - List.iter - (fun fd -> if a = fd then CBuf.write b' a else CBuf.write a' b) - w ; - List.iter (fun fd -> if a = fd then CBuf.read a' a else CBuf.read b' b) r ; - (* If there's nothing else to read or write then signal the other end *) - List.iter - (fun (buf, fd) -> - if CBuf.end_of_reads buf then Unix.shutdown fd Unix.SHUTDOWN_SEND ; - if CBuf.end_of_writes buf then Unix.shutdown fd Unix.SHUTDOWN_RECEIVE - ) - [(a', b); (b', a)] - done - with _ -> ( - (try Unix.clear_nonblock a with _ -> ()) ; - try Unix.clear_nonblock b with _ -> () - ) - -let finally f g = - try - let result = f () in - g () ; result - with e -> g () ; raise e - -let ip = ref "127.0.0.1" - -let send proxy_socket = - let to_close = ref [] in - let to_unlink = ref [] in - finally - (fun () -> - let s_ip = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in - to_close := s_ip :: !to_close ; - Unix.bind s_ip (Unix.ADDR_INET (Unix.inet_addr_of_string !ip, 0)) ; - Unix.listen s_ip 5 ; - let port = - match Unix.getsockname s_ip with - | Unix.ADDR_INET (_, port) -> - port - | _ -> - assert false - in - let s_unix = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - to_close := s_unix :: !to_close ; - let path = Filename.temp_file "channel" "" in - to_unlink := path :: !to_unlink ; - if Sys.file_exists path then Unix.unlink path ; - Unix.bind s_unix (Unix.ADDR_UNIX path) ; - Unix.listen s_unix 5 ; - let token = "token" in - let protocols = - let open Xcp_channel_protocol in - [TCP_proxy (!ip, port); Unix_sendmsg (my_domid, path, token)] - in - (* We need to hang onto a copy of the proxy_socket so we can run a proxy - in a background thread, allowing the caller to close their copy. *) - let proxy_socket = Unix.dup proxy_socket in - to_close := proxy_socket :: !to_close ; - let (_ : Thread.t) = - Thread.create - (fun (fds, paths) -> - (* The thread takes over management of the listening sockets *) - let to_close = ref fds in - let to_unlink = ref paths in - let close fd = - if List.mem fd !to_close then ( - to_close := List.filter (fun x -> x <> fd) !to_close ; - Unix.close fd - ) - in - finally - (fun () -> - let readable, _, _ = Unix.select [s_ip; s_unix] [] [] (-1.0) in - if List.mem s_unix readable then ( - let fd, _peer = Unix.accept s_unix in - to_close := fd :: !to_close ; - let buffer = Bytes.make (String.length token) '\000' in - let n = Unix.recv fd buffer 0 (Bytes.length buffer) [] in - let token' = Bytes.sub_string buffer 0 n in - if token = token' then - let (_ : int) = - Fd_send_recv.send_fd_substring fd token 0 - (String.length token) [] proxy_socket - in - () - ) else if List.mem s_ip readable then ( - let fd, _peer = Unix.accept s_ip in - List.iter close !to_close ; - to_close := fd :: !to_close ; - proxy fd proxy_socket - ) else - assert false - (* can never happen *) - ) - (fun () -> - List.iter close !to_close ; - List.iter Unix.unlink !to_unlink - ) - ) - (!to_close, !to_unlink) - in - (* Handover of listening sockets successful *) - to_close := [] ; - to_unlink := [] ; - protocols - ) - (fun () -> - List.iter Unix.close !to_close ; - List.iter Unix.unlink !to_unlink - ) - -let receive protocols = - let open Xcp_channel_protocol in - let weight = function - | TCP_proxy (_, _) -> - 2 - | Unix_sendmsg (domid, _, _) -> - if my_domid = domid then 3 else 0 - | V4V_proxy (_, _) -> - 0 - in - let protocol = - match List.sort (fun a b -> compare (weight b) (weight a)) protocols with - | [] -> - raise Channel_setup_failed - | best :: _ -> - if weight best = 0 then raise Channel_setup_failed else best - in - match protocol with - | V4V_proxy (_, _) -> - assert false (* weight is 0 above *) - | TCP_proxy (ip, port) -> ( - let unwrapped_ip = Scanf.ksscanf ip (fun _ _ -> ip) "[%s@]" Fun.id in - let addr = Unix.ADDR_INET (Unix.inet_addr_of_string unwrapped_ip, port) in - let family = Unix.domain_of_sockaddr addr in - let s = Unix.socket family Unix.SOCK_STREAM 0 in - try Unix.connect s addr ; s with e -> Unix.close s ; raise e - ) - | Unix_sendmsg (_, path, token) -> - let s = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in - finally - (fun () -> - Unix.connect s (Unix.ADDR_UNIX path) ; - let (_ : int) = - Unix.send_substring s token 0 (String.length token) [] - in - let buf = Bytes.create (String.length token) in - let _, _, fd = Fd_send_recv.recv_fd s buf 0 (Bytes.length buf) [] in - fd - ) - (fun () -> Unix.close s) diff --git a/ocaml/xapi-idl/lib/posix_channel.mli b/ocaml/xapi-idl/lib/posix_channel.mli deleted file mode 100644 index 8610f27a86d..00000000000 --- a/ocaml/xapi-idl/lib/posix_channel.mli +++ /dev/null @@ -1,21 +0,0 @@ -(* - * Copyright (C) Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -val send : Unix.file_descr -> Xcp_channel_protocol.t list -(** [send fd] attempts to send the channel represented by [fd] to a remote - process. Note the file descriptor remains open in the original process and - should still be closed normally. *) - -val receive : Xcp_channel_protocol.t list -> Unix.file_descr -(** [receive protocols] receives a channel from a remote. *) diff --git a/ocaml/xapi-idl/lib/scheduler.ml b/ocaml/xapi-idl/lib/scheduler.ml index 407120c9fc6..d4d5c7c5cca 100644 --- a/ocaml/xapi-idl/lib/scheduler.ml +++ b/ocaml/xapi-idl/lib/scheduler.ml @@ -18,33 +18,7 @@ open D let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute -module PipeDelay = struct - (* Concrete type is the ends of a pipe *) - type t = { - (* A pipe is used to wake up a thread blocked in wait: *) - pipe_out: Unix.file_descr - ; pipe_in: Unix.file_descr - } - - let make () = - let pipe_out, pipe_in = Unix.pipe () in - {pipe_out; pipe_in} - - let wait (x : t) (seconds : float) = - let timeout = if seconds < 0.0 then 0.0 else seconds in - if Thread.wait_timed_read x.pipe_out timeout then - (* flush the single byte from the pipe *) - let (_ : int) = Unix.read x.pipe_out (Bytes.create 1) 0 1 in - (* return false if we were woken *) - false - else - (* return true if we waited the full length of time, false if we were woken *) - true - - let signal (x : t) = - let (_ : int) = Unix.write x.pipe_in (Bytes.of_string "X") 0 1 in - () -end +module PipeDelay = Xapi_stdext_threads.Threadext.Delay type handle = Mtime.span * int diff --git a/ocaml/xapi-idl/lib/xcp_channel.ml b/ocaml/xapi-idl/lib/xcp_channel.ml deleted file mode 100644 index 395da851a5f..00000000000 --- a/ocaml/xapi-idl/lib/xcp_channel.ml +++ /dev/null @@ -1,17 +0,0 @@ -type t = Unix.file_descr - -let file_descr_of_t t = t - -let t_of_file_descr t = t - -[@@@ocaml.warning "-34"] - -type protocols = Xcp_channel_protocol.t list [@@deriving rpc] - -let rpc_of_t fd = - let protocols = Posix_channel.send fd in - rpc_of_protocols protocols - -let t_of_rpc x = - let protocols = protocols_of_rpc x in - Posix_channel.receive protocols diff --git a/ocaml/xapi-idl/lib/xcp_channel.mli b/ocaml/xapi-idl/lib/xcp_channel.mli deleted file mode 100644 index 35849a1e5d4..00000000000 --- a/ocaml/xapi-idl/lib/xcp_channel.mli +++ /dev/null @@ -1,13 +0,0 @@ -type t - -val rpc_of_t : t -> Rpc.t - -val t_of_rpc : Rpc.t -> t - -val file_descr_of_t : t -> Unix.file_descr - -val t_of_file_descr : Unix.file_descr -> t - -val protocols_of_rpc : Rpc.t -> Xcp_channel_protocol.t list - -val rpc_of_protocols : Xcp_channel_protocol.t list -> Rpc.t diff --git a/ocaml/xapi-idl/lib_test/channel_test.ml b/ocaml/xapi-idl/lib_test/channel_test.ml deleted file mode 100644 index dd607935778..00000000000 --- a/ocaml/xapi-idl/lib_test/channel_test.ml +++ /dev/null @@ -1,77 +0,0 @@ -(* - * Copyright (C) 2011-2013 Citrix Inc - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -let dup_automatic x = - let x = Xcp_channel.t_of_file_descr x in - let y = Xcp_channel.rpc_of_t x in - let z = Xcp_channel.t_of_rpc y in - Xcp_channel.file_descr_of_t z - -let dup_sendmsg x = - let protos = Posix_channel.send x in - let proto = - List.find - (function - | Xcp_channel_protocol.Unix_sendmsg (_, _, _) -> true | _ -> false - ) - protos - in - Posix_channel.receive [proto] - -let count_fds () = Array.length (Sys.readdir "/proc/self/fd") - -(* dup stdout, check /proc/pid/fd *) -let check_for_leak dup_function () = - let before = count_fds () in - let stdout2 = dup_function Unix.stdout in - let after = count_fds () in - Alcotest.(check int) "fds" (before + 1) after ; - Unix.close stdout2 ; - let after' = count_fds () in - Alcotest.(check int) "fds" before after' - -let dup_proxy x = - let protos = Posix_channel.send x in - let proto = - List.find - (function - | Xcp_channel_protocol.TCP_proxy (_ip, _port) -> true | _ -> false - ) - protos - in - Posix_channel.receive [proto] - -let check_for_leak_proxy () = - let a, _b = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in - let before = count_fds () in - let c = dup_proxy a in - (* background fd closing *) - Thread.delay 1.0 ; - let after = count_fds () in - Alcotest.(check int) "fds" (before + 2) after ; - Unix.close c ; - (* background fd closing *) - Thread.delay 1.0 ; - let after' = count_fds () in - Alcotest.(check int) "fds" before after' - -let tests = - [ - ( "check_for_leak with automatic selection" - , `Quick - , check_for_leak dup_automatic - ) - ; ("check_for_leak with sendmsg", `Quick, check_for_leak dup_sendmsg) - ; ("check_for_leak_proxy", `Quick, check_for_leak_proxy) - ] diff --git a/ocaml/xapi-idl/lib_test/dune b/ocaml/xapi-idl/lib_test/dune index 57c8c95e592..bb5a05933d5 100644 --- a/ocaml/xapi-idl/lib_test/dune +++ b/ocaml/xapi-idl/lib_test/dune @@ -35,6 +35,7 @@ rpclib.xml test_lib threads.posix + xapi-stdext-unix xapi-idl xapi-idl.cluster xapi-idl.rrd diff --git a/ocaml/xapi-idl/lib_test/scheduler_test.ml b/ocaml/xapi-idl/lib_test/scheduler_test.ml index 640ae938862..39a58f06b9a 100644 --- a/ocaml/xapi-idl/lib_test/scheduler_test.ml +++ b/ocaml/xapi-idl/lib_test/scheduler_test.ml @@ -25,7 +25,8 @@ let test_delay_cancel () = let elapsed = after -. before in assert_bool "elapsed_time1" (elapsed < 0.4) -let timed_wait_callback ~msg ?(time_min = 0.) ?(eps = 0.1) ?(time_max = 60.) f = +let timed_wait_callback ~msg ?(time_min = 0.) ?(eps = 0.1) + ?(time_max = Mtime.Span.(60 * s)) f = let rd, wr = Unix.pipe () in let finally () = Unix.close rd ; Unix.close wr in Fun.protect ~finally (fun () -> @@ -37,13 +38,13 @@ let timed_wait_callback ~msg ?(time_min = 0.) ?(eps = 0.1) ?(time_max = 60.) f = () in f callback ; - let ready = Thread.wait_timed_read rd time_max in + let ready = Xapi_stdext_unix.Unixext.wait_timed_read rd time_max in match (ready, !after) with | true, None -> Alcotest.fail "pipe ready to read, but after is not set" | false, None -> - Alcotest.fail - (Printf.sprintf "%s: callback not invoked within %gs" msg time_max) + Alcotest.failf "%s: callback not invoked within %a" msg Mtime.Span.pp + time_max | _, Some t -> let actual_minimum = min (t -. before) time_min in Alcotest.(check (float eps)) diff --git a/ocaml/xapi-idl/misc/channel_helper.ml b/ocaml/xapi-idl/misc/channel_helper.ml deleted file mode 100644 index 1485e6a5ead..00000000000 --- a/ocaml/xapi-idl/misc/channel_helper.ml +++ /dev/null @@ -1,221 +0,0 @@ -let project_url = "https://github.com/xen-org/xcp-idl" - -open Lwt - -let my_domid = 0 (* TODO: figure this out *) - -exception Short_write of int * int - -exception End_of_file - -let copy_all src dst = - let buffer = Bytes.make 16384 '\000' in - let rec loop () = - Lwt_unix.read src buffer 0 (Bytes.length buffer) >>= fun n -> - if n = 0 then - Lwt.fail End_of_file - else - Lwt_unix.write dst buffer 0 n >>= fun m -> - if n <> m then Lwt.fail (Short_write (m, n)) else loop () - in - loop () - -let proxy a b = - let copy _id src dst = - Lwt.catch - (fun () -> copy_all src dst) - (fun _e -> - (try Lwt_unix.shutdown src Lwt_unix.SHUTDOWN_RECEIVE with _ -> ()) ; - (try Lwt_unix.shutdown dst Lwt_unix.SHUTDOWN_SEND with _ -> ()) ; - return () - ) - in - let ts = [copy "ab" a b; copy "ba" b a] in - Lwt.join ts - -let file_descr_of_int (x : int) : Unix.file_descr = Obj.magic x - -(* Keep this in sync with ocaml's file_descr type *) - -let ip = ref "127.0.0.1" - -let unix = ref "/tmp" - -module Common = struct - type t = {verbose: bool; debug: bool; port: int} [@@deriving rpc] - - let make verbose debug port = {verbose; debug; port} -end - -let _common_options = "COMMON OPTIONS" - -open Cmdliner - -(* Options common to all commands *) -let common_options_t = - let docs = _common_options in - let debug = - let doc = "Give only debug output." in - Arg.(value & flag & info ["debug"] ~docs ~doc) - in - let verb = - let doc = "Give verbose output." in - let verbose = (true, Arg.info ["v"; "verbose"] ~docs ~doc) in - Arg.(last & vflag_all [false] [verbose]) - in - let port = - let doc = Printf.sprintf "Specify port to connect to the message switch." in - Arg.(value & opt int 8080 & info ["port"] ~docs ~doc) - in - Term.(const Common.make $ debug $ verb $ port) - -(* Help sections common to all commands *) -let help = - [ - `S _common_options - ; `P "These options are common to all commands." - ; `S "MORE HELP" - ; `P "Use `$(mname) $(i,COMMAND) --help' for help on a single command." - ; `Noblank - ; `S "BUGS" - ; `P (Printf.sprintf "Check bug reports at %s" project_url) - ] - -(* Commands *) -let advertise_t _common_options_t proxy_socket = - let unwrapped_ip = Scanf.ksscanf !ip (fun _ _ -> !ip) "[%s@]" Fun.id in - let addr = Lwt_unix.ADDR_INET (Unix.inet_addr_of_string unwrapped_ip, 0) in - let family = Unix.domain_of_sockaddr addr in - let s_ip = Lwt_unix.socket family Lwt_unix.SOCK_STREAM 0 in - (* INET socket, can't block *) - Lwt_unix.bind s_ip addr >>= fun () -> - Lwt_unix.listen s_ip 5 ; - let port = - match Lwt_unix.getsockname s_ip with - | Unix.ADDR_INET (_, port) -> - port - | _ -> - assert false - in - let s_unix = Lwt_unix.socket Lwt_unix.PF_UNIX Lwt_unix.SOCK_STREAM 0 in - (* Try to avoid polluting the filesystem with unused unix domain sockets *) - let path = - Printf.sprintf "%s/%s.%d" !unix - (Filename.basename Sys.argv.(0)) - (Unix.getpid ()) - in - if Sys.file_exists path then Unix.unlink path ; - Lwt_unix.bind s_unix (Lwt_unix.ADDR_UNIX path) >>= fun () -> - List.iter - (fun signal -> - ignore (Lwt_unix.on_signal signal (fun _ -> Unix.unlink path ; exit 1)) - ) - [Sys.sigterm; Sys.sigint] ; - Lwt_unix.listen s_unix 5 ; - let token = "token" in - let protocols = - let open Xcp_channel_protocol in - [TCP_proxy (!ip, port); Unix_sendmsg (my_domid, path, token)] - in - Printf.fprintf stdout "%s\n%!" - (Jsonrpc.to_string (Xcp_channel.rpc_of_protocols protocols)) ; - let t_ip = - Lwt_unix.accept s_ip >>= fun (fd, _peer) -> - Lwt_unix.close s_ip >>= fun () -> - proxy fd (Lwt_unix.of_unix_file_descr proxy_socket) - in - let t_unix = - Lwt_unix.accept s_unix >>= fun (fd, _peer) -> - let buffer = Bytes.make (String.length token) '\000' in - let io_vector = Lwt_unix.IO_vectors.create () in - Lwt_unix.IO_vectors.append_bytes io_vector buffer 0 (Bytes.length buffer) ; - Lwt_unix.recv_msg ~socket:fd ~io_vectors:io_vector >>= fun (n, fds) -> - List.iter Unix.close fds ; - let token' = Bytes.sub buffer 0 n in - let io_vector' = Lwt_unix.IO_vectors.create () in - Lwt_unix.IO_vectors.append_bytes io_vector' token' 0 (Bytes.length token') ; - if token = Bytes.to_string token' then - Lwt_unix.send_msg ~socket:fd ~io_vectors:io_vector' ~fds:[proxy_socket] - >>= fun _ -> return () - else - return () - in - Lwt.pick [t_ip; t_unix] >>= fun () -> Unix.unlink path ; return () - -let advertise common_options_t fd = - match fd with - | Some x -> - Lwt_main.run (advertise_t common_options_t (file_descr_of_int x)) ; - `Ok () - | None -> - `Error (true, "you must provide a file descriptor to proxy") - -let advertise_cmd = - let doc = "advertise a given channel represented as a file-descriptor" in - let man = - [ - `S "DESCRIPTION" - ; `P - "Advertises a given channel over as many protocols as possible, and \ - waits for someone to connect." - ] - @ help - in - let fd = - let doc = Printf.sprintf "File descriptor to advertise" in - Arg.(value & pos 0 (some int) None & info [] ~docv:"FD" ~doc) - in - Cmd.v - (Cmd.info "advertise" ~sdocs:_common_options ~doc ~man) - Term.(ret (const advertise $ common_options_t $ fd)) - -let connect_t _common_options_t = - (Lwt_io.read_line_opt Lwt_io.stdin >>= function - | None -> - return "" - | Some x -> - return x - ) - >>= fun advertisement -> - let open Xcp_channel in - let fd = - Lwt_unix.of_unix_file_descr - (file_descr_of_t (t_of_rpc (Jsonrpc.of_string advertisement))) - in - let a = copy_all Lwt_unix.stdin fd in - let b = copy_all fd Lwt_unix.stdout in - Lwt.join [a; b] - -let connect common_options_t = - Lwt_main.run (connect_t common_options_t) ; - `Ok () - -let connect_cmd = - let doc = "connect to a channel and proxy to the terminal" in - let man = - [ - `S "DESCRIPTION" - ; `P - "Connect to a channel which has been advertised and proxy I/O to the \ - console. The advertisement will be read from stdin as a single line \ - of text." - ] - @ help - in - Cmd.v - (Cmd.info "connect" ~sdocs:_common_options ~doc ~man) - Term.(ret (const connect $ common_options_t)) - -let cmds = [advertise_cmd; connect_cmd] - -let () = - let default = - Term.(ret (const (fun _ -> `Help (`Pager, None)) $ common_options_t)) - in - let info = - let doc = "channel (file-descriptor) passing helper program" in - let man = help in - Cmd.info "proxy" ~version:"1.0.0" ~sdocs:_common_options ~doc ~man - in - let cmd = Cmd.group ~default info cmds in - exit (Cmd.eval cmd) diff --git a/ocaml/xapi-idl/misc/dune b/ocaml/xapi-idl/misc/dune deleted file mode 100644 index 9d009d01260..00000000000 --- a/ocaml/xapi-idl/misc/dune +++ /dev/null @@ -1,16 +0,0 @@ -(executable - (name channel_helper) - (public_name xcp-idl-debugger) - (modules channel_helper) - (package xapi-idl) - (libraries - cmdliner - dune-build-info - lwt - lwt.unix - rpclib.core - rpclib.json - xapi-idl - xapi-log - ) - (preprocess (pps ppx_deriving_rpc))) diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index 0c2417bb829..82b062eb2da 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1093,17 +1093,23 @@ let xapi_globs_spec = , Float Db_globs.permanent_master_failure_retry_interval ) ; ( "redo_log_max_block_time_empty" - , Float Db_globs.redo_log_max_block_time_empty + , ShortDurationFromSeconds Db_globs.redo_log_max_block_time_empty + ) + ; ( "redo_log_max_block_time_read" + , ShortDurationFromSeconds Db_globs.redo_log_max_block_time_read ) - ; ("redo_log_max_block_time_read", Float Db_globs.redo_log_max_block_time_read) ; ( "redo_log_max_block_time_writedelta" - , Float Db_globs.redo_log_max_block_time_writedelta + , ShortDurationFromSeconds Db_globs.redo_log_max_block_time_writedelta ) ; ( "redo_log_max_block_time_writedb" - , Float Db_globs.redo_log_max_block_time_writedb + , ShortDurationFromSeconds Db_globs.redo_log_max_block_time_writedb + ) + ; ( "redo_log_max_startup_time" + , ShortDurationFromSeconds Db_globs.redo_log_max_startup_time + ) + ; ( "redo_log_connect_delay" + , ShortDurationFromSeconds Db_globs.redo_log_connect_delay ) - ; ("redo_log_max_startup_time", Float Db_globs.redo_log_max_startup_time) - ; ("redo_log_connect_delay", Float Db_globs.redo_log_connect_delay) ; ("default-vbd3-polling-duration", Int default_vbd3_polling_duration) ; ( "default-vbd3-polling-idle-threshold" , Int default_vbd3_polling_idle_threshold @@ -1160,15 +1166,8 @@ let options_of_xapi_globs_spec = string_of_float !x | Int x -> string_of_int !x - | ShortDurationFromSeconds x -> - let literal = - Mtime.Span.to_uint64_ns !x |> fun ns -> - Int64.div ns 1_000_000_000L |> Int64.to_int |> string_of_int - in - Fmt.str "%s (%a)" literal Mtime.Span.pp !x - | LongDurationFromSeconds x -> - let literal = Clock.Timer.span_to_s !x |> string_of_float in - Fmt.str "%s (%a)" literal Mtime.Span.pp !x + | ShortDurationFromSeconds x | LongDurationFromSeconds x -> + Fmt.str "%Luns (%a)" (Mtime.Span.to_uint64_ns !x) Mtime.Span.pp !x ) , Printf.sprintf "Set the value of '%s'" name ) diff --git a/ocaml/xenopsd/cli/dune b/ocaml/xenopsd/cli/dune index b194b10323c..aacf70d07ca 100644 --- a/ocaml/xenopsd/cli/dune +++ b/ocaml/xenopsd/cli/dune @@ -9,6 +9,7 @@ astring cmdliner dune-build-info + polly re result rpclib.core diff --git a/ocaml/xenopsd/cli/xn.ml b/ocaml/xenopsd/cli/xn.ml index 5ac6100669c..186fb47db13 100644 --- a/ocaml/xenopsd/cli/xn.ml +++ b/ocaml/xenopsd/cli/xn.ml @@ -1009,27 +1009,38 @@ let raw_console_proxy sockaddr = ) else if !final then finished := true else - let r, _, _ = Unix.select [Unix.stdin; fd] [] [] (-1.) in - if List.mem Unix.stdin r then ( - let b = - Unix.read Unix.stdin buf_remote !buf_remote_end - (block - !buf_remote_end) - in - let i = ref !buf_remote_end in - while - !i < !buf_remote_end + b - && Char.code (Bytes.get buf_remote !i) <> 0x1d - do - incr i - done ; - if !i < !buf_remote_end + b then final := true ; - buf_remote_end := !i - ) ; - if List.mem fd r then - let b = - Unix.read fd buf_local !buf_local_end (block - !buf_local_end) - in - buf_local_end := !buf_local_end + b + let epoll = Polly.create () in + List.iter + (fun fd -> Polly.add epoll fd Polly.Events.inp) + [Unix.stdin; fd] ; + Fun.protect + ~finally:(fun () -> Polly.close epoll) + (fun () -> + ignore + @@ Polly.wait epoll 2 (-1) (fun _ file_desc _ -> + if Unix.stdin = file_desc then ( + let b = + Unix.read Unix.stdin buf_remote !buf_remote_end + (block - !buf_remote_end) + in + let i = ref !buf_remote_end in + while + !i < !buf_remote_end + b + && Char.code (Bytes.get buf_remote !i) <> 0x1d + do + incr i + done ; + if !i < !buf_remote_end + b then final := true ; + buf_remote_end := !i + ) ; + if fd = file_desc then + let b = + Unix.read fd buf_local !buf_local_end + (block - !buf_local_end) + in + buf_local_end := !buf_local_end + b + ) + ) done in let delay = ref 0.1 in diff --git a/ocaml/xsh/dune b/ocaml/xsh/dune index 13fc1e74c46..541b4f2d20a 100644 --- a/ocaml/xsh/dune +++ b/ocaml/xsh/dune @@ -5,6 +5,7 @@ (package xapi) (libraries dune-build-info + polly stunnel safe-resources xapi-consts diff --git a/ocaml/xsh/xsh.ml b/ocaml/xsh/xsh.ml index 982ff6c346f..990e61a1113 100644 --- a/ocaml/xsh/xsh.ml +++ b/ocaml/xsh/xsh.ml @@ -60,12 +60,25 @@ let proxy (ain : Unix.file_descr) (aout : Unix.file_descr) (bin : Unixfd.t) (if can_write a' then [bout] else []) @ if can_write b' then [aout] else [] in - let r, w, _ = Unix.select r w [] (-1.0) in - (* Do the writing before the reading *) - List.iter - (fun fd -> if aout = fd then write_from b' a' else write_from a' b') - w ; - List.iter (fun fd -> if ain = fd then read_into a' else read_into b') r + let epoll = Polly.create () in + List.iter (fun fd -> Polly.add epoll fd Polly.Events.inp) r ; + List.iter (fun fd -> Polly.add epoll fd Polly.Events.out) w ; + Fun.protect + ~finally:(fun () -> Polly.close epoll) + (fun () -> + ignore + @@ Polly.wait epoll 4 (-1) (fun _ fd _ -> + (* Note: only one fd is handled *) + if aout = fd then + write_from b' a' + else if bout = fd then + write_from a' b' + else if ain = fd then + read_into a' + else + read_into b' + ) + ) done with _ -> ( (try Unix.clear_nonblock ain with _ -> ()) ; diff --git a/quality-gate.sh b/quality-gate.sh index b3cd2e67813..f9c644467f5 100755 --- a/quality-gate.sh +++ b/quality-gate.sh @@ -25,7 +25,7 @@ verify-cert () { } mli-files () { - N=515 + N=513 # do not count ml files from the tests in ocaml/{tests/perftest/quicktest} MLIS=$(git ls-files -- '**/*.mli' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;) MLS=$(git ls-files -- '**/*.ml' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)