Skip to content

Refactor Xapi_event (redux) #6370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 117 additions & 105 deletions ocaml/xapi/xapi_event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,59 @@ let rec next ~__context =
else
rpc_of_events relevant

type time = Xapi_database.Db_cache_types.Time.t

type entry = {table: string; obj: string; time: time}

type acc = {
creates: entry list
; mods: entry list
; deletes: entry list
; last: time
}

let collect_events (subs, tables, last_generation) acc table =
let open Xapi_database in
let open Db_cache_types in
let table_value = TableSet.find table tables in
let prepend_recent obj stat _ ({creates; mods; last; _} as entries) =
let Stat.{created; modified; deleted} = stat in
if Subscription.object_matches subs table obj then
let last = max last (max modified deleted) in
let creates =
if created > last_generation then
{table; obj; time= created} :: creates
else
creates
in
let mods =
if modified > last_generation && not (created > last_generation) then
{table; obj; time= modified} :: mods
else
mods
in
{entries with creates; mods; last}
else
entries
in
let prepend_deleted obj stat ({deletes; last; _} as entries) =
let Stat.{created; modified; deleted} = stat in
if Subscription.object_matches subs table obj then
let last = max last (max modified deleted) in
let deletes =
if created <= last_generation then
{table; obj; time= deleted} :: deletes
else
deletes
in
{entries with deletes; last}
else
entries
in
acc
|> Table.fold_over_recent last_generation prepend_recent table_value
|> Table.fold_over_deleted last_generation prepend_deleted table_value

let from_inner __context session subs from from_t timer batching =
let open Xapi_database in
let open From in
Expand All @@ -541,159 +594,118 @@ let from_inner __context session subs from from_t timer batching =
in
List.filter (fun table -> Subscription.table_matches subs table) all
in
let last_generation = ref from in
let last_msg_gen = ref from_t in
let grab_range t =
let grab_range ~since t =
let tableset = Db_cache_types.Database.tableset (Db_ref.get_database t) in
let msg_gen, messages =
if Subscription.table_matches subs "message" then
!Message.get_since_for_events ~__context !last_msg_gen
else
(0L, [])
in
( msg_gen
, messages
, tableset
, List.fold_left
(fun acc table ->
(* Fold over the live objects *)
let acc =
Db_cache_types.Table.fold_over_recent !last_generation
(fun objref {Db_cache_types.Stat.created; modified; deleted} _
(creates, mods, deletes, last) ->
if Subscription.object_matches subs table objref then
let last = max last (max modified deleted) in
(* mtime guaranteed to always be larger than ctime *)
( ( if created > !last_generation then
(table, objref, created) :: creates
else
creates
)
, ( if
modified > !last_generation
&& not (created > !last_generation)
then
(table, objref, modified) :: mods
else
mods
)
, (* Only have a mod event if we don't have a created event *)
deletes
, last
)
else
(creates, mods, deletes, last)
)
(Db_cache_types.TableSet.find table tableset)
acc
in
(* Fold over the deleted objects *)
Db_cache_types.Table.fold_over_deleted !last_generation
(fun objref {Db_cache_types.Stat.created; modified; deleted}
(creates, mods, deletes, last) ->
if Subscription.object_matches subs table objref then
let last = max last (max modified deleted) in
(* mtime guaranteed to always be larger than ctime *)
if created > !last_generation then
(creates, mods, deletes, last)
(* It was created and destroyed since the last update *)
else
(creates, mods, (table, objref, deleted) :: deletes, last)
(* It might have been modified, but we can't tell now *)
else
(creates, mods, deletes, last)
)
(Db_cache_types.TableSet.find table tableset)
acc
)
([], [], [], !last_generation)
tables
)
let events =
let initial = {creates= []; mods= []; deletes= []; last= since} in
let folder = collect_events (subs, tableset, since) in
List.fold_left folder initial tables
in
(msg_gen, messages, tableset, events)
in
(* Each event.from should have an independent subscription record *)
let msg_gen, messages, tableset, (creates, mods, deletes, last) =
let msg_gen, messages, tableset, events =
with_call session subs (fun sub ->
let grab_nonempty_range =
Throttle.Batching.with_recursive_loop batching @@ fun self arg ->
let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
as result
) =
Db_lock.with_lock (fun () -> grab_range (Db_backend.make ()))
Throttle.Batching.with_recursive_loop batching @@ fun self since ->
let result =
Db_lock.with_lock (fun () -> grab_range ~since (Db_backend.make ()))
in
let msg_gen, messages, _tables, events = result in
let {creates; mods; deletes; last} = events in
if
creates = []
&& mods = []
&& deletes = []
&& messages = []
&& not (Clock.Timer.has_expired timer)
then (
last_generation := last ;
(* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
(* cur_id was bumped, but nothing relevent fell out of the database.
Therefore the last ID the client got is equivalent to the current one. *)
sub.cur_id <- last ;
(* last id the client got is equivalent to the current one *)
last_msg_gen := msg_gen ;
wait2 sub last timer ;
(self [@tailcall]) arg
(* The next iteration will fold over events starting after
the last database event that matched a subscription. *)
let next = last in
(self [@tailcall]) next
) else
result
in
grab_nonempty_range ()
grab_nonempty_range from
)
in
last_generation := last ;
let event_of op ?snapshot (table, objref, time) =
let {creates; mods; deletes; last} = events in
let event_of op ?snapshot {table; obj; time} =
{
id= Int64.to_string time
; ts= "0.0"
; ty= String.lowercase_ascii table
; op
; reference= objref
; reference= obj
; snapshot
}
in
let events =
List.fold_left
(fun acc x ->
let ev = event_of `del x in
if Subscription.event_matches subs ev then ev :: acc else acc
)
[] deletes
in
let events =
List.fold_left
(fun acc (table, objref, mtime) ->
let serialiser = Eventgen.find_get_record table in
try
let xml = serialiser ~__context ~self:objref () in
let ev = event_of `_mod ?snapshot:xml (table, objref, mtime) in
if Subscription.event_matches subs ev then ev :: acc else acc
with _ -> acc
)
events mods
let events_of ~kind ?(with_snapshot = true) entries acc =
let rec go events ({table; obj; time= _} as entry) =
try
let snapshot =
let serialiser = Eventgen.find_get_record table in
if with_snapshot then
serialiser ~__context ~self:obj ()
else
None
in
let event = event_of kind ?snapshot entry in
if Subscription.event_matches subs event then
event :: events
else
events
with _ ->
(* CA-91931: An exception may be raised here if an object's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be safe to match specific exceptions to avoid swallowing unexpected exceptions?

lifetime is too short.

The problem is that "collect_events" and "events_of" work
on different versions of the database, so some `add and
`mod events can be lost if the corresponding object is
deleted before a snapshot is taken.

In practice, this has only been seen with the "task"
object - which can be rapidly created and destroyed using
helper functions.

These exceptions have been suppressed since [bc0cc5a9]. *)
events
in
List.fold_left go acc entries
in
let events =
List.fold_left
(fun acc (table, objref, ctime) ->
let serialiser = Eventgen.find_get_record table in
try
let xml = serialiser ~__context ~self:objref () in
let ev = event_of `add ?snapshot:xml (table, objref, ctime) in
if Subscription.event_matches subs ev then ev :: acc else acc
with _ -> acc
)
events creates
[] (* Accumulate the events for objects stored in the database. *)
|> events_of ~kind:`del ~with_snapshot:false deletes
|> events_of ~kind:`_mod mods
|> events_of ~kind:`add creates
in
let events =
(* Messages require a special casing as their contents are not
stored in the database. *)
List.fold_left
(fun acc mev ->
let event =
let table = "message" in
match mev with
| Message.Create (_ref, message) ->
event_of `add
?snapshot:(Some (API.rpc_of_message_t message))
("message", Ref.string_of _ref, 0L)
{table; obj= Ref.string_of _ref; time= 0L}
| Message.Del _ref ->
event_of `del ("message", Ref.string_of _ref, 0L)
event_of `del {table; obj= Ref.string_of _ref; time= 0L}
in
event :: acc
)
Expand Down
Loading