Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cmd/moverDependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,18 @@ func (cooked *CookedCopyCmdArgs) ToString() string {
return fmt.Sprintf("CookedCopyCmdArgs{%s}", strings.Join(parts, ", "))
}

func OpenScanningLogger() {
// set up the front end scanning logger
azcopyScanningLogger = common.NewJobLogger(Client.CurrentJobID, LogLevel, common.LogPathFolder, "-scanning")
azcopyScanningLogger.OpenLog()
}

func CloseScanningLogger() {
if azcopyScanningLogger != nil {
azcopyScanningLogger.CloseLog()
}
}

// ============================================================================
// End Utility Functions - Misellaneous
// ============================================================================
23 changes: 18 additions & 5 deletions cmd/syncComparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func timeEqual(t1, t2 time.Time, useMicroSecPrecision bool) bool {
return t1.Equal(t2)
}

// timeAfter compares two timestamps with precision tolerance to handle filesystem precision differences.
// It truncates both times to the specified precision to handle precision differences
func timeAfter(t1, t2 time.Time, useMicroSecPrecision bool) bool {
// Truncate both times to the specified precision to handle precision differences
if useMicroSecPrecision {
return t1.Truncate(time.Microsecond).After(t2.Truncate(time.Microsecond))
}

return t1.After(t2)
}

// with the help of an objectIndexer containing the source objects
// find out the destination objects that should be transferred
// in other words, this should be used when destination is being enumerated secondly
Expand Down Expand Up @@ -227,13 +238,15 @@ func (f *syncDestinationComparator) processIfNecessaryWithOrchestrator(
sourceObjectInMap StoredObject,
destinationObject StoredObject) (bool, error) {

if sourceObjectInMap.entityType == common.EEntityType.Hardlink() ||
sourceObjectInMap.entityType == common.EEntityType.Other() {
// As of now, for hardlinks and special files at source, fallback to the default behavior
if sourceObjectInMap.entityType == common.EEntityType.Other() {
// As of now, for special files at source, fallback to the default behavior
return false, nil
}

if sourceObjectInMap.entityType != destinationObject.entityType {
// Don't compare the entity type for hardlinks as destination will consider them as files for followed hardlinks
// This will cause unnecessary transfers
if sourceObjectInMap.entityType != common.EEntityType.Hardlink() &&
sourceObjectInMap.entityType != destinationObject.entityType {
if destinationObject.entityType == common.EEntityType.Folder() {
// This entity type compararison is necessary for SyncOrchestrator as we have the visibility
// of a deleted object in the source only once during the directory non-recusrive enumeration.
Expand Down Expand Up @@ -359,7 +372,7 @@ func (f *syncDestinationComparator) compareSourceAndDestinationObject(
return false, true
} else {
// else check if source changed after job start time
return false, sourceObject.changeTime.After(f.orchestratorOptions.lastSuccessfulSyncJobStartTime)
return false, timeAfter(sourceObject.changeTime, f.orchestratorOptions.lastSuccessfulSyncJobStartTime, true)
}
} else {
// If last successful job start time can't be used, we assume its changed
Expand Down
31 changes: 22 additions & 9 deletions cmd/syncEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,24 @@ func (cca *cookedSyncCmdArgs) InitEnumerator(ctx context.Context, enumeratorOpti

Credential: &srcCredInfo,
IncrementEnumeration: func(entityType common.EntityType) {
if entityType == common.EEntityType.File() {
switch entityType {
case common.EEntityType.File(), common.EEntityType.Hardlink():
atomic.AddUint64(&cca.atomicSourceFilesScanned, 1)
} else if entityType == common.EEntityType.Folder() {
case common.EEntityType.Folder():
atomic.AddUint64(&cca.atomicSourceFoldersScanned, 1)
}
if common.IsNFSCopy() {
if entityType == common.EEntityType.Other() {
atomic.AddUint32(&cca.atomicSkippedSpecialFileCount, 1)
} else if entityType == common.EEntityType.Symlink() {
case common.EEntityType.Symlink():
if common.IsNFSCopy() {
atomic.AddUint32(&cca.atomicSkippedSymlinkCount, 1)
}
if cca.symlinkHandling == common.ESymlinkHandlingType.Skip() {
atomic.AddUint64(&cca.atomicSourceFilesScanned, 1)
}
case common.EEntityType.Other():
if common.IsNFSCopy() {
atomic.AddUint32(&cca.atomicSkippedSpecialFileCount, 1)
atomic.AddUint64(&cca.atomicSourceFilesScanned, 1)
}
default:
}
},

Expand All @@ -133,12 +140,17 @@ func (cca *cookedSyncCmdArgs) InitEnumerator(ctx context.Context, enumeratorOpti
PreserveBlobTags: cca.s2sPreserveBlobTags,
HardlinkHandling: cca.hardlinks,
IncrementNotTransferred: func(entityType common.EntityType) {
if entityType == common.EEntityType.File() {

switch entityType {
case common.EEntityType.File(), common.EEntityType.Hardlink(), common.EEntityType.Symlink():
atomic.AddUint64(&cca.atomicSourceFilesTransferNotRequired, 1)
} else if entityType == common.EEntityType.Folder() {
case common.EEntityType.Folder():
atomic.AddUint64(&cca.atomicSourceFoldersTransferNotRequired, 1)
case common.EEntityType.Other():
default:
}
},
ErrorChannel: enumeratorOptions.ErrorChannel,
}
srcTraverserTemplate := ResourceTraverserTemplate{
location: cca.fromTo.From(),
Expand Down Expand Up @@ -181,6 +193,7 @@ func (cca *cookedSyncCmdArgs) InitEnumerator(ctx context.Context, enumeratorOpti
IncludeDirectoryStubs: includeDirStubs,
PreserveBlobTags: cca.s2sPreserveBlobTags,
HardlinkHandling: common.EHardlinkHandlingType.Follow(),
ErrorChannel: enumeratorOptions.ErrorChannel,
}
dstTraverserTemplate := ResourceTraverserTemplate{
location: cca.fromTo.To(),
Expand Down
22 changes: 14 additions & 8 deletions cmd/syncOrchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,13 @@ func (cca *cookedSyncCmdArgs) runSyncOrchestrator(enumerator *syncEnumerator, ct

srcDirEnumerating.Add(1) // Increment active directory count

// func pathEncodeRules(path string, fromTo common.FromTo, disableAutoDecoding bool, source bool) string
// srcRelativePath = pathEncodeRules(dir.(minimalStoredObject).relativePath, cca.fromTo, false, true)
dstRelativePath := pathEncodeRules(dir.(minimalStoredObject).relativePath, cca.fromTo, false, false)

// Build source and destination paths for current directory
sync_src := []string{cca.source.Value, dir.(minimalStoredObject).relativePath}
sync_dst := []string{cca.destination.Value, dir.(minimalStoredObject).relativePath}
sync_dst := []string{cca.destination.Value, dstRelativePath}

pt_src := cca.source
st_src := cca.destination
Expand All @@ -653,14 +657,15 @@ func (cca *cookedSyncCmdArgs) runSyncOrchestrator(enumerator *syncEnumerator, ct
mainCtx,
ptt.options)
if err != nil {
errMsg = fmt.Sprintf("Creating source traverser failed for dir %s: %s", pt_src.Value, err)
errMsg = fmt.Sprintf("SyncOrchestrator: Creating source traverser failed for dir %s: %s", pt_src.Value, err)
syncOrchestratorLog(common.LogError, errMsg)
writeSyncErrToChannel(ptt.options.ErrorChannel, SyncOrchErrorInfo{
DirPath: pt_src.Value,
DirName: dir.(minimalStoredObject).relativePath,
ErrorMsg: errors.New(errMsg),
TraverserLocation: cca.fromTo.From(),
})
cca.IncrementSourceFolderEnumerationFailed()
return err
}

Expand All @@ -671,14 +676,15 @@ func (cca *cookedSyncCmdArgs) runSyncOrchestrator(enumerator *syncEnumerator, ct
mainCtx,
stt.options)
if err != nil {
errMsg = fmt.Sprintf("Creating target traverser failed for dir %s: %s\n", st_src.Value, err)
errMsg = fmt.Sprintf("SyncOrchestrator: Creating target traverser failed for dir %s: %s\n", st_src.Value, err)
syncOrchestratorLog(common.LogError, errMsg)
writeSyncErrToChannel(stt.options.ErrorChannel, SyncOrchErrorInfo{
DirPath: st_src.Value,
DirName: dir.(minimalStoredObject).relativePath,
ErrorMsg: errors.New(errMsg),
TraverserLocation: cca.fromTo.To(),
})
cca.IncrementDestinationFolderEnumerationFailed()
return err
}

Expand All @@ -694,7 +700,7 @@ func (cca *cookedSyncCmdArgs) runSyncOrchestrator(enumerator *syncEnumerator, ct
}

if err != nil {
errMsg = fmt.Sprintf("primary traversal failed for dir %s : %s\n", pt_src.Value, err)
errMsg = fmt.Sprintf("SyncOrchestrator: primary traversal failed for dir %s : %s\n", pt_src.Value, err)
syncOrchestratorLog(common.LogError, errMsg)
writeSyncErrToChannel(ptt.options.ErrorChannel, SyncOrchErrorInfo{
DirPath: pt_src.Value,
Expand Down Expand Up @@ -728,7 +734,7 @@ func (cca *cookedSyncCmdArgs) runSyncOrchestrator(enumerator *syncEnumerator, ct
if changed, fileCount := stra.hasAnyChildChangedSinceLastSync(); !changed {
err = stra.finalize(false) // false indicates we do not want to schedule transfers yet
if err != nil {
errMsg = fmt.Sprintf("Sync finalize to skip target enumeration failed for source dir %s.\n", pt_src.Value)
errMsg = fmt.Sprintf("SyncOrchestrator: Sync finalize to skip target enumeration failed for source dir %s.\n", pt_src.Value)
syncOrchestratorLog(common.LogError, errMsg)
writeSyncErrToChannel(ptt.options.ErrorChannel, SyncOrchErrorInfo{
DirPath: pt_src.Value,
Expand Down Expand Up @@ -782,7 +788,7 @@ func (cca *cookedSyncCmdArgs) runSyncOrchestrator(enumerator *syncEnumerator, ct
}

if err != nil {
errMsg = fmt.Sprintf("Secondary traversal failed for dir %s = %s\n", st_src.Value, err)
errMsg = fmt.Sprintf("SyncOrchestrator: Secondary traversal failed for dir %s = %s\n", st_src.Value, err)
syncOrchestratorLog(common.LogError, errMsg)
// Only report unexpected errors (404s are normal for new files)
if IsDestinationNotFoundDuringSync(err) {
Expand All @@ -799,7 +805,7 @@ func (cca *cookedSyncCmdArgs) runSyncOrchestrator(enumerator *syncEnumerator, ct

err = stra.finalize(false) // false indicates we do not want to schedule transfers yet
if err != nil {
errMsg = fmt.Sprintf("Failed to cleanup indexer object due to target traversal failure - %s. There may be unintended transfers.\n", pt_src.Value)
errMsg = fmt.Sprintf("SyncOrchestrator: Failed to cleanup indexer object due to target traversal failure - %s. There may be unintended transfers.\n", pt_src.Value)
syncOrchestratorLog(common.LogError, errMsg)
writeSyncErrToChannel(ptt.options.ErrorChannel, SyncOrchErrorInfo{
DirPath: pt_src.Value,
Expand All @@ -823,7 +829,7 @@ func (cca *cookedSyncCmdArgs) runSyncOrchestrator(enumerator *syncEnumerator, ct
err = stra.finalize(true) // true indicates we want to schedule transfers

if err != nil {
errMsg = fmt.Sprintf("Sync finalize failed for source dir %s.\n", pt_src.Value)
errMsg = fmt.Sprintf("SyncOrchestrator: Sync finalize failed for source dir %s.\n", pt_src.Value)
syncOrchestratorLog(common.LogError, errMsg)
writeSyncErrToChannel(ptt.options.ErrorChannel, SyncOrchErrorInfo{
DirPath: pt_src.Value,
Expand Down
22 changes: 22 additions & 0 deletions cmd/traverser_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
package cmd

import (
"fmt"
"time"

"github.com/Azure/azure-storage-azcopy/v10/common"
)

const (
SkippedItemErrorPrefix = "SkippedItemError"
UnsupportedItemErrorPrefix = "UnsupportedItemError"
)

// TraverserErrorItemInfo provides an interface for error information related to files and folders that failed enumeration.
// It includes methods to retrieve the file path, file size, last modified time,
// whether the file is a directory, the error message, and whether the file is a source.
Expand All @@ -47,3 +53,19 @@ type TraverserErrorItemInfo interface {
ErrorMessage() error
Location() common.Location
}

func GetSkippedFileErrorMessage(entityType common.EntityType, err error) error {
errMsg := ""
if err != nil {
errMsg = fmt.Sprintf("error: %s.", err.Error())
}
return fmt.Errorf("%s %s. %s", SkippedItemErrorPrefix, entityType.String(), errMsg)
}

func GetUnsupportedFileErrorMessage(entityType common.EntityType, err error) error {
errMsg := ""
if err != nil {
errMsg = fmt.Sprintf("error: %s.", err.Error())
}
return fmt.Errorf("%s %s. %s", UnsupportedItemErrorPrefix, entityType.String(), errMsg)
}
38 changes: 38 additions & 0 deletions cmd/zc_traverser_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ func writeToErrorChannel(errorChannel chan<- TraverserErrorItemInfo, err ErrorFi
// Channel might be full, log the error instead
WarnStdoutAndScanningLog(fmt.Sprintf("Failed to send error to channel: %v", err.ErrorMessage()))
}
} else {
WarnStdoutAndScanningLog(fmt.Sprintf("Error channel is nil, cannot send error: %v", err.ErrorMessage()))
}
}

Expand Down Expand Up @@ -367,6 +369,16 @@ func WalkWithSymlinks(
if options.IncrementEnumerationCounter != nil {
options.IncrementEnumerationCounter(common.EEntityType.Symlink())
}

// Using errorChannel to log skipped files
writeToErrorChannel(options.ErrorChannel,
ErrorFileInfo{
FilePath: filePath,
FileInfo: fileInfo,
ErrorMsg: GetSkippedFileErrorMessage(common.EEntityType.Symlink(), nil),
})
WarnStdoutAndScanningLog(fmt.Sprintf("Skipping symlink - '%s' for NFS", filePath))

logNFSLinkWarning(fileInfo.Name(), "", true)
}
return nil // skip it
Expand Down Expand Up @@ -495,6 +507,15 @@ func WalkWithSymlinks(
if options.IncrementEnumerationCounter != nil {
options.IncrementEnumerationCounter(common.EEntityType.Other())
}

// Using errorChannel to log skipped files
writeToErrorChannel(options.ErrorChannel,
ErrorFileInfo{
FilePath: filePath,
FileInfo: fileInfo,
ErrorMsg: GetUnsupportedFileErrorMessage(common.EEntityType.Other(), nil),
})
WarnStdoutAndScanningLog(fmt.Sprintf("Skipping special file - '%s' for NFS", filePath))
logSpecialFileWarning(fileInfo.Name())
return nil
}
Expand Down Expand Up @@ -1019,6 +1040,14 @@ func (t *localTraverser) Traverse(preprocessor objectMorpher, processor objectPr
// If we are not following symlinks, we skip them.
if common.IsNFSCopy() && t.incrementEnumerationCounter != nil {
t.incrementEnumerationCounter(common.EEntityType.Symlink())
// Using errorChannel to log skipped files
writeToErrorChannel(t.errorChannel,
ErrorFileInfo{
FilePath: path,
FileInfo: fileInfo,
ErrorMsg: GetSkippedFileErrorMessage(common.EEntityType.Symlink(), nil),
})
WarnStdoutAndScanningLog(fmt.Sprintf("Skipping symlink - '%s' for NFS", path))
}
continue

Expand Down Expand Up @@ -1099,6 +1128,15 @@ func (t *localTraverser) Traverse(preprocessor objectMorpher, processor objectPr
if t.incrementEnumerationCounter != nil {
t.incrementEnumerationCounter(entityType)
}

// Using errorChannel to log skipped files
writeToErrorChannel(t.errorChannel,
ErrorFileInfo{
FilePath: path,
FileInfo: fileInfo,
ErrorMsg: GetUnsupportedFileErrorMessage(common.EEntityType.Other(), nil),
})
WarnStdoutAndScanningLog(fmt.Sprintf("Skipping special file - '%s' for NFS", path))
continue
}
}
Expand Down