diff --git a/doc/content/xapi/storage/sxm.md b/doc/content/xapi/storage/sxm.md index 6c44e432d22..8b7971bed79 100644 --- a/doc/content/xapi/storage/sxm.md +++ b/doc/content/xapi/storage/sxm.md @@ -2,9 +2,220 @@ Title: Storage migration --- +- [Overview](#overview) +- [SXM Multiplexing](#sxm-multiplexing) + - [Motivation](#motivation) + - [But we have storage\_mux.ml](#but-we-have-storage_muxml) + - [Thought experiments on an alternative design](#thought-experiments-on-an-alternative-design) + - [Design](#design) +- [SMAPIv1 migration](#smapiv1-migration) +- [SMAPIv3 migration](#smapiv3-migration) +- [Error Handling](#error-handling) + - [Preparation (SMAPIv1 and SMAPIv3)](#preparation-smapiv1-and-smapiv3) + - [Snapshot and mirror failure (SMAPIv1)](#snapshot-and-mirror-failure-smapiv1) + - [Mirror failure (SMAPIv3)](#mirror-failure-smapiv3) + - [Copy failure (SMAPIv1)](#copy-failure-smapiv1) +- [SMAPIv1 Migration implementation detail](#smapiv1-migration-implementation-detail) + - [Receiving SXM](#receiving-sxm) + - [Xapi code](#xapi-code) + - [Storage code](#storage-code) + - [Copying a VDI](#copying-a-vdi) + - [Mirroring a VDI](#mirroring-a-vdi) + - [Code walkthrough](#code-walkthrough) + - [DATA.copy](#datacopy) + - [DATA.copy\_into](#datacopy_into) + - [DATA.MIRROR.start](#datamirrorstart) + + ## Overview -{{}} +The core idea of storage migration is surprisingly simple: We have VDIs attached to a VM, +and we wish to migrate these VDIs from one SR to another. This necessarily requires +us to copy the data stored in these VDIs over to the new SR, which can be a long-running +process if there are gigabytes or even terabytes of them. We wish to minimise the +down time of this process to allow the VM to keep running as much as possible. + +At a very high level, the SXM process generally only consists of two stages: preparation +and mirroring. The preparation is about getting the receiving host ready for the +mirroring operation, while the mirroring itself can be further divided into two +more operations: 1. sending new writes to both sides; 2.copying existing data from +source to destination. The exact detail of how to set up a mirror differs significantly +between SMAPIv1 and SMAPIv3, but both of them will have to perform the above two +operations. +Once the mirroring is established, it is a matter of checking the status of the +mirroring and carry on with the follwoing VM migration. + +The reality is more complex than what we had hoped for. For example, in SMAPIv1, +the mirror establishment is quite an involved process and is itself divided into +several stages, which will be discussed in more detail later on. + + +## SXM Multiplexing + +This section is about the design idea behind the additional layer of mutiplexing specifically +for Storage Xen Motion (SXM) from SRs using SMAPIv3. It is recommended that you have read the +[introduction doc](_index.md) for the storage layer first to understand how storage +multiplexing is done between SMAPIv2 and SMAPI{v1, v3} before reading this. + + +### Motivation + +The existing SXM code was designed to work only with SMAPIv1 SRs, and therefore +does not take into account the dramatic difference in the ways SXM is done between +SMAPIv1 and SMAPIv3. The exact difference will be covered later on in this doc, for this section +it is sufficient to assume that they have two ways of doing migration. Therefore, +we need different code paths for migration from SMAPIv1 and SMAPIv3. + +#### But we have storage_mux.ml + +Indeed, storage_mux.ml is responsible for multiplexing and forwarding requests to +the correct storage backend, based on the SR type that the caller specifies. And +in fact, for inbound SXM to SMAPIv3 (i.e. migrating into a SMAPIv3 SR, GFS2 for example), +storage_mux is doing the heavy lifting of multiplexing between different storage +backends. Every time a `Remote.` call is invoked, this will go through the SMAPIv2 +layer to the remote host and get multiplexed on the destination host, based on +whether we are migrating into a SMAPIv1 or SMAPIv3 SR (see the diagram below). +And the inbound SXM is implemented +by implementing the existing SMAPIv2 -> SMAPIv3 calls (see `import_activate` for example) +which may not have been implemented before. + +![mux for inbound](sxm_mux_inbound.svg) + +While this works fine for inbound SXM, it does not work for outbound SXM. A typical SXM +consists of four combinations, the source sr type (v1/v3) and the destiantion sr +type (v1/v3), any of the four combinations is possible. We have already covered the +destination multiplexing (v1/v3) by utilising storage_mux, and at this point we +have run out of multiplexer for multiplexing on the source. In other words, we +can only mutiplex once for each SMAPIv2 call, and we can either use that chance for +either the source or the destination, and we have already used it for the latter. + + +#### Thought experiments on an alternative design + +To make it even more concrete, let us consider an example: the mirroring logic in +SXM is different based on the source SR type of the SXM call. You might imagine +defining a function like `MIRROR.start v3_sr v1_sr` that will be multiplexed +by the storage_mux based on the source SR type, and forwarded to storage_smapiv3_migrate, +or even just xapi-storage-script, which is indeed quite possible. +Now at this point we have already done the multiplexing, but we still wish to +multiplex operations on destination SRs, for example, we might want to attach a +VDI belonging to a SMAPIv1 SR on the remote host. But as we have already done the +multiplexing and is now inside xapi-storage-script, we have lost any chance of doing +any further multiplexing :( + +### Design + +The idea of this new design is to introduce an additional multiplexing layer that +is specific for multiplexing calls based on the source SR type. For example, in +the diagram below the `send_start src_sr dest_sr` will take both the src SR and the +destination SR as parameters, and suppose the mirroring logic is different for different +types of source SRs (i.e. SMAPIv1 or SMAPIv3), the storage migration code will +necessarily choose the right code path based on the source SR type. And this is +exactly what is done in this additional multiplexing layer. The respective logic +for doing {v1,v3}-specifi mirroring, for example, will stay in storage_smapi{v1,v3}_migrate.ml + +![mux for outbound](sxm_mux_outbound.svg) + +Note that later on storage_smapi{v1,v3}_migrate.ml will still have the flexibility +to call remote SMAPIv2 functions, such as `Remote.VDI.attach dest_sr vdi`, and +it will be handled just as before. + +## SMAPIv1 migration + +At a high level, mirror establishment for SMAPIv1 works as follows: + +1. Take a snapshot of a VDI that is attached to VM1. This gives us an immutable +copy of the current state of the VDI, with all the data until the point we took +the snapshot. This is illustrated in the diagram as a VDI and its snapshot connecting +to a shared parent, which stores the shared content for the snapshot and the writable +VDI from which we took the snapshot (snapshot) +2. Mirror the writable VDI to the server hosts: this means that all writes that goes to the +client VDI will also be written to the mirrored VDI on the remote host (mirror) +3. Copy the immutable snapshot from our local host to the remote (copy) +4. Compose the mirror and the snapshot to form a single VDI +5. Destroy the snapshot on the local host (cleanup) + + +more detail to come... + +## SMAPIv3 migration + +More detail to come... + +## Error Handling + +Storage migration is a long-running process, and is prone to failures in each +step. Hence it is important specifying what errors could be raised at each step +and their significance. This is beneficial both for the user and for triaging. + +There are two general cleanup functions in SXM: `MIRROR.receive_cancel` and +`MIRROR.stop`. The former is for cleaning up whatever has been created by `MIRROR.receive_start` +on the destination host (such as VDIs for receiving mirrored data). The latter is +a more comprehensive function that attempts to "undo" all the side effects that +was done during the SXM, and also calls `receive_cancel` as part of its operations. + +Currently error handling was done by building up a list of cleanup functions in +the `on_fail` list ref as the function executes. For example, if the `receive_start` +has been completed successfully, add `receive_cancel` to the list of cleanup functions. +And whenever an exception is encountered, just execute whatever has been added +to the `on_fail` list ref. This is convenient, but does entangle all the error +handling logic with the core SXM logic itself, making the code rather than hard +to understand and maintain. + +The idea to fix this is to introduce explicit "stages" during the SXM and define +explicitly what error handling should be done if it fails at a certain stage. This +helps separate the error handling logic into the `with` part of a `try with` block, +which is where they are supposed to be. Since we need to accommodate the existing +SMAPIv1 migration (which has more stages than SMAPIv3), the following stages are +introduced: preparation (v1,v3), snapshot(v1), mirror(v1, v3), copy(v1). Note that +each stage also roughly corresponds to a helper function that is called within `MIRROR.start`, +which is the wrapper function that initiates storage migration. And each helper +functions themselves would also have error handling logic within themselves as +needed (e.g. see `Storage_smapiv1_migrate.receive_start) to deal with exceptions +that happen within each helper functions. + +### Preparation (SMAPIv1 and SMAPIv3) + +The preparation stage generally corresponds to what is done in `receive_start`, and +this function itself will handle exceptions when there are partial failures within +the function itself, such as an exception after the receiving VDI is created. +It will use the old-style `on_fail` function but only with a limited scope. + +There is nothing to be done at a higher level (i.e within `MIRROR.start` which +calls `receive_start`) if preparation has failed. + +### Snapshot and mirror failure (SMAPIv1) + +For SMAPIv1, the mirror is done in a bit cumbersome way. The end goal is to establish +connections between two tapdisk processes on the source and destination hosts. +To achieve this goal, xapi will do two main jobs: 1. create a connection between two +hosts and pass the connection to tapdisk; 2. create a snapshot as a starting point +of the mirroring process. + +Therefore handling of failures at these two stages are similar: clean up what was +done in the preparation stage by calling `receive_cancel`, and that is almost it. +Again, we will leave whatever is needed for partial failure handling within those +functions themselves and only clean up at a stage-level in `storage_migrate.ml` + +Note that `receive_cancel` is a multiplexed function for SMAPIv1 and SMAPIv3, which +means different clean up logic will be executed depending on what type of SR we +are migrating from. + +### Mirror failure (SMAPIv3) + +To be filled... + +### Copy failure (SMAPIv1) + +The final step of storage migration for SMAPIv1 is to copy the snapshot from the +source to the destination. At this stage, most of the side effectful work has been +done, so we do need to call `MIRROR.stop` to clean things up if we experience an +failure during copying. + + +## SMAPIv1 Migration implementation detail + +```mermaid sequenceDiagram participant local_tapdisk as local tapdisk participant local_smapiv2 as local SMAPIv2 @@ -129,7 +340,7 @@ opt post_detach_hook end Note over xapi: memory image migration by xenopsd Note over xapi: destroy the VM record -{{< /mermaid >}} +``` ### Receiving SXM @@ -162,7 +373,7 @@ the receiving end of storage motion: This is how xapi coordinates storage migration. We'll do it as a code walkthrough through the two layers: xapi and storage-in-xapi (SMAPIv2). -## Xapi code +### Xapi code The entry point is in [xapi_vm_migration.ml](https://github.com/xapi-project/xen-api/blob/f75d51e7a3eff89d952330ec1a739df85a2895e2/ocaml/xapi/xapi_vm_migrate.ml#L786) @@ -1056,7 +1267,7 @@ We also try to remove the VM record from the destination if we managed to send i Finally we check for mirror failure in the task - this is set by the events thread watching for events from the storage layer, in [storage_access.ml](https://github.com/xapi-project/xen-api/blob/f75d51e7a3eff89d952330ec1a739df85a2895e2/ocaml/xapi/storage_access.ml#L1169-L1207) -## Storage code +### Storage code The part of the code that is conceptually in the storage layer, but physically in xapi, is located in [storage_migrate.ml](https://github.com/xapi-project/xen-api/blob/f75d51e7a3eff89d952330ec1a739df85a2895e2/ocaml/xapi/storage_migrate.ml). There are logically a few separate parts to this file: @@ -1069,7 +1280,7 @@ The part of the code that is conceptually in the storage layer, but physically i Let's start by considering the way the storage APIs are intended to be used. -### Copying a VDI +#### Copying a VDI `DATA.copy` takes several parameters: @@ -1119,7 +1330,7 @@ The implementation uses the `url` parameter to make SMAPIv2 calls to the destina The implementation tries to minimize the amount of data copied by looking for related VDIs on the destination SR. See below for more details. -### Mirroring a VDI +#### Mirroring a VDI `DATA.MIRROR.start` takes a similar set of parameters to that of copy: @@ -1156,11 +1367,11 @@ Note that state is a list since the initial phase of the operation requires both Additionally the mirror can be cancelled using the `MIRROR.stop` API call. -### Code walkthrough +#### Code walkthrough let's go through the implementation of `copy`: -#### DATA.copy +##### DATA.copy ```ocaml let copy ~task ~dbg ~sr ~vdi ~dp ~url ~dest = @@ -1296,7 +1507,7 @@ Finally we snapshot the remote VDI to ensure we've got a VDI of type 'snapshot' The exception handler does nothing - so we leak remote VDIs if the exception happens after we've done our cloning :-( -#### DATA.copy_into +##### DATA.copy_into Let's now look at the data-copying part. This is common code shared between `VDI.copy`, `VDI.copy_into` and `MIRROR.start` and hence has some duplication of the calls made above. @@ -1467,7 +1678,7 @@ The last thing we do is to set the local and remote content_id. The local set_co Here we perform the list of cleanup operations. Theoretically. It seems we don't ever actually set this to anything, so this is dead code. -#### DATA.MIRROR.start +##### DATA.MIRROR.start ```ocaml let start' ~task ~dbg ~sr ~vdi ~dp ~url ~dest = @@ -1765,3 +1976,4 @@ let pre_deactivate_hook ~dbg ~dp ~sr ~vdi = s.failed <- true ) ``` + diff --git a/doc/content/xapi/storage/sxm_mux_inbound.svg b/doc/content/xapi/storage/sxm_mux_inbound.svg new file mode 100644 index 00000000000..c38bc36ae5f --- /dev/null +++ b/doc/content/xapi/storage/sxm_mux_inbound.svg @@ -0,0 +1,4 @@ + + + +
Storage_migrate.start
Host A
Host B
v1
v3
storage_mux server
Remote.VDI.attach
storage_mux server
rpc
\ No newline at end of file diff --git a/doc/content/xapi/storage/sxm_mux_outbound.svg b/doc/content/xapi/storage/sxm_mux_outbound.svg new file mode 100644 index 00000000000..915cc7550e3 --- /dev/null +++ b/doc/content/xapi/storage/sxm_mux_outbound.svg @@ -0,0 +1,4 @@ + + + +
Storage_migrate.start
Host A
Host B
storage_mux server
Remote.VDI.attach
vdi dst_sr
MIRROR.send_start
src_sr dst_sr
storage_smapiv1_migrate.
send_start
storage_smapiv3_migrate.
send_start
RPC to host B
....
new multiplexing layer
mux based on src_sr
\ No newline at end of file diff --git a/ocaml/xapi-idl/storage/storage_interface.ml b/ocaml/xapi-idl/storage/storage_interface.ml index 311c9f2dfdf..b98047bd610 100644 --- a/ocaml/xapi-idl/storage/storage_interface.ml +++ b/ocaml/xapi-idl/storage/storage_interface.ml @@ -362,6 +362,19 @@ module Errors = struct | No_storage_plugin_for_sr of string | Content_ids_do_not_match of (string * string) | Missing_configuration_parameter of string + (* raised when preparing the environment for SXM, for example, when the dest + host creates VDIs for data mirroring (SMAPIv1 and v3) *) + | Migration_preparation_failure of string + (* happens when passing fds to tapdisks for mirroring (SMAPIv1 only) *) + | Migration_mirror_fd_failure of string + (* raised when taking a snapshot as the base image before copying it over to + the destination (SMAPIv1 only) *) + | Migration_mirror_snapshot_failure of string + (* mirror_copy_failure: raised when copying of the base image fails (SMAPIv1 only) *) + | Migration_mirror_copy_failure of string + (* mirror_failure: raised when there is any issues that causes the mirror to crash + during SXM (SMAPIv3 only, v1 uses more specific errors as above) *) + | Migration_mirror_failure of string | Internal_error of string | Unknown_error [@@default Unknown_error] [@@deriving rpcty] diff --git a/ocaml/xapi/storage_migrate.ml b/ocaml/xapi/storage_migrate.ml index 706f73891a0..ae3344d788b 100644 --- a/ocaml/xapi/storage_migrate.ml +++ b/ocaml/xapi/storage_migrate.ml @@ -18,389 +18,178 @@ open D module Listext = Xapi_stdext_std.Listext open Xapi_stdext_pervasives.Pervasiveext module Unixext = Xapi_stdext_unix.Unixext -open Xmlrpc_client open Storage_interface open Storage_task open Storage_migrate_helper -let tapdisk_of_attach_info (backend : Storage_interface.backend) = - let _, blockdevices, _, nbds = - Storage_interface.implementations_of_backend backend - in - match (blockdevices, nbds) with - | blockdevice :: _, _ -> ( - let path = blockdevice.Storage_interface.path in - try - match Tapctl.of_device (Tapctl.create ()) path with - | tapdev, _, _ -> - Some tapdev - with - | Tapctl.Not_blktap -> - debug "Device %s is not controlled by blktap" path ; - None - | Tapctl.Not_a_device -> - debug "%s is not a device" path ; - None - | _ -> - debug "Device %s has an unknown driver" path ; - None - ) - | _, nbd :: _ -> ( - try - let path, _ = Storage_interface.parse_nbd_uri nbd in - let filename = Unix.realpath path |> Filename.basename in - Scanf.sscanf filename "nbd%d.%d" (fun pid minor -> - Some (Tapctl.tapdev_of ~pid ~minor) - ) - with _ -> - debug "No tapdisk found for NBD backend: %s" nbd.Storage_interface.uri ; - None - ) - | _ -> - debug "No tapdisk found for backend: %s" - (Storage_interface.(rpc_of backend) backend |> Rpc.to_string) ; - None - -let with_activated_disk ~dbg ~sr ~vdi ~dp ~vm f = - let attached_vdi = - Option.map - (fun vdi -> - let backend = Local.VDI.attach3 dbg dp sr vdi vm false in - (vdi, backend) - ) - vdi - in - finally - (fun () -> - let path_and_nbd = - Option.map - (fun (vdi, backend) -> - let _xendisks, blockdevs, files, nbds = - Storage_interface.implementations_of_backend backend - in - match (files, blockdevs, nbds) with - | {path} :: _, _, _ | _, {path} :: _, _ -> - Local.VDI.activate3 dbg dp sr vdi vm ; - (path, false) - | _, _, nbd :: _ -> - Local.VDI.activate3 dbg dp sr vdi vm ; - let unix_socket_path, export_name = - Storage_interface.parse_nbd_uri nbd - in - ( Attach_helpers.NbdClient.start_nbd_client ~unix_socket_path - ~export_name - , true - ) - | [], [], [] -> - raise - (Storage_interface.Storage_error - (Backend_error - ( Api_errors.internal_error - , [ - "No File, BlockDevice or Nbd implementation in \ - Datapath.attach response: " - ^ (Storage_interface.(rpc_of backend) backend - |> Jsonrpc.to_string - ) - ] - ) - ) - ) - ) - attached_vdi - in - finally - (fun () -> f (Option.map (function path, _ -> path) path_and_nbd)) - (fun () -> - Option.iter - (function - | path, true -> - Attach_helpers.NbdClient.stop_nbd_client ~nbd_device:path - | _ -> - () - ) - path_and_nbd ; - Option.iter (fun vdi -> Local.VDI.deactivate dbg dp sr vdi vm) vdi - ) - ) - (fun () -> - Option.iter - (fun (vdi, _) -> Local.VDI.detach dbg dp sr vdi vm) - attached_vdi - ) +module type SMAPIv2_MIRROR = Storage_interface.MIRROR -let perform_cleanup_actions = - List.iter (fun f -> - try f () - with e -> - error "Caught %s while performing cleanup actions" (Printexc.to_string e) - ) +let s_of_sr = Storage_interface.Sr.string_of -let progress_callback start len t y = - let new_progress = start +. (y *. len) in - Storage_task.set_state t (Task.Pending new_progress) ; - signal (Storage_task.id_of_handle t) +let choose_backend dbg sr = + debug "%s dbg: %s choosing backend for sr :%s" __FUNCTION__ dbg (s_of_sr sr) ; + match Storage_mux_reg.smapi_version_of_sr sr with + | SMAPIv1 -> + (module Storage_smapiv1_migrate.MIRROR : SMAPIv2_MIRROR) + | SMAPIv3 -> + (module Storage_smapiv3_migrate.MIRROR : SMAPIv2_MIRROR) + | SMAPIv2 -> + (* this should never happen *) + failwith "unsupported SMAPI version smapiv2" -(** This module [MigrateLocal] consists of the concrete implementations of the -migration part of SMAPI. Functions inside this module are sender driven, which means -they tend to be executed on the sender side. although there is not a hard rule -on what is executed on the sender side, this provides some heuristics. *) -module MigrateLocal = struct - (** [copy_into_vdi] is similar to [copy_into_sr] but requires a [dest_vdi] parameter *) - let copy_into_vdi ~task ~dbg ~sr ~vdi ~vm ~url ~dest ~dest_vdi ~verify_dest = - let (module Remote) = get_remote_backend url verify_dest in - debug "copy local=%s/%s url=%s remote=%s/%s verify_dest=%B" - (Storage_interface.Sr.string_of sr) - (Storage_interface.Vdi.string_of vdi) - url - (Storage_interface.Sr.string_of dest) - (Storage_interface.Vdi.string_of dest_vdi) - verify_dest ; - (* Check the remote SR exists *) - let srs = Remote.SR.list dbg in - if not (List.mem dest srs) then - failwith - (Printf.sprintf "Remote SR %s not found" - (Storage_interface.Sr.string_of dest) - ) ; - let vdis = Remote.SR.scan dbg dest in - let remote_vdi = - try List.find (fun x -> x.vdi = dest_vdi) vdis - with Not_found -> - failwith - (Printf.sprintf "Remote VDI %s not found" - (Storage_interface.Vdi.string_of dest_vdi) - ) - in - let dest_content_id = remote_vdi.content_id in - (* Find the local VDI *) - let vdis = Local.SR.scan dbg sr in - let local_vdi = - try List.find (fun x -> x.vdi = vdi) vdis - with Not_found -> - failwith - (Printf.sprintf "Local VDI %s not found" - (Storage_interface.Vdi.string_of vdi) - ) - in - debug "copy local content_id=%s" local_vdi.content_id ; - debug "copy remote content_id=%s" dest_content_id ; - if local_vdi.virtual_size > remote_vdi.virtual_size then ( - (* This should never happen provided the higher-level logic is working properly *) - error "copy local virtual_size=%Ld > remote virtual_size = %Ld" - local_vdi.virtual_size remote_vdi.virtual_size ; - failwith "local VDI is larger than the remote VDI" - ) ; +(** module [MigrateRemote] is similar to [MigrateLocal], but most of these functions +tend to be executed on the receiver side. *) +module MigrateRemote = struct + let receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm = let on_fail : (unit -> unit) list ref = ref [] in - let base_vdi = - try - let x = - (List.find (fun x -> x.content_id = dest_content_id) vdis).vdi - in - debug - "local VDI has content_id = %s; we will perform an incremental copy" - dest_content_id ; - Some x - with _ -> - debug "no local VDI has content_id = %s; we will perform a full copy" - dest_content_id ; - None - in + let vdis = Local.SR.scan dbg sr in + (* We drop cbt_metadata VDIs that do not have any actual data *) + let vdis = List.filter (fun vdi -> vdi.ty <> "cbt_metadata") vdis in + let leaf_dp = Local.DP.create dbg Uuidx.(to_string (make ())) in try - let remote_dp = Uuidx.(to_string (make ())) in - let base_dp = Uuidx.(to_string (make ())) in - let leaf_dp = Uuidx.(to_string (make ())) in - let dest_vdi_url = - let url' = Http.Url.of_string url in - Http.Url.set_uri url' - (Printf.sprintf "%s/nbdproxy/%s/%s/%s/%s" (Http.Url.get_uri url') - (Storage_interface.Vm.string_of vm) - (Storage_interface.Sr.string_of dest) - (Storage_interface.Vdi.string_of dest_vdi) - remote_dp + let vdi_info = {vdi_info with sm_config= [("base_mirror", id)]} in + let leaf = Local.VDI.create dbg sr vdi_info in + info "Created leaf VDI for mirror receive: %s" (string_of_vdi_info leaf) ; + on_fail := (fun () -> Local.VDI.destroy dbg sr leaf.vdi) :: !on_fail ; + (* dummy VDI is created so that the leaf VDI becomes a differencing disk, + useful for calling VDI.compose later on *) + let dummy = Local.VDI.snapshot dbg sr leaf in + on_fail := (fun () -> Local.VDI.destroy dbg sr dummy.vdi) :: !on_fail ; + debug "%s Created dummy snapshot for mirror receive: %s" __FUNCTION__ + (string_of_vdi_info dummy) ; + let _ : backend = Local.VDI.attach3 dbg leaf_dp sr leaf.vdi vm true in + Local.VDI.activate3 dbg leaf_dp sr leaf.vdi vm ; + let nearest = + List.fold_left + (fun acc content_id -> + match acc with + | Some _ -> + acc + | None -> ( + try + Some + (List.find + (fun vdi -> + vdi.content_id = content_id + && vdi.virtual_size <= vdi_info.virtual_size + ) + vdis + ) + with Not_found -> None + ) ) - |> Http.Url.to_string + None similar + in + debug "Nearest VDI: content_id=%s vdi=%s" + (Option.fold ~none:"None" ~some:(fun x -> x.content_id) nearest) + (Option.fold ~none:"None" + ~some:(fun x -> Storage_interface.Vdi.string_of x.vdi) + nearest + ) ; + let parent = + match nearest with + | Some vdi -> + debug "Cloning VDI" ; + let vdi = add_to_sm_config vdi "base_mirror" id in + let vdi_clone = Local.VDI.clone dbg sr vdi in + debug "Clone: %s" (Storage_interface.Vdi.string_of vdi_clone.vdi) ; + ( if vdi_clone.virtual_size <> vdi_info.virtual_size then + let new_size = + Local.VDI.resize dbg sr vdi_clone.vdi vdi_info.virtual_size + in + debug "Resize local clone VDI to %Ld: result %Ld" + vdi_info.virtual_size new_size + ) ; + vdi_clone + | None -> + debug "Creating a blank remote VDI" ; + Local.VDI.create dbg sr vdi_info in - debug "%s copy remote NBD URL = %s" __FUNCTION__ dest_vdi_url ; - let id = State.copy_id_of (sr, vdi) in - debug "Persisting state for copy (id=%s)" id ; + debug "Parent disk content_id=%s" parent.content_id ; State.add id State.( - Copy_op - Copy_state. + Recv_op + Receive_state. { - base_dp + sr + ; dummy_vdi= dummy.vdi + ; leaf_vdi= leaf.vdi ; leaf_dp - ; remote_dp - ; dest_sr= dest - ; copy_vdi= remote_vdi.vdi - ; remote_url= url - ; verify_dest + ; parent_vdi= parent.vdi + ; remote_vdi= vdi_info.vdi + ; mirror_vm= vm } ) ; - SXM.info "%s: copy initiated local_vdi:%s dest_vdi:%s" __FUNCTION__ - (Storage_interface.Vdi.string_of vdi) - (Storage_interface.Vdi.string_of dest_vdi) ; - finally - (fun () -> - debug "activating RW datapath %s on remote" remote_dp ; - let backend = - Remote.VDI.attach3 dbg remote_dp dest dest_vdi vm true - in - let _, _, _, nbds = - Storage_interface.implementations_of_backend backend - in - let proto = - match nbds with - | [] -> - None - | uri :: _ -> - let _socket, export = Storage_interface.parse_nbd_uri uri in - Some (`NBD export) - in - Remote.VDI.activate3 dbg remote_dp dest dest_vdi vm ; - with_activated_disk ~dbg ~sr ~vdi:base_vdi ~dp:base_dp ~vm - (fun base_path -> - with_activated_disk ~dbg ~sr ~vdi:(Some vdi) ~dp:leaf_dp ~vm - (fun src -> - let verify_cert = - if verify_dest then Stunnel_client.pool () else None - in - let dd = - Sparse_dd_wrapper.start - ~progress_cb:(progress_callback 0.05 0.9 task) - ~verify_cert ~proto ?base:base_path true (Option.get src) - dest_vdi_url remote_vdi.virtual_size - in - Storage_task.with_cancel task - (fun () -> Sparse_dd_wrapper.cancel dd) - (fun () -> - try Sparse_dd_wrapper.wait dd - with Sparse_dd_wrapper.Cancelled -> - Storage_task.raise_cancelled task - ) - ) - ) - ) - (fun () -> - Remote.DP.destroy dbg remote_dp false ; - State.remove_copy id - ) ; - SXM.info "%s: copy complete for local_vdi:%s dest_vdi:%s" __FUNCTION__ - (Storage_interface.Vdi.string_of vdi) - (Storage_interface.Vdi.string_of dest_vdi) ; - debug "setting remote content_id <- %s" local_vdi.content_id ; - Remote.VDI.set_content_id dbg dest dest_vdi local_vdi.content_id ; - (* PR-1255: XXX: this is useful because we don't have content_ids by default *) - debug "setting local content_id <- %s" local_vdi.content_id ; - Local.VDI.set_content_id dbg sr local_vdi.vdi local_vdi.content_id ; - Some (Vdi_info remote_vdi) + let nearest_content_id = Option.map (fun x -> x.content_id) nearest in + Mirror.Vhd_mirror + { + Mirror.mirror_vdi= leaf + ; mirror_datapath= leaf_dp + ; copy_diffs_from= nearest_content_id + ; copy_diffs_to= parent.vdi + ; dummy_vdi= dummy.vdi + } with e -> - error "Caught %s: performing cleanup actions" (Printexc.to_string e) ; - perform_cleanup_actions !on_fail ; + List.iter + (fun op -> + try op () + with e -> + debug "Caught exception in on_fail: %s" (Printexc.to_string e) + ) + !on_fail ; raise e - (** [copy_into_sr] does not requires a dest vdi to be provided, instead, it will - find the nearest vdi on the [dest] sr, and if there is no such vdi, it will - create one. *) - let copy_into_sr ~task ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest = - debug "copy sr:%s vdi:%s url:%s dest:%s verify_dest:%B" - (Storage_interface.Sr.string_of sr) - (Storage_interface.Vdi.string_of vdi) - url - (Storage_interface.Sr.string_of dest) - verify_dest ; - let (module Remote) = get_remote_backend url verify_dest in - (* Find the local VDI *) - try - let vdis = Local.SR.scan dbg sr in - let local_vdi = - try List.find (fun x -> x.vdi = vdi) vdis - with Not_found -> failwith (Printf.sprintf "Local VDI not found") - in - try - let similar_vdis = Local.VDI.similar_content dbg sr vdi in - let similars = List.map (fun vdi -> vdi.content_id) similar_vdis in - debug "Similar VDIs = [ %s ]" - (String.concat "; " - (List.map - (fun x -> - Printf.sprintf "(vdi=%s,content_id=%s)" - (Storage_interface.Vdi.string_of x.vdi) - x.content_id - ) - similar_vdis - ) - ) ; - let remote_vdis = Remote.SR.scan dbg dest in - (* We drop cbt_metadata VDIs that do not have any actual data *) - let remote_vdis = - List.filter (fun vdi -> vdi.ty <> "cbt_metadata") remote_vdis - in - let nearest = - List.fold_left - (fun acc content_id -> - match acc with - | Some _ -> - acc - | None -> ( - try - Some - (List.find - (fun vdi -> - vdi.content_id = content_id - && vdi.virtual_size <= local_vdi.virtual_size - ) - remote_vdis - ) - with Not_found -> None - ) - ) - None similars - in - debug "Nearest VDI: content_id=%s vdi=%s" - (Option.fold ~none:"None" ~some:(fun x -> x.content_id) nearest) - (Option.fold ~none:"None" - ~some:(fun x -> Storage_interface.Vdi.string_of x.vdi) - nearest - ) ; - let remote_base = - match nearest with - | Some vdi -> - debug "Cloning VDI" ; - let vdi_clone = Remote.VDI.clone dbg dest vdi in - debug "Clone: %s" (Storage_interface.Vdi.string_of vdi_clone.vdi) ; - ( if vdi_clone.virtual_size <> local_vdi.virtual_size then - let new_size = - Remote.VDI.resize dbg dest vdi_clone.vdi - local_vdi.virtual_size - in - debug "Resize remote clone VDI to %Ld: result %Ld" - local_vdi.virtual_size new_size - ) ; - vdi_clone - | None -> - debug "Creating a blank remote VDI" ; - Remote.VDI.create dbg dest {local_vdi with sm_config= []} - in - let remote_copy = - copy_into_vdi ~task ~dbg ~sr ~vdi ~vm ~url ~dest - ~dest_vdi:remote_base.vdi ~verify_dest - |> vdi_info - in - let snapshot = Remote.VDI.snapshot dbg dest remote_copy in - Remote.VDI.destroy dbg dest remote_copy.vdi ; - Some (Vdi_info snapshot) - with e -> - error "Caught %s: copying snapshots vdi" (Printexc.to_string e) ; - raise (Storage_error (Internal_error (Printexc.to_string e))) - with - | Storage_error (Backend_error (code, params)) - | Api_errors.Server_error (code, params) -> - raise (Storage_error (Backend_error (code, params))) - | e -> - raise (Storage_error (Internal_error (Printexc.to_string e))) + let receive_start ~dbg ~sr ~vdi_info ~id ~similar = + receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm:(Vm.of_string "0") + + let receive_start2 ~dbg ~sr ~vdi_info ~id ~similar ~vm = + receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm + + let receive_finalize ~dbg ~id = + let recv_state = State.find_active_receive_mirror id in + let open State.Receive_state in + Option.iter (fun r -> Local.DP.destroy dbg r.leaf_dp false) recv_state ; + State.remove_receive_mirror id + + let receive_finalize2 ~dbg ~id = + let recv_state = State.find_active_receive_mirror id in + let open State.Receive_state in + Option.iter + (fun r -> + SXM.info + "%s Mirror done. Compose on the dest sr %s parent %s and leaf %s" + __FUNCTION__ (Sr.string_of r.sr) + (Vdi.string_of r.parent_vdi) + (Vdi.string_of r.leaf_vdi) ; + Local.DP.destroy2 dbg r.leaf_dp r.sr r.leaf_vdi r.mirror_vm false ; + Local.VDI.compose dbg r.sr r.parent_vdi r.leaf_vdi ; + (* On SMAPIv3, compose would have removed the now invalid dummy vdi, so + there is no need to destroy it anymore, while this is necessary on SMAPIv1 SRs. *) + log_and_ignore_exn (fun () -> Local.VDI.destroy dbg r.sr r.dummy_vdi) ; + Local.VDI.remove_from_sm_config dbg r.sr r.leaf_vdi "base_mirror" + ) + recv_state ; + State.remove_receive_mirror id + + let receive_cancel ~dbg ~id = + let receive_state = State.find_active_receive_mirror id in + let open State.Receive_state in + Option.iter + (fun r -> + log_and_ignore_exn (fun () -> Local.DP.destroy dbg r.leaf_dp false) ; + List.iter + (fun v -> log_and_ignore_exn (fun () -> Local.VDI.destroy dbg r.sr v)) + [r.dummy_vdi; r.leaf_vdi; r.parent_vdi] + ) + receive_state ; + State.remove_receive_mirror id +end +(** This module [MigrateLocal] consists of the concrete implementations of the +migration part of SMAPI. Functions inside this module are sender driven, which means +they tend to be executed on the sender side. although there is not a hard rule +on what is executed on the sender side, this provides some heuristics. *) +module MigrateLocal = struct let stop_internal ~dbg ~id = (* Find the local VDI *) let alm = State.find_active_local_mirror id in @@ -458,8 +247,22 @@ module MigrateLocal = struct | e -> raise e - let start ~task ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest ~verify_dest - = + let prepare ~dbg ~sr ~vdi ~dest ~local_vdi ~mirror_id ~mirror_vm ~url + ~verify_dest = + try + let (module Remote) = get_remote_backend url verify_dest in + let similars = similar_vdis ~dbg ~sr ~vdi in + + Remote.DATA.MIRROR.receive_start2 dbg dest local_vdi mirror_id similars + mirror_vm + with e -> + error "%s Caught error %s while preparing for SXM" __FUNCTION__ + (Printexc.to_string e) ; + raise + (Storage_error (Migration_preparation_failure (Printexc.to_string e))) + + let start ~task_id ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest + ~verify_dest = SXM.info "%s sr:%s vdi:%s dp: %s mirror_vm: %s copy_vm: %s url:%s dest:%s \ verify_dest:%B" @@ -473,14 +276,9 @@ module MigrateLocal = struct (Storage_interface.Sr.string_of dest) verify_dest ; - let remote_url = Http.Url.of_string url in let (module Remote) = get_remote_backend url verify_dest in (* Find the local VDI *) - let vdis = Local.SR.scan dbg sr in - let local_vdi = - try List.find (fun x -> x.vdi = vdi) vdis - with Not_found -> failwith "Local VDI not found" - in + let local_vdi = find_local_vdi ~dbg ~sr ~vdi in let mirror_id = State.mirror_id_of (sr, local_vdi.vdi) in debug "%s: Adding to active local mirrors before sending: id=%s" __FUNCTION__ mirror_id ; @@ -500,176 +298,34 @@ module MigrateLocal = struct State.add mirror_id (State.Send_op alm) ; debug "%s Added mirror %s to active local mirrors" __FUNCTION__ mirror_id ; (* A list of cleanup actions to perform if the operation should fail. *) - let on_fail : (unit -> unit) list ref = ref [] in + let (module Migrate_Backend) = choose_backend dbg sr in try - let similar_vdis = Local.VDI.similar_content dbg sr vdi in - let similars = - List.filter - (fun x -> x <> "") - (List.map (fun vdi -> vdi.content_id) similar_vdis) - in - debug "Similar VDIs to = [ %s ]" - (String.concat "; " - (List.map - (fun x -> - Printf.sprintf "(vdi=%s,content_id=%s)" - (Storage_interface.Vdi.string_of x.vdi) - x.content_id - ) - similar_vdis - ) - ) ; - let (Mirror.Vhd_mirror result) = - Remote.DATA.MIRROR.receive_start2 dbg dest local_vdi mirror_id similars - mirror_vm - in - (* Enable mirroring on the local machine *) - let mirror_dp = result.Mirror.mirror_datapath in - let uri = - Printf.sprintf "/services/SM/nbd/%s/%s/%s/%s" - (Storage_interface.Vm.string_of mirror_vm) - (Storage_interface.Sr.string_of dest) - (Storage_interface.Vdi.string_of result.Mirror.mirror_vdi.vdi) - mirror_dp - in - debug "%s: uri of http request for mirroring is %s" __FUNCTION__ uri ; - let dest_url = Http.Url.set_uri remote_url uri in - let request = - Http.Request.make - ~query:(Http.Url.get_query_params dest_url) - ~version:"1.0" ~user_agent:"smapiv2" Http.Put uri - in - let verify_cert = if verify_dest then Stunnel_client.pool () else None in - let transport = Xmlrpc_client.transport_of_url ~verify_cert dest_url in - debug "Searching for data path: %s" dp ; - let attach_info = Local.DP.attach_info dbg sr vdi dp mirror_vm in - on_fail := - (fun () -> Remote.DATA.MIRROR.receive_cancel dbg mirror_id) :: !on_fail ; - let tapdev = - match tapdisk_of_attach_info attach_info with - | Some tapdev -> - let pid = Tapctl.get_tapdisk_pid tapdev in - let path = - Printf.sprintf "/var/run/blktap-control/nbdclient%d" pid - in - with_transport ~stunnel_wait_disconnect:false transport - (with_http request (fun (_response, s) -> - let control_fd = - Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 - in - finally - (fun () -> - Unix.connect control_fd (Unix.ADDR_UNIX path) ; - let msg = dp in - let len = String.length msg in - let written = - Unixext.send_fd_substring control_fd msg 0 len [] s - in - if written <> len then ( - error "Failed to transfer fd to %s" path ; - failwith "Internal error transferring fd to tapdisk" - ) - ) - (fun () -> Unix.close control_fd) - ) - ) ; - tapdev - | None -> - failwith "Not attached" - in - debug "%s Updating active local mirrors: id=%s" __FUNCTION__ mirror_id ; - let alm = - State.Send_state. - { - url - ; dest_sr= dest - ; remote_info= - Some - { - dp= mirror_dp - ; vdi= result.Mirror.mirror_vdi.vdi - ; url - ; verify_dest - } - ; local_dp= dp - ; tapdev= Some tapdev - ; failed= false - ; watchdog= None - } - in - - State.add mirror_id (State.Send_op alm) ; - debug "%s Updated mirror_id %s in the active local mirror" __FUNCTION__ - mirror_id ; - - SXM.info "%s About to snapshot VDI = %s" __FUNCTION__ - (string_of_vdi_info local_vdi) ; - let local_vdi = add_to_sm_config local_vdi "mirror" ("nbd:" ^ dp) in - let local_vdi = add_to_sm_config local_vdi "base_mirror" mirror_id in - let snapshot = - try Local.VDI.snapshot dbg sr local_vdi with - | Storage_interface.Storage_error (Backend_error (code, _)) - when code = "SR_BACKEND_FAILURE_44" -> - raise - (Api_errors.Server_error - ( Api_errors.sr_source_space_insufficient - , [Storage_interface.Sr.string_of sr] - ) - ) - | e -> - raise e - in - SXM.info "%s: snapshot created, mirror initiated vdi:%s snapshot_of:%s" - __FUNCTION__ - (Storage_interface.Vdi.string_of snapshot.vdi) - (Storage_interface.Vdi.string_of local_vdi.vdi) ; - on_fail := (fun () -> Local.VDI.destroy dbg sr snapshot.vdi) :: !on_fail ; - (let rec inner () = - let alm_opt = State.find_active_local_mirror mirror_id in - match alm_opt with - | Some alm -> - let stats = Tapctl.stats (Tapctl.create ()) tapdev in - if stats.Tapctl.Stats.nbd_mirror_failed = 1 then ( - error "Tapdisk mirroring has failed" ; - Updates.add (Dynamic.Mirror mirror_id) updates - ) ; - alm.State.Send_state.watchdog <- - Some - (Scheduler.one_shot scheduler (Scheduler.Delta 5) - "tapdisk_watchdog" inner - ) - | None -> - () - in - inner () - ) ; - on_fail := (fun () -> stop ~dbg ~id:mirror_id) :: !on_fail ; - (* Copy the snapshot to the remote *) - let new_parent = - Storage_task.with_subtask task "copy" (fun () -> - copy_into_vdi ~task ~dbg ~sr ~vdi:snapshot.vdi ~vm:copy_vm ~url - ~dest ~dest_vdi:result.Mirror.copy_diffs_to ~verify_dest - ) - |> vdi_info + let remote_mirror = + prepare ~dbg ~sr ~vdi ~dest ~local_vdi ~mirror_id ~mirror_vm ~url + ~verify_dest in - debug "Local VDI %s = remote VDI %s" - (Storage_interface.Vdi.string_of snapshot.vdi) - (Storage_interface.Vdi.string_of new_parent.vdi) ; - debug "Local VDI %s now mirrored to remote VDI: %s" - (Storage_interface.Vdi.string_of local_vdi.vdi) - (Storage_interface.Vdi.string_of result.Mirror.mirror_vdi.vdi) ; - debug "Destroying snapshot on src" ; - Local.VDI.destroy dbg sr snapshot.vdi ; + Migrate_Backend.send_start () ~dbg ~task_id ~dp ~sr ~vdi ~mirror_vm + ~mirror_id ~local_vdi ~copy_vm ~live_vm:(Vm.of_string "0") ~url + ~remote_mirror ~dest_sr:dest ~verify_dest ; Some (Mirror_id mirror_id) with | Storage_error (Sr_not_attached sr_uuid) -> error " Caught exception %s:%s. Performing cleanup." Api_errors.sr_not_attached sr_uuid ; - perform_cleanup_actions !on_fail ; raise (Api_errors.Server_error (Api_errors.sr_not_attached, [sr_uuid])) + | ( Storage_error (Migration_mirror_fd_failure reason) + | Storage_error (Migration_mirror_snapshot_failure reason) ) as e -> + error "%s: Caught %s: during storage migration preparation" __FUNCTION__ + reason ; + MigrateRemote.receive_cancel ~dbg ~id:mirror_id ; + raise e + | Storage_error (Migration_mirror_copy_failure reason) as e -> + error "%s: Caught %s: during storage migration copy" __FUNCTION__ reason ; + stop ~dbg ~id:mirror_id ; + raise e | e -> - error "Caught %s: performing cleanup actions" (Api_errors.to_string e) ; - perform_cleanup_actions !on_fail ; + error "Caught %s during SXM: " (Api_errors.to_string e) ; + stop ~dbg ~id:mirror_id ; raise e let stat ~dbg:_ ~id = @@ -785,154 +441,6 @@ module MigrateLocal = struct State.clear () end -(** module [MigrateRemote] is similar to [MigrateLocal], but most of these functions -tend to be executed on the receiver side. *) -module MigrateRemote = struct - let receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm = - let on_fail : (unit -> unit) list ref = ref [] in - let vdis = Local.SR.scan dbg sr in - (* We drop cbt_metadata VDIs that do not have any actual data *) - let vdis = List.filter (fun vdi -> vdi.ty <> "cbt_metadata") vdis in - let leaf_dp = Local.DP.create dbg Uuidx.(to_string (make ())) in - try - let vdi_info = {vdi_info with sm_config= [("base_mirror", id)]} in - let leaf = Local.VDI.create dbg sr vdi_info in - info "Created leaf VDI for mirror receive: %s" (string_of_vdi_info leaf) ; - on_fail := (fun () -> Local.VDI.destroy dbg sr leaf.vdi) :: !on_fail ; - (* dummy VDI is created so that the leaf VDI becomes a differencing disk, - useful for calling VDI.compose later on *) - let dummy = Local.VDI.snapshot dbg sr leaf in - on_fail := (fun () -> Local.VDI.destroy dbg sr dummy.vdi) :: !on_fail ; - debug "%s Created dummy snapshot for mirror receive: %s" __FUNCTION__ - (string_of_vdi_info dummy) ; - let _ : backend = Local.VDI.attach3 dbg leaf_dp sr leaf.vdi vm true in - Local.VDI.activate3 dbg leaf_dp sr leaf.vdi vm ; - let nearest = - List.fold_left - (fun acc content_id -> - match acc with - | Some _ -> - acc - | None -> ( - try - Some - (List.find - (fun vdi -> - vdi.content_id = content_id - && vdi.virtual_size <= vdi_info.virtual_size - ) - vdis - ) - with Not_found -> None - ) - ) - None similar - in - debug "Nearest VDI: content_id=%s vdi=%s" - (Option.fold ~none:"None" ~some:(fun x -> x.content_id) nearest) - (Option.fold ~none:"None" - ~some:(fun x -> Storage_interface.Vdi.string_of x.vdi) - nearest - ) ; - let parent = - match nearest with - | Some vdi -> - debug "Cloning VDI" ; - let vdi = add_to_sm_config vdi "base_mirror" id in - let vdi_clone = Local.VDI.clone dbg sr vdi in - debug "Clone: %s" (Storage_interface.Vdi.string_of vdi_clone.vdi) ; - ( if vdi_clone.virtual_size <> vdi_info.virtual_size then - let new_size = - Local.VDI.resize dbg sr vdi_clone.vdi vdi_info.virtual_size - in - debug "Resize local clone VDI to %Ld: result %Ld" - vdi_info.virtual_size new_size - ) ; - vdi_clone - | None -> - debug "Creating a blank remote VDI" ; - Local.VDI.create dbg sr vdi_info - in - debug "Parent disk content_id=%s" parent.content_id ; - State.add id - State.( - Recv_op - Receive_state. - { - sr - ; dummy_vdi= dummy.vdi - ; leaf_vdi= leaf.vdi - ; leaf_dp - ; parent_vdi= parent.vdi - ; remote_vdi= vdi_info.vdi - ; mirror_vm= vm - } - ) ; - let nearest_content_id = Option.map (fun x -> x.content_id) nearest in - Mirror.Vhd_mirror - { - Mirror.mirror_vdi= leaf - ; mirror_datapath= leaf_dp - ; copy_diffs_from= nearest_content_id - ; copy_diffs_to= parent.vdi - ; dummy_vdi= dummy.vdi - } - with e -> - List.iter - (fun op -> - try op () - with e -> - debug "Caught exception in on_fail: %s" (Printexc.to_string e) - ) - !on_fail ; - raise e - - let receive_start ~dbg ~sr ~vdi_info ~id ~similar = - receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm:(Vm.of_string "0") - - let receive_start2 ~dbg ~sr ~vdi_info ~id ~similar ~vm = - receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm - - let receive_finalize ~dbg ~id = - let recv_state = State.find_active_receive_mirror id in - let open State.Receive_state in - Option.iter (fun r -> Local.DP.destroy dbg r.leaf_dp false) recv_state ; - State.remove_receive_mirror id - - let receive_finalize2 ~dbg ~id = - let recv_state = State.find_active_receive_mirror id in - let open State.Receive_state in - Option.iter - (fun r -> - SXM.info - "%s Mirror done. Compose on the dest sr %s parent %s and leaf %s" - __FUNCTION__ (Sr.string_of r.sr) - (Vdi.string_of r.parent_vdi) - (Vdi.string_of r.leaf_vdi) ; - Local.DP.destroy2 dbg r.leaf_dp r.sr r.leaf_vdi r.mirror_vm false ; - Local.VDI.compose dbg r.sr r.parent_vdi r.leaf_vdi ; - (* On SMAPIv3, compose would have removed the now invalid dummy vdi, so - there is no need to destroy it anymore, while this is necessary on SMAPIv1 SRs. *) - log_and_ignore_exn (fun () -> Local.VDI.destroy dbg r.sr r.dummy_vdi) ; - Local.VDI.remove_from_sm_config dbg r.sr r.leaf_vdi "base_mirror" - ) - recv_state ; - State.remove_receive_mirror id - - let receive_cancel ~dbg ~id = - let receive_state = State.find_active_receive_mirror id in - let open State.Receive_state in - Option.iter - (fun r -> - log_and_ignore_exn (fun () -> Local.DP.destroy dbg r.leaf_dp false) ; - List.iter - (fun v -> log_and_ignore_exn (fun () -> Local.VDI.destroy dbg r.sr v)) - [r.dummy_vdi; r.leaf_vdi; r.parent_vdi] - ) - receive_state ; - State.remove_receive_mirror id -end - exception Timeout of Mtime.Span.t let reqs_outstanding_timeout = Mtime.Span.(150 * s) @@ -1096,25 +604,21 @@ let with_task_and_thread ~dbg f = let copy ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest = with_task_and_thread ~dbg (fun task -> - MigrateLocal.copy_into_sr ~task ~dbg:dbg.Debug_info.log ~sr ~vdi ~vm ~url - ~dest ~verify_dest + Storage_smapiv1_migrate.Copy.copy_into_sr ~task ~dbg:dbg.Debug_info.log + ~sr ~vdi ~vm ~url ~dest ~verify_dest ) let start ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest ~verify_dest = with_dbg ~name:__FUNCTION__ ~dbg @@ fun dbg -> with_task_and_thread ~dbg (fun task -> - MigrateLocal.start ~task ~dbg:dbg.Debug_info.log ~sr ~vdi ~dp ~mirror_vm - ~copy_vm ~url ~dest ~verify_dest + MigrateLocal.start + ~task_id:(Storage_task.id_of_handle task) + ~dbg:dbg.Debug_info.log ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest + ~verify_dest ) (* XXX: PR-1255: copy the xenopsd 'raise Exception' pattern *) -let stop ~dbg ~id = - try MigrateLocal.stop ~dbg ~id with - | Storage_error (Backend_error (code, params)) - | Api_errors.Server_error (code, params) -> - raise (Storage_error (Backend_error (code, params))) - | e -> - raise e +let stop = MigrateLocal.stop let list = MigrateLocal.list diff --git a/ocaml/xapi/storage_migrate_helper.ml b/ocaml/xapi/storage_migrate_helper.ml index e924c208d8f..66c23d9a04e 100644 --- a/ocaml/xapi/storage_migrate_helper.ml +++ b/ocaml/xapi/storage_migrate_helper.ml @@ -345,3 +345,40 @@ let get_remote_backend url verify_dest = Storage_utils.rpc ~srcstr:"smapiv2" ~dststr:"dst_smapiv2" remote_url end)) in (module Remote : SMAPIv2) + +let find_local_vdi ~dbg ~sr ~vdi = + (* Find the local VDI *) + let vdis, _ = Local.SR.scan2 dbg sr in + match List.find_opt (fun x -> x.vdi = vdi) vdis with + | None -> + failwith "Local VDI not found" + | Some v -> + v + +(** [similar_vdis dbg sr vdi] returns a list of content_ids of vdis + which are similar to the input [vdi] in [sr] *) +let similar_vdis ~dbg ~sr ~vdi = + let similar_vdis = Local.VDI.similar_content dbg sr vdi in + let similars = + List.filter_map + (function + | {content_id; _} when content_id = "" -> + None + | {content_id; _} -> + Some content_id + ) + similar_vdis + in + + D.debug "%s Similar VDIs to = [ %s ]" __FUNCTION__ + (String.concat "; " + (List.map + (fun x -> + Printf.sprintf "(vdi=%s,content_id=%s)" + (Storage_interface.Vdi.string_of x.vdi) + x.content_id + ) + similar_vdis + ) + ) ; + similars diff --git a/ocaml/xapi/storage_migrate_helper.mli b/ocaml/xapi/storage_migrate_helper.mli index 8ac0da552e2..972faf57ce6 100644 --- a/ocaml/xapi/storage_migrate_helper.mli +++ b/ocaml/xapi/storage_migrate_helper.mli @@ -14,6 +14,8 @@ module SXM : Debug.DEBUG +open Storage_interface + val failwith_fmt : ('a, unit, string, 'b) format4 -> 'a module State : sig @@ -258,3 +260,7 @@ end module Local : SMAPIv2 val get_remote_backend : string -> bool -> (module SMAPIv2) + +val find_local_vdi : dbg:string -> sr:sr -> vdi:vdi -> vdi_info + +val similar_vdis : dbg:string -> sr:sr -> vdi:vdi -> uuid list diff --git a/ocaml/xapi/storage_smapiv1_migrate.ml b/ocaml/xapi/storage_smapiv1_migrate.ml index 83dd41d4972..b38231dad5b 100644 --- a/ocaml/xapi/storage_smapiv1_migrate.ml +++ b/ocaml/xapi/storage_smapiv1_migrate.ml @@ -15,19 +15,582 @@ module D = Debug.Make (struct let name = "storage_smapiv1_migrate" end) module Unixext = Xapi_stdext_unix.Unixext +open Xapi_stdext_pervasives.Pervasiveext open Storage_interface +open Xmlrpc_client open Storage_migrate_helper +open Storage_task module State = Storage_migrate_helper.State module SXM = Storage_migrate_helper.SXM module type SMAPIv2_MIRROR = Storage_interface.MIRROR +let with_activated_disk ~dbg ~sr ~vdi ~dp ~vm f = + let attached_vdi = + Option.map + (fun vdi -> + let backend = Local.VDI.attach3 dbg dp sr vdi vm false in + (vdi, backend) + ) + vdi + in + finally + (fun () -> + let path_and_nbd = + Option.map + (fun (vdi, backend) -> + let _xendisks, blockdevs, files, nbds = + Storage_interface.implementations_of_backend backend + in + match (files, blockdevs, nbds) with + | {path} :: _, _, _ | _, {path} :: _, _ -> + Local.VDI.activate3 dbg dp sr vdi vm ; + (path, false) + | _, _, nbd :: _ -> + Local.VDI.activate3 dbg dp sr vdi vm ; + let unix_socket_path, export_name = + Storage_interface.parse_nbd_uri nbd + in + ( Attach_helpers.NbdClient.start_nbd_client ~unix_socket_path + ~export_name + , true + ) + | [], [], [] -> + raise + (Storage_interface.Storage_error + (Backend_error + ( Api_errors.internal_error + , [ + "No File, BlockDevice or Nbd implementation in \ + Datapath.attach response: " + ^ (Storage_interface.(rpc_of backend) backend + |> Jsonrpc.to_string + ) + ] + ) + ) + ) + ) + attached_vdi + in + finally + (fun () -> f (Option.map (function path, _ -> path) path_and_nbd)) + (fun () -> + Option.iter + (function + | path, true -> + Attach_helpers.NbdClient.stop_nbd_client ~nbd_device:path + | _ -> + () + ) + path_and_nbd ; + Option.iter (fun vdi -> Local.VDI.deactivate dbg dp sr vdi vm) vdi + ) + ) + (fun () -> + Option.iter + (fun (vdi, _) -> Local.VDI.detach dbg dp sr vdi vm) + attached_vdi + ) + +let tapdisk_of_attach_info (backend : Storage_interface.backend) = + let _, blockdevices, _, nbds = + Storage_interface.implementations_of_backend backend + in + match (blockdevices, nbds) with + | blockdevice :: _, _ -> ( + let path = blockdevice.Storage_interface.path in + try + match Tapctl.of_device (Tapctl.create ()) path with + | tapdev, _, _ -> + Some tapdev + with + | Tapctl.Not_blktap -> + D.debug "Device %s is not controlled by blktap" path ; + None + | Tapctl.Not_a_device -> + D.debug "%s is not a device" path ; + None + | _ -> + D.debug "Device %s has an unknown driver" path ; + None + ) + | _, nbd :: _ -> ( + try + let path, _ = Storage_interface.parse_nbd_uri nbd in + let filename = Unix.realpath path |> Filename.basename in + Scanf.sscanf filename "nbd%d.%d" (fun pid minor -> + Some (Tapctl.tapdev_of ~pid ~minor) + ) + with _ -> + D.debug "No tapdisk found for NBD backend: %s" nbd.Storage_interface.uri ; + None + ) + | _ -> + D.debug "No tapdisk found for backend: %s" + (Storage_interface.(rpc_of backend) backend |> Rpc.to_string) ; + None + +let progress_callback start len t y = + let new_progress = start +. (y *. len) in + Storage_task.set_state t (Task.Pending new_progress) ; + signal (Storage_task.id_of_handle t) + +let perform_cleanup_actions = + List.iter (fun f -> + try f () + with e -> + D.error "Caught %s while performing cleanup actions" + (Printexc.to_string e) + ) + +module Copy = struct + (** [copy_into_vdi] is similar to [copy_into_sr] but requires a [dest_vdi] parameter *) + let copy_into_vdi ~task ~dbg ~sr ~vdi ~vm ~url ~dest ~dest_vdi ~verify_dest = + let (module Remote) = get_remote_backend url verify_dest in + D.debug "copy local=%s/%s url=%s remote=%s/%s verify_dest=%B" + (Storage_interface.Sr.string_of sr) + (Storage_interface.Vdi.string_of vdi) + url + (Storage_interface.Sr.string_of dest) + (Storage_interface.Vdi.string_of dest_vdi) + verify_dest ; + (* Check the remote SR exists *) + let srs = Remote.SR.list dbg in + if not (List.mem dest srs) then + failwith + (Printf.sprintf "Remote SR %s not found" + (Storage_interface.Sr.string_of dest) + ) ; + let vdis = Remote.SR.scan dbg dest in + let remote_vdi = + try List.find (fun x -> x.vdi = dest_vdi) vdis + with Not_found -> + failwith + (Printf.sprintf "Remote VDI %s not found" + (Storage_interface.Vdi.string_of dest_vdi) + ) + in + let dest_content_id = remote_vdi.content_id in + (* Find the local VDI *) + let vdis = Local.SR.scan dbg sr in + let local_vdi = + try List.find (fun x -> x.vdi = vdi) vdis + with Not_found -> + failwith + (Printf.sprintf "Local VDI %s not found" + (Storage_interface.Vdi.string_of vdi) + ) + in + D.debug "copy local content_id=%s" local_vdi.content_id ; + D.debug "copy remote content_id=%s" dest_content_id ; + if local_vdi.virtual_size > remote_vdi.virtual_size then ( + (* This should never happen provided the higher-level logic is working properly *) + D.error "copy local virtual_size=%Ld > remote virtual_size = %Ld" + local_vdi.virtual_size remote_vdi.virtual_size ; + failwith "local VDI is larger than the remote VDI" + ) ; + let on_fail : (unit -> unit) list ref = ref [] in + let base_vdi = + try + let x = + (List.find (fun x -> x.content_id = dest_content_id) vdis).vdi + in + D.debug + "local VDI has content_id = %s; we will perform an incremental copy" + dest_content_id ; + Some x + with _ -> + D.debug "no local VDI has content_id = %s; we will perform a full copy" + dest_content_id ; + None + in + try + let remote_dp = Uuidx.(to_string (make ())) in + let base_dp = Uuidx.(to_string (make ())) in + let leaf_dp = Uuidx.(to_string (make ())) in + let dest_vdi_url = + let url' = Http.Url.of_string url in + Http.Url.set_uri url' + (Printf.sprintf "%s/nbdproxy/%s/%s/%s/%s" (Http.Url.get_uri url') + (Storage_interface.Vm.string_of vm) + (Storage_interface.Sr.string_of dest) + (Storage_interface.Vdi.string_of dest_vdi) + remote_dp + ) + |> Http.Url.to_string + in + D.debug "%s copy remote NBD URL = %s" __FUNCTION__ dest_vdi_url ; + let id = State.copy_id_of (sr, vdi) in + D.debug "Persisting state for copy (id=%s)" id ; + State.add id + State.( + Copy_op + Copy_state. + { + base_dp + ; leaf_dp + ; remote_dp + ; dest_sr= dest + ; copy_vdi= remote_vdi.vdi + ; remote_url= url + ; verify_dest + } + ) ; + SXM.info "%s: copy initiated local_vdi:%s dest_vdi:%s" __FUNCTION__ + (Storage_interface.Vdi.string_of vdi) + (Storage_interface.Vdi.string_of dest_vdi) ; + finally + (fun () -> + D.debug "activating RW datapath %s on remote" remote_dp ; + let backend = + Remote.VDI.attach3 dbg remote_dp dest dest_vdi vm true + in + let _, _, _, nbds = + Storage_interface.implementations_of_backend backend + in + let proto = + match nbds with + | [] -> + None + | uri :: _ -> + let _socket, export = Storage_interface.parse_nbd_uri uri in + Some (`NBD export) + in + Remote.VDI.activate3 dbg remote_dp dest dest_vdi vm ; + with_activated_disk ~dbg ~sr ~vdi:base_vdi ~dp:base_dp ~vm + (fun base_path -> + with_activated_disk ~dbg ~sr ~vdi:(Some vdi) ~dp:leaf_dp ~vm + (fun src -> + let verify_cert = + if verify_dest then Stunnel_client.pool () else None + in + let dd = + Sparse_dd_wrapper.start + ~progress_cb:(progress_callback 0.05 0.9 task) + ~verify_cert ~proto ?base:base_path true (Option.get src) + dest_vdi_url remote_vdi.virtual_size + in + Storage_task.with_cancel task + (fun () -> Sparse_dd_wrapper.cancel dd) + (fun () -> + try Sparse_dd_wrapper.wait dd + with Sparse_dd_wrapper.Cancelled -> + Storage_task.raise_cancelled task + ) + ) + ) + ) + (fun () -> + Remote.DP.destroy dbg remote_dp false ; + State.remove_copy id + ) ; + SXM.info "%s: copy complete for local_vdi:%s dest_vdi:%s" __FUNCTION__ + (Storage_interface.Vdi.string_of vdi) + (Storage_interface.Vdi.string_of dest_vdi) ; + D.debug "setting remote content_id <- %s" local_vdi.content_id ; + Remote.VDI.set_content_id dbg dest dest_vdi local_vdi.content_id ; + (* PR-1255: XXX: this is useful because we don't have content_ids by default *) + D.debug "setting local content_id <- %s" local_vdi.content_id ; + Local.VDI.set_content_id dbg sr local_vdi.vdi local_vdi.content_id ; + Some (Vdi_info remote_vdi) + with e -> + D.error "Caught %s: performing cleanup actions" (Printexc.to_string e) ; + perform_cleanup_actions !on_fail ; + raise e + + (** [copy_into_sr] does not requires a dest vdi to be provided, instead, it will + find the nearest vdi on the [dest] sr, and if there is no such vdi, it will + create one. *) + let copy_into_sr ~task ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest = + D.debug "copy sr:%s vdi:%s url:%s dest:%s verify_dest:%B" + (Storage_interface.Sr.string_of sr) + (Storage_interface.Vdi.string_of vdi) + url + (Storage_interface.Sr.string_of dest) + verify_dest ; + let (module Remote) = get_remote_backend url verify_dest in + (* Find the local VDI *) + try + let vdis = Local.SR.scan dbg sr in + let local_vdi = + try List.find (fun x -> x.vdi = vdi) vdis + with Not_found -> failwith (Printf.sprintf "Local VDI not found") + in + try + let similar_vdis = Local.VDI.similar_content dbg sr vdi in + let similars = List.map (fun vdi -> vdi.content_id) similar_vdis in + D.debug "Similar VDIs = [ %s ]" + (String.concat "; " + (List.map + (fun x -> + Printf.sprintf "(vdi=%s,content_id=%s)" + (Storage_interface.Vdi.string_of x.vdi) + x.content_id + ) + similar_vdis + ) + ) ; + let remote_vdis = Remote.SR.scan dbg dest in + (* We drop cbt_metadata VDIs that do not have any actual data *) + let remote_vdis = + List.filter (fun vdi -> vdi.ty <> "cbt_metadata") remote_vdis + in + let nearest = + List.fold_left + (fun acc content_id -> + match acc with + | Some _ -> + acc + | None -> ( + try + Some + (List.find + (fun vdi -> + vdi.content_id = content_id + && vdi.virtual_size <= local_vdi.virtual_size + ) + remote_vdis + ) + with Not_found -> None + ) + ) + None similars + in + D.debug "Nearest VDI: content_id=%s vdi=%s" + (Option.fold ~none:"None" ~some:(fun x -> x.content_id) nearest) + (Option.fold ~none:"None" + ~some:(fun x -> Storage_interface.Vdi.string_of x.vdi) + nearest + ) ; + let remote_base = + match nearest with + | Some vdi -> + D.debug "Cloning VDI" ; + let vdi_clone = Remote.VDI.clone dbg dest vdi in + D.debug "Clone: %s" (Storage_interface.Vdi.string_of vdi_clone.vdi) ; + ( if vdi_clone.virtual_size <> local_vdi.virtual_size then + let new_size = + Remote.VDI.resize dbg dest vdi_clone.vdi + local_vdi.virtual_size + in + D.debug "Resize remote clone VDI to %Ld: result %Ld" + local_vdi.virtual_size new_size + ) ; + vdi_clone + | None -> + D.debug "Creating a blank remote VDI" ; + Remote.VDI.create dbg dest {local_vdi with sm_config= []} + in + let remote_copy = + copy_into_vdi ~task ~dbg ~sr ~vdi ~vm ~url ~dest + ~dest_vdi:remote_base.vdi ~verify_dest + |> vdi_info + in + let snapshot = Remote.VDI.snapshot dbg dest remote_copy in + Remote.VDI.destroy dbg dest remote_copy.vdi ; + Some (Vdi_info snapshot) + with e -> + D.error "Caught %s: copying snapshots vdi" (Printexc.to_string e) ; + raise (Storage_error (Internal_error (Printexc.to_string e))) + with + | Storage_error (Backend_error (code, params)) + | Api_errors.Server_error (code, params) -> + raise (Storage_error (Backend_error (code, params))) + | e -> + raise (Storage_error (Internal_error (Printexc.to_string e))) +end + +let mirror_pass_fds ~dbg ~dp ~sr ~vdi ~mirror_vm ~mirror_id ~url ~dest_sr + ~verify_dest ~(remote_mirror : Mirror.mirror_receive_result_vhd_t) = + let remote_vdi = remote_mirror.mirror_vdi.vdi in + let mirror_dp = remote_mirror.mirror_datapath in + + let uri = + Printf.sprintf "/services/SM/nbd/%s/%s/%s/%s" + (Storage_interface.Vm.string_of mirror_vm) + (Storage_interface.Sr.string_of dest_sr) + (Storage_interface.Vdi.string_of remote_vdi) + mirror_dp + in + D.debug "%s: uri of http request for mirroring is %s" __FUNCTION__ uri ; + let dest_url = Http.Url.set_uri (Http.Url.of_string url) uri in + D.debug "%s url of http request for mirroring is %s" __FUNCTION__ + (Http.Url.to_string dest_url) ; + let request = + Http.Request.make + ~query:(Http.Url.get_query_params dest_url) + ~version:"1.0" ~user_agent:"smapiv2" Http.Put uri + in + let verify_cert = if verify_dest then Stunnel_client.pool () else None in + let transport = Xmlrpc_client.transport_of_url ~verify_cert dest_url in + D.debug "Searching for data path: %s" dp ; + let attach_info = Local.DP.attach_info dbg sr vdi dp mirror_vm in + + let tapdev = + match tapdisk_of_attach_info attach_info with + | Some tapdev -> + let pid = Tapctl.get_tapdisk_pid tapdev in + let path = Printf.sprintf "/var/run/blktap-control/nbdclient%d" pid in + with_transport ~stunnel_wait_disconnect:false transport + (with_http request (fun (_response, s) -> + (* Enable mirroring on the local machine *) + let control_fd = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in + finally + (fun () -> + Unix.connect control_fd (Unix.ADDR_UNIX path) ; + let msg = dp in + let len = String.length msg in + let written = + Unixext.send_fd_substring control_fd msg 0 len [] s + in + if written <> len then ( + D.error "Failed to transfer fd to %s" path ; + failwith "Internal error transferring fd to tapdisk" + ) + ) + (fun () -> Unix.close control_fd) + ) + ) ; + tapdev + | None -> + D.error "%s: vdi %s not attached" __FUNCTION__ (Vdi.string_of vdi) ; + raise + (Storage_interface.Storage_error + (Migration_mirror_fd_failure "VDI Not Attached") + ) + | exception e -> + D.error "%s Caught exception %s:. Performing cleanup." __FUNCTION__ + (Printexc.to_string e) ; + raise + (Storage_interface.Storage_error + (Migration_mirror_fd_failure (Printexc.to_string e)) + ) + in + D.debug "%s Updating active local mirrors: id=%s" __FUNCTION__ mirror_id ; + let alm = + State.Send_state. + { + url + ; dest_sr + ; remote_info= + Some + { + dp= remote_mirror.mirror_datapath + ; vdi= remote_mirror.mirror_vdi.vdi + ; url + ; verify_dest + } + ; local_dp= dp + ; tapdev= Some tapdev + ; failed= false + ; watchdog= None + } + in + State.add mirror_id (State.Send_op alm) ; + D.debug "%s Updated mirror_id %s in the active local mirror" __FUNCTION__ + mirror_id ; + tapdev + +let mirror_snapshot ~dbg ~sr ~dp ~mirror_id ~local_vdi = + SXM.info "%s About to snapshot VDI = %s" __FUNCTION__ + (string_of_vdi_info local_vdi) ; + let local_vdi = add_to_sm_config local_vdi "mirror" ("nbd:" ^ dp) in + let local_vdi = add_to_sm_config local_vdi "base_mirror" mirror_id in + let snapshot = + try Local.VDI.snapshot dbg sr local_vdi with + | Storage_interface.Storage_error (Backend_error (code, _)) + when code = "SR_BACKEND_FAILURE_44" -> + raise + (Storage_interface.Storage_error + (Migration_mirror_snapshot_failure + (Printf.sprintf "%s:%s" Api_errors.sr_source_space_insufficient + (Storage_interface.Sr.string_of sr) + ) + ) + ) + | e -> + raise + (Storage_interface.Storage_error + (Migration_mirror_snapshot_failure (Printexc.to_string e)) + ) + in + + SXM.info "%s: snapshot created, mirror initiated vdi:%s snapshot_of:%s" + __FUNCTION__ + (Storage_interface.Vdi.string_of snapshot.vdi) + (Storage_interface.Vdi.string_of local_vdi.vdi) ; + + snapshot + +let mirror_checker mirror_id tapdev = + let rec inner () = + let alm_opt = State.find_active_local_mirror mirror_id in + match alm_opt with + | Some alm -> + let stats = Tapctl.stats (Tapctl.create ()) tapdev in + if stats.Tapctl.Stats.nbd_mirror_failed = 1 then ( + D.error "Tapdisk mirroring has failed" ; + Updates.add (Dynamic.Mirror mirror_id) updates + ) ; + alm.State.Send_state.watchdog <- + Some + (Scheduler.one_shot scheduler (Scheduler.Delta 5) "tapdisk_watchdog" + inner + ) + | None -> + () + in + inner () + +let mirror_copy ~task ~dbg ~sr ~snapshot ~copy_vm ~url ~dest_sr ~remote_mirror + ~verify_dest = + (* Copy the snapshot to the remote *) + try + Storage_task.with_subtask task "copy" (fun () -> + Copy.copy_into_vdi ~task ~dbg ~sr ~vdi:snapshot.vdi ~vm:copy_vm ~url + ~dest:dest_sr ~dest_vdi:remote_mirror.Mirror.copy_diffs_to + ~verify_dest + ) + |> vdi_info + with e -> + raise (Storage_error (Migration_mirror_copy_failure (Printexc.to_string e))) + +let mirror_cleanup ~dbg ~sr ~snapshot = + D.debug "Destroying snapshot on src" ; + Local.VDI.destroy dbg sr snapshot.vdi + module MIRROR : SMAPIv2_MIRROR = struct type context = unit - let u x = raise Storage_interface.(Storage_error (Errors.Unimplemented x)) + let send_start _ctx ~dbg ~task_id ~dp ~sr ~vdi ~mirror_vm ~mirror_id + ~local_vdi ~copy_vm ~live_vm:_ ~url ~remote_mirror ~dest_sr ~verify_dest = + let (module Remote) = + Storage_migrate_helper.get_remote_backend url verify_dest + in + match remote_mirror with + | Mirror.Vhd_mirror mirror_res -> + let tapdev = + mirror_pass_fds ~dbg ~dp ~sr ~vdi ~mirror_vm ~mirror_id ~url ~dest_sr + ~verify_dest ~remote_mirror:mirror_res + in + + let snapshot = mirror_snapshot ~dbg ~sr ~dp ~mirror_id ~local_vdi in + + mirror_checker mirror_id tapdev ; + let task = Storage_task.(handle_of_id tasks) task_id in + let new_parent = + mirror_copy ~task ~dbg ~sr ~snapshot ~copy_vm ~url ~dest_sr + ~remote_mirror:mirror_res ~verify_dest + in - let send_start _ctx = u __FUNCTION__ + D.debug "Local VDI %s = remote VDI %s" + (Storage_interface.Vdi.string_of snapshot.vdi) + (Storage_interface.Vdi.string_of new_parent.vdi) ; + D.debug "Local VDI %s now mirrored to remote VDI: %s" + (Storage_interface.Vdi.string_of local_vdi.vdi) + (Storage_interface.Vdi.string_of mirror_res.Mirror.mirror_vdi.vdi) ; + mirror_cleanup ~dbg ~sr ~snapshot let receive_start_common ~dbg ~sr ~vdi_info ~id ~similar ~vm = let on_fail : (unit -> unit) list ref = ref [] in diff --git a/ocaml/xapi/storage_smapiv1_migrate.mli b/ocaml/xapi/storage_smapiv1_migrate.mli index d47b82cd86c..4c40e2ab999 100644 --- a/ocaml/xapi/storage_smapiv1_migrate.mli +++ b/ocaml/xapi/storage_smapiv1_migrate.mli @@ -14,4 +14,75 @@ module type SMAPIv2_MIRROR = Storage_interface.MIRROR +val with_activated_disk : + dbg:string + -> sr:Storage_interface.sr + -> vdi:Storage_interface.vdi option + -> dp:string + -> vm:Storage_interface.vm + -> (string option -> 'a) + -> 'a + +val tapdisk_of_attach_info : Storage_interface.backend -> Tapctl.tapdev option + +module Copy : sig + val copy_into_vdi : + task:Storage_task.Storage_task.task_handle + -> dbg:string + -> sr:Storage_interface.sr + -> vdi:Storage_interface.vdi + -> vm:Storage_interface.vm + -> url:string + -> dest:Storage_interface.sr + -> dest_vdi:Storage_interface.vdi + -> verify_dest:bool + -> Storage_interface.async_result_t option + + val copy_into_sr : + task:Storage_task.Storage_task.task_handle + -> dbg:string + -> sr:Storage_interface.sr + -> vdi:Storage_interface.vdi + -> vm:Storage_interface.vm + -> url:string + -> dest:Storage_interface.sr + -> verify_dest:bool + -> Storage_interface.async_result_t option +end + +val mirror_pass_fds : + dbg:string + -> dp:string + -> sr:Storage_interface.sr + -> vdi:Storage_interface.vdi + -> mirror_vm:Storage_interface.vm + -> mirror_id:string + -> url:string + -> dest_sr:Storage_interface.sr + -> verify_dest:bool + -> remote_mirror:Storage_interface.Mirror.mirror_receive_result_vhd_t + -> Tapctl.tapdev + +val mirror_snapshot : + dbg:string + -> sr:Storage_interface.sr + -> dp:string + -> mirror_id:string + -> local_vdi:Storage_interface.vdi_info + -> Storage_interface.vdi_info + +val mirror_checker : string -> Tapctl.tapdev -> unit + +val mirror_copy : + task:Storage_task.Storage_task.task_handle + -> dbg:string + -> sr:Storage_interface.sr + -> snapshot:Storage_interface.vdi_info + -> copy_vm:Storage_interface.vm + -> url:string + -> dest_sr:Storage_interface.sr + -> remote_mirror:Storage_interface.Mirror.mirror_receive_result_vhd_t + -> verify_dest:bool + -> Storage_interface.vdi_info + module MIRROR : SMAPIv2_MIRROR diff --git a/ocaml/xapi/storage_smapiv1_wrapper.ml b/ocaml/xapi/storage_smapiv1_wrapper.ml index 8a9b053f509..569f4f33bb0 100644 --- a/ocaml/xapi/storage_smapiv1_wrapper.ml +++ b/ocaml/xapi/storage_smapiv1_wrapper.ml @@ -1152,7 +1152,7 @@ functor info "%s DATA.get_nbd_server dbg:%s dp:%s sr:%s vdi:%s vm:%s" __FUNCTION__ dbg dp (s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm) ; let attach_info = DP.attach_info context ~dbg:"nbd" ~sr ~vdi ~dp ~vm in - match Storage_migrate.tapdisk_of_attach_info attach_info with + match Storage_smapiv1_migrate.tapdisk_of_attach_info attach_info with | Some tapdev -> let minor = Tapctl.get_minor tapdev in let pid = Tapctl.get_tapdisk_pid tapdev in