Skip to content

Duplicates in object listing #1133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 114 additions & 19 deletions api/layer/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ type (
}
)

func (a prefixSearchResult) isNewerThan(b prefixSearchResult) bool {
if a.CreationEpoch > b.CreationEpoch {
return true
}

if a.CreationTimestamp > b.CreationTimestamp {
return true
}

return false
}

// objectHead returns all object's headers.
func (n *layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) {
prm := PrmObjectRead{
Expand Down Expand Up @@ -689,9 +701,9 @@ func (n *layer) searchTagsAndLocksInNeoFS(ctx context.Context, bkt *data.BucketI
// searchAllVersionsInNeoFS returns all version of object by its objectName.
//
// Returns ErrNodeNotFound if zero objects found.
func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int, onlyUnversioned bool) ([]prefixSearchResult, string, error) {
func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int) ([]prefixSearchResult, string, error) {
var (
filters = make(object.SearchFilters, 0, 4)
filters = make(object.SearchFilters, 0, 3)
returningAttributes = []string{
object.AttributeFilePath,
object.FilterCreationEpoch,
Expand Down Expand Up @@ -719,10 +731,6 @@ func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.
filters.AddTypeFilter(object.MatchStringEqual, object.TypeRegular)
filters.AddFilter(s3headers.MetaType, "", object.MatchNotPresent)

if onlyUnversioned {
filters.AddFilter(s3headers.AttributeVersioningState, "", object.MatchNotPresent)
}

searchResultItems, nextCursor, err := n.neoFS.SearchObjectsV2WithCursor(ctx, bkt.CID, filters, returningAttributes, cursor, opts)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAccessDenied) {
Expand Down Expand Up @@ -788,6 +796,10 @@ func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.
}

sortFunc := func(a, b prefixSearchResult) int {
if c := cmp.Compare(a.FilePath, b.FilePath); c != 0 { // direct order.
return c
}

if c := cmp.Compare(b.CreationEpoch, a.CreationEpoch); c != 0 { // reverse order.
return c
}
Expand All @@ -805,26 +817,109 @@ func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.
return searchResults, nextCursor, nil
}

func (n *layer) searchLatestVersionsByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int, onlyUnversioned bool) ([]prefixSearchResult, string, error) {
searchResults, nextCursor, err := n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, cursor, maxKeys, onlyUnversioned)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAccessDenied) {
return nil, "", s3errors.GetAPIError(s3errors.ErrAccessDenied)
func (n *layer) searchLatestVersionsByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int) ([]prefixSearchResult, string, error) {
var (
batch = make(map[string]prefixSearchResult, maxKeys)
searchedPage []prefixSearchResult
nextCursor = cursor
seachPageSize = 1000
err error
generateCursor bool

// isBatchDone == true means we collected `maxKeys` objects to the response.
isBatchDone bool
)

for len(batch) < maxKeys {
searchedPage, nextCursor, err = n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, nextCursor, seachPageSize)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAccessDenied) {
return nil, "", s3errors.GetAPIError(s3errors.ErrAccessDenied)
}

return nil, "", fmt.Errorf("get all versions by prefix: %w", err)
}

return nil, "", fmt.Errorf("get all versions by prefix: %w", err)
for _, pageItem := range searchedPage {
batchItem, ok := batch[pageItem.FilePath]
if ok {
// Store only the newest item.
if pageItem.isNewerThan(batchItem) {
batch[pageItem.FilePath] = pageItem
}

continue
}

// We collect unique items to response.
if !isBatchDone {
batch[pageItem.FilePath] = pageItem
isBatchDone = len(batch) == maxKeys
} else {
// We already collected `maxKeys` objects. Despite it, current page has items for the next request.
// We must shift cursor to the end of the current batch.
generateCursor = true
}
}

if len(nextCursor) == 0 {
break
}
}

var uniq = make(map[string]prefixSearchResult, len(searchResults))
// There are more items to search. We should look into the next page. It may contain objects from current batch.
if len(nextCursor) != 0 {
// It is possible we have to take more than the one extra pages. But as minimum one must be checked.
var oneMorePage = true
for oneMorePage {
oneMorePage = false

searchedPage, nextCursor, err = n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, nextCursor, seachPageSize)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAccessDenied) {
return nil, "", s3errors.GetAPIError(s3errors.ErrAccessDenied)
}

for _, result := range searchResults {
// take only first object, because it is the freshest one.
if _, ok := uniq[result.FilePath]; !ok {
uniq[result.FilePath] = result
return nil, "", fmt.Errorf("get all versions by prefix: %w", err)
}

// searchedPage sorted by object.AttributeFilePath. If the last element from the extra page is presented in
// current bath we should check another page in advice to find all versions of objects.
if l := len(searchedPage); l > 0 {
_, oneMorePage = batch[searchedPage[l-1].FilePath]
}

for _, result := range searchedPage {
existing, ok := batch[result.FilePath]

// We don't need new items which out of current batch.
if !ok {
// But it means there are not covered items and they must be retrieved in the next request.
generateCursor = true
continue
}

// Try to get the latest version of the object from this page.
if result.isNewerThan(existing) {
batch[result.FilePath] = result
}
}
}
}

return slices.Collect(maps.Values(uniq)), nextCursor, nil
items := slices.Collect(maps.Values(batch))
sortFunc := func(a, b prefixSearchResult) int {
return cmp.Compare(a.FilePath, b.FilePath)
}

slices.SortFunc(items, sortFunc)

// We did some manipulations with pages, we must to send to the client actual point to start from the next request.
if len(nextCursor) != 0 || generateCursor {
nextCursor = generateContinuationToken(items[len(items)-1].FilePath)
}

return items, nextCursor, nil
}

// objectDelete puts tombstone object into neofs.
Expand Down Expand Up @@ -995,7 +1090,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
var latestVersions []prefixSearchResult

if nodeVersions == nil {
latestVersions, next, err = n.searchLatestVersionsByPrefix(ctx, p.Bucket, p.Bucket.Owner, p.Prefix, p.ContinuationToken, p.MaxKeys, false)
latestVersions, next, err = n.searchLatestVersionsByPrefix(ctx, p.Bucket, p.Bucket.Owner, p.Prefix, p.ContinuationToken, p.MaxKeys)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, "", nil
Expand Down
Loading