Skip to content

[epoll] Unix.select conversion: replace with stdext modules/calls or polly calls #5705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions ocaml/database/block_device_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,20 @@ let listen_on sock =
s

let accept_conn s latest_response_time =
let now = Unix.gettimeofday () in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit contains:

TODO: needs some extra testing of block_device_io, which doesn't many suitable tests.

Has this been done in some way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is 1 XenRT test capable of detecting breakages in the redolog, the one that got added for HA vTPM testing.
(all other HA XenRT tests used to pass with a completely broken redolog in the past).

There is a test for epoll on block devices part of the stdext quickcheck tests, although it requires root so it is not run in the CI.
I'll add some code to the XAPI quicktests so we can run some of these fuzzers in Dom0.

I think it is still worthwhile trying to add a separate test for the redolog itself that runs as part of the quicktests (it needs a blockdevice, and setting one up even in loopback mode requires root, so can't be done in the CI).

let timeout = latest_response_time -. now in
(* Await an incoming connection... *)
let ready_to_read, _, _ = Unix.select [s] [] [] timeout in
R.info "Finished selecting" ;
if List.mem s ready_to_read then
(* We've received a connection. Accept it and return the socket. *)
fst (Unix.accept s)
else (* We must have timed out *)
raise Unixext.Timeout
match Clock.Timer.remaining latest_response_time with
| Expired _ ->
raise Unixext.Timeout
| Remaining timeout ->
(* Await an incoming connection... *)
let ready_to_read, _, _ =
Unix.select [s] [] [] (Clock.Timer.span_to_s timeout)
in
R.info "Finished selecting" ;
if List.mem s ready_to_read then
(* We've received a connection. Accept it and return the socket. *)
fst (Unix.accept s)
else (* We must have timed out *)
raise Unixext.Timeout

(* Listen on a given socket. Accept a single connection and transfer all the data from it to dest_fd, or raise Timeout if target_response_time happens first. *)
(* Raises NotEnoughSpace if the next write would exceed the available_space. *)
Expand Down Expand Up @@ -731,6 +735,8 @@ let dump = ref false

let empty = ref false

let target_response_time delta = Clock.Timer.start ~duration:delta

let _ =
(* Initialise debug logging *)
initialise_logging () ;
Expand Down Expand Up @@ -770,11 +776,12 @@ let _ =
(* Open the block device *)
let block_dev_fd =
open_block_device !block_dev
(Unix.gettimeofday () +. !Db_globs.redo_log_max_startup_time)
(target_response_time !Db_globs.redo_log_max_startup_time)
in
R.info "Opened block device." ;
let target_response_time = Unix.gettimeofday () +. 3600. in
(* Read the validity byte *)
let target_response_time =
target_response_time !Db_globs.redo_log_max_connect_time
in
try
let validity = read_validity_byte block_dev_fd target_response_time in
Printf.printf "*** Validity byte: [%s]\n" validity ;
Expand Down Expand Up @@ -841,10 +848,12 @@ let _ =
(* Open the block device *)
let block_dev_fd =
open_block_device !block_dev
(Unix.gettimeofday () +. !Db_globs.redo_log_max_startup_time)
(target_response_time !Db_globs.redo_log_max_startup_time)
in
R.info "Opened block device." ;
let target_response_time = Unix.gettimeofday () +. 3600. in
let target_response_time =
target_response_time !Db_globs.redo_log_max_connect_time
in
initialise_redo_log block_dev_fd target_response_time ;
Printf.printf "Block device initialised.\n"
) ;
Expand All @@ -855,9 +864,8 @@ let _ =
let s = listen_on !ctrlsock in
(* Main loop: accept a new client, communicate with it until it stops sending commands, repeat. *)
while true do
let start_of_startup = Unix.gettimeofday () in
let target_startup_response_time =
start_of_startup +. !Db_globs.redo_log_max_startup_time
target_response_time !Db_globs.redo_log_max_startup_time
in
R.info "Awaiting incoming connections on %s..." !ctrlsock ;
let client = accept_conn s target_startup_response_time in
Expand Down Expand Up @@ -898,11 +906,11 @@ let _ =
send_failure client (str ^ "|nack")
("Unknown command " ^ str)
)
, 0.
, Mtime.Span.zero
)
in
(* "Start the clock!" -- set the latest time by which we need to have responded to the client. *)
let target_response_time = Unix.gettimeofday () +. block_time in
let target_response_time = target_response_time block_time in
action_fn block_dev_fd client !datasock target_response_time
with
(* this must be an exception in Unixext.really_read because action_fn doesn't throw exceptions *)
Expand Down
15 changes: 9 additions & 6 deletions ocaml/database/db_globs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
let redo_log_block_device_io = ref "block_device_io"

(** The delay between each attempt to connect to the block device I/O process *)
let redo_log_connect_delay = ref 0.1
let redo_log_connect_delay = ref Mtime.Span.(100 * ms)

(** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while emptying *)
let redo_log_max_block_time_empty = ref 2.
let redo_log_max_block_time_empty = ref Mtime.Span.(2 * s)

(** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while reading *)
let redo_log_max_block_time_read = ref 30.
let redo_log_max_block_time_read = ref Mtime.Span.(30 * s)

(** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while writing a delta *)
let redo_log_max_block_time_writedelta = ref 2.
let redo_log_max_block_time_writedelta = ref Mtime.Span.(2 * s)

(** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while writing a database *)
let redo_log_max_block_time_writedb = ref 30.
let redo_log_max_block_time_writedb = ref Mtime.Span.(30 * s)

(** The maximum amount of time to wait for a connection (used to be hardcoded to 3600s) *)
let redo_log_max_connect_time = ref Mtime.Span.(1 * hour)

(** {3 Settings related to the exponential back-off of repeated attempts to reconnect after failure} *)

Expand All @@ -35,7 +38,7 @@ let redo_log_max_dying_processes = 2
let redo_log_comms_socket_stem = "sock-blkdev-io"

(** The maximum time, in seconds, for which we are prepared to wait for a response from the block device I/O process before assuming that it has died while initially connecting to it *)
let redo_log_max_startup_time = ref 5.
let redo_log_max_startup_time = ref Mtime.Span.(5 * s)

(** The length, in bytes, of one redo log which constitutes half of the VDI *)
let redo_log_length_of_half = 60 * 1024 * 1024
Expand Down
7 changes: 7 additions & 0 deletions ocaml/database/dune
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
test_schemas unit_test_marshall unit_test_sql))
(libraries
forkexec
fmt
gzip
mtime
mtime.clock
mtime.clock.os
rpclib.core
rpclib.json
safe-resources
Expand Down Expand Up @@ -61,6 +65,9 @@
(modules block_device_io)
(libraries
dune-build-info
mtime
mtime.clock
mtime.clock.os
xapi_database
xapi-log
xapi-stdext-pervasives
Expand Down
6 changes: 5 additions & 1 deletion ocaml/database/master_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ let open_secure_connection () =
~write_to_log:(fun x -> debug "stunnel: %s\n" x)
~verify_cert host port
@@ fun st_proc ->
let fd_closed = Thread.wait_timed_read Unixfd.(!(st_proc.Stunnel.fd)) 5. in
let fd_closed =
Xapi_stdext_unix.Unixext.wait_timed_read
Unixfd.(!(st_proc.Stunnel.fd))
Mtime.Span.(5 * s)
in
let proc_quit =
try
Unix.kill (Stunnel.getpid st_proc.Stunnel.pid) 0 ;
Expand Down
39 changes: 20 additions & 19 deletions ocaml/database/redo_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ let generation_size = 16

let length_size = 16

let get_latest_response_time block_time =
let now = Unix.gettimeofday () in
now +. block_time
let get_latest_response_time block_time = Clock.Timer.start ~duration:block_time

(* Returns the PID of the process *)
let start_io_process block_dev ctrlsockpath datasockpath =
Expand All @@ -265,26 +263,29 @@ let connect sockpath latest_response_time =
Unix.connect s (Unix.ADDR_UNIX sockpath) ;
D.debug "Connected to I/O process via socket %s" sockpath ;
s
with Unix.Unix_error (a, b, _) ->
with Unix.Unix_error (a, b, _) -> (
(* It's probably the case that the process hasn't started yet. *)
(* See if we can afford to wait and try again *)
Unix.close s ;
let attempt_delay = !Db_globs.redo_log_connect_delay in
let now = Unix.gettimeofday () in
let remaining = latest_response_time -. now in
if attempt_delay < remaining then (
(* Wait for a while then try again *)
D.debug
"Waiting to connect to I/O process via socket %s (error was %s: \
%s)..."
sockpath b (Unix.error_message a) ;
D.debug "Remaining time for retries: %f" remaining ;
Thread.delay attempt_delay ;
attempt ()
) else (
D.debug "%s timeout reached" __FUNCTION__ ;
raise Unixext.Timeout
)
match
Clock.Timer.remaining
(Clock.Timer.shorten_by attempt_delay latest_response_time)
with
| Remaining remaining ->
(* Wait for a while then try again *)
D.debug
"Waiting to connect to I/O process via socket %s (error was %s: \
%s)..."
sockpath b (Unix.error_message a) ;
D.debug "Remaining time for retries: %a" Debug.Pp.mtime_span remaining ;
Unixext.delay_span attempt_delay ;
attempt ()
| Expired overrun ->
D.debug "%s timeout reached (%a overrun)" __FUNCTION__
Debug.Pp.mtime_span overrun ;
raise Unixext.Timeout
)
in
attempt ()

Expand Down
2 changes: 1 addition & 1 deletion ocaml/database/redo_log.mli
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type t =
represents the write to the field with name [fldname] of a row in table [tblname] with key [objref], overwriting its value with [newval]. *)

val apply :
(Generation.t -> Unix.file_descr -> int -> float -> unit)
(Generation.t -> Unix.file_descr -> int -> Clock.Timer.t -> unit)
-> (Generation.t -> t -> unit)
-> [< `RO | `RW] redo_log
-> unit
Expand Down
9 changes: 1 addition & 8 deletions ocaml/forkexecd/src/child.ml
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,7 @@ let handle_comms comms_sock fd_sock state =
let log_failure args child_pid reason =
(* The commandline might be too long to clip it *)
let cmdline = String.concat " " args in
let limit = 80 - 3 in
let cmdline' =
if String.length cmdline > limit then
String.sub cmdline 0 limit ^ "..."
else
cmdline
in
Fe_debug.error "%d (%s) %s" child_pid cmdline' reason
Fe_debug.error "%d (%s) %s" child_pid cmdline reason

let report_child_exit comms_sock args child_pid status =
let module Unixext = Xapi_stdext_unix.Unixext in
Expand Down
1 change: 1 addition & 0 deletions ocaml/libs/ezxenstore/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
(re_export xenstore)
(re_export xenstore_transport)
threads.posix
xapi-stdext-threads
(re_export xenstore.unix))
)
46 changes: 15 additions & 31 deletions ocaml/libs/ezxenstore/core/watch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,25 @@ let has_fired ~xs x =
true
with Xs_protocol.Eagain -> false

module Delay = Xapi_stdext_threads.Threadext.Delay

(** Block waiting for a result *)
let wait_for ~xs ?(timeout = 300.) (x : 'a t) =
let _ = ignore xs in
let with_pipe f =
let p1, p2 = Unix.pipe () in
let close_all () =
let close p = try Unix.close p with _ -> () in
close p1 ; close p2
in
try
let result = f (p1, p2) in
close_all () ; result
with e -> close_all () ; raise e
in
let task = Xs.wait x.evaluate in
with_pipe (fun (p1, p2) ->
let thread =
Thread.create
(fun () ->
let r, _, _ = Unix.select [p1] [] [] timeout in
if r <> [] then
()
else
try Xs_client_unix.Task.cancel task with _ -> ()
)
()
in
try
let result = Xs_client_unix.Task.wait task in
ignore (Unix.write p2 (Bytes.of_string "x") 0 1) ;
Thread.join thread ;
result
with Xs_client_unix.Cancelled ->
Thread.join thread ; raise (Timeout timeout)
)
let delay = Delay.make () in
let thread =
Thread.create
(fun () ->
if Delay.wait delay timeout then
try Xs_client_unix.Task.cancel task with _ -> ()
)
()
in
try
let result = Xs_client_unix.Task.wait task in
Delay.signal delay ; Thread.join thread ; result
with Xs_client_unix.Cancelled -> Thread.join thread ; raise (Timeout timeout)

(** Wait for a node to appear in the store and return its value *)
let value_to_appear (path : path) : string t =
Expand Down
16 changes: 10 additions & 6 deletions ocaml/libs/http-lib/buf_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,21 @@ let is_full ic = ic.cur = 0 && ic.max = Bytes.length ic.buf
let fill_buf ~buffered ic timeout =
let buf_size = Bytes.length ic.buf in
let fill_no_exc timeout len =
let l, _, _ = Unix.select [ic.fd] [] [] timeout in
if l <> [] then (
Xapi_stdext_unix.Unixext.with_socket_timeout ic.fd timeout @@ fun () ->
try
let n = Unix.read ic.fd ic.buf ic.max len in
ic.max <- n + ic.max ;
if n = 0 && len <> 0 then raise Eof ;
n
) else
-1
with Unix.Unix_error (Unix.(EAGAIN | EWOULDBLOCK), _, _) -> -1
in
(* If there's no space to read, shift *)
if ic.max = buf_size then shift ic ;
let space_left = buf_size - ic.max in
(* Read byte one by one just do make sure we don't buffer too many chars *)
let n =
fill_no_exc timeout (if buffered then space_left else min space_left 1)
fill_no_exc (Some timeout)
(if buffered then space_left else min space_left 1)
in
(* Select returned nothing to read *)
if n = -1 then raise Timeout ;
Expand All @@ -102,7 +102,11 @@ let fill_buf ~buffered ic timeout =
let tofillsz =
if buffered then buf_size - ic.max else min (buf_size - ic.max) 1
in
ignore (fill_no_exc 0.0 tofillsz)
(* cannot use 0 here, for select that'd mean timeout immediately, for
setsockopt it would mean no timeout.
So use a very short timeout instead
*)
ignore (fill_no_exc (Some 1e-6) tofillsz)
)

(** Input one line terminated by \n *)
Expand Down
11 changes: 4 additions & 7 deletions ocaml/libs/http-lib/bufio_test.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
open QCheck2
open Xapi_fd_test

let print_timeout = string_of_float
let print_timeout = Fmt.to_to_string Mtime.Span.pp

let expect_string ~expected ~actual =
if not (String.equal expected actual) then
Expand All @@ -20,7 +20,7 @@ let test_buf_io =
let timeouts = Generate.timeouts in
let gen = Gen.tup2 Generate.t timeouts
and print = Print.tup2 Generate.print print_timeout in
Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout) ->
Test.make ~name:__FUNCTION__ ~print gen @@ fun (behaviour, timeout_span) ->
let every_bytes =
Int.min
(Option.map Observations.Delay.every_bytes behaviour.delay_read
Expand All @@ -38,11 +38,8 @@ let test_buf_io =
timeout_span remains the span for the entire function,
and timeout the per operation timeout that we'll pass to the function under test.
*)
let timeout_span = Mtime.Span.of_float_ns (timeout *. 1e9) |> Option.get in
let timeout = timeout /. float operations in
let timeout_operation_span =
Mtime.Span.of_float_ns (timeout *. 1e9) |> Option.get
in
let timeout = Clock.Timer.span_to_s timeout_span /. float operations in
let timeout_operation_span = Clock.Timer.s_to_span timeout |> Option.get in
(* timeout < 1us would get truncated to 0 *)
QCheck2.assume (timeout > 1e-6) ;
(* Format.eprintf "Testing %s@." (print (behaviour, timeout)); *)
Expand Down
1 change: 1 addition & 0 deletions ocaml/libs/http-lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
(modes (best exe))
(modules bufio_test)
(libraries
clock
fmt
mtime
mtime.clock
Expand Down
Loading
Loading