Skip to content

Instrument task related functionality #5735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions ocaml/xapi/context.ml
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,23 @@ let get_user_agent context =
let with_tracing ?originator ~__context name f =
let open Tracing in
let parent = __context.tracing in
let span_attributes = Attributes.attr_of_originator originator in
let span_attributes =
Attributes.attr_of_originator originator
@ make_attributes ~task_id:__context.task_id
?session_id:__context.session_id ()
in
match start_tracing_helper ~span_attributes (fun _ -> parent) name with
| Some _ as span ->
| Some _ as span -> (
try
let new_context = {__context with tracing= span} in
let result = f new_context in
let _ = Tracer.finish span in
result
with exn ->
let backtrace = Printexc.get_raw_backtrace () in
let error = (exn, Printexc.raw_backtrace_to_string backtrace) in
ignore @@ Tracer.finish span ~error ;
Printexc.raise_with_backtrace exn backtrace
)
| None ->
f __context
3 changes: 3 additions & 0 deletions ocaml/xapi/helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ let call_api_functions_internal ~__context f =
)

let call_api_functions ~__context f =
Context.with_tracing ~__context __FUNCTION__ @@ fun __context ->
match Context.get_test_rpc __context with
| Some rpc ->
f rpc (Ref.of_string "fake_session")
Expand Down Expand Up @@ -1733,6 +1734,7 @@ module Task : sig
end = struct
(* can't place these functions in task helpers due to circular dependencies *)
let wait_for_ ~__context ~tasks ~propagate_cancel cb =
Context.with_tracing ~__context __FUNCTION__ @@ fun __context ->
let our_task = Context.get_task_id __context in
let classes =
List.map
Expand Down Expand Up @@ -1819,6 +1821,7 @@ end = struct
wait_for_ ~__context ~tasks:[t] mirror

let to_result ~__context ~of_rpc ~t =
Context.with_tracing ~__context __FUNCTION__ @@ fun __context ->
wait_for_mirror ~__context ~propagate_cancel:true ~t ;
let fail msg =
raise
Expand Down
2 changes: 2 additions & 0 deletions ocaml/xapi/message_forwarding.ml
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ functor
include Local.Task

let cancel ~__context ~task =
Context.with_tracing ~__context __FUNCTION__ @@ fun __context ->
TaskHelper.assert_op_valid ~__context task ;
let local_fn = cancel ~task in
let forwarded_to = Db.Task.get_forwarded_to ~__context ~self:task in
Expand Down Expand Up @@ -1185,6 +1186,7 @@ functor
with _ -> ()

let cancel ~__context ~vm ~ops =
Context.with_tracing ~__context __FUNCTION__ @@ fun __context ->
let cancelled =
List.filter_map
(fun (task, op) ->
Expand Down
160 changes: 91 additions & 69 deletions ocaml/xapi/taskHelper.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@ module Date = Xapi_stdext_date.Date

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

let ( let@ ) f x = f x

let finally_complete_tracing ?error __context f =
Xapi_stdext_pervasives.Pervasiveext.finally f (fun () ->
Context.complete_tracing ?error __context
)

type t = API.ref_task

(* creates a new task *)
let make ~__context ~http_other_config ?(description = "") ?session_id
?subtask_of label : t * t Uuidx.t =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let uuid = Uuidx.make () in
let uuid_str = Uuidx.to_string uuid in
let ref = Ref.make () in
Expand All @@ -35,8 +43,7 @@ let make ~__context ~http_other_config ?(description = "") ?session_id
Ref.null
in
let (_ : unit) =
Db_actions.DB_Action.Task.create ~ref ~__context
~created:(Date.of_float (Unix.time ()))
Db_actions.DB_Action.Task.create ~ref ~__context ~created:(Date.now ())
~finished:(Date.of_float 0.0) ~current_operations:[] ~_type:"<none/>"
~session:(Option.value ~default:Ref.null session_id)
~resident_on:!Xapi_globs.localhost_ref ~status:`pending ~result:""
Expand All @@ -54,6 +61,7 @@ let rbac_assert_permission_fn = ref None
(* required to break dep-cycle with rbac.ml *)

let assert_op_valid ?(ok_if_no_session_in_context = false) ~__context task_id =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let assert_permission_task_op_any () =
match !rbac_assert_permission_fn with
| None ->
Expand Down Expand Up @@ -101,13 +109,15 @@ let assert_op_valid ?(ok_if_no_session_in_context = false) ~__context task_id =
assert_permission_task_op_any ()

let get_name ~__context =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let task_id = Context.get_task_id __context in
if Ref.is_dummy task_id then
Ref.name_of_dummy task_id
else
Db.Task.get_name_label ~__context ~self:task_id

let destroy ~__context task_id =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
if not (Ref.is_dummy task_id) then (
assert_op_valid ~ok_if_no_session_in_context:true ~__context task_id ;
Db_actions.DB_Action.Task.destroy ~__context ~self:task_id
Expand All @@ -123,34 +133,36 @@ let init () =
Context.__make_task := make

let operate_on_db_task ~__context f =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
if Context.task_in_database __context then
f (Context.get_task_id __context)

let set_description ~__context value =
operate_on_db_task ~__context (fun self ->
Db_actions.DB_Action.Task.set_name_description ~__context ~self ~value
)
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let@ self = operate_on_db_task ~__context in
Db_actions.DB_Action.Task.set_name_description ~__context ~self ~value

let add_to_other_config ~__context key value =
operate_on_db_task ~__context (fun self ->
Db_actions.DB_Action.Task.remove_from_other_config ~__context ~self ~key ;
Db_actions.DB_Action.Task.add_to_other_config ~__context ~self ~key ~value
)
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let@ self = operate_on_db_task ~__context in
Db_actions.DB_Action.Task.remove_from_other_config ~__context ~self ~key ;
Db_actions.DB_Action.Task.add_to_other_config ~__context ~self ~key ~value

let set_progress ~__context value =
operate_on_db_task ~__context (fun self ->
Db_actions.DB_Action.Task.set_progress ~__context ~self ~value
)
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let@ self = operate_on_db_task ~__context in
Db_actions.DB_Action.Task.set_progress ~__context ~self ~value

let set_external_pid ~__context pid =
operate_on_db_task ~__context (fun self ->
Db_actions.DB_Action.Task.set_externalpid ~__context ~self
~value:(Int64.of_int pid)
)
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let@ self = operate_on_db_task ~__context in
Db_actions.DB_Action.Task.set_externalpid ~__context ~self
~value:(Int64.of_int pid)

let clear_external_pid ~__context = set_external_pid ~__context (-1)

let set_result_on_task ~__context task_id result =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
match result with
| None ->
()
Expand All @@ -160,7 +172,9 @@ let set_result_on_task ~__context task_id result =

(** Only set the result without completing the task. Useful for vm import *)
let set_result ~__context result =
operate_on_db_task ~__context (fun t -> set_result_on_task ~__context t result)
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let@ self = operate_on_db_task ~__context in
set_result_on_task ~__context self result

let status_to_string = function
| `pending ->
Expand All @@ -178,36 +192,36 @@ let status_is_completed task_status =
task_status = `success || task_status = `failure || task_status = `cancelled

let complete ~__context result =
Context.complete_tracing __context ;
operate_on_db_task ~__context (fun self ->
let status = Db_actions.DB_Action.Task.get_status ~__context ~self in
if status = `pending then (
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self
~value:[] ;
Db_actions.DB_Action.Task.set_finished ~__context ~self
~value:(Date.of_float (Unix.time ())) ;
Db_actions.DB_Action.Task.set_progress ~__context ~self ~value:1. ;
set_result_on_task ~__context self result ;
Db_actions.DB_Action.Task.set_status ~__context ~self ~value:`success
) else
debug "the status of %s is: %s; cannot set it to `success"
(Ref.really_pretty_and_small self)
(status_to_string status)
)
let@ () = finally_complete_tracing __context in
let@ self = operate_on_db_task ~__context in
let status = Db_actions.DB_Action.Task.get_status ~__context ~self in
match status with
| `pending ->
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self
~value:[] ;
Db_actions.DB_Action.Task.set_finished ~__context ~self
~value:(Date.now ()) ;
Db_actions.DB_Action.Task.set_progress ~__context ~self ~value:1. ;
set_result_on_task ~__context self result ;
Db_actions.DB_Action.Task.set_status ~__context ~self ~value:`success
| _ ->
debug "the status of %s is: %s; cannot set it to `success"
(Ref.really_pretty_and_small self)
(status_to_string status)

let set_cancellable ~__context =
operate_on_db_task ~__context (fun self ->
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self
~value:[`cancel]
)
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let@ self = operate_on_db_task ~__context in
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self
~value:[`cancel]

let set_not_cancellable ~__context =
operate_on_db_task ~__context (fun self ->
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self
~value:[]
)
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let@ self = operate_on_db_task ~__context in
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self ~value:[]

let is_cancelling ~__context =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
Context.task_in_database __context
&&
let l =
Expand All @@ -217,21 +231,22 @@ let is_cancelling ~__context =
List.exists (fun (_, x) -> x = `cancel) l

let raise_cancelled ~__context =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let task_id = Context.get_task_id __context in
raise Api_errors.(Server_error (task_cancelled, [Ref.string_of task_id]))

let exn_if_cancelling ~__context =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
if is_cancelling ~__context then
raise_cancelled ~__context

let cancel_this ~__context ~self =
Context.complete_tracing __context ;
let@ () = finally_complete_tracing __context in
assert_op_valid ~__context self ;
let status = Db_actions.DB_Action.Task.get_status ~__context ~self in
if status = `pending then (
Db_actions.DB_Action.Task.set_progress ~__context ~self ~value:1. ;
Db_actions.DB_Action.Task.set_finished ~__context ~self
~value:(Date.of_float (Unix.time ())) ;
Db_actions.DB_Action.Task.set_finished ~__context ~self ~value:(Date.now ()) ;
Db_actions.DB_Action.Task.set_status ~__context ~self ~value:`cancelled ;
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self ~value:[]
) else
Expand All @@ -240,35 +255,40 @@ let cancel_this ~__context ~self =
(status_to_string status)

let cancel ~__context =
operate_on_db_task ~__context (fun self -> cancel_this ~__context ~self)
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let@ self = operate_on_db_task ~__context in
cancel_this ~__context ~self

let failed ~__context exn =
let backtrace = Printexc.get_backtrace () in
Context.complete_tracing __context ~error:(exn, backtrace) ;
let@ () = finally_complete_tracing ~error:(exn, backtrace) __context in
let code, params = ExnHelper.error_of_exn exn in
operate_on_db_task ~__context (fun self ->
let status = Db_actions.DB_Action.Task.get_status ~__context ~self in
if status = `pending then (
Db_actions.DB_Action.Task.set_progress ~__context ~self ~value:1. ;
Db_actions.DB_Action.Task.set_error_info ~__context ~self
~value:(code :: params) ;
Db_actions.DB_Action.Task.set_backtrace ~__context ~self
~value:(Sexplib.Sexp.to_string Backtrace.(sexp_of_t (get exn))) ;
Db_actions.DB_Action.Task.set_finished ~__context ~self
~value:(Date.of_float (Unix.time ())) ;
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self
~value:[] ;
if code = Api_errors.task_cancelled then
Db_actions.DB_Action.Task.set_status ~__context ~self
~value:`cancelled
else
Db_actions.DB_Action.Task.set_status ~__context ~self ~value:`failure
) else
debug "the status of %s is %s; cannot set it to %s"
(Ref.really_pretty_and_small self)
(status_to_string status)
(if code = Api_errors.task_cancelled then "`cancelled" else "`failure")
)
let@ self = operate_on_db_task ~__context in
let status = Db_actions.DB_Action.Task.get_status ~__context ~self in
match status with
| `pending ->
Db_actions.DB_Action.Task.set_progress ~__context ~self ~value:1. ;
Db_actions.DB_Action.Task.set_error_info ~__context ~self
~value:(code :: params) ;
Db_actions.DB_Action.Task.set_backtrace ~__context ~self
~value:(Sexplib.Sexp.to_string Backtrace.(sexp_of_t (get exn))) ;
Db_actions.DB_Action.Task.set_finished ~__context ~self
~value:(Date.now ()) ;
Db_actions.DB_Action.Task.set_allowed_operations ~__context ~self
~value:[] ;
if code = Api_errors.task_cancelled then
Db_actions.DB_Action.Task.set_status ~__context ~self ~value:`cancelled
else
Db_actions.DB_Action.Task.set_status ~__context ~self ~value:`failure
| _ ->
debug "the status of %s is %s; cannot set it to %s"
(Ref.really_pretty_and_small self)
(status_to_string status)
( if code = Api_errors.task_cancelled then
"`cancelled"
else
"`failure"
)

type id = Sm of string | Xenops of string * string

Expand All @@ -287,6 +307,7 @@ let task_to_id_exn task =
with_lock task_tbl_m (fun () -> Hashtbl.find task_to_id_tbl task)

let register_task __context ?(cancellable = true) id =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
let task = Context.get_task_id __context in
with_lock task_tbl_m (fun () ->
Hashtbl.replace id_to_task_tbl id task ;
Expand All @@ -302,6 +323,7 @@ let register_task __context ?(cancellable = true) id =
()

let unregister_task __context id =
let@ __context = Context.with_tracing ~__context __FUNCTION__ in
(* The rest of the XenAPI Task won't be cancellable *)
set_not_cancellable ~__context ;
with_lock task_tbl_m (fun () ->
Expand Down
Loading
Loading