Skip to content

Commit de4a832

Browse files
committed
First revision of raw block volume
1 parent cd3c0b6 commit de4a832

File tree

6 files changed

+336
-28
lines changed

6 files changed

+336
-28
lines changed

pkg/csi-util/utils.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"net"
2121
"os"
2222
"os/exec"
23+
"path/filepath"
2324
"strconv"
2425
"strings"
2526
"sync"
@@ -70,6 +71,9 @@ const (
7071
FINDMNT_COMMAND = "findmnt"
7172
CAT_COMMAND = "cat"
7273
RPM_COMMAND = "rpm-host"
74+
75+
// For Raw Block Volumes, the name of the bind-mounted file inside StagingTargetPath
76+
RawBlockStagingFile = "mountfile"
7377
)
7478

7579
// Util interface
@@ -450,3 +454,36 @@ func ValidateFssId(id string) *FSSVolumeHandler {
450454
}
451455
return volumeHandler
452456
}
457+
458+
func GetStagingTargetPathForFile(stagingTargetPath string) string {
459+
stagingTargetPathFile := filepath.Join(stagingTargetPath, RawBlockStagingFile)
460+
return stagingTargetPathFile
461+
}
462+
463+
// Creates a file on the specified path after creating the containing directory
464+
func CreateFilePath(logger *zap.SugaredLogger, path string) error {
465+
pathDir := filepath.Dir(path)
466+
467+
logger.Infof("trying to create surrounding directory %s", pathDir)
468+
err := os.MkdirAll(pathDir, 0750)
469+
if err != nil {
470+
logger.Infof("failed to create surrounding directory %s", pathDir)
471+
return err
472+
}
473+
474+
logger.Infof("created surrounding directory, trying to create the target file %s", path)
475+
file, fileErr := os.OpenFile(path, os.O_CREATE, 0640)
476+
if fileErr != nil && !os.IsExist(fileErr) {
477+
logger.Infof("failed to create/open the target file at %s", path)
478+
return fileErr
479+
}
480+
481+
fileErr = file.Close()
482+
if fileErr != nil {
483+
logger.Infof("failed to close %s", path)
484+
return fileErr
485+
}
486+
487+
logger.Infof("ensured file at %s", path)
488+
return nil
489+
}

pkg/csi/driver/bv_node.go

Lines changed: 191 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ func (d BlockVolumeNodeDriver) NodeStageVolume(ctx context.Context, req *csi.Nod
6464

6565
logger := d.logger.With("volumeID", req.VolumeId, "stagingPath", req.StagingTargetPath)
6666

67+
stagingTargetFilePath := csi_util.GetStagingTargetPathForFile(req.StagingTargetPath)
68+
69+
isRawBlockVolume := false
70+
71+
if _, ok := req.VolumeCapability.GetAccessType().(*csi.VolumeCapability_Block); ok {
72+
isRawBlockVolume = true
73+
}
74+
6775
attachment, ok := req.PublishContext[attachmentType]
6876

6977
if !ok {
@@ -128,6 +136,13 @@ func (d BlockVolumeNodeDriver) NodeStageVolume(ctx context.Context, req *csi.Nod
128136

129137
defer d.volumeLocks.Release(req.VolumeId)
130138

139+
/*
140+
Code block to check if the block device is ready for consumption.
141+
- devicePath is the source and is extracted based on attachmentType.
142+
- DeviceOpened also has different implentation based on attachmentType.
143+
TBD: Do we need to do something different for Raw Block Volume Case - dont think so
144+
*/
145+
131146
isMounted, oErr := mountHandler.DeviceOpened(devicePath)
132147
if oErr != nil {
133148
logger.With(zap.Error(oErr)).Error("getting error to get the details about volume is already mounted or not.")
@@ -191,6 +206,15 @@ func (d BlockVolumeNodeDriver) NodeStageVolume(ctx context.Context, req *csi.Nod
191206
return nil, status.Error(codes.DeadlineExceeded, "Failed to wait for device to exist.")
192207
}
193208

209+
if isRawBlockVolume {
210+
err = createFileAndBindMountDevice(logger, devicePath, stagingTargetFilePath, mountHandler, false)
211+
if err != nil {
212+
logger.Infof("failed to bind mount raw block volume to stagingTargetFile %s", stagingTargetFilePath)
213+
return nil, status.Error(codes.Internal, err.Error())
214+
}
215+
return &csi.NodeStageVolumeResponse{}, nil
216+
}
217+
194218
mnt := req.VolumeCapability.GetMount()
195219
options := mnt.MountFlags
196220

@@ -263,6 +287,9 @@ func (d BlockVolumeNodeDriver) NodeUnstageVolume(ctx context.Context, req *csi.N
263287

264288
logger := d.logger.With("volumeID", req.VolumeId, "stagingPath", req.StagingTargetPath)
265289

290+
stagingTargetPathFile := csi_util.GetStagingTargetPathForFile(req.StagingTargetPath)
291+
isRawBlockVolume := false
292+
266293
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
267294
logger.Error("Could not acquire lock for NodeUnstageVolume.")
268295
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, req.VolumeId)
@@ -273,13 +300,32 @@ func (d BlockVolumeNodeDriver) NodeUnstageVolume(ctx context.Context, req *csi.N
273300
diskPath, err := disk.GetDiskPathFromMountPath(d.logger, req.GetStagingTargetPath())
274301

275302
if err != nil {
276-
// do a clean exit in case of mount point not found
277303
if err == disk.ErrMountPointNotFound {
278-
logger.With(zap.Error(err)).With("mountPath", req.GetStagingTargetPath()).Warn("unable to fetch mount point")
279-
return &csi.NodeUnstageVolumeResponse{}, nil
304+
// Check if it is a raw block volume if StagingTargetPath isn't mounted.
305+
isDevice, pathErr := disk.PathIsDevice(logger, stagingTargetPathFile)
306+
if pathErr != nil {
307+
logger.Errorf("unable to check if %s represents a device file.", stagingTargetPathFile)
308+
return nil, status.Error(codes.Internal, pathErr.Error())
309+
}
310+
311+
// Update StagingTargetPath and find diskPaths if it is a raw block volume
312+
if isDevice {
313+
isRawBlockVolume = true
314+
diskPath, err = disk.GetDiskPathFromBindDeviceFilePath(logger, stagingTargetPathFile)
315+
if err != nil {
316+
logger.With(zap.Error(err)).Warn("unable to fetch the disk paths")
317+
return &csi.NodeUnstageVolumeResponse{}, nil
318+
}
319+
logger.Info("the volume is detected to be a raw block volume, fetched disk paths")
320+
} else {
321+
// Else, treat as usual
322+
logger.With(zap.Error(err)).With("mountPath", req.GetStagingTargetPath()).Warn("unable to fetch mount point")
323+
return &csi.NodeUnstageVolumeResponse{}, nil
324+
}
325+
} else {
326+
logger.With(zap.Error(err)).With("mountPath", req.GetStagingTargetPath()).Error("unable to get diskPath from mount path")
327+
return nil, status.Error(codes.Internal, err.Error())
280328
}
281-
logger.With(zap.Error(err)).With("mountPath", req.GetStagingTargetPath()).Error("unable to get diskPath from mount path")
282-
return nil, status.Error(codes.Internal, err.Error())
283329
}
284330

285331
attachmentType, devicePath, err := getDevicePathAndAttachmentType(diskPath)
@@ -318,16 +364,24 @@ func (d BlockVolumeNodeDriver) NodeUnstageVolume(ctx context.Context, req *csi.N
318364
logger.Error("unknown attachment type. supported attachment types are iscsi and paravirtualized")
319365
return nil, status.Error(codes.InvalidArgument, "unknown attachment type. supported attachment types are iscsi and paravirtualized")
320366
}
321-
isMounted, oErr := mountHandler.DeviceOpened(devicePath)
322-
if oErr != nil {
323-
logger.With(zap.Error(oErr)).Error("getting error to get the details about volume is already mounted or not.")
324-
return nil, status.Error(codes.Internal, oErr.Error())
325-
} else if !isMounted {
326-
logger.Info("volume is already mounted on the staging path.")
327-
return &csi.NodeUnstageVolumeResponse{}, nil
367+
368+
if !isRawBlockVolume {
369+
isMounted, oErr := mountHandler.DeviceOpened(devicePath)
370+
if oErr != nil {
371+
logger.With(zap.Error(oErr)).Error("getting error to get the details about volume is already mounted or not.")
372+
return nil, status.Error(codes.Internal, oErr.Error())
373+
} else if !isMounted {
374+
logger.Info("volume is already mounted on the staging path.")
375+
return &csi.NodeUnstageVolumeResponse{}, nil
376+
}
377+
}
378+
379+
if isRawBlockVolume {
380+
err = mountHandler.UnmountDeviceBindAndDelete(stagingTargetPathFile)
381+
} else {
382+
err = mountHandler.UnmountPath(req.StagingTargetPath)
328383
}
329384

330-
err = mountHandler.UnmountPath(req.StagingTargetPath)
331385
if err != nil {
332386
logger.With(zap.Error(err)).Error("failed to unmount the staging path")
333387
return nil, status.Error(codes.Internal, err.Error())
@@ -374,6 +428,14 @@ func (d BlockVolumeNodeDriver) NodePublishVolume(ctx context.Context, req *csi.N
374428

375429
logger := d.logger.With("volumeID", req.VolumeId, "targetPath", req.TargetPath)
376430

431+
stagingTargetFilePath := csi_util.GetStagingTargetPathForFile(req.StagingTargetPath)
432+
433+
isRawBlockVolume := false
434+
435+
if _, ok := req.VolumeCapability.GetAccessType().(*csi.VolumeCapability_Block); ok {
436+
isRawBlockVolume = true
437+
}
438+
377439
attachment, ok := req.PublishContext[attachmentType]
378440
if !ok {
379441
logger.Error("Unable to get the attachmentType from the attribute list, assuming iscsi")
@@ -387,13 +449,15 @@ func (d BlockVolumeNodeDriver) NodePublishVolume(ctx context.Context, req *csi.N
387449

388450
defer d.volumeLocks.Release(req.VolumeId)
389451

390-
// k8s v1.20+ will not create the TargetPath directory
391-
// https://github.com/kubernetes/kubernetes/pull/88759
392-
// if the path exists already (<v1.20) this is a no op
393-
// https://golang.org/pkg/os/#MkdirAll
394-
if err := os.MkdirAll(req.TargetPath, 0750); err != nil {
395-
logger.With(zap.Error(err)).Error("Failed to create TargetPath directory")
396-
return nil, status.Error(codes.Internal, "Failed to create TargetPath directory")
452+
if !isRawBlockVolume {
453+
// k8s v1.20+ will not create the TargetPath directory
454+
// https://github.com/kubernetes/kubernetes/pull/88759
455+
// if the path exists already (<v1.20) this is a no op
456+
// https://golang.org/pkg/os/#MkdirAll
457+
if err := os.MkdirAll(req.TargetPath, 0750); err != nil {
458+
logger.With(zap.Error(err)).Error("Failed to create TargetPath directory")
459+
return nil, status.Error(codes.Internal, "Failed to create TargetPath directory")
460+
}
397461
}
398462

399463
multipathEnabledVolume := false
@@ -431,6 +495,15 @@ func (d BlockVolumeNodeDriver) NodePublishVolume(ctx context.Context, req *csi.N
431495
return nil, status.Error(codes.InvalidArgument, "unknown attachment type. supported attachment types are iscsi and paravirtualized")
432496
}
433497

498+
if isRawBlockVolume {
499+
err := createFileAndBindMountDevice(logger, stagingTargetFilePath, req.TargetPath, mountHandler, req.Readonly)
500+
if err != nil {
501+
logger.Infof("failed to bind mount raw block volume to TargetPath %s", req.TargetPath)
502+
return nil, status.Error(codes.Internal, err.Error())
503+
}
504+
return &csi.NodePublishVolumeResponse{}, nil
505+
}
506+
434507
mnt := req.VolumeCapability.GetMount()
435508
options := mnt.MountFlags
436509

@@ -458,6 +531,7 @@ func (d BlockVolumeNodeDriver) NodePublishVolume(ctx context.Context, req *csi.N
458531

459532
logger.With("attachmentType", attachment).Info("Publish volume to the Node is Completed.")
460533

534+
// TBD: Might need to determine if the following code block has to be changed
461535
if req.PublishContext[needResize] != "" {
462536
needsResize, err := strconv.ParseBool(req.PublishContext[needResize])
463537
if err != nil {
@@ -548,22 +622,40 @@ func (d BlockVolumeNodeDriver) NodeUnpublishVolume(ctx context.Context, req *csi
548622

549623
logger := d.logger.With("volumeID", req.VolumeId, "targetPath", req.TargetPath)
550624

625+
//Check if TargetPath is a device file to see for Raw Block.
626+
isRawBlockVolume, checkErr := disk.PathIsDevice(logger, req.TargetPath)
627+
if checkErr != nil {
628+
logger.Errorf("failed to check if %s is a device file", req.TargetPath)
629+
return nil, status.Errorf(codes.Internal, checkErr.Error())
630+
}
631+
551632
if acquired := d.volumeLocks.TryAcquire(req.VolumeId); !acquired {
552633
logger.Error("Could not acquire lock for NodeUnpublishVolume.")
553634
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, req.VolumeId)
554635
}
555636

556637
defer d.volumeLocks.Release(req.VolumeId)
557638

558-
diskPath, err := disk.GetDiskPathFromMountPath(d.logger, req.TargetPath)
559-
if err != nil {
560-
// do a clean exit in case of mount point not found
561-
if err == disk.ErrMountPointNotFound {
562-
logger.With(zap.Error(err)).With("mountPath", req.TargetPath).Warn("unable to fetch mount point")
639+
var diskPath []string
640+
var err error
641+
642+
if isRawBlockVolume {
643+
diskPath, err = disk.GetDiskPathFromBindDeviceFilePath(logger, req.TargetPath)
644+
if err != nil {
645+
logger.With(zap.Error(err)).Warn("unable to fetch disk paths, exiting.")
563646
return &csi.NodeUnpublishVolumeResponse{}, nil
564647
}
565-
logger.With(zap.Error(err)).With("mountPath", req.TargetPath).Error("unable to get diskPath from mount path")
566-
return nil, status.Error(codes.Internal, err.Error())
648+
} else {
649+
diskPath, err = disk.GetDiskPathFromMountPath(d.logger, req.TargetPath)
650+
if err != nil {
651+
// do a clean exit in case of mount point not found
652+
if err == disk.ErrMountPointNotFound {
653+
logger.With(zap.Error(err)).With("mountPath", req.TargetPath).Warn("unable to fetch mount point")
654+
return &csi.NodeUnpublishVolumeResponse{}, nil
655+
}
656+
logger.With(zap.Error(err)).With("mountPath", req.TargetPath).Error("unable to get diskPath from mount path")
657+
return nil, status.Error(codes.Internal, err.Error())
658+
}
567659
}
568660

569661
attachmentType, devicePath, err := getDevicePathAndAttachmentType(diskPath)
@@ -591,7 +683,15 @@ func (d BlockVolumeNodeDriver) NodeUnpublishVolume(ctx context.Context, req *csi
591683
return nil, status.Error(codes.InvalidArgument, "unknown attachment type. supported attachment types are iscsi and paravirtualized")
592684
}
593685

594-
if err := mountHandler.UnmountPath(req.TargetPath); err != nil {
686+
var umountErr error
687+
688+
if isRawBlockVolume {
689+
umountErr = mountHandler.UnmountDeviceBindAndDelete(req.TargetPath)
690+
} else {
691+
umountErr = mountHandler.UnmountPath(req.TargetPath)
692+
}
693+
694+
if umountErr != nil {
595695
logger.With(zap.Error(err)).Error("failed to unmount the target path, error")
596696
return nil, status.Error(codes.Internal, err.Error())
597697
}
@@ -679,6 +779,44 @@ func (d BlockVolumeNodeDriver) NodeGetVolumeStats(ctx context.Context, req *csi.
679779
return nil, status.Error(codes.InvalidArgument, "volume path must be provided")
680780
}
681781

782+
isRawBlockVolume := false
783+
isDevice, err := disk.PathIsDevice(logger, volumePath)
784+
if err != nil {
785+
logger.With(zap.Error(err)).Errorf("failed to check if the volumePath is a Device %s", volumePath)
786+
return nil, status.Error(codes.Internal, err.Error())
787+
}
788+
if isDevice {
789+
isRawBlockVolume = true
790+
} else {
791+
isDevice, err = disk.PathIsDevice(logger, csi_util.GetStagingTargetPathForFile(volumePath))
792+
if err != nil {
793+
logger.With(zap.Error(err)).Errorf("failed to check if the volumePathFile is a Device %s", volumePath)
794+
return nil, status.Error(codes.Internal, err.Error())
795+
}
796+
if isDevice {
797+
isRawBlockVolume = true
798+
volumePath = csi_util.GetStagingTargetPathForFile(volumePath)
799+
}
800+
}
801+
802+
if isRawBlockVolume {
803+
metricsProvider := volume.NewMetricsBlock(volumePath)
804+
metrics, err := metricsProvider.GetMetrics()
805+
if err != nil {
806+
logger.With(zap.Error(err)).Errorf("failed to get metrics for device at %s", volumePath)
807+
return nil, status.Error(codes.Internal, err.Error())
808+
}
809+
810+
return &csi.NodeGetVolumeStatsResponse{
811+
Usage: []*csi.VolumeUsage{
812+
{
813+
Unit: csi.VolumeUsage_BYTES,
814+
Total: metrics.Capacity.AsDec().UnscaledBig().Int64(),
815+
},
816+
},
817+
}, nil
818+
}
819+
682820
hostUtil := hostutil.NewHostUtil()
683821
exists, err := hostUtil.PathExists(volumePath)
684822
if err != nil {
@@ -716,6 +854,7 @@ func (d BlockVolumeNodeDriver) NodeGetVolumeStats(ctx context.Context, req *csi.
716854
}
717855

718856
// NodeExpandVolume returns the expand of the volume
857+
// TBD - Need to handle this case as well
719858
func (d BlockVolumeNodeDriver) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
720859
volumeID := req.GetVolumeId()
721860
if len(volumeID) == 0 {
@@ -841,3 +980,27 @@ func getMultipathDevicesFromReq(req *csi.NodeStageVolumeRequest) ([]core.Multipa
841980

842981
return multipathDevicesList, nil
843982
}
983+
984+
// Creates a file at target and bind mounts a device placed on source to it
985+
func createFileAndBindMountDevice(logger *zap.SugaredLogger, source string, target string, mountHandler disk.Interface, readOnly bool) error {
986+
err := csi_util.CreateFilePath(logger, target)
987+
if err != nil {
988+
logger.Infof("failed to create the target file at %s", target)
989+
return err
990+
}
991+
992+
options := []string{"bind"}
993+
if readOnly {
994+
options = append(options, "ro")
995+
}
996+
997+
logger.Infof("trying to bind mount %s on %s", source, target)
998+
err = mountHandler.MountWithoutFormat(source, target, "", options)
999+
if err != nil {
1000+
logger.Infof("failed to mount %s on %s", source, target)
1001+
return err
1002+
}
1003+
1004+
logger.Infof("successfully bind mounted to %s", target)
1005+
return nil
1006+
}

0 commit comments

Comments
 (0)