Skip to content

Search refactorings #1157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 56 additions & 109 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,29 +581,24 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
owner = n.Owner(ctx)
)

if len(p.VersionID) == 0 {
heads, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, owner, p.Object, false)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey)
versions, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, owner, p.Object, p.VersionID == data.UnversionedObjectVersionID)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
if len(p.VersionID) == 0 {
err = s3errors.GetAPIError(s3errors.ErrNoSuchKey)
} else {
err = s3errors.GetAPIError(s3errors.ErrNoSuchVersion)
}

return nil, err
}

if heads[0].IsDeleteMarker {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey)
}
return nil, err
}

id = heads[0].ID
} else if p.VersionID == data.UnversionedObjectVersionID {
versions, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, owner, p.Object, true)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion)
if len(p.VersionID) == 0 || p.VersionID == data.UnversionedObjectVersionID {
if versions[0].IsDeleteMarker {
if len(p.VersionID) == 0 {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey)
}

return nil, err
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion)
}

id = versions[0].ID
Expand All @@ -613,14 +608,6 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
return nil, fmt.Errorf("get bucket settings: %w", err)
}

versions, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, owner, p.Object, false)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion)
}
return nil, err
}

var foundVersion *allVersionsSearchResult

if settings.VersioningEnabled() {
Expand Down Expand Up @@ -890,75 +877,35 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte
}

func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
if settings.VersioningEnabled() {
if len(obj.VersionID) > 0 {
var deleteOID oid.ID

if obj.VersionID == data.UnversionedObjectVersionID {
versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, true)
if err != nil {
obj.Error = fmt.Errorf("search versions: %w", err)
if errors.Is(err, ErrNodeNotFound) {
obj.Error = nil
}

return obj
}

if len(versions) == 0 {
obj.Error = nil
return obj
}
// Specific version ID passed, drop it.
if len(obj.VersionID) > 0 && obj.VersionID != data.UnversionedObjectVersionID {
// Simple single ID deletion.
var deleteOID oid.ID

for _, version := range versions {
if obj.Error = n.objectDelete(ctx, bkt, version.ID); obj.Error != nil {
return obj
}
}
} else {
if err := deleteOID.DecodeString(obj.VersionID); err != nil {
obj.Error = fmt.Errorf("decode version: %w", err)
return obj
}
if err := deleteOID.DecodeString(obj.VersionID); err != nil {
obj.Error = fmt.Errorf("decode version: %w", err)
return obj
}

if obj.Error = n.objectDelete(ctx, bkt, deleteOID); obj.Error != nil {
return obj
}
}
} else {
var markerOID oid.ID
markerOID, obj.Error = n.putDeleteMarker(ctx, bkt, obj.Name)
obj.DeleteMarkVersion = markerOID.EncodeToString()
if obj.Error = n.objectDelete(ctx, bkt, deleteOID); obj.Error != nil {
return obj
}

n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
return obj
}

if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID

versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, true)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
obj.Error = nil
} else {
obj.Error = fmt.Errorf("search versions: %w", err)
}

return obj
}

for _, version := range versions {
if obj.Error = n.objectDelete(ctx, bkt, version.ID); obj.Error != nil {
return obj
}
}
if settings.VersioningEnabled() && len(obj.VersionID) == 0 {
// No version specified -> add deletion marker.
var markerOID oid.ID
markerOID, obj.Error = n.putDeleteMarker(ctx, bkt, obj.Name)
obj.DeleteMarkVersion = markerOID.EncodeToString()

n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
return obj
}

versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, false)
versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, settings.VersioningEnabled() || settings.VersioningSuspended())
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
obj.Error = nil
Expand All @@ -969,30 +916,20 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj
}

if obj.VersionID == "" {
for _, ver := range versions {
if obj.Error = n.objectDelete(ctx, bkt, ver.ID); obj.Error != nil {
n.log.Error("could not delete object", zap.Error(obj.Error), zap.Stringer("oid", ver.ID))
if isErrObjectAlreadyRemoved(obj.Error) {
obj.Error = nil
continue
}

return obj
}
}
} else {
for _, ver := range versions {
if ver.ID.EncodeToString() == obj.VersionID {
if obj.Error = n.objectDelete(ctx, bkt, ver.ID); obj.Error != nil {
return obj
}

return obj
for _, version := range versions {
if obj.Error = n.objectDelete(ctx, bkt, version.ID); obj.Error != nil {
n.log.Error("could not delete object", zap.Error(obj.Error), zap.Stringer("oid", version.ID))
if isErrObjectAlreadyRemoved(obj.Error) {
obj.Error = nil
continue
}
return obj
}
}

if settings.VersioningSuspended() {
// Return unversioned ID.
obj.VersionID = data.UnversionedObjectVersionID
}
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)

return obj
Expand Down Expand Up @@ -1051,11 +988,21 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
}

func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
objects, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, p.BktInfo.Owner, "", false)
var (
filters = make(object.SearchFilters, 0, 2)
opts client.SearchObjectsOptions
)

opts.SetCount(1) // Enough for the purpose.
if bt := bearerTokenFromContext(ctx, p.BktInfo.Owner); bt != nil {
opts.WithBearerToken(*bt)
}
filters.AddTypeFilter(object.MatchStringEqual, object.TypeRegular)
filters.AddFilter(s3headers.MetaType, "", object.MatchNotPresent)

objects, err := n.neoFS.SearchObjectsV2(ctx, p.BktInfo.CID, filters, []string{object.FilterType}, opts)
if err != nil {
if !errors.Is(err, ErrNodeNotFound) {
return err
}
return err
}

// there are only Regular objects in slice.
Expand Down
Loading