diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index a7412790019..cd38814d7e2 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -525,6 +525,59 @@ let rec next ~__context = else rpc_of_events relevant +type time = Xapi_database.Db_cache_types.Time.t + +type entry = {table: string; obj: string; time: time} + +type acc = { + creates: entry list + ; mods: entry list + ; deletes: entry list + ; last: time +} + +let collect_events (subs, tables, last_generation) acc table = + let open Xapi_database in + let open Db_cache_types in + let table_value = TableSet.find table tables in + let prepend_recent obj stat _ ({creates; mods; last; _} as entries) = + let Stat.{created; modified; deleted} = stat in + if Subscription.object_matches subs table obj then + let last = max last (max modified deleted) in + let creates = + if created > last_generation then + {table; obj; time= created} :: creates + else + creates + in + let mods = + if modified > last_generation && not (created > last_generation) then + {table; obj; time= modified} :: mods + else + mods + in + {entries with creates; mods; last} + else + entries + in + let prepend_deleted obj stat ({deletes; last; _} as entries) = + let Stat.{created; modified; deleted} = stat in + if Subscription.object_matches subs table obj then + let last = max last (max modified deleted) in + let deletes = + if created <= last_generation then + {table; obj; time= deleted} :: deletes + else + deletes + in + {entries with deletes; last} + else + entries + in + acc + |> Table.fold_over_recent last_generation prepend_recent table_value + |> Table.fold_over_deleted last_generation prepend_deleted table_value + let from_inner __context session subs from from_t timer batching = let open Xapi_database in let open From in @@ -541,9 +594,8 @@ let from_inner __context session subs from from_t timer batching = in List.filter (fun table -> Subscription.table_matches subs table) all in - let last_generation = ref from in let last_msg_gen = ref from_t in - let grab_range t = + let grab_range ~since t = let tableset = Db_cache_types.Database.tableset (Db_ref.get_database t) in let msg_gen, messages = if Subscription.table_matches subs "message" then @@ -551,75 +603,23 @@ let from_inner __context session subs from from_t timer batching = else (0L, []) in - ( msg_gen - , messages - , tableset - , List.fold_left - (fun acc table -> - (* Fold over the live objects *) - let acc = - Db_cache_types.Table.fold_over_recent !last_generation - (fun objref {Db_cache_types.Stat.created; modified; deleted} _ - (creates, mods, deletes, last) -> - if Subscription.object_matches subs table objref then - let last = max last (max modified deleted) in - (* mtime guaranteed to always be larger than ctime *) - ( ( if created > !last_generation then - (table, objref, created) :: creates - else - creates - ) - , ( if - modified > !last_generation - && not (created > !last_generation) - then - (table, objref, modified) :: mods - else - mods - ) - , (* Only have a mod event if we don't have a created event *) - deletes - , last - ) - else - (creates, mods, deletes, last) - ) - (Db_cache_types.TableSet.find table tableset) - acc - in - (* Fold over the deleted objects *) - Db_cache_types.Table.fold_over_deleted !last_generation - (fun objref {Db_cache_types.Stat.created; modified; deleted} - (creates, mods, deletes, last) -> - if Subscription.object_matches subs table objref then - let last = max last (max modified deleted) in - (* mtime guaranteed to always be larger than ctime *) - if created > !last_generation then - (creates, mods, deletes, last) - (* It was created and destroyed since the last update *) - else - (creates, mods, (table, objref, deleted) :: deletes, last) - (* It might have been modified, but we can't tell now *) - else - (creates, mods, deletes, last) - ) - (Db_cache_types.TableSet.find table tableset) - acc - ) - ([], [], [], !last_generation) - tables - ) + let events = + let initial = {creates= []; mods= []; deletes= []; last= since} in + let folder = collect_events (subs, tableset, since) in + List.fold_left folder initial tables + in + (msg_gen, messages, tableset, events) in (* Each event.from should have an independent subscription record *) - let msg_gen, messages, tableset, (creates, mods, deletes, last) = + let msg_gen, messages, tableset, events = with_call session subs (fun sub -> let grab_nonempty_range = - Throttle.Batching.with_recursive_loop batching @@ fun self arg -> - let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last)) - as result - ) = - Db_lock.with_lock (fun () -> grab_range (Db_backend.make ())) + Throttle.Batching.with_recursive_loop batching @@ fun self since -> + let result = + Db_lock.with_lock (fun () -> grab_range ~since (Db_backend.make ())) in + let msg_gen, messages, _tables, events = result in + let {creates; mods; deletes; last} = events in if creates = [] && mods = [] @@ -627,73 +627,85 @@ let from_inner __context session subs from from_t timer batching = && messages = [] && not (Clock.Timer.has_expired timer) then ( - last_generation := last ; - (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *) + (* cur_id was bumped, but nothing relevent fell out of the database. + Therefore the last ID the client got is equivalent to the current one. *) sub.cur_id <- last ; - (* last id the client got is equivalent to the current one *) last_msg_gen := msg_gen ; wait2 sub last timer ; - (self [@tailcall]) arg + (* The next iteration will fold over events starting after + the last database event that matched a subscription. *) + let next = last in + (self [@tailcall]) next ) else result in - grab_nonempty_range () + grab_nonempty_range from ) in - last_generation := last ; - let event_of op ?snapshot (table, objref, time) = + let {creates; mods; deletes; last} = events in + let event_of op ?snapshot {table; obj; time} = { id= Int64.to_string time ; ts= "0.0" ; ty= String.lowercase_ascii table ; op - ; reference= objref + ; reference= obj ; snapshot } in - let events = - List.fold_left - (fun acc x -> - let ev = event_of `del x in - if Subscription.event_matches subs ev then ev :: acc else acc - ) - [] deletes - in - let events = - List.fold_left - (fun acc (table, objref, mtime) -> - let serialiser = Eventgen.find_get_record table in - try - let xml = serialiser ~__context ~self:objref () in - let ev = event_of `_mod ?snapshot:xml (table, objref, mtime) in - if Subscription.event_matches subs ev then ev :: acc else acc - with _ -> acc - ) - events mods + let events_of ~kind ?(with_snapshot = true) entries acc = + let rec go events ({table; obj; time= _} as entry) = + try + let snapshot = + let serialiser = Eventgen.find_get_record table in + if with_snapshot then + serialiser ~__context ~self:obj () + else + None + in + let event = event_of kind ?snapshot entry in + if Subscription.event_matches subs event then + event :: events + else + events + with _ -> + (* CA-91931: An exception may be raised here if an object's + lifetime is too short. + + The problem is that "collect_events" and "events_of" work + on different versions of the database, so some `add and + `mod events can be lost if the corresponding object is + deleted before a snapshot is taken. + + In practice, this has only been seen with the "task" + object - which can be rapidly created and destroyed using + helper functions. + + These exceptions have been suppressed since [bc0cc5a9]. *) + events + in + List.fold_left go acc entries in let events = - List.fold_left - (fun acc (table, objref, ctime) -> - let serialiser = Eventgen.find_get_record table in - try - let xml = serialiser ~__context ~self:objref () in - let ev = event_of `add ?snapshot:xml (table, objref, ctime) in - if Subscription.event_matches subs ev then ev :: acc else acc - with _ -> acc - ) - events creates + [] (* Accumulate the events for objects stored in the database. *) + |> events_of ~kind:`del ~with_snapshot:false deletes + |> events_of ~kind:`_mod mods + |> events_of ~kind:`add creates in let events = + (* Messages require a special casing as their contents are not + stored in the database. *) List.fold_left (fun acc mev -> let event = + let table = "message" in match mev with | Message.Create (_ref, message) -> event_of `add ?snapshot:(Some (API.rpc_of_message_t message)) - ("message", Ref.string_of _ref, 0L) + {table; obj= Ref.string_of _ref; time= 0L} | Message.Del _ref -> - event_of `del ("message", Ref.string_of _ref, 0L) + event_of `del {table; obj= Ref.string_of _ref; time= 0L} in event :: acc )