Skip to content

s3 pagination failures #1177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/handler/object_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func parseListObjectVersionsRequest(reqInfo *api.ReqInfo) (*layer.ListObjectVers
}

res.Prefix = queryValues.Get("prefix")
res.KeyMarker = queryValues.Get("marker")
res.KeyMarker = queryValues.Get("key-marker")
res.Delimiter = queryValues.Get("delimiter")
res.Encode = queryValues.Get("encoding-type")
res.VersionIDMarker = queryValues.Get("version-id-marker")
Expand Down
58 changes: 43 additions & 15 deletions api/layer/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,14 @@ func (n *layer) prepareMultipartHeadObject(ctx context.Context, p *PutObjectPara
//
// Returns ErrNodeNotFound if zero objects found.
func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketInfo, owner user.ID, objectName string, onlyUnversioned bool) ([]allVersionsSearchResult, error) {
searchResults, _, err := n.searchAllVersionsInNeoFSWithCursor(ctx, bkt, owner, objectName, onlyUnversioned, nil)

return searchResults, err
}

func (n *layer) searchAllVersionsInNeoFSWithCursor(ctx context.Context, bkt *data.BucketInfo, owner user.ID, objectName string, onlyUnversioned bool, cursor *string) ([]allVersionsSearchResult, string, error) {
var (
filters = make(object.SearchFilters, 0, 6)
filters = make(object.SearchFilters, 0, 4)
returningAttributes = []string{
object.AttributeFilePath,
object.AttributeTimestamp,
Expand All @@ -418,7 +424,10 @@ func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketIn
s3headers.AttributeDeleteMarker,
}

opts client.SearchObjectsOptions
opts client.SearchObjectsOptions
searchResultItems []client.SearchResultItem
nextCursor string
err error
)

if bt := bearerTokenFromContext(ctx, owner); bt != nil {
Expand All @@ -438,17 +447,28 @@ func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketIn
filters.AddFilter(s3headers.AttributeVersioningState, "", object.MatchNotPresent)
}

searchResultItems, err := n.neoFS.SearchObjectsV2(ctx, bkt.CID, filters, returningAttributes, opts)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAccessDenied) {
return nil, s3errors.GetAPIError(s3errors.ErrAccessDenied)
if cursor != nil {
searchResultItems, nextCursor, err = n.neoFS.SearchObjectsV2WithCursor(ctx, bkt.CID, filters, returningAttributes, *cursor, opts)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAccessDenied) {
return nil, "", s3errors.GetAPIError(s3errors.ErrAccessDenied)
}

return nil, "", fmt.Errorf("search object version: %w", err)
}
} else {
searchResultItems, err = n.neoFS.SearchObjectsV2(ctx, bkt.CID, filters, returningAttributes, opts)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAccessDenied) {
return nil, "", s3errors.GetAPIError(s3errors.ErrAccessDenied)
}

return nil, fmt.Errorf("search object version: %w", err)
return nil, "", fmt.Errorf("search object version: %w", err)
}
}

if len(searchResultItems) == 0 {
return nil, ErrNodeNotFound
return nil, "", ErrNodeNotFound
}

var searchResults = make([]allVersionsSearchResult, 0, len(searchResultItems))
Expand All @@ -462,7 +482,7 @@ func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketIn
if item.Attributes[1] != "" {
psr.CreationTimestamp, err = strconv.ParseInt(item.Attributes[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid creation timestamp %s: %w", item.Attributes[1], err)
return nil, "", fmt.Errorf("invalid creation timestamp %s: %w", item.Attributes[1], err)
}
}

Expand All @@ -471,7 +491,7 @@ func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketIn
if item.Attributes[3] != "" {
psr.PayloadSize, err = strconv.ParseInt(item.Attributes[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid payload size %s: %w", item.Attributes[3], err)
return nil, "", fmt.Errorf("invalid payload size %s: %w", item.Attributes[3], err)
}
}

Expand All @@ -481,6 +501,10 @@ func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketIn
}

sortFunc := func(a, b allVersionsSearchResult) int {
if c := cmp.Compare(a.FilePath, b.FilePath); c != 0 { // direct order.
return c
}

if c := cmp.Compare(b.CreationTimestamp, a.CreationTimestamp); c != 0 { // reverse order.
return c
}
Expand All @@ -491,7 +515,7 @@ func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketIn

slices.SortFunc(searchResults, sortFunc)

return searchResults, nil
return searchResults, nextCursor, nil
}

func (n *layer) comprehensiveSearchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketInfo, owner user.ID, objectName string, onlyUnversioned bool) ([]allVersionsSearchResult, oid.ID, *data.LockInfo, error) {
Expand Down Expand Up @@ -1033,6 +1057,10 @@ func generateContinuationToken(filePath string) string {
id[i] = 255
}

return generateAdjustedContinuationToken(filePath, id)
}

func generateAdjustedContinuationToken(filePath string, id oid.ID) string {
cursorBuf := bytes.NewBuffer(nil)
cursorBuf.Write([]byte(object.AttributeFilePath))
cursorBuf.WriteByte(0x00)
Expand Down Expand Up @@ -1153,10 +1181,10 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
return
}

func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) {
searchResults, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, prefix, false)
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, cursor, delimiter string) (map[string][]*data.ExtendedObjectInfo, string, error) {
searchResults, nextCursor, err := n.searchAllVersionsInNeoFSWithCursor(ctx, bkt, bkt.Owner, prefix, false, &cursor)
if err != nil {
return nil, err
return nil, "", err
}

versions := make(map[string][]*data.ExtendedObjectInfo, len(searchResults))
Expand Down Expand Up @@ -1205,7 +1233,7 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo,
versions[oi.Name] = objVersions
}

return versions, nil
return versions, nextCursor, nil
}

func IsSystemHeader(key string) bool {
Expand Down
120 changes: 86 additions & 34 deletions api/layer/versioning.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,109 @@
package layer

import (
"cmp"
"bytes"
"context"
"errors"
"fmt"
"maps"
"slices"
"strconv"
"strings"

"github.com/nspcc-dev/neofs-s3-gw/api/data"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
var (
allObjects = make([]*data.ExtendedObjectInfo, 0, p.MaxKeys)
res = &ListObjectVersionsInfo{}
allObjects = make([]*data.ExtendedObjectInfo, 0, p.MaxKeys)
res = &ListObjectVersionsInfo{}
cursor string
id oid.ID
keyMarkerLastTs uint64
err error
)

versions, err := n.getAllObjectsVersions(ctx, p.BktInfo, p.Prefix, p.Delimiter)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return res, nil
// We should start with specific key.
if p.KeyMarker != "" {
// We should start with specific key version.
if p.VersionIDMarker != "" {
parts := strings.Split(p.VersionIDMarker, ":")
if err = id.DecodeString(parts[0]); err != nil {
return nil, err
}

if len(parts) == 2 {
f, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, err
}

keyMarkerLastTs = uint64(f)
}
}
return nil, err
}

sortedNames := make([]string, 0, len(versions))
for k := range versions {
sortedNames = append(sortedNames, k)
cursor = generateAdjustedContinuationToken(p.KeyMarker, oid.ID{})
}
slices.Sort(sortedNames)

for _, name := range sortedNames {
sortedVersions := versions[name]
slices.SortFunc(sortedVersions, func(a, b *data.ExtendedObjectInfo) int {
return cmp.Compare(b.NodeVersion.Timestamp, a.NodeVersion.Timestamp) // sort in reverse order
})
for {
versions, nextCursor, err := n.getAllObjectsVersions(ctx, p.BktInfo, p.Prefix, cursor, p.Delimiter)
cursor = nextCursor

// The object with "null" version should be only one. We get only last (actual) one.
var isNullVersionCounted bool

for i, version := range sortedVersions {
version.IsLatest = i == 0
if version.NodeVersion.IsUnversioned && isNullVersionCounted {
continue
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return res, nil
}
return nil, err
}

if version.NodeVersion.IsUnversioned {
isNullVersionCounted = true
sortedNames := slices.Collect(maps.Keys(versions))
slices.Sort(sortedNames)

for _, name := range sortedNames {
sortedVersions := versions[name]
// The object with "null" version should be only one. We get only last (actual) one.
var isNullVersionCounted bool
var isLatestShouldBeSet = true

for i, version := range sortedVersions {
version.IsLatest = i == 0
if version.NodeVersion.IsUnversioned && isNullVersionCounted {
continue
}

if version.NodeVersion.IsUnversioned {
isNullVersionCounted = true
}

// On the next pages, we should filter out objects we already showed on the previous page.
if p.KeyMarker == version.NodeVersion.FilePath && keyMarkerLastTs > 0 {
// Objects are sorted in reverse order. The most recently stored objects are at the beginning.
// We should skip what has already been shown by time.
if keyMarkerLastTs < version.NodeVersion.Timestamp {
continue
}

// But sometimes the objects with the same name can be created in one second.
// To handle this situation, we have to filter out processed objects with OIDs.
if keyMarkerLastTs == version.NodeVersion.Timestamp && !id.IsZero() {
if bytes.Compare(id[:], version.NodeVersion.OID[:]) < 0 {
continue
}
}
}

if !version.IsLatest && isLatestShouldBeSet {
version.IsLatest = true
}

isLatestShouldBeSet = false

allObjects = append(allObjects, version)
}
allObjects = append(allObjects, version)
}
}

for i, obj := range allObjects {
if obj.ObjectInfo.Name >= p.KeyMarker && obj.ObjectInfo.VersionID() >= p.VersionIDMarker {
allObjects = allObjects[i:]
if nextCursor == "" || len(allObjects) >= p.MaxKeys {
break
}
}
Expand All @@ -62,8 +112,10 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar

if len(allObjects) > p.MaxKeys {
res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys].ObjectInfo.Name
res.NextVersionIDMarker = allObjects[p.MaxKeys].ObjectInfo.VersionID()
oi := allObjects[p.MaxKeys].ObjectInfo

res.NextKeyMarker = oi.Name
res.NextVersionIDMarker = fmt.Sprintf("%s:%d", oi.VersionID(), oi.Created.Unix())

allObjects = allObjects[:p.MaxKeys]
res.KeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name
Expand Down
2 changes: 2 additions & 0 deletions docs/aws_s3_compat.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ See also `GetObject` and other method parameters.
| 🟢 | ListObjectVersions | ListBucketObjectVersions |
| 🔵 | RestoreObject | Related to lifecycles and tiering. |

In some cases `ListObjectVersions` may return fewer elements than `max-keys` requested.

## Bucket

| | Method | Comments |
Expand Down
Loading