diff --git a/api/layer/object.go b/api/layer/object.go index 7abf88db..8dd98c81 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -112,6 +112,18 @@ type ( } ) +func (a prefixSearchResult) isNewerThan(b prefixSearchResult) bool { + if a.CreationEpoch > b.CreationEpoch { + return true + } + + if a.CreationTimestamp > b.CreationTimestamp { + return true + } + + return false +} + // objectHead returns all object's headers. func (n *layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) { prm := PrmObjectRead{ @@ -689,9 +701,9 @@ func (n *layer) searchTagsAndLocksInNeoFS(ctx context.Context, bkt *data.BucketI // searchAllVersionsInNeoFS returns all version of object by its objectName. // // Returns ErrNodeNotFound if zero objects found. -func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int, onlyUnversioned bool) ([]prefixSearchResult, string, error) { +func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int) ([]prefixSearchResult, string, error) { var ( - filters = make(object.SearchFilters, 0, 4) + filters = make(object.SearchFilters, 0, 3) returningAttributes = []string{ object.AttributeFilePath, object.FilterCreationEpoch, @@ -719,10 +731,6 @@ func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data. filters.AddTypeFilter(object.MatchStringEqual, object.TypeRegular) filters.AddFilter(s3headers.MetaType, "", object.MatchNotPresent) - if onlyUnversioned { - filters.AddFilter(s3headers.AttributeVersioningState, "", object.MatchNotPresent) - } - searchResultItems, nextCursor, err := n.neoFS.SearchObjectsV2WithCursor(ctx, bkt.CID, filters, returningAttributes, cursor, opts) if err != nil { if errors.Is(err, apistatus.ErrObjectAccessDenied) { @@ -788,6 +796,10 @@ func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data. } sortFunc := func(a, b prefixSearchResult) int { + if c := cmp.Compare(a.FilePath, b.FilePath); c != 0 { // direct order. + return c + } + if c := cmp.Compare(b.CreationEpoch, a.CreationEpoch); c != 0 { // reverse order. return c } @@ -805,26 +817,109 @@ func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data. return searchResults, nextCursor, nil } -func (n *layer) searchLatestVersionsByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int, onlyUnversioned bool) ([]prefixSearchResult, string, error) { - searchResults, nextCursor, err := n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, cursor, maxKeys, onlyUnversioned) - if err != nil { - if errors.Is(err, apistatus.ErrObjectAccessDenied) { - return nil, "", s3errors.GetAPIError(s3errors.ErrAccessDenied) +func (n *layer) searchLatestVersionsByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int) ([]prefixSearchResult, string, error) { + var ( + batch = make(map[string]prefixSearchResult, maxKeys) + searchedPage []prefixSearchResult + nextCursor = cursor + seachPageSize = 1000 + err error + generateCursor bool + + // isBatchDone == true means we collected `maxKeys` objects to the response. + isBatchDone bool + ) + + for len(batch) < maxKeys { + searchedPage, nextCursor, err = n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, nextCursor, seachPageSize) + if err != nil { + if errors.Is(err, apistatus.ErrObjectAccessDenied) { + return nil, "", s3errors.GetAPIError(s3errors.ErrAccessDenied) + } + + return nil, "", fmt.Errorf("get all versions by prefix: %w", err) } - return nil, "", fmt.Errorf("get all versions by prefix: %w", err) + for _, pageItem := range searchedPage { + batchItem, ok := batch[pageItem.FilePath] + if ok { + // Store only the newest item. + if pageItem.isNewerThan(batchItem) { + batch[pageItem.FilePath] = pageItem + } + + continue + } + + // We collect unique items to response. + if !isBatchDone { + batch[pageItem.FilePath] = pageItem + isBatchDone = len(batch) == maxKeys + } else { + // We already collected `maxKeys` objects. Despite it, current page has items for the next request. + // We must shift cursor to the end of the current batch. + generateCursor = true + } + } + + if len(nextCursor) == 0 { + break + } } - var uniq = make(map[string]prefixSearchResult, len(searchResults)) + // There are more items to search. We should look into the next page. It may contain objects from current batch. + if len(nextCursor) != 0 { + // It is possible we have to take more than the one extra pages. But as minimum one must be checked. + var oneMorePage = true + for oneMorePage { + oneMorePage = false + + searchedPage, nextCursor, err = n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, nextCursor, seachPageSize) + if err != nil { + if errors.Is(err, apistatus.ErrObjectAccessDenied) { + return nil, "", s3errors.GetAPIError(s3errors.ErrAccessDenied) + } - for _, result := range searchResults { - // take only first object, because it is the freshest one. - if _, ok := uniq[result.FilePath]; !ok { - uniq[result.FilePath] = result + return nil, "", fmt.Errorf("get all versions by prefix: %w", err) + } + + // searchedPage sorted by object.AttributeFilePath. If the last element from the extra page is presented in + // current bath we should check another page in advice to find all versions of objects. + if l := len(searchedPage); l > 0 { + _, oneMorePage = batch[searchedPage[l-1].FilePath] + } + + for _, result := range searchedPage { + existing, ok := batch[result.FilePath] + + // We don't need new items which out of current batch. + if !ok { + // But it means there are not covered items and they must be retrieved in the next request. + generateCursor = true + continue + } + + // Try to get the latest version of the object from this page. + if result.isNewerThan(existing) { + batch[result.FilePath] = result + } + } } } - return slices.Collect(maps.Values(uniq)), nextCursor, nil + items := slices.Collect(maps.Values(batch)) + sortFunc := func(a, b prefixSearchResult) int { + return cmp.Compare(a.FilePath, b.FilePath) + } + + slices.SortFunc(items, sortFunc) + + // We did some manipulations with pages, we must to send to the client actual point to start from the next request. + if len(nextCursor) != 0 || generateCursor { + nextCursor = generateContinuationToken(items[len(items)-1].FilePath) + } + + return items, nextCursor, nil } // objectDelete puts tombstone object into neofs. @@ -995,7 +1090,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) var latestVersions []prefixSearchResult if nodeVersions == nil { - latestVersions, next, err = n.searchLatestVersionsByPrefix(ctx, p.Bucket, p.Bucket.Owner, p.Prefix, p.ContinuationToken, p.MaxKeys, false) + latestVersions, next, err = n.searchLatestVersionsByPrefix(ctx, p.Bucket, p.Bucket.Owner, p.Prefix, p.ContinuationToken, p.MaxKeys) if err != nil { if errors.Is(err, ErrNodeNotFound) { return nil, "", nil