Skip to content

feat: add --bfs option to mc mirror for layer-by-layer traversal … #5187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,10 @@ func (c *S3Client) listVersionsRoutine(ctx context.Context, b, o string, opts Li
buckets = append(buckets, b)
}

if opts.Prefix != "" {
o = opts.Prefix
}

for _, b := range buckets {
var skipKey string
for objectVersion := range c.api.ListObjects(ctx, b, minio.ListObjectsOptions{
Expand Down Expand Up @@ -2104,6 +2108,10 @@ func (c *S3Client) listIncompleteInRoutine(ctx context.Context, contentCh chan *
func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
// get bucket and object from URL.
b, o := c.url2BucketAndObject()
if opts.Prefix != "" {
o = opts.Prefix
}

switch {
case b == "" && o == "":
buckets, err := c.api.ListBuckets(ctx)
Expand Down Expand Up @@ -2243,6 +2251,7 @@ func (c *S3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInf
}
url.Path = c.buildAbsPath(bucket, entry.Key)
content.URL = url
content.ObjectKey = entry.Key
content.BucketName = bucket
content.Size = entry.Size
content.ETag = entry.ETag
Expand Down Expand Up @@ -2321,6 +2330,10 @@ func (c *S3Client) bucketStat(ctx context.Context, opts BucketStatOptions) (*Cli
func (c *S3Client) listInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
// get bucket and object from URL.
b, o := c.url2BucketAndObject()
if opts.Prefix != "" {
o = opts.Prefix
}

if opts.ListZip && (b == "" || o == "") {
contentCh <- &ClientContent{
Err: probe.NewError(errors.New("listing zip files must provide bucket and object")),
Expand Down Expand Up @@ -2385,6 +2398,10 @@ func sortBucketsNameWithSlash(bucketsInfo []minio.BucketInfo) {
func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
// get bucket and object from URL.
b, o := c.url2BucketAndObject()
if opts.Prefix != "" {
o = opts.Prefix
}

switch {
case b == "" && o == "":
buckets, err := c.api.ListBuckets(ctx)
Expand Down
12 changes: 8 additions & 4 deletions cmd/client-url.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,14 @@ func (u ClientURL) String() string {
}

// urlJoinPath Join a path to existing URL.
func urlJoinPath(url1, url2 string) string {
u1 := newClientURL(url1)
u2 := newClientURL(url2)
return joinURLs(u1, u2).String()
func urlJoinPath(base, element string) string {
if strings.HasSuffix(base, "/") && strings.HasPrefix(element, "/") {
return base + element[1:]
}
if !strings.HasSuffix(base, "/") && !strings.HasPrefix(element, "/") {
return base + "/" + element
}
return base + element
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have to test with URLs @Laisky just thinking that base and element are just paths is not going to work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Laisky can you let met know why this change is needed?


// url2Stat returns stat info for URL - supports bucket, object and a prefixe with or without a trailing slash
Expand Down
2 changes: 2 additions & 0 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type ListOptions struct {
TimeRef time.Time
ShowDir DirOpt
Count int
Prefix string // Add prefix support
}

// CopyOptions holds options for copying operation
Expand Down Expand Up @@ -213,6 +214,7 @@ type Client interface {
// ClientContent - Content container for content metadata
type ClientContent struct {
URL ClientURL
ObjectKey string
BucketName string // only valid and set for client-type objectStorage
Time time.Time
Size int64
Expand Down
172 changes: 171 additions & 1 deletion cmd/difference.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,171 @@ func getSourceModTimeKey(metadata map[string]string) string {
return ""
}

// layerDifference performs a breadth-first search (BFS) comparison between source and target.
// Unlike the standard recursive listing approach, this function traverses the object hierarchy
// layer by layer (directory by directory), which prevents overwhelming the server with
// large recursive listing operations that could cause timeouts or connection failures.
//
// This approach is especially useful for buckets containing millions of objects where
// a standard recursive listing might cause server-side resource exhaustion. By exploring
// the hierarchy level by level and comparing objects at each layer, this function provides
// a more scalable solution for large object stores.
//
// The BFS approach:
// 1. Starts with the root prefix ("") for both source and target
// 2. Lists objects at the current level/prefix (non-recursively)
// 3. Compares objects found at this level
// 4. Queues any directories found for exploration in the next iteration
// 5. Continues until all directories in both source and target are explored
//
// This is enabled with the --bfs parameter to avoid the limitations of recursive listing.
func layerDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage {
diffCh := make(chan diffMessage, 10000)

go func() {
defer close(diffCh)

// Channels to feed items found by BFS into the difference engine
srcClientCh := make(chan *ClientContent, 1000)
tgtClientCh := make(chan *ClientContent, 1000)

// Goroutine to perform BFS on the source
go func() {
defer close(srcClientCh)
// Queue for *relative object prefixes* to explore
queue := []string{""} // "" represents the root prefix

for len(queue) > 0 {
// Dequeue the next relative prefix
prefix := queue[0]
queue = queue[1:]

// List items at the current prefix level using the relative prefix
listCtx, listCancel := context.WithCancel(ctx)
contentsCh := sourceClnt.List(listCtx, ListOptions{
Recursive: false, // List only the current level
WithMetadata: opts.isMetadata,
ShowDir: DirLast, // Ensure directories are listed
Prefix: prefix, // Pass the relative prefix
})

for content := range contentsCh {
select {
case <-ctx.Done():
listCancel()
return
default:
if content != nil && content.Err != nil {
srcClientCh <- content
listCancel()
continue
}
if content == nil {
continue
}

// Send the valid content (file or directory) for comparison
srcClientCh <- content

// If it's a directory, queue its *relative object key* for the next level
if content.Type.IsDir() {
relativeKey := content.ObjectKey // Get the relative key
// Prevent infinite loops: don't re-queue the prefix we just listed,
// especially the root ("") which might list itself as "/" depending on backend.
// Also check if ObjectKey is populated.
if relativeKey != "" && relativeKey != prefix {
// Ensure the key ends with a separator if it's a directory prefix
// The S3 ListObjects usually returns directory keys ending with '/'
if !strings.HasSuffix(relativeKey, string(content.URL.Separator)) {
// This case might indicate a non-standard directory representation, handle cautiously
// For standard S3, common prefixes already end in '/'
// If needed, append separator: relativeKey += string(content.URL.Separator)
}
// Add the relative key (prefix) to the queue
queue = append(queue, relativeKey)
}
}
}
}
listCancel()
}
}()

// Goroutine to perform BFS on the target (symmetric to the source)
go func() {
defer close(tgtClientCh)
// Queue for *relative object prefixes*
queue := []string{""}

for len(queue) > 0 {
prefix := queue[0]
queue = queue[1:]

listCtx, listCancel := context.WithCancel(ctx)
contentsCh := targetClnt.List(listCtx, ListOptions{
Recursive: false,
WithMetadata: opts.isMetadata,
ShowDir: DirLast,
Prefix: prefix, // Pass the relative prefix
})

for content := range contentsCh {
select {
case <-ctx.Done():
listCancel()
return
default:
if content != nil && content.Err != nil {
tgtClientCh <- content
listCancel()
continue
}
if content == nil {
continue
}

tgtClientCh <- content

// If it's a directory, queue its *relative object key*
if content.Type.IsDir() {
relativeKey := content.ObjectKey
if relativeKey != "" && relativeKey != prefix {
// Ensure trailing slash if needed (usually present from S3 List)
if !strings.HasSuffix(relativeKey, string(content.URL.Separator)) {
// Handle non-standard directory representation if necessary
}
queue = append(queue, relativeKey)
}
}
}
}
listCancel()
}
}()

// Comparison logic remains the same
err := differenceInternal(
sourceClnt.GetURL().String(),
srcClientCh,
targetClnt.GetURL().String(),
tgtClientCh,
opts,
false, // returnSimilar is false
diffCh,
)

if err != nil {
select {
case <-ctx.Done():
default:
diffCh <- diffMessage{Error: err}
}
}
}()

return diffCh
}

// activeActiveModTimeUpdated tries to calculate if the object copy in the target
// is older than the one in the source by comparing the modtime of the data.
func activeActiveModTimeUpdated(src, dst *ClientContent) bool {
Expand Down Expand Up @@ -167,7 +332,12 @@ func bucketObjectDifference(ctx context.Context, sourceClnt, targetClnt Client)
})
}

func objectDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) (diffCh chan diffMessage) {
func objectDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage {
if opts.bfs {
// Use layer-by-layer difference for regular objects
return layerDifference(ctx, sourceClnt, targetClnt, opts)
}

sourceURL := sourceClnt.GetURL().String()
sourceCh := sourceClnt.List(ctx, ListOptions{Recursive: true, WithMetadata: opts.isMetadata, ShowDir: DirNone})

Expand Down
7 changes: 6 additions & 1 deletion cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ var (
Name: "skip-errors",
Usage: "skip any errors when mirroring",
},
cli.BoolFlag{
Name: "bfs",
Usage: "using BFS for layer-by-layer traversal of files, suitable for large number of files",
},
checksumFlag,
}
)
Expand Down Expand Up @@ -212,7 +216,7 @@ EXAMPLES:
{{.Prompt}} {{.HelpName}} --older-than 30d s3/test ~/test

13. Mirror server encrypted objects from Amazon S3 cloud storage to a bucket on Amazon S3 cloud storage
{{.Prompt}} {{.HelpName}} --enc-c "minio/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDA" --enc-c "s3/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5BBB" s3/archive/ minio/archive/
{{.Prompt}} {{.HelpName}} --enc-c "minio/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDA" --enc-c "s3/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5BBB" s3/archive/ minio/archive/

14. Update 'Cache-Control' header on all existing objects recursively.
{{.Prompt}} {{.HelpName}} --attr "Cache-Control=max-age=90000,min-fresh=9000" myminio/video-files myminio/video-files
Expand Down Expand Up @@ -1024,6 +1028,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc
userMetadata: userMetadata,
encKeyDB: encKeyDB,
activeActive: isWatch,
bfs: cli.Bool("bfs"),
}

// If we are not using active/active and we are not removing
Expand Down
1 change: 1 addition & 0 deletions cmd/mirror-url.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ type mirrorOptions struct {
userMetadata map[string]string
checksum minio.ChecksumType
sourceListingOnly bool
bfs bool
}

// Prepares urls that need to be copied or removed based on requested options.
Expand Down