diff --git a/doc/content/xapi/storage/sxm.md b/doc/content/xapi/storage/sxm/index.md
similarity index 87%
rename from doc/content/xapi/storage/sxm.md
rename to doc/content/xapi/storage/sxm/index.md
index 8b7971bed7..4a8a68ced5 100644
--- a/doc/content/xapi/storage/sxm.md
+++ b/doc/content/xapi/storage/sxm/index.md
@@ -9,7 +9,17 @@ Title: Storage migration
- [Thought experiments on an alternative design](#thought-experiments-on-an-alternative-design)
- [Design](#design)
- [SMAPIv1 migration](#smapiv1-migration)
+ - [Preparation](#preparation)
+ - [Establishing mirror](#establishing-mirror)
+ - [Mirror](#mirror)
+ - [Snapshot](#snapshot)
+ - [Copy and compose](#copy-and-compose)
+ - [Finish](#finish)
- [SMAPIv3 migration](#smapiv3-migration)
+ - [Preparation](#preparation-1)
+ - [Establishing mirror](#establishing-mirror-1)
+ - [Limitations](#limitations)
+ - [Finish](#finish-1)
- [Error Handling](#error-handling)
- [Preparation (SMAPIv1 and SMAPIv3)](#preparation-smapiv1-and-smapiv3)
- [Snapshot and mirror failure (SMAPIv1)](#snapshot-and-mirror-failure-smapiv1)
@@ -122,10 +132,44 @@ it will be handled just as before.
## SMAPIv1 migration
+This section is about migration from SMAPIv1 SRs to SMAPIv1 or SMAPIv3 SRs, since
+the migration is driven by the source host, it is usally the source host that
+determines most of the logic during a storage migration.
+
+First we take a look at an overview diagram of what happens during SMAPIv1 SXM:
+the diagram is labelled with S1, S2 ... which indicates different stages of the migration.
+We will talk about each stage in more detail below.
+
+
+
+### Preparation
+
+Before we can start our migration process, there are a number of preparations
+needed to prepare for the following mirror. For SMAPIv1 this involves:
+
+1. Create a new VDI (called leaf) that will be used as the receiving VDI for all the new writes
+2. Create a dummy snapshot of the VDI above to make sure it is a differencing disk and can be composed later on
+3. Create a VDI (called parent) that will be used to receive the existing content of the disk (the snapshot)
+
+Note that the leaf VDI needs to be attached and activated on the destination host (to a non-exsiting `mirror_vm`)
+since it will later on accept writes to mirror what is written on the source host.
+
+The parent VDI may be created in two different ways: 1. If there is a "similar VDI",
+clone it on the destination host and use it as the parent VDI; 2. If there is no
+such VDI, create a new blank VDI. The similarity here is defined by the distances
+between different VDIs in the VHD tree, which is exploiting the internal representation
+of the storage layer, hence we will not go into too much detail about this here.
+
+Once these preparations are done, a `mirror_receive_result` data structure is then
+passed back to the source host that will contain all the necessary information about
+these new VDIs, etc.
+
+### Establishing mirror
+
At a high level, mirror establishment for SMAPIv1 works as follows:
1. Take a snapshot of a VDI that is attached to VM1. This gives us an immutable
-copy of the current state of the VDI, with all the data until the point we took
+copy of the current state of the VDI, with all the data up until the point we took
the snapshot. This is illustrated in the diagram as a VDI and its snapshot connecting
to a shared parent, which stores the shared content for the snapshot and the writable
VDI from which we took the snapshot (snapshot)
@@ -135,12 +179,174 @@ client VDI will also be written to the mirrored VDI on the remote host (mirror)
4. Compose the mirror and the snapshot to form a single VDI
5. Destroy the snapshot on the local host (cleanup)
+#### Mirror
+
+The mirroring process for SMAPIv1 is rather unconventional, so it is worth
+documenting how this works. Instead of a conventional client server architecture,
+where the source client connects to the destination server directly through the
+NBD protocol in tapdisk, the connection is established in xapi and then passed
+onto tapdisk. It was done in this rather unusual way mainly due to authentication
+issues. Because it is xapi that is creating the connection, tapdisk does not need
+to be concerned about authentication of the connection, thus simplifying the storage
+component. This is reasonable as the storage component should focus on handling
+storage requests rather than worrying about network security.
+
+The diagram below illustrates this prcess. First, xapi on the source host will
+initiate an https request to the remote xapi. This request contains the necessary
+information about the VDI to be mirrored, and the SR that contains it, etc. This
+information is then passed onto the https handler on the destination host (called
+`nbd_handler`) which then processes this information. Now the unusual step is that
+both the source and the destination xapi will pass this connection onto tapdisk,
+by sending the fd representing the socket connection to the tapdisk process. On
+the source this would be nbd client process of tapdisk, and on the destination
+this would be the nbd server process of the tapdisk. After this step, we can consider
+a client-server connection is established between two tapdisks on the client and
+server, as if the tapdisk on the source host makes a request to the tapdisk on the
+destination host and initiates the connection. On the diagram, this is indicated
+by the dashed lines between the tapdisk processes. Logically, we can view this as
+xapi creates the connection, and then passes this connection down into tapdisk.
+
+
+
+#### Snapshot
+
+The next step would be create a snapshot of the VDI. This is easily done as a
+`VDI.snapshot` operation. If the VDI was in VHD format, then internally this would
+create two children for, one for the snapshot, which only contains the metadata
+information and tends to be small, the other for the writable VDI where all the
+new writes will go to. The shared base copy contains the shared blocks.
+
+
+
+#### Copy and compose
+
+Once the snapshot is created, we can then copy the snapshot from the source
+to the destination. This step is done by `sparse_dd` using the nbd protocol. This
+is also the step that takes the most time to complete.
+
+`sparse_dd` is a process forked by xapi that does the copying of the disk blocks.
+`sparse_dd` can supports a number of protocols, including nbd. In this case, `sparse_dd`
+will initiate an https put request to the destination host, with a url of the form
+`
/services/SM/nbdproxy//`. This https request then
+gets handled by the https handler on the destination host B, which will then spawn
+a handler thread. This handler will find the
+"generic" nbd server[^2] of either tapdisk or qemu-dp, depending on the destination
+SR type, and then start proxying data between the https connection socket and the
+socket connected to the nbd server.
+
+[^2]: The server is generic because it does not accept fd passing, and I call those
+"special" nbd server/fd receiver.
+
+
+
+Once copying is done, the snapshot and mirrored VDI can be then composed into a
+single VDI.
+
+#### Finish
+
+At this point the VDI is synchronised to the new host! Mirror is still working at this point
+though because that will not be destroyed until the VM itself has been migrated
+as well. Some cleanups are done at this point, such as deleting the snapshot
+that is taken on the source, destroying the mirror datapath, etc.
+
+The end results look like the following. Note that VM2 is in dashed line as it
+is not yet created yet. The next steps would be to migrate the VM1 itself to the
+destination as well, but this is part of the VM migration process and will not
+be covered here.
+
+
-more detail to come...
## SMAPIv3 migration
-More detail to come...
+This section covers the mechanism of migrations *from* SRs using SMAPIv3 (to
+SMAPIv1 or SMAPIv3). Although the core ideas are the same, SMAPIv3 has a rather
+different mechanism for mirroring: 1. it does not require xapi to take snapshot
+of the VDI anymore, since the mirror itself will take care of replicating the
+existing data to the destination; 2. there is no fd passing for connection establishment anymore, and instead proxies are used for connection setup.
+
+### Preparation
+
+The preparation work for SMAPIv3 is greatly simplified by the fact that the mirror
+at the storage layer will copy the existing data in the VDI to the destination.
+This means that snapshot of the source VDI is not required anymore. So we are left
+with only one thing:
+
+1. Create a VDI used for mirroring the data of the source VDI
+
+For this reason, the implementation logic for SMAPIv3 preparation is also shorter,
+as the complexity is now handled by the storage layer, which is where it is supposed
+to be handled.
+
+### Establishing mirror
+
+The other significant difference is that the storage backend for SMAPIv3 `qemu-dp`
+SRs no longer accepts fds, so xapi needs to proxy the data between two nbd client
+and nbd server.
+
+SMAPIv3 provides the `Data.mirror uri domain remote` which needs three parameters:
+`uri` for accessing the local disk, `doamin` for the domain slice on which mirroring
+should happen, and most importantly for this design, a `remote` url which represents
+the remote nbd server to which the blocks of data can be sent to.
+
+This function itself, when called by xapi and forwarded to the storage layer's qemu-dp
+nbd client, will initiate a nbd connection to the nbd server pointed to by `remote`.
+This works fine when the storage migration happens entirely within a local host,
+where qemu-dp's nbd client and nbd server can communicate over unix domain sockets.
+However, it does not work for inter-host migrations as qemu-dp's nbd server is not
+exposed publicly over the network (just as tapdisk's nbd server). Therefore a proxying
+service on the source host is needed for forwarding the nbd connection from the
+source host to the destination host. And it would be the responsiblity of
+xapi to manage this proxy service.
+
+The following diagram illustrates the mirroring process of a single VDI:
+
+
+
+The first step for xapi is then to set up a nbd proxy thread that will be listening
+on a local unix domain socket with path `/var/run/nbdproxy/export/` where
+domain is the `domain` parameter mentioned above in `Data.mirror`. The nbd proxy
+thread will accept nbd connections (or rather any connections, it does not
+speak/care about nbd protocol at all) and sends an https put request
+to the remote xapi. The proxy itself will then forward the data exactly as it is
+to the remote side through the https connection.
+
+Once the proxy is set up, xapi will call `Data.mirror`, which
+will be forwarded to the xapi-storage-script and is further forwarded to the qemu-dp.
+This call contains, among other parameters, the destination NBD server url (`remote`)
+to be connected. In this case the destination nbd server is exactly the domain
+socket to which the proxy thread is listening. Therefore the `remote` parameter
+will be of the form `nbd+unix:///?socket=` where the export is provided
+by the destination nbd server that represents the VDI prepared on the destination
+host, and the socket will be the path of the unix domain socket where the proxy
+thread (which we just created) is listening at.
+
+When this connection is set up, the proxy process will talk to the remote xapi via
+https requests, and on the remote side, an https handler will proxy this request to
+the appropriate nbd server of either tapdisk or qemu-dp, using exactly the same
+[import proxy](#copy-and-compose) as mentioned before.
+
+Note that this proxying service is tightly integrated with outbound SXM of SMAPIv3
+SRs. This is to make it simple to focus on the migration itself.
+
+Although there is no need to explicitly copy the VDI anymore, we still need to
+transfer the data and wait for it finish. For this we use `Data.stat` call provided
+by the storage backend to query the status of the mirror, and wait for it to finish
+as needed.
+
+#### Limitations
+
+This way of establishing the connection simplifies the implementation of the migration
+for SMAPIv3, but it also has limitations:
+
+One proxy per live VDI migration is needed, which can potentially consume lots of resources in dom0, and we should measure the impact of this before we switch to using more resource-efficient ways such as wire guard that allows establishing a single connection between multiple hosts.
+
+
+### Finish
+
+As there is no need to copy a VDI, there is also no need to compose or delete the
+snapshot. The cleanup procedure would therefore just involve destroy the datapath
+that was used for receiving writes for the mirrored VDI.
## Error Handling
@@ -168,10 +374,10 @@ helps separate the error handling logic into the `with` part of a `try with` blo
which is where they are supposed to be. Since we need to accommodate the existing
SMAPIv1 migration (which has more stages than SMAPIv3), the following stages are
introduced: preparation (v1,v3), snapshot(v1), mirror(v1, v3), copy(v1). Note that
-each stage also roughly corresponds to a helper function that is called within `MIRROR.start`,
+each stage also roughly corresponds to a helper function that is called within `Storage_migrate.start`,
which is the wrapper function that initiates storage migration. And each helper
functions themselves would also have error handling logic within themselves as
-needed (e.g. see `Storage_smapiv1_migrate.receive_start) to deal with exceptions
+needed (e.g. see `Storage_smapiv1_migrate.receive_start`) to deal with exceptions
that happen within each helper functions.
### Preparation (SMAPIv1 and SMAPIv3)
@@ -203,7 +409,16 @@ are migrating from.
### Mirror failure (SMAPIv3)
-To be filled...
+The `Data.stat` call in SMAPIv3 returns a data structure that includes the current
+progress of the mirror job, whether it has completed syncing the existing data and
+whether the mirorr has failed. Similar to how it is done in SMAPIv1, we wait for
+the sync to complete once we issue the `Data.mirror` call, by repeatedly polling
+the status of the mirror using the `Data.stat` call. During this process, the status
+of the mirror is also checked and if a failure is detected, a `Migration_mirror_failure`
+will be raised and then gets handled by the code in `storage_migrate.ml` by calling
+`Storage_smapiv3_migrate.receive_cancel2`, which will clean up the mirror datapath
+and destroy the mirror VDI, similar to what is done in SMAPIv1.
+
### Copy failure (SMAPIv1)
@@ -215,6 +430,14 @@ failure during copying.
## SMAPIv1 Migration implementation detail
+{{% notice info %}}
+The following doc refers to the xapi a [version](https://github.com/xapi-project/xen-api/blob/v24.37.0/ocaml/xapi/storage_migrate.ml)
+of xapi that is before 24.37 after which point this code structure has undergone
+many changes as part of adding support for SMAPIv3 SXM. Therefore the following
+tutorial might be less relevant in terms of the implementation detail. Although
+the general principle should remain the same.
+{{% /notice %}}
+
```mermaid
sequenceDiagram
participant local_tapdisk as local tapdisk
diff --git a/doc/content/xapi/storage/sxm/sxm-final-v1.svg b/doc/content/xapi/storage/sxm/sxm-final-v1.svg
new file mode 100644
index 0000000000..7cdb2d540a
--- /dev/null
+++ b/doc/content/xapi/storage/sxm/sxm-final-v1.svg
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/doc/content/xapi/storage/sxm/sxm-mirror-v1.svg b/doc/content/xapi/storage/sxm/sxm-mirror-v1.svg
new file mode 100644
index 0000000000..4b6f61131c
--- /dev/null
+++ b/doc/content/xapi/storage/sxm/sxm-mirror-v1.svg
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/doc/content/xapi/storage/sxm/sxm-mirror-v3.svg b/doc/content/xapi/storage/sxm/sxm-mirror-v3.svg
new file mode 100644
index 0000000000..8ed03406ac
--- /dev/null
+++ b/doc/content/xapi/storage/sxm/sxm-mirror-v3.svg
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/doc/content/xapi/storage/sxm/sxm-new-copy-v1.svg b/doc/content/xapi/storage/sxm/sxm-new-copy-v1.svg
new file mode 100644
index 0000000000..891913850d
--- /dev/null
+++ b/doc/content/xapi/storage/sxm/sxm-new-copy-v1.svg
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/doc/content/xapi/storage/sxm/sxm-overview-v1.svg b/doc/content/xapi/storage/sxm/sxm-overview-v1.svg
new file mode 100644
index 0000000000..b6002382db
--- /dev/null
+++ b/doc/content/xapi/storage/sxm/sxm-overview-v1.svg
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/doc/content/xapi/storage/sxm/sxm-snapshot-v1.svg b/doc/content/xapi/storage/sxm/sxm-snapshot-v1.svg
new file mode 100644
index 0000000000..5fe0f398c1
--- /dev/null
+++ b/doc/content/xapi/storage/sxm/sxm-snapshot-v1.svg
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/doc/content/xapi/storage/sxm_mux_inbound.svg b/doc/content/xapi/storage/sxm/sxm_mux_inbound.svg
similarity index 100%
rename from doc/content/xapi/storage/sxm_mux_inbound.svg
rename to doc/content/xapi/storage/sxm/sxm_mux_inbound.svg
diff --git a/doc/content/xapi/storage/sxm_mux_outbound.svg b/doc/content/xapi/storage/sxm/sxm_mux_outbound.svg
similarity index 100%
rename from doc/content/xapi/storage/sxm_mux_outbound.svg
rename to doc/content/xapi/storage/sxm/sxm_mux_outbound.svg
diff --git a/ocaml/idl/datamodel_errors.ml b/ocaml/idl/datamodel_errors.ml
index 30c56b2120..9a6cddc910 100644
--- a/ocaml/idl/datamodel_errors.ml
+++ b/ocaml/idl/datamodel_errors.ml
@@ -1921,6 +1921,11 @@ let _ =
() ;
error Api_errors.invalid_base_url ["url"]
~doc:"The base url in the repository is invalid." () ;
+ error Api_errors.blocked_repo_url ["url"]
+ ~doc:
+ "Cannot create the repository as the url is blocked, please check your \
+ settings."
+ () ;
error Api_errors.invalid_gpgkey_path ["gpgkey_path"]
~doc:"The GPG public key file name in the repository is invalid." () ;
error Api_errors.repository_already_exists ["ref"]
diff --git a/ocaml/tests/test_repository_helpers.ml b/ocaml/tests/test_repository_helpers.ml
index c05e7c8a63..d6c8421afd 100644
--- a/ocaml/tests/test_repository_helpers.ml
+++ b/ocaml/tests/test_repository_helpers.ml
@@ -253,6 +253,46 @@ module AssertUrlIsValid = Generic.MakeStateless (struct
]
end)
+module AssertUrlIsNotBlocked = Generic.MakeStateless (struct
+ module Io = struct
+ type input_t = string * string list
+
+ type output_t = (unit, exn) result
+
+ let string_of_input_t = Fmt.(str "%a" Dump.(pair string (list string)))
+
+ let string_of_output_t =
+ Fmt.(str "%a" Dump.(result ~ok:(any "()") ~error:exn))
+ end
+
+ let transform (url, url_blocklist) =
+ Xapi_globs.repository_url_blocklist := url_blocklist ;
+ try Ok (assert_url_is_not_blocked ~url) with e -> Error e
+
+ let tests =
+ `QuickAndAutoDocumented
+ [
+ (* no blocklist *)
+ (("https://test.com", []), Ok ())
+ ; (* Not match in blocklist *)
+ ( ("https://test.com", ["http://blocked.com"; "http://also/blocked.com"])
+ , Ok ()
+ )
+ ; (* match in blocklist *)
+ ( ( "http://blocked.com"
+ , ["http://blocked.com"; "http://also/blocked.com"]
+ )
+ , Error
+ Api_errors.(Server_error (blocked_repo_url, ["http://blocked.com"]))
+ )
+ ; (* match keyword in blocklist *)
+ ( ("http://blocked.com", ["private"; "blocked"])
+ , Error
+ Api_errors.(Server_error (blocked_repo_url, ["http://blocked.com"]))
+ )
+ ]
+end)
+
module WriteYumConfig = Generic.MakeStateless (struct
module Io = struct
(* ( (source_url, binary_url), (need_gpg_check, gpgkey_path) ) *)
@@ -4780,6 +4820,7 @@ let tests =
[
("update_of_json", UpdateOfJsonTest.tests)
; ("assert_url_is_valid", AssertUrlIsValid.tests)
+ ; ("assert_url_is_not_blocked", AssertUrlIsNotBlocked.tests)
; ("write_yum_config", WriteYumConfig.tests)
; ("eval_guidance_for_one_update", EvalGuidanceForOneUpdate.tests)
; ("get_update_in_json", GetUpdateInJson.tests)
diff --git a/ocaml/xapi-consts/api_errors.ml b/ocaml/xapi-consts/api_errors.ml
index 6a25fbe48c..3884abeed6 100644
--- a/ocaml/xapi-consts/api_errors.ml
+++ b/ocaml/xapi-consts/api_errors.ml
@@ -1323,6 +1323,8 @@ let configure_repositories_in_progress =
let invalid_base_url = add_error "INVALID_BASE_URL"
+let blocked_repo_url = add_error "BLOCKED_REPO_URL"
+
let invalid_gpgkey_path = add_error "INVALID_GPGKEY_PATH"
let repository_already_exists = add_error "REPOSITORY_ALREADY_EXISTS"
diff --git a/ocaml/xapi-idl/storage/storage_interface.ml b/ocaml/xapi-idl/storage/storage_interface.ml
index a3da3d906d..14ca03e6cb 100644
--- a/ocaml/xapi-idl/storage/storage_interface.ml
+++ b/ocaml/xapi-idl/storage/storage_interface.ml
@@ -175,6 +175,9 @@ let parse_nbd_uri nbd =
| _ ->
fail ()
+let parse_nbd_uri_opt nbd =
+ try Some (parse_nbd_uri nbd) with Failure _e -> None
+
(** Separates the implementations of the given backend returned from the
VDI.attach2 SMAPIv2 call based on their type *)
let implementations_of_backend backend =
@@ -192,6 +195,16 @@ let implementations_of_backend backend =
)
([], [], [], []) backend.implementations
+let nbd_export_of_attach_info (backend : backend) =
+ let _, _, _, nbds = implementations_of_backend backend in
+ match nbds with
+ | [] ->
+ debug "%s no nbd uri found" __FUNCTION__ ;
+ None
+ | uri :: _ ->
+ debug "%s found nbd uri %s" __FUNCTION__ uri.uri ;
+ parse_nbd_uri_opt uri |> Option.map snd
+
(** Uniquely identifies the contents of a VDI *)
type content_id = string [@@deriving rpcty]
@@ -1043,6 +1056,29 @@ module StorageAPI (R : RPC) = struct
@-> returning result_p err
)
+ let operation_p = Param.mk ~name:"operation" Mirror.operation
+
+ let mirror =
+ declare "DATA.mirror" []
+ (dbg_p
+ @-> sr_p
+ @-> vdi_p
+ @-> vm_p
+ @-> url_p
+ @-> returning operation_p err
+ )
+
+ let stat =
+ let status_p = Param.mk ~name:"status" Mirror.status in
+ declare "DATA.stat" []
+ (dbg_p
+ @-> sr_p
+ @-> vdi_p
+ @-> vm_p
+ @-> operation_p
+ @-> returning status_p err
+ )
+
(** [import_activate dbg dp sr vdi vm] returns a server socket address to
which a fd can be passed via SCM_RIGHTS for mirroring purposes.*)
let import_activate =
@@ -1170,7 +1206,7 @@ module StorageAPI (R : RPC) = struct
(** Called on the receiving end
@deprecated This function is deprecated, and is only here to keep backward
- compatibility with old xapis that call Remote.DATA.MIRROR.receive_finalize
+ compatibility with old xapis that call Remote.DATA.MIRROR.receive_finalize2
during SXM. Use the receive_finalize3 function instead.
*)
let receive_finalize2 =
@@ -1613,6 +1649,24 @@ module type Server_impl = sig
-> verify_dest:bool
-> Task.id
+ val mirror :
+ context
+ -> dbg:debug_info
+ -> sr:sr
+ -> vdi:vdi
+ -> vm:vm
+ -> dest:string
+ -> operation
+
+ val stat :
+ context
+ -> dbg:debug_info
+ -> sr:sr
+ -> vdi:vdi
+ -> vm:vm
+ -> key:operation
+ -> status
+
val import_activate :
context
-> dbg:debug_info
@@ -1787,6 +1841,12 @@ module Server (Impl : Server_impl) () = struct
S.DATA.copy (fun dbg sr vdi vm url dest verify_dest ->
Impl.DATA.copy () ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest
) ;
+ S.DATA.mirror (fun dbg sr vdi vm dest ->
+ Impl.DATA.mirror () ~dbg ~sr ~vdi ~vm ~dest
+ ) ;
+ S.DATA.stat (fun dbg sr vdi vm key ->
+ Impl.DATA.stat () ~dbg ~sr ~vdi ~vm ~key
+ ) ;
S.DATA.MIRROR.send_start
(fun
dbg
diff --git a/ocaml/xapi-idl/storage/storage_skeleton.ml b/ocaml/xapi-idl/storage/storage_skeleton.ml
index edaf4bc981..290c09d623 100644
--- a/ocaml/xapi-idl/storage/storage_skeleton.ml
+++ b/ocaml/xapi-idl/storage/storage_skeleton.ml
@@ -154,6 +154,10 @@ let get_by_name ctx ~dbg ~name = u "get_by_name"
module DATA = struct
let copy ctx ~dbg ~sr ~vdi ~vm ~url ~dest = u "DATA.copy"
+ let mirror ctx ~dbg ~sr ~vdi ~vm ~dest = u "DATA.mirror"
+
+ let stat ctx ~dbg ~sr ~vdi ~vm ~key = u "DATA.stat"
+
let import_activate ctx ~dbg ~dp ~sr ~vdi ~vm =
u "DATA.MIRROR.import_activate"
diff --git a/ocaml/xapi-storage-script/main.ml b/ocaml/xapi-storage-script/main.ml
index 0d76c09601..1b15a17f46 100644
--- a/ocaml/xapi-storage-script/main.ml
+++ b/ocaml/xapi-storage-script/main.ml
@@ -16,6 +16,7 @@ module Plugin_client = Xapi_storage.Plugin.Plugin (Rpc_lwt.GenClient ())
module Volume_client = Xapi_storage.Control.Volume (Rpc_lwt.GenClient ())
module Sr_client = Xapi_storage.Control.Sr (Rpc_lwt.GenClient ())
module Datapath_client = Xapi_storage.Data.Datapath (Rpc_lwt.GenClient ())
+module Data_client = Xapi_storage.Data.Data (Rpc_lwt.GenClient ())
open Private.Lib
let ( >>= ) = Lwt.bind
@@ -1456,6 +1457,9 @@ module VDIImpl (M : META) = struct
set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key
~key:_snapshot_of_key ~value:vdi
>>>= fun () ->
+ set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key
+ ~key:_vdi_content_id_key ~value:vdi_info.content_id
+ >>>= fun () ->
let response =
{
(vdi_of_volume response) with
@@ -1753,6 +1757,8 @@ module VDIImpl (M : META) = struct
let vdi = Storage_interface.Vdi.string_of vdi in
let* () = unset ~dbg ~sr ~vdi ~key:(_sm_config_prefix_key ^ key) in
return ()
+
+ let similar_content_impl _dbg _sr _vdi = wrap @@ return []
end
module DPImpl (M : META) = struct
@@ -1789,6 +1795,62 @@ end
module DATAImpl (M : META) = struct
module VDI = VDIImpl (M)
+ let stat dbg sr vdi' _vm key =
+ let open Storage_interface in
+ let convert_key = function
+ | Mirror.CopyV1 k ->
+ Data_client.CopyV1 k
+ | Mirror.MirrorV1 k ->
+ Data_client.MirrorV1 k
+ in
+
+ let vdi = Vdi.string_of vdi' in
+ Attached_SRs.find sr >>>= fun sr ->
+ VDI.stat ~dbg ~sr ~vdi >>>= fun response ->
+ ( match
+ List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys
+ with
+ | None ->
+ return response
+ | Some temporary ->
+ VDI.stat ~dbg ~sr ~vdi:temporary
+ )
+ >>>= fun response ->
+ choose_datapath response >>>= fun (rpc, _datapath, _uri) ->
+ let key = convert_key key in
+ return_data_rpc (fun () -> Data_client.stat (rpc ~dbg) dbg key)
+ >>>= function
+ | {failed; complete; progress} ->
+ return Mirror.{failed; complete; progress}
+
+ let stat_impl dbg sr vdi vm key = wrap @@ stat dbg sr vdi vm key
+
+ let mirror dbg sr vdi' vm' remote =
+ let vdi = Storage_interface.Vdi.string_of vdi' in
+ let domain = Storage_interface.Vm.string_of vm' in
+ Attached_SRs.find sr >>>= fun sr ->
+ VDI.stat ~dbg ~sr ~vdi >>>= fun response ->
+ ( match
+ List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys
+ with
+ | None ->
+ return response
+ | Some temporary ->
+ VDI.stat ~dbg ~sr ~vdi:temporary
+ )
+ >>>= fun response ->
+ choose_datapath response >>>= fun (rpc, _datapath, uri) ->
+ return_data_rpc (fun () ->
+ Data_client.mirror (rpc ~dbg) dbg uri domain remote
+ )
+ >>>= function
+ | CopyV1 v ->
+ return (Storage_interface.Mirror.CopyV1 v)
+ | MirrorV1 v ->
+ return (Storage_interface.Mirror.MirrorV1 v)
+
+ let mirror_impl dbg sr vdi vm remote = wrap @@ mirror dbg sr vdi vm remote
+
let data_import_activate_impl dbg _dp sr vdi' vm' =
wrap
@@
@@ -1855,6 +1917,7 @@ let bind ~volume_script_dir =
(* this version field will be updated once query is called *)
let version = ref None
end in
+ let u name _ = failwith ("Unimplemented: " ^ name) in
let module Query = QueryImpl (RuntimeMeta) in
S.Query.query Query.query_impl ;
S.Query.diagnostics Query.query_diagnostics_impl ;
@@ -1899,16 +1962,19 @@ let bind ~volume_script_dir =
S.VDI.set_content_id VDI.vdi_set_content_id_impl ;
S.VDI.add_to_sm_config VDI.vdi_add_to_sm_config_impl ;
S.VDI.remove_from_sm_config VDI.vdi_remove_from_sm_config_impl ;
+ S.VDI.similar_content VDI.similar_content_impl ;
let module DP = DPImpl (RuntimeMeta) in
S.DP.destroy2 DP.dp_destroy2 ;
S.DP.attach_info DP.dp_attach_info_impl ;
let module DATA = DATAImpl (RuntimeMeta) in
+ S.DATA.copy (u "DATA.copy") ;
+ S.DATA.mirror DATA.mirror_impl ;
+ S.DATA.stat DATA.stat_impl ;
S.DATA.get_nbd_server DATA.get_nbd_server_impl ;
S.DATA.import_activate DATA.data_import_activate_impl ;
- let u name _ = failwith ("Unimplemented: " ^ name) in
S.get_by_name (u "get_by_name") ;
S.VDI.get_by_name (u "VDI.get_by_name") ;
S.UPDATES.get (u "UPDATES.get") ;
@@ -1917,8 +1983,6 @@ let bind ~volume_script_dir =
S.DP.diagnostics (u "DP.diagnostics") ;
S.TASK.destroy (u "TASK.destroy") ;
S.DP.destroy (u "DP.destroy") ;
- S.VDI.similar_content (u "VDI.similar_content") ;
- S.DATA.copy (u "DATA.copy") ;
S.DP.stat_vdi (u "DP.stat_vdi") ;
S.DATA.MIRROR.send_start (u "DATA.MIRROR.send_start") ;
S.DATA.MIRROR.receive_start (u "DATA.MIRROR.receive_start") ;
diff --git a/ocaml/xapi/export.ml b/ocaml/xapi/export.ml
index 81dcb22bc4..3c00b544f7 100644
--- a/ocaml/xapi/export.ml
+++ b/ocaml/xapi/export.ml
@@ -713,11 +713,15 @@ open Http
open Client
let lock_vm ~__context ~vm ~task_id op =
- (* Note slight race here because we haven't got the master lock *)
- Xapi_vm_lifecycle.assert_operation_valid ~__context ~self:vm ~op ~strict:true ;
- (* ... small race lives here ... *)
- Db.VM.add_to_current_operations ~__context ~self:vm ~key:task_id ~value:op ;
- Xapi_vm_lifecycle.update_allowed_operations ~__context ~self:vm
+ Helpers.retry ~__context ~doc:task_id ~policy:Helpers.Policy.fail_quickly
+ (fun () ->
+ (* Note slight race here because we haven't got the master lock *)
+ Xapi_vm_lifecycle.assert_operation_valid ~__context ~self:vm ~op
+ ~strict:true ;
+ (* ... small race lives here ... *)
+ Db.VM.add_to_current_operations ~__context ~self:vm ~key:task_id ~value:op ;
+ Xapi_vm_lifecycle.update_allowed_operations ~__context ~self:vm
+ )
let unlock_vm ~__context ~vm ~task_id =
Db.VM.remove_from_current_operations ~__context ~self:vm ~key:task_id ;
diff --git a/ocaml/xapi/repository.ml b/ocaml/xapi/repository.ml
index 1ec1486a3e..ea87a715e1 100644
--- a/ocaml/xapi/repository.ml
+++ b/ocaml/xapi/repository.ml
@@ -33,6 +33,8 @@ let updates_in_cache : (API.ref_host, Yojson.Basic.t) Hashtbl.t =
let introduce ~__context ~name_label ~name_description ~binary_url ~source_url
~update ~gpgkey_path =
+ assert_url_is_not_blocked ~url:binary_url ;
+ assert_url_is_not_blocked ~url:source_url ;
assert_url_is_valid ~url:binary_url ;
assert_url_is_valid ~url:source_url ;
assert_gpgkey_path_is_valid gpgkey_path ;
diff --git a/ocaml/xapi/repository_helpers.ml b/ocaml/xapi/repository_helpers.ml
index 62df609c53..91a3c1b467 100644
--- a/ocaml/xapi/repository_helpers.ml
+++ b/ocaml/xapi/repository_helpers.ml
@@ -209,6 +209,23 @@ let assert_url_is_valid ~url =
error "Invalid url %s: %s" url (ExnHelper.string_of_exn e) ;
raise Api_errors.(Server_error (invalid_base_url, [url]))
+let url_matches ~url (patterns : string list) : bool =
+ List.exists
+ (fun pattern ->
+ try
+ let re = Re.Perl.re pattern |> Re.compile in
+ Re.execp re url
+ with exn ->
+ error "Exception in %s: %s" __FUNCTION__ (Printexc.to_string exn) ;
+ false
+ )
+ patterns
+
+let assert_url_is_not_blocked ~url =
+ let blocklist = !Xapi_globs.repository_url_blocklist in
+ if url_matches ~url blocklist then
+ raise Api_errors.(Server_error (blocked_repo_url, [url]))
+
let is_gpgkey_path_valid = function
| 'A' .. 'Z' | 'a' .. 'z' | '0' .. '9' | '_' | '-' ->
true
diff --git a/ocaml/xapi/storage_migrate.ml b/ocaml/xapi/storage_migrate.ml
index 85f36e31fa..1ff03c3d7e 100644
--- a/ocaml/xapi/storage_migrate.ml
+++ b/ocaml/xapi/storage_migrate.ml
@@ -46,23 +46,9 @@ module MigrateRemote = struct
let (module Migrate_Backend) = choose_backend dbg sr in
Migrate_Backend.receive_finalize3 () ~dbg ~mirror_id ~sr ~url ~verify_dest
- let receive_cancel2 ~dbg ~mirror_id ~url ~verify_dest =
- let (module Remote) =
- Storage_migrate_helper.get_remote_backend url verify_dest
- in
- let receive_state = State.find_active_receive_mirror mirror_id in
- let open State.Receive_state in
- Option.iter
- (fun r ->
- D.log_and_ignore_exn (fun () -> Remote.DP.destroy dbg r.leaf_dp false) ;
- List.iter
- (fun v ->
- D.log_and_ignore_exn (fun () -> Remote.VDI.destroy dbg r.sr v)
- )
- [r.dummy_vdi; r.leaf_vdi; r.parent_vdi]
- )
- receive_state ;
- State.remove_receive_mirror mirror_id
+ let receive_cancel2 ~dbg ~mirror_id ~sr ~url ~verify_dest =
+ let (module Migrate_Backend) = choose_backend dbg sr in
+ Migrate_Backend.receive_cancel2 () ~dbg ~mirror_id ~url ~verify_dest
end
(** This module [MigrateLocal] consists of the concrete implementations of the
@@ -107,7 +93,7 @@ module MigrateLocal = struct
debug "Snapshot VDI already cleaned up"
) ;
try
- MigrateRemote.receive_cancel2 ~dbg ~mirror_id:id
+ MigrateRemote.receive_cancel2 ~dbg ~mirror_id:id ~sr
~url:remote_info.url ~verify_dest:remote_info.verify_dest
with _ -> ()
)
@@ -186,8 +172,8 @@ module MigrateLocal = struct
~verify_dest
in
Migrate_Backend.send_start () ~dbg ~task_id ~dp ~sr ~vdi ~mirror_vm
- ~mirror_id ~local_vdi ~copy_vm ~live_vm:(Vm.of_string "0") ~url
- ~remote_mirror ~dest_sr:dest ~verify_dest ;
+ ~mirror_id ~local_vdi ~copy_vm ~live_vm ~url ~remote_mirror
+ ~dest_sr:dest ~verify_dest ;
Some (Mirror_id mirror_id)
with
| Storage_error (Sr_not_attached sr_uuid) ->
@@ -196,9 +182,14 @@ module MigrateLocal = struct
raise (Api_errors.Server_error (Api_errors.sr_not_attached, [sr_uuid]))
| ( Storage_error (Migration_mirror_fd_failure reason)
| Storage_error (Migration_mirror_snapshot_failure reason) ) as e ->
- error "%s: Caught %s: during storage migration preparation" __FUNCTION__
- reason ;
- MigrateRemote.receive_cancel2 ~dbg ~mirror_id ~url ~verify_dest ;
+ error "%s: Caught %s: during SMAPIv1 storage migration mirror "
+ __FUNCTION__ reason ;
+ MigrateRemote.receive_cancel2 ~dbg ~mirror_id ~sr ~url ~verify_dest ;
+ raise e
+ | Storage_error (Migration_mirror_failure reason) as e ->
+ error "%s: Caught :%s: during SMAPIv3 storage migration mirror"
+ __FUNCTION__ reason ;
+ MigrateRemote.receive_cancel2 ~dbg ~mirror_id ~sr ~url ~verify_dest ;
raise e
| Storage_error (Migration_mirror_copy_failure reason) as e ->
error "%s: Caught %s: during storage migration copy" __FUNCTION__ reason ;
@@ -307,10 +298,11 @@ module MigrateLocal = struct
copy_ops ;
List.iter
(fun (mirror_id, (recv_state : State.Receive_state.t)) ->
+ let sr, _vdi = State.of_mirror_id mirror_id in
debug "Receive in progress: %s" mirror_id ;
log_and_ignore_exn (fun () ->
- MigrateRemote.receive_cancel2 ~dbg ~mirror_id ~url:recv_state.url
- ~verify_dest:recv_state.verify_dest
+ MigrateRemote.receive_cancel2 ~dbg ~mirror_id ~sr
+ ~url:recv_state.url ~verify_dest:recv_state.verify_dest
)
)
recv_ops ;
@@ -339,8 +331,7 @@ let post_deactivate_hook ~sr ~vdi ~dp:_ =
) ;
debug "Finished calling receive_finalize3" ;
State.remove_local_mirror id ;
- debug "Removed active local mirror: %s" id ;
- Option.iter (fun id -> Scheduler.cancel scheduler id) r.watchdog
+ debug "Removed active local mirror: %s" id
)
let nbd_handler req s ?(vm = "0") sr vdi dp =
@@ -373,7 +364,7 @@ let nbd_handler req s ?(vm = "0") sr vdi dp =
(** nbd_proxy is a http handler but will turn the http connection into an nbd connection.
It proxies the connection between the sender and the generic nbd server, as returned
by [get_nbd_server dp sr vdi vm]. *)
-let nbd_proxy req s vm sr vdi dp =
+let import_nbd_proxy req s vm sr vdi dp =
debug "%s: vm=%s sr=%s vdi=%s dp=%s" __FUNCTION__ vm sr vdi dp ;
let sr, vdi = Storage_interface.(Sr.of_string sr, Vdi.of_string vdi) in
req.Http.Request.close <- true ;
@@ -451,7 +442,9 @@ let stop = MigrateLocal.stop
let list = MigrateLocal.list
-let killall = MigrateLocal.killall
+let killall ~dbg =
+ with_dbg ~name:__FUNCTION__ ~dbg @@ fun di ->
+ MigrateLocal.killall ~dbg:(Debug_info.to_string di)
let stat = MigrateLocal.stat
diff --git a/ocaml/xapi/storage_mux.ml b/ocaml/xapi/storage_mux.ml
index a523000c7b..1ea91e9407 100644
--- a/ocaml/xapi/storage_mux.ml
+++ b/ocaml/xapi/storage_mux.ml
@@ -27,6 +27,8 @@ let s_of_vdi = Storage_interface.Vdi.string_of
let s_of_vm = Storage_interface.Vm.string_of
+let s_of_operation = Storage_interface.Mirror.show_operation
+
let with_dbg ~name ~dbg f =
Debug_info.with_dbg ~with_thread:true ~module_name:"SMAPIv2" ~name ~dbg f
@@ -645,7 +647,13 @@ module Mux = struct
let module C = StorageAPI (Idl.Exn.GenClient (struct
let rpc = of_sr sr
end)) in
- C.VDI.deactivate (Debug_info.to_string di) dp sr vdi vm
+ C.VDI.deactivate (Debug_info.to_string di) dp sr vdi vm ;
+ (*XX The hook should not be called here, nor should storage_mux care about
+ the SMAPI version of the SR, but as xapi-storage-script cannot call code
+ xapi, and smapiv1_wrapper has state tracking logic, the hook has to be placed
+ here for now. *)
+ if smapi_version_of_sr sr = SMAPIv3 then
+ Storage_migrate.post_deactivate_hook ~sr ~vdi ~dp
let detach () ~dbg ~dp ~sr ~vdi ~vm =
with_dbg ~name:"VDI.detach" ~dbg @@ fun di ->
@@ -797,6 +805,24 @@ module Mux = struct
let copy () ~dbg =
with_dbg ~name:"DATA.copy" ~dbg @@ fun dbg -> Storage_migrate.copy ~dbg
+ let mirror () ~dbg ~sr ~vdi ~vm ~dest =
+ with_dbg ~name:"DATA.mirror" ~dbg @@ fun di ->
+ info "%s dbg:%s sr: %s vdi: %s vm:%s remote:%s" __FUNCTION__ dbg
+ (s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm) dest ;
+ let module C = StorageAPI (Idl.Exn.GenClient (struct
+ let rpc = of_sr sr
+ end)) in
+ C.DATA.mirror (Debug_info.to_string di) sr vdi vm dest
+
+ let stat () ~dbg ~sr ~vdi ~vm ~key =
+ with_dbg ~name:"DATA.stat" ~dbg @@ fun di ->
+ info "%s dbg:%s sr: %s vdi: %s vm: %s opeartion_key: %s" __FUNCTION__ dbg
+ (s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm) (s_of_operation key) ;
+ let module C = StorageAPI (Idl.Exn.GenClient (struct
+ let rpc = of_sr sr
+ end)) in
+ C.DATA.stat (Debug_info.to_string di) sr vdi vm key
+
let import_activate () ~dbg ~dp ~sr ~vdi ~vm =
with_dbg ~name:"DATA.import_activate" ~dbg @@ fun di ->
info "%s dbg:%s dp:%s sr:%s vdi:%s vm:%s" __FUNCTION__ dbg dp (s_of_sr sr)
diff --git a/ocaml/xapi/storage_smapiv1.ml b/ocaml/xapi/storage_smapiv1.ml
index 7eef88b46e..0995edc35c 100644
--- a/ocaml/xapi/storage_smapiv1.ml
+++ b/ocaml/xapi/storage_smapiv1.ml
@@ -478,6 +478,7 @@ module SMAPIv1 : Server_impl = struct
; backend_type= "vbd3"
}
; BlockDevice {path= params}
+ ; Nbd {uri= attach_info_v1.Smint.params_nbd}
]
)
}
@@ -1128,6 +1129,10 @@ module SMAPIv1 : Server_impl = struct
let copy _context ~dbg:_ ~sr:_ ~vdi:_ ~vm:_ ~url:_ ~dest:_ ~verify_dest:_ =
assert false
+ let mirror _context ~dbg:_ ~sr:_ ~vdi:_ ~vm:_ ~dest:_ = assert false
+
+ let stat _context ~dbg:_ ~sr:_ ~vdi:_ ~vm:_ ~key:_ = assert false
+
let import_activate _context ~dbg:_ ~dp:_ ~sr:_ ~vdi:_ ~vm:_ = assert false
let get_nbd_server _context ~dbg:_ ~dp:_ ~sr:_ ~vdi:_ ~vm:_ = assert false
diff --git a/ocaml/xapi/storage_smapiv1_migrate.ml b/ocaml/xapi/storage_smapiv1_migrate.ml
index 8a605fd59b..fe291d44d6 100644
--- a/ocaml/xapi/storage_smapiv1_migrate.ml
+++ b/ocaml/xapi/storage_smapiv1_migrate.ml
@@ -203,7 +203,8 @@ module Copy = struct
let dest_vdi_url =
let url' = Http.Url.of_string url in
Http.Url.set_uri url'
- (Printf.sprintf "%s/nbdproxy/%s/%s/%s/%s" (Http.Url.get_uri url')
+ (Printf.sprintf "%s/nbdproxy/import/%s/%s/%s/%s"
+ (Http.Url.get_uri url')
(Storage_interface.Vm.string_of vm)
(Storage_interface.Sr.string_of dest)
(Storage_interface.Vdi.string_of dest_vdi)
@@ -578,6 +579,12 @@ module MIRROR : SMAPIv2_MIRROR = struct
let (module Remote) =
Storage_migrate_helper.get_remote_backend url verify_dest
in
+
+ let read_write = true in
+ (* DP set up is only essential for MIRROR.start/stop due to their open ended pattern.
+ It's not necessary for copy which will take care of that itself. *)
+ ignore (Local.VDI.attach3 dbg dp sr vdi (Vm.of_string "0") read_write) ;
+ Local.VDI.activate3 dbg dp sr vdi (Vm.of_string "0") ;
match remote_mirror with
| Mirror.SMAPIv3_mirror _ ->
(* this should never happen *)
@@ -797,10 +804,6 @@ module MIRROR : SMAPIv2_MIRROR = struct
receive_state ;
State.remove_receive_mirror id
- let receive_cancel2 _ctx ~dbg:_ ~mirror_id:_ ~url:_ ~verify_dest:_ =
- (* see Storage_migrate.receive_cancel2 *)
- u __FUNCTION__
-
exception Timeout of Mtime.Span.t
let reqs_outstanding_timeout = Mtime.Span.(150 * s)
@@ -843,7 +846,10 @@ module MIRROR : SMAPIv2_MIRROR = struct
if st.Stats.nbd_mirror_failed = 1 then (
D.error "tapdisk reports mirroring failed" ;
s.failed <- true
- )
+ ) ;
+ Option.iter
+ (fun id -> Scheduler.cancel scheduler id)
+ s.watchdog
with
| Timeout elapsed ->
D.error
@@ -875,4 +881,22 @@ module MIRROR : SMAPIv2_MIRROR = struct
let list _ctx = u __FUNCTION__
let stat _ctx = u __FUNCTION__
+
+ let receive_cancel2 _ctx ~dbg ~mirror_id ~url ~verify_dest =
+ let (module Remote) =
+ Storage_migrate_helper.get_remote_backend url verify_dest
+ in
+ let receive_state = State.find_active_receive_mirror mirror_id in
+ let open State.Receive_state in
+ Option.iter
+ (fun r ->
+ D.log_and_ignore_exn (fun () -> Remote.DP.destroy dbg r.leaf_dp false) ;
+ List.iter
+ (fun v ->
+ D.log_and_ignore_exn (fun () -> Remote.VDI.destroy dbg r.sr v)
+ )
+ [r.dummy_vdi; r.leaf_vdi; r.parent_vdi]
+ )
+ receive_state ;
+ State.remove_receive_mirror mirror_id
end
diff --git a/ocaml/xapi/storage_smapiv1_wrapper.ml b/ocaml/xapi/storage_smapiv1_wrapper.ml
index 397cee17d6..7066a649ce 100644
--- a/ocaml/xapi/storage_smapiv1_wrapper.ml
+++ b/ocaml/xapi/storage_smapiv1_wrapper.ml
@@ -1137,11 +1137,17 @@ functor
end
module DATA = struct
+ let u x = raise Storage_interface.(Storage_error (Errors.Unimplemented x))
+
let copy context ~dbg ~sr ~vdi ~vm ~url ~dest =
info "DATA.copy dbg:%s sr:%s vdi:%s url:%s dest:%s" dbg (s_of_sr sr)
(s_of_vdi vdi) url (s_of_sr dest) ;
Impl.DATA.copy context ~dbg ~sr ~vdi ~vm ~url ~dest
+ let mirror _context ~dbg:_ ~sr:_ ~vdi:_ ~vm:_ ~dest:_ = u "DATA.mirror"
+
+ let stat _context ~dbg:_ ~sr:_ ~vdi:_ ~vm:_ ~key:_ = u "DATA.stat"
+
(* tapdisk supports three kind of nbd servers, the old style nbdserver,
the new style nbd server and a real nbd server. The old and new style nbd servers
are "special" nbd servers that accept fds passed via SCM_RIGHTS and handle
@@ -1186,9 +1192,6 @@ functor
module MIRROR = struct
type context = unit
- let u x =
- raise Storage_interface.(Storage_error (Errors.Unimplemented x))
-
let send_start _ctx ~dbg:_ ~task_id:_ ~dp:_ ~sr:_ ~vdi:_ ~mirror_vm:_
~mirror_id:_ ~local_vdi:_ ~copy_vm:_ ~live_vm:_ ~url:_
~remote_mirror:_ ~dest_sr:_ ~verify_dest:_ =
diff --git a/ocaml/xapi/storage_smapiv3_migrate.ml b/ocaml/xapi/storage_smapiv3_migrate.ml
index 5ef3eeaac6..d9d34ffbe0 100644
--- a/ocaml/xapi/storage_smapiv3_migrate.ml
+++ b/ocaml/xapi/storage_smapiv3_migrate.ml
@@ -12,42 +12,325 @@
* GNU Lesser General Public License for more details.
*)
-module D = Debug.Make (struct let name = "storage_smapiv1_migrate" end)
+module D = Debug.Make (struct let name = __MODULE__ end)
module Unixext = Xapi_stdext_unix.Unixext
module State = Storage_migrate_helper.State
module SXM = Storage_migrate_helper.SXM
+open Storage_interface
+open Storage_task
+open Xmlrpc_client
+open Storage_migrate_helper
module type SMAPIv2_MIRROR = Storage_interface.MIRROR
+let s_of_sr = Storage_interface.Sr.string_of
+
+let s_of_vdi = Storage_interface.Vdi.string_of
+
+let s_of_vm = Storage_interface.Vm.string_of
+
+let export_nbd_proxy ~remote_url ~mirror_vm ~sr ~vdi ~dp ~verify_dest =
+ D.debug "%s spawning exporting nbd proxy" __FUNCTION__ ;
+ let path =
+ Printf.sprintf "/var/run/nbdproxy/export/%s" (Vm.string_of mirror_vm)
+ in
+ let proxy_srv = Fecomms.open_unix_domain_sock_server path in
+ try
+ let uri =
+ Printf.sprintf "/services/SM/nbdproxy/import/%s/%s/%s/%s"
+ (Vm.string_of mirror_vm) (Sr.string_of sr) (Vdi.string_of vdi) dp
+ in
+
+ let dest_url = Http.Url.set_uri (Http.Url.of_string remote_url) uri in
+ D.debug "%s now waiting for connection at %s" __FUNCTION__ path ;
+ let nbd_client, _addr = Unix.accept proxy_srv in
+ D.debug "%s connection accepted" __FUNCTION__ ;
+ let request =
+ Http.Request.make
+ ~query:(Http.Url.get_query_params dest_url)
+ ~version:"1.0" ~user_agent:"export_nbd_proxy" Http.Put uri
+ in
+ D.debug "%s making request to dest %s" __FUNCTION__
+ (Http.Url.to_string dest_url) ;
+ let verify_cert = if verify_dest then Stunnel_client.pool () else None in
+ let transport = Xmlrpc_client.transport_of_url ~verify_cert dest_url in
+ with_transport ~stunnel_wait_disconnect:false transport
+ (with_http request (fun (_response, s) ->
+ D.debug "%s starting proxy" __FUNCTION__ ;
+ Unixext.proxy (Unix.dup s) (Unix.dup nbd_client)
+ )
+ ) ;
+ Unix.close proxy_srv
+ with e ->
+ D.debug "%s did not get connection due to %s, closing" __FUNCTION__
+ (Printexc.to_string e) ;
+ Unix.close proxy_srv ;
+ raise e
+
+let mirror_wait ~dbg ~sr ~vdi ~vm ~mirror_id mirror_key =
+ let rec mirror_wait_rec key =
+ let {failed; complete; progress} : Mirror.status =
+ Local.DATA.stat dbg sr vdi vm key
+ in
+ if complete then (
+ Option.fold ~none:()
+ ~some:(fun p -> D.info "%s progress is %f" __FUNCTION__ p)
+ progress ;
+ D.info "%s qemu mirror %s completed" mirror_id __FUNCTION__
+ ) else if failed then (
+ Option.iter
+ (fun (snd_state : State.Send_state.t) -> snd_state.failed <- true)
+ (State.find_active_local_mirror mirror_id) ;
+ D.info "%s qemu mirror %s failed" mirror_id __FUNCTION__ ;
+ State.find_active_local_mirror mirror_id
+ |> Option.iter (fun (s : State.Send_state.t) -> s.failed <- true) ;
+ Updates.add (Dynamic.Mirror mirror_id) updates ;
+ raise
+ (Storage_interface.Storage_error
+ (Migration_mirror_failure "Mirror failed during syncing")
+ )
+ ) else (
+ Option.fold ~none:()
+ ~some:(fun p -> D.info "%s progress is %f" __FUNCTION__ p)
+ progress ;
+ mirror_wait_rec key
+ )
+ in
+
+ match mirror_key with
+ | Storage_interface.Mirror.CopyV1 _ ->
+ ()
+ | Storage_interface.Mirror.MirrorV1 _ ->
+ D.debug "%s waiting for mirroring to be done" __FUNCTION__ ;
+ mirror_wait_rec mirror_key
+
module MIRROR : SMAPIv2_MIRROR = struct
type context = unit
let u x = raise Storage_interface.(Storage_error (Errors.Unimplemented x))
- let send_start _ctx = u __FUNCTION__
-
- let receive_start _ctx = u __FUNCTION__
+ let send_start _ctx ~dbg ~task_id:_ ~dp ~sr ~vdi ~mirror_vm ~mirror_id
+ ~local_vdi:_ ~copy_vm:_ ~live_vm ~url ~remote_mirror ~dest_sr ~verify_dest
+ =
+ D.debug
+ "%s dbg: %s dp: %s sr: %s vdi:%s mirror_vm:%s mirror_id: %s live_vm: %s \
+ url:%s dest_sr:%s verify_dest:%B"
+ __FUNCTION__ dbg dp (s_of_sr sr) (s_of_vdi vdi) (s_of_vm mirror_vm)
+ mirror_id (s_of_vm live_vm) url (s_of_sr dest_sr) verify_dest ;
+ ignore (Local.VDI.attach3 dbg dp sr vdi (Vm.of_string "0") true) ;
+ (* TODO we are not activating the VDI here because SMAPIv3 does not support
+ activating the VDI again on dom 0 when it is already activated on the live_vm.
+ This means that if the VM shutsdown while SXM is in progress the
+ mirroring for SMAPIv3 will fail.*)
+ let nbd_proxy_path =
+ Printf.sprintf "/var/run/nbdproxy/export/%s" (Vm.string_of mirror_vm)
+ in
+ match remote_mirror with
+ | Mirror.Vhd_mirror _ ->
+ raise
+ (Storage_error
+ (Migration_preparation_failure
+ "Incorrect remote mirror format for SMAPIv3"
+ )
+ )
+ | Mirror.SMAPIv3_mirror {nbd_export; mirror_datapath; mirror_vdi} -> (
+ try
+ let nbd_uri =
+ Uri.make ~scheme:"nbd+unix" ~host:"" ~path:nbd_export
+ ~query:[("socket", [nbd_proxy_path])]
+ ()
+ |> Uri.to_string
+ in
+ let _ : Thread.t =
+ Thread.create
+ (fun () ->
+ export_nbd_proxy ~remote_url:url ~mirror_vm ~sr:dest_sr
+ ~vdi:mirror_vdi.vdi ~dp:mirror_datapath ~verify_dest
+ )
+ ()
+ in
- let receive_start2 _ctx = u __FUNCTION__
+ D.info "%s nbd_proxy_path: %s nbd_url %s" __FUNCTION__ nbd_proxy_path
+ nbd_uri ;
+ let mk = Local.DATA.mirror dbg sr vdi live_vm nbd_uri in
- let receive_start3 _ctx = u __FUNCTION__
+ D.debug "%s Updating active local mirrors: id=%s" __FUNCTION__ mirror_id ;
+ let alm =
+ State.Send_state.
+ {
+ url
+ ; dest_sr
+ ; remote_info=
+ Some
+ {dp= mirror_datapath; vdi= mirror_vdi.vdi; url; verify_dest}
+ ; local_dp= dp
+ ; tapdev= None
+ ; failed= false
+ ; watchdog= None
+ ; vdi
+ ; live_vm
+ ; mirror_key= Some mk
+ }
+ in
+ State.add mirror_id (State.Send_op alm) ;
+ D.debug "%s Updated mirror_id %s in the active local mirror"
+ __FUNCTION__ mirror_id ;
+ mirror_wait ~dbg ~sr ~vdi ~vm:live_vm ~mirror_id mk
+ with e ->
+ D.error "%s caught exception during mirror: %s" __FUNCTION__
+ (Printexc.to_string e) ;
+ raise
+ (Storage_interface.Storage_error
+ (Migration_mirror_failure (Printexc.to_string e))
+ )
+ )
- let receive_finalize _ctx = u __FUNCTION__
+ let receive_start _ctx ~dbg:_ ~sr:_ ~vdi_info:_ ~id:_ ~similar:_ =
+ u "DATA.MIRROR.receive_start"
- let receive_finalize2 _ctx = u __FUNCTION__
+ let receive_start2 _ctx ~dbg:_ ~sr:_ ~vdi_info:_ ~id:_ ~similar:_ ~vm:_ =
+ u "DATA.MIRROR.receive_start2"
- let receive_finalize3 _ctx = u __FUNCTION__
+ let receive_start3 _ctx ~dbg ~sr ~vdi_info ~mirror_id ~similar:_ ~vm ~url
+ ~verify_dest =
+ D.debug "%s dbg: %s sr: %s vdi: %s id: %s vm: %s url: %s verify_dest: %B"
+ __FUNCTION__ dbg (s_of_sr sr)
+ (string_of_vdi_info vdi_info)
+ mirror_id (s_of_vm vm) url verify_dest ;
+ let module Remote = StorageAPI (Idl.Exn.GenClient (struct
+ let rpc =
+ Storage_utils.rpc ~srcstr:"smapiv2" ~dststr:"dst_smapiv2"
+ (Storage_utils.connection_args_of_uri ~verify_dest url)
+ end)) in
+ let on_fail : (unit -> unit) list ref = ref [] in
+ try
+ (* We drop cbt_metadata VDIs that do not have any actual data *)
+ let (vdi_info : vdi_info) =
+ {vdi_info with sm_config= [("base_mirror", mirror_id)]}
+ in
+ let leaf_dp = Remote.DP.create dbg Uuidx.(to_string (make ())) in
+ let leaf = Remote.VDI.create dbg sr vdi_info in
+ D.info "Created leaf VDI for mirror receive: %s" (string_of_vdi_info leaf) ;
+ on_fail := (fun () -> Remote.VDI.destroy dbg sr leaf.vdi) :: !on_fail ;
+ let backend = Remote.VDI.attach3 dbg leaf_dp sr leaf.vdi vm true in
+ let nbd_export =
+ match nbd_export_of_attach_info backend with
+ | None ->
+ raise
+ (Storage_error
+ (Migration_preparation_failure "Cannot parse nbd uri")
+ )
+ | Some export ->
+ export
+ in
+ D.debug "%s activating dp %s sr: %s vdi: %s vm: %s" __FUNCTION__ leaf_dp
+ (s_of_sr sr) (s_of_vdi leaf.vdi) (s_of_vm vm) ;
+ Remote.VDI.activate3 dbg leaf_dp sr leaf.vdi vm ;
+ let qcow2_res =
+ {Mirror.mirror_vdi= leaf; mirror_datapath= leaf_dp; nbd_export}
+ in
+ let remote_mirror = Mirror.SMAPIv3_mirror qcow2_res in
+ D.debug
+ "%s updating receiving state lcoally to id: %s vm: %s vdi_info: %s"
+ __FUNCTION__ mirror_id (s_of_vm vm)
+ (string_of_vdi_info vdi_info) ;
+ State.add mirror_id
+ State.(
+ Recv_op
+ Receive_state.
+ {
+ sr
+ ; leaf_vdi= qcow2_res.mirror_vdi.vdi
+ ; leaf_dp= qcow2_res.mirror_datapath
+ ; remote_vdi= vdi_info.vdi
+ ; mirror_vm= vm
+ ; dummy_vdi=
+ Vdi.of_string "dummy"
+ (* No dummy_vdi is needed when migrating from SMAPIv3 SRs, having a
+ "dummy" VDI here is fine as cleanup code for SMAPIv3 will not
+ access dummy_vdi, and all the clean up functions will ignore
+ exceptions when trying to clean up the dummy VDIs even if they
+ do access dummy_vdi. The same applies to parent_vdi *)
+ ; parent_vdi= Vdi.of_string "dummy"
+ ; url
+ ; verify_dest
+ }
+ ) ;
+ remote_mirror
+ with e ->
+ List.iter
+ (fun op ->
+ try op ()
+ with e ->
+ D.warn "Caught exception in on_fail: %s performing cleaning up"
+ (Printexc.to_string e)
+ )
+ !on_fail ;
+ raise e
- let receive_cancel _ctx = u __FUNCTION__
+ let receive_finalize _ctx ~dbg:_ ~id:_ = u "DATA.MIRROR.receive_finalize"
- let receive_cancel2 _ctx = u __FUNCTION__
+ let receive_finalize2 _ctx ~dbg:_ ~id:_ = u "DATA.MIRROR.receive_finalize2"
- let has_mirror_failed _ctx = u __FUNCTION__
+ let receive_finalize3 _ctx ~dbg ~mirror_id ~sr ~url ~verify_dest =
+ D.debug "%s dbg:%s id: %s sr: %s url: %s verify_dest: %B" __FUNCTION__ dbg
+ mirror_id (s_of_sr sr) url verify_dest ;
+ let (module Remote) =
+ Storage_migrate_helper.get_remote_backend url verify_dest
+ in
+ let open State.Receive_state in
+ let recv_state = State.find_active_receive_mirror mirror_id in
+ Option.iter
+ (fun r ->
+ Remote.DP.destroy2 dbg r.leaf_dp r.sr r.leaf_vdi r.mirror_vm false ;
+ Remote.VDI.remove_from_sm_config dbg r.sr r.leaf_vdi "base_mirror"
+ )
+ recv_state ;
+ State.remove_receive_mirror mirror_id
- let pre_deactivate_hook _ctx = u __FUNCTION__
+ let receive_cancel _ctx ~dbg:_ ~id:_ = u __FUNCTION__
let list _ctx = u __FUNCTION__
let stat _ctx = u __FUNCTION__
+
+ let receive_cancel2 _ctx ~dbg ~mirror_id ~url ~verify_dest =
+ D.debug "%s dbg:%s mirror_id:%s url:%s verify_dest:%B" __FUNCTION__ dbg
+ mirror_id url verify_dest ;
+ let (module Remote) =
+ Storage_migrate_helper.get_remote_backend url verify_dest
+ in
+ let receive_state = State.find_active_receive_mirror mirror_id in
+ let open State.Receive_state in
+ Option.iter
+ (fun r ->
+ D.log_and_ignore_exn (fun () -> Remote.DP.destroy dbg r.leaf_dp false) ;
+ D.log_and_ignore_exn (fun () -> Remote.VDI.destroy dbg r.sr r.leaf_vdi)
+ )
+ receive_state ;
+ State.remove_receive_mirror mirror_id
+
+ let has_mirror_failed _ctx ~dbg ~mirror_id ~sr =
+ match State.find_active_local_mirror mirror_id with
+ | Some ({mirror_key= Some mk; vdi; live_vm; _} : State.Send_state.t) ->
+ let {failed; _} : Mirror.status =
+ Local.DATA.stat dbg sr vdi live_vm mk
+ in
+ failed
+ | _ ->
+ false
+
+ (* TODO currently we make the pre_deactivate_hook for SMAPIv3 a noop while for
+ SMAPIv1 it will do a final check of the state of the mirror and report error
+ if there is a mirror failure. We leave this for SMAPIv3 because the Data.stat
+ call, which checks for the state of the mirror stops working once the domain
+ has been paused, which happens before VDI.deactivate, hence we cannot do this check in
+ pre_deactivate_hook. Instead we work around this by doing mirror check in mirror_wait
+ as we repeatedly poll the state of the mirror job. In the future we might
+ want to invent a different hook that can be called to do a final check just
+ before the VM is paused. *)
+ let pre_deactivate_hook _ctx ~dbg ~dp ~sr ~vdi =
+ D.debug "%s dbg: %s dp: %s sr: %s vdi: %s" __FUNCTION__ dbg dp (s_of_sr sr)
+ (s_of_vdi vdi)
end
diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml
index f7ac9b546d..a12e3ec0c8 100644
--- a/ocaml/xapi/xapi.ml
+++ b/ocaml/xapi/xapi.ml
@@ -327,6 +327,31 @@ let server_run_in_emergency_mode () =
in
wait_to_die () ; exit 0
+let remove_blocked_repositories ~__context () =
+ try
+ let blocklist = !Xapi_globs.repository_url_blocklist in
+ let repos = Db.Repository.get_all ~__context in
+ let pool = Helpers.get_pool ~__context in
+ let is_repo_blocked repo =
+ let binary_url = Db.Repository.get_binary_url ~__context ~self:repo in
+ let source_url = Db.Repository.get_source_url ~__context ~self:repo in
+ Repository_helpers.url_matches ~url:binary_url blocklist
+ || Repository_helpers.url_matches ~url:source_url blocklist
+ in
+ let remove_repo repo =
+ debug "%s Removing repository %s due to it being blocked" __FUNCTION__
+ (Ref.string_of repo) ;
+ try
+ Xapi_pool.remove_repository ~__context ~self:pool ~value:repo ;
+ Db.Repository.destroy ~__context ~self:repo
+ with e ->
+ debug "%s Failed to remove repository for %s: %s" __FUNCTION__
+ (Ref.string_of repo) (Printexc.to_string e)
+ in
+ List.filter (fun x -> is_repo_blocked x) repos
+ |> List.iter (fun x -> remove_repo x)
+ with e -> error "Exception in %s: %s" __FUNCTION__ (Printexc.to_string e)
+
let bring_up_management_if ~__context () =
try
let management_if =
@@ -1115,6 +1140,10 @@ let server_init () =
, [Startup.OnlyMaster]
, Xapi_db_upgrade.hi_level_db_upgrade_rules ~__context
)
+ ; ( "removing blocked repositories"
+ , [Startup.OnlyMaster]
+ , remove_blocked_repositories ~__context
+ )
; ( "bringing up management interface"
, []
, bring_up_management_if ~__context
diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml
index 8bdeac10d0..b183d477ee 100644
--- a/ocaml/xapi/xapi_globs.ml
+++ b/ocaml/xapi/xapi_globs.ml
@@ -932,6 +932,13 @@ let gen_pool_secret_script = ref "/usr/bin/pool_secret_wrapper"
let repository_domain_name_allowlist = ref []
+(*
+ This blocklist aims to prevent the creation of any repository whose URL matches an entry in the blocklist.
+ Additionally, if an existing repository contains a URL that matches an entry in the blocklist,
+ it should be removed automatically after xapi is restarted.
+*)
+let repository_url_blocklist = ref []
+
let yum_cmd = ref "/usr/bin/yum"
let dnf_cmd = ref "/usr/bin/dnf"
@@ -1599,6 +1606,11 @@ let other_options =
(fun s -> s)
(fun s -> s)
repository_domain_name_allowlist
+ ; gen_list_option "repository-url-blocklist"
+ "space-separated list of blocked URL patterns in base URL in repository."
+ (fun s -> s)
+ (fun s -> s)
+ repository_url_blocklist
; ( "repository-gpgcheck"
, Arg.Set repository_gpgcheck
, (fun () -> string_of_bool !repository_gpgcheck)
diff --git a/ocaml/xapi/xapi_services.ml b/ocaml/xapi/xapi_services.ml
index 1612c5050f..ca9e3d729c 100644
--- a/ocaml/xapi/xapi_services.ml
+++ b/ocaml/xapi/xapi_services.ml
@@ -207,8 +207,9 @@ let put_handler (req : Http.Request.t) s _ =
->
Storage_migrate.nbd_handler req s ~vm sr vdi dp
| [""; services; "SM"; "nbdproxy"; vm; sr; vdi; dp]
+ | [""; services; "SM"; "nbdproxy"; "import"; vm; sr; vdi; dp]
when services = _services ->
- Storage_migrate.nbd_proxy req s vm sr vdi dp
+ Storage_migrate.import_nbd_proxy req s vm sr vdi dp
| _ ->
Http_svr.headers s (Http.http_404_missing ~version:"1.0" ()) ;
req.Http.Request.close <- true
diff --git a/ocaml/xapi/xapi_vm_migrate.ml b/ocaml/xapi/xapi_vm_migrate.ml
index e28242cadf..e5eca21283 100644
--- a/ocaml/xapi/xapi_vm_migrate.ml
+++ b/ocaml/xapi/xapi_vm_migrate.ml
@@ -1020,14 +1020,6 @@ let vdi_copy_fun __context dbg vdi_map remote is_intra_pool remote_vdis so_far
(* Though we have no intention of "write", here we use the same mode as the
associated VBD on a mirrored VDIs (i.e. always RW). This avoids problem
when we need to start/stop the VM along the migration. *)
- let read_write = true in
- (* DP set up is only essential for MIRROR.start/stop due to their open ended pattern.
- It's not necessary for copy which will take care of that itself. *)
- ignore
- (SMAPI.VDI.attach3 dbg new_dp vconf.sr vconf.location vconf.mirror_vm
- read_write
- ) ;
- SMAPI.VDI.activate3 dbg new_dp vconf.sr vconf.location vconf.mirror_vm ;
let id =
Storage_migrate_helper.State.mirror_id_of (vconf.sr, vconf.location)
in
diff --git a/ocaml/xcp-rrdd/bin/rrdd/dune b/ocaml/xcp-rrdd/bin/rrdd/dune
index b8419b12fb..d84e06e46f 100644
--- a/ocaml/xcp-rrdd/bin/rrdd/dune
+++ b/ocaml/xcp-rrdd/bin/rrdd/dune
@@ -10,8 +10,8 @@
http_lib
httpsvr
inotify
- mtime
- mtime.clock.os
+ clock
+ mtime.clock
rpclib.core
rrd-transport
rrd-transport.lib
@@ -46,6 +46,7 @@
http_lib
httpsvr
inotify
+ clock
rpclib.core
rpclib.json
rpclib.xml
diff --git a/ocaml/xcp-rrdd/bin/rrdd/rrdd_server.ml b/ocaml/xcp-rrdd/bin/rrdd/rrdd_server.ml
index 6e11a2da31..6a1212f178 100644
--- a/ocaml/xcp-rrdd/bin/rrdd/rrdd_server.ml
+++ b/ocaml/xcp-rrdd/bin/rrdd/rrdd_server.ml
@@ -716,8 +716,12 @@ module Plugin = struct
let next_reading (uid : P.uid) : float =
let open Rrdd_shared in
if with_lock registered_m (fun _ -> Hashtbl.mem registered uid) then
- with_lock last_loop_end_time_m (fun _ ->
- !last_loop_end_time +. !timeslice -. Unix.gettimeofday ()
+ with_lock next_iteration_start_m (fun _ ->
+ match Clock.Timer.remaining !next_iteration_start with
+ | Remaining diff ->
+ Clock.Timer.span_to_s diff
+ | Expired diff ->
+ Clock.Timer.span_to_s diff *. -1.
)
else
-1.
diff --git a/ocaml/xcp-rrdd/bin/rrdd/rrdd_shared.ml b/ocaml/xcp-rrdd/bin/rrdd/rrdd_shared.ml
index 8800ed5683..816860e581 100644
--- a/ocaml/xcp-rrdd/bin/rrdd/rrdd_shared.ml
+++ b/ocaml/xcp-rrdd/bin/rrdd/rrdd_shared.ml
@@ -20,14 +20,15 @@ module StringSet = Set.Make (String)
(* Whether to enable all non-default datasources *)
let enable_all_dss = ref false
-(* The time between each monitoring loop. *)
-let timeslice : float ref = ref 5.
+(* The expected time span between each monitoring loop. *)
+let timeslice : Mtime.span ref = ref Mtime.Span.(5 * s)
-(* Timestamp of the last monitoring loop end. *)
-let last_loop_end_time : float ref = ref neg_infinity
+(* A timer that expires at the start of the next iteration *)
+let next_iteration_start : Clock.Timer.t ref =
+ ref (Clock.Timer.start ~duration:!timeslice)
-(* The mutex that protects the last_loop_end_time against data corruption. *)
-let last_loop_end_time_m : Mutex.t = Mutex.create ()
+(* The mutex that protects the next_iteration_start against data corruption. *)
+let next_iteration_start_m : Mutex.t = Mutex.create ()
(** Cache memory/target values *)
let memory_targets : (int, int64) Hashtbl.t = Hashtbl.create 20
diff --git a/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml b/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
index 448dc98f9c..7510846590 100644
--- a/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
+++ b/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml
@@ -538,18 +538,36 @@ let monitor_write_loop writers =
while true do
try
do_monitor_write xc writers ;
- with_lock Rrdd_shared.last_loop_end_time_m (fun _ ->
- Rrdd_shared.last_loop_end_time := Unix.gettimeofday ()
+ with_lock Rrdd_shared.next_iteration_start_m (fun _ ->
+ Rrdd_shared.next_iteration_start :=
+ Clock.Timer.extend_by !Rrdd_shared.timeslice
+ !Rrdd_shared.next_iteration_start
) ;
- Thread.delay !Rrdd_shared.timeslice
+ match Clock.Timer.remaining !Rrdd_shared.next_iteration_start with
+ | Remaining remaining ->
+ Thread.delay (Clock.Timer.span_to_s remaining)
+ | Expired missed_by ->
+ warn
+ "%s: Monitor write iteration missed cycle by %a, skipping \
+ the delay"
+ __FUNCTION__ Debug.Pp.mtime_span missed_by ;
+ (* To avoid to use up 100% CPU when the timer is already
+ expired, still delay 1s *)
+ Thread.delay 1.
with e ->
Backtrace.is_important e ;
warn
- "Monitor/write thread caught an exception. Pausing for 10s, \
- then restarting: %s"
- (Printexc.to_string e) ;
+ "%s: Monitor/write thread caught an exception. Pausing for \
+ 10s, then restarting: %s"
+ __FUNCTION__ (Printexc.to_string e) ;
log_backtrace e ;
- Thread.delay 10.
+ Thread.delay 10. ;
+ with_lock Rrdd_shared.next_iteration_start_m (fun _ ->
+ Rrdd_shared.next_iteration_start :=
+ Clock.Timer.extend_by
+ Mtime.Span.(10 * s)
+ !Rrdd_shared.next_iteration_start
+ )
done
)
)
diff --git a/ocaml/xenopsd/lib/xenops_server.ml b/ocaml/xenopsd/lib/xenops_server.ml
index ae93a2476c..0b7dc1130a 100644
--- a/ocaml/xenopsd/lib/xenops_server.ml
+++ b/ocaml/xenopsd/lib/xenops_server.ml
@@ -928,6 +928,12 @@ module Redirector = struct
let nested_parallel_queues =
{queues= Queues.create (); mutex= Mutex.create ()}
+ (* We create another queue only for VM_receive_memory operations for the same reason again.
+ Migration spawns 2 operations, send and receive, so if there is limited available worker space
+ a deadlock can happen when VMs are migrating between hosts or on localhost migration
+ as the receiver has no free workers to receive memory. *)
+ let receive_memory_queues = {queues= Queues.create (); mutex= Mutex.create ()}
+
(* we do not want to use = when comparing queues: queues can contain
(uncomparable) functions, and we are only interested in comparing the
equality of their static references *)
@@ -1062,6 +1068,7 @@ module Redirector = struct
(default.queues
:: parallel_queues.queues
:: nested_parallel_queues.queues
+ :: receive_memory_queues.queues
:: List.map snd (StringMap.bindings !overrides)
)
)
@@ -1297,7 +1304,8 @@ module WorkerPool = struct
for _i = 1 to size do
incr Redirector.default ;
incr Redirector.parallel_queues ;
- incr Redirector.nested_parallel_queues
+ incr Redirector.nested_parallel_queues ;
+ incr Redirector.receive_memory_queues
done
let set_size size =
@@ -1313,7 +1321,8 @@ module WorkerPool = struct
in
inner Redirector.default ;
inner Redirector.parallel_queues ;
- inner Redirector.nested_parallel_queues
+ inner Redirector.nested_parallel_queues ;
+ inner Redirector.receive_memory_queues
end
(* Keep track of which VMs we're rebooting so we avoid transient glitches where
@@ -3360,7 +3369,8 @@ let uses_mxgpu id =
)
(VGPU_DB.ids id)
-let queue_operation_int ?traceparent dbg id op =
+let queue_operation_int ?traceparent ?(redirector = Redirector.default) dbg id
+ op =
let task =
Xenops_task.add ?traceparent tasks dbg
(let r = ref None in
@@ -3368,11 +3378,11 @@ let queue_operation_int ?traceparent dbg id op =
)
in
let tag = if uses_mxgpu id then "mxgpu" else id in
- Redirector.push Redirector.default tag (op, task) ;
+ Redirector.push redirector tag (op, task) ;
task
-let queue_operation ?traceparent dbg id op =
- let task = queue_operation_int ?traceparent dbg id op in
+let queue_operation ?traceparent ?redirector dbg id op =
+ let task = queue_operation_int ?traceparent ?redirector dbg id op in
Xenops_task.id_of_handle task
let queue_operation_and_wait dbg id op =
@@ -3556,9 +3566,9 @@ module VIF = struct
()
end
-let default_numa_affinity_policy = ref Xenops_interface.Host.Any
+let default_numa_affinity_policy = ref Xenops_interface.Host.Best_effort
-let numa_placement = ref Xenops_interface.Host.Any
+let numa_placement = ref !default_numa_affinity_policy
let string_of_numa_affinity_policy =
Xenops_interface.Host.(function Any -> "any" | Best_effort -> "best-effort")
@@ -3821,7 +3831,12 @@ module VM = struct
; vmr_compressed= compressed_memory
}
in
- let task = Some (queue_operation ?traceparent dbg id op) in
+ let task =
+ Some
+ (queue_operation ?traceparent
+ ~redirector:Redirector.receive_memory_queues dbg id op
+ )
+ in
Option.iter
(fun t -> t |> Xenops_client.wait_for_task dbg |> ignore)
task
diff --git a/ocaml/xenopsd/lib/xenopsd.ml b/ocaml/xenopsd/lib/xenopsd.ml
index 276192792d..9c5e83e04c 100644
--- a/ocaml/xenopsd/lib/xenopsd.ml
+++ b/ocaml/xenopsd/lib/xenopsd.ml
@@ -59,8 +59,6 @@ let feature_flags_path = ref "/etc/xenserver/features.d"
let pvinpvh_xen_cmdline = ref "pv-shim console=xen"
-let numa_placement_compat = ref false
-
(* O(N^2) operations, until we get a xenstore cache, so use a small number here *)
let vm_guest_agent_xenstore_quota = ref 128
@@ -242,8 +240,11 @@ let options =
, "Command line for the inner-xen for PV-in-PVH guests"
)
; ( "numa-placement"
- , Arg.Bool (fun x -> numa_placement_compat := x)
- , (fun () -> string_of_bool !numa_placement_compat)
+ , Arg.Bool (fun _ -> ())
+ , (fun () ->
+ string_of_bool
+ (!Xenops_server.default_numa_affinity_policy = Best_effort)
+ )
, "NUMA-aware placement of VMs (deprecated, use XAPI setting)"
)
; ( "pci-quarantine"
diff --git a/ocaml/xenopsd/xc/xenops_server_xen.ml b/ocaml/xenopsd/xc/xenops_server_xen.ml
index 9eae9cb76b..3d6b5cf721 100644
--- a/ocaml/xenopsd/xc/xenops_server_xen.ml
+++ b/ocaml/xenopsd/xc/xenops_server_xen.ml
@@ -5294,8 +5294,6 @@ let init () =
{Xs_protocol.ACL.owner= 0; other= Xs_protocol.ACL.READ; acl= []}
) ;
Device.Backend.init () ;
- Xenops_server.default_numa_affinity_policy :=
- if !Xenopsd.numa_placement_compat then Best_effort else Any ;
info "Default NUMA affinity policy is '%s'"
Xenops_server.(string_of_numa_affinity_policy !default_numa_affinity_policy) ;
Xenops_server.numa_placement := !Xenops_server.default_numa_affinity_policy ;