Skip to content

Store object tags as attributes along with a payload #1166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/handler/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

// There are no tags in separate objects. Try to get tags from the object headers.
if len(comprehensiveObjectInfo.TagSet) == 0 && objectWithPayloadReader.ObjectInfo != nil {
for k, v := range objectWithPayloadReader.ObjectInfo.Headers {
if strings.HasPrefix(k, s3headers.NeoFSSystemMetadataTagPrefix) {
comprehensiveObjectInfo.TagSet[strings.TrimPrefix(k, s3headers.NeoFSSystemMetadataTagPrefix)] = v
}
}
}

info := objectWithPayloadReader.ObjectInfo

if err = checkPreconditions(info, conditional); err != nil {
Expand Down
39 changes: 2 additions & 37 deletions api/handler/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
Header: metadata,
Encryption: encryptionParams,
CopiesNumber: copiesNumber,
Tags: tagSet,
}

params.Lock, err = formObjectLock(r.Context(), bktInfo, settings.LockConfiguration, r.Header)
Expand Down Expand Up @@ -311,27 +312,6 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
}
}

if tagSet != nil {
tagPrm := &layer.PutObjectTaggingParams{
ObjectVersion: &layer.ObjectVersion{
BktInfo: bktInfo,
ObjectName: objInfo.Name,
VersionID: objInfo.VersionID(),
},
TagSet: tagSet,
CopiesNumber: h.cfg.CopiesNumber,
}

if !settings.VersioningEnabled() {
tagPrm.ObjectVersion.VersionID = ""
}

if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
}

if newEaclTable != nil {
p := &layer.PutBucketACLParams{
BktInfo: bktInfo,
Expand Down Expand Up @@ -502,6 +482,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
Reader: contentReader,
Size: size,
Header: metadata,
Tags: tagSet,
}

extendedObjInfo, err := h.obj.PutObject(r.Context(), params)
Expand Down Expand Up @@ -538,22 +519,6 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
}
}

if tagSet != nil {
tagPrm := &layer.PutObjectTaggingParams{
ObjectVersion: &layer.ObjectVersion{
BktInfo: bktInfo,
ObjectName: objInfo.Name,
VersionID: objInfo.VersionID(),
},
CopiesNumber: h.cfg.CopiesNumber,
}

if err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
}

if newEaclTable != nil {
p := &layer.PutBucketACLParams{
BktInfo: bktInfo,
Expand Down
2 changes: 1 addition & 1 deletion api/handler/tagging.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
p.VersionID = ei.EncodeToString()
}

if err = h.obj.DeleteObjectTagging(r.Context(), p); err != nil {
if err = h.obj.DeleteObjectTagging(r.Context(), p, h.cfg.CopiesNumber); err != nil {
h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
return
}
Expand Down
14 changes: 11 additions & 3 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type (
Lock *data.ObjectLock
Encryption encryption.Params
CopiesNumber uint32
Tags map[string]string
}

DeleteObjectParams struct {
Expand Down Expand Up @@ -251,7 +252,7 @@ type (

GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams) (string, map[string]string, error)
PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams) error
DeleteObjectTagging(ctx context.Context, p *ObjectVersion) error
DeleteObjectTagging(ctx context.Context, p *ObjectVersion, copiesNumber uint32) error

PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error)

Expand Down Expand Up @@ -662,10 +663,11 @@ func (n *layer) ComprehensiveObjectInfo(ctx context.Context, p *HeadObjectParams
owner = n.Owner(ctx)
versions []allVersionsSearchResult

tagSet map[string]string
tagSet = make(map[string]string)
lockInfo *data.LockInfo
isEmptyVersion = len(p.VersionID) == 0
isNullVersion = p.VersionID == data.UnversionedObjectVersionID
header *object.Object
)

if isEmptyVersion || isNullVersion {
Expand Down Expand Up @@ -698,7 +700,7 @@ func (n *layer) ComprehensiveObjectInfo(ctx context.Context, p *HeadObjectParams
return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion)
}

if _, err = n.objectHead(ctx, p.BktInfo, id); err != nil {
if header, err = n.objectHead(ctx, p.BktInfo, id); err != nil {
var errNotFound *apistatus.ObjectNotFound

if errors.As(err, &errNotFound) {
Expand All @@ -708,6 +710,12 @@ func (n *layer) ComprehensiveObjectInfo(ctx context.Context, p *HeadObjectParams
return nil, fmt.Errorf("head version %s: %w", p.VersionID, err)
}

for _, attr := range header.Attributes() {
if strings.HasPrefix(attr.Key(), s3headers.NeoFSSystemMetadataTagPrefix) {
tagSet[strings.TrimPrefix(attr.Key(), s3headers.NeoFSSystemMetadataTagPrefix)] = attr.Value()
}
}

tagsObjectOID, lockInfo, err = n.searchTagsAndLocksInNeoFS(ctx, p.BktInfo, owner, p.Object, p.VersionID)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions api/layer/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
}
}

for k, v := range p.Tags {
p.Header[s3headers.NeoFSSystemMetadataTagPrefix+k] = v
}

prm := PrmObjectCreate{
Container: p.BktInfo.CID,
Creator: owner,
Expand Down
93 changes: 83 additions & 10 deletions api/layer/tagging.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"slices"
"strconv"
"strings"

"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/s3errors"
Expand All @@ -35,6 +36,7 @@ type taggingSearchResult struct {
ID oid.ID
FilePath string
CreationTimestamp int64
IsOriginalObject bool
}

func (n *layer) PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams) error {
Expand All @@ -49,7 +51,7 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams)
CreationTime: TimeNow(ctx),
CopiesNumber: p.CopiesNumber,
Filepath: p.ObjectVersion.ObjectName,
Attributes: make(map[string]string, 2),
Attributes: make(map[string]string, 3+len(p.TagSet)),
Payload: bytes.NewBuffer(payload),
PayloadSize: uint64(len(payload)),
}
Expand All @@ -60,6 +62,9 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams)
}

prm.Attributes[s3headers.MetaType] = s3headers.TypeTags
for k, v := range p.TagSet {
prm.Attributes[s3headers.NeoFSSystemMetadataTagPrefix+k] = v
}

if _, _, err = n.objectPutAndHash(ctx, prm, p.ObjectVersion.BktInfo); err != nil {
return fmt.Errorf("create tagging object: %w", err)
Expand All @@ -85,6 +90,7 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams)
returningAttributes = []string{
object.AttributeFilePath,
object.AttributeTimestamp,
s3headers.MetaType,
}

opts client.SearchObjectsOptions
Expand All @@ -95,7 +101,6 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams)
}

filters.AddFilter(object.AttributeFilePath, p.ObjectVersion.ObjectName, object.MatchStringEqual)
filters.AddFilter(s3headers.MetaType, s3headers.TypeTags, object.MatchStringEqual)
filters.AddTypeFilter(object.MatchStringEqual, object.TypeRegular)
if p.ObjectVersion.VersionID != "" {
filters.AddFilter(s3headers.AttributeObjectVersion, p.ObjectVersion.VersionID, object.MatchStringEqual)
Expand All @@ -111,10 +116,30 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams)
}

if len(searchResultItems) == 0 {
// Objects inside versioned container don't have s3headers.AttributeObjectVersion attribute.
// This case means there are no separate attbute meta objects, let's try to get attributes from object itself.
if p.ObjectVersion.VersionID != "" {
var objID oid.ID
if err = objID.DecodeString(p.ObjectVersion.VersionID); err != nil {
return "", nil, fmt.Errorf("parse object version %s into oid: %w", p.ObjectVersion.VersionID, err)
}

tags, err := n.getTagsFromOriginalObject(ctx, p.ObjectVersion.BktInfo, objID)
if err != nil {
return "", nil, fmt.Errorf("get tags by oid: %w", err)
}

return p.ObjectVersion.VersionID, tags, nil
}

return "", nil, nil
}

var searchResults = make([]taggingSearchResult, 0, len(searchResultItems))
var (
tags map[string]string
metaObjectSearchResults = make([]taggingSearchResult, 0, len(searchResultItems))
originalObjectSearchResults = make([]taggingSearchResult, 0, 1)
)

for _, item := range searchResultItems {
if len(item.Attributes) != len(returningAttributes) {
Expand All @@ -126,14 +151,28 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams)
FilePath: item.Attributes[0],
}

switch item.Attributes[2] {
case "":
psr.IsOriginalObject = true
case s3headers.TypeTags:
// is a tag meta object.
default:
// skip other meta types.
continue
}

if item.Attributes[1] != "" {
psr.CreationTimestamp, err = strconv.ParseInt(item.Attributes[1], 10, 64)
if err != nil {
return "", nil, fmt.Errorf("invalid creation timestamp %s: %w", item.Attributes[1], err)
}
}

searchResults = append(searchResults, psr)
if psr.IsOriginalObject {
originalObjectSearchResults = append(originalObjectSearchResults, psr)
} else {
metaObjectSearchResults = append(metaObjectSearchResults, psr)
}
}

sortFunc := func(a, b taggingSearchResult) int {
Expand All @@ -145,16 +184,43 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams)
return bytes.Compare(b.ID[:], a.ID[:]) // reverse order.
}

slices.SortFunc(searchResults, sortFunc)
// There are not extra meta objects with tags.
if len(metaObjectSearchResults) == 0 {
slices.SortFunc(originalObjectSearchResults, sortFunc)

tags, err := n.getTagsByOID(ctx, p.ObjectVersion.BktInfo, searchResults[0].ID)
if err != nil {
return "", nil, err
tags, err = n.getTagsFromOriginalObject(ctx, p.ObjectVersion.BktInfo, originalObjectSearchResults[0].ID)
if err != nil {
return "", nil, err
}
} else {
slices.SortFunc(metaObjectSearchResults, sortFunc)

tags, err = n.getTagsByOID(ctx, p.ObjectVersion.BktInfo, metaObjectSearchResults[0].ID)
if err != nil {
return "", nil, err
}
}

return p.ObjectVersion.VersionID, tags, nil
}

func (n *layer) getTagsFromOriginalObject(ctx context.Context, bktInfo *data.BucketInfo, id oid.ID) (map[string]string, error) {
var tagSet = make(map[string]string)

header, err := n.objectHead(ctx, bktInfo, id)
if err != nil {
return nil, fmt.Errorf("get object head: %w", err)
}

for _, attr := range header.Attributes() {
if strings.HasPrefix(attr.Key(), s3headers.NeoFSSystemMetadataTagPrefix) {
tagSet[strings.TrimPrefix(attr.Key(), s3headers.NeoFSSystemMetadataTagPrefix)] = attr.Value()
}
}

return tagSet, nil
}

func (n *layer) getTagsByOID(ctx context.Context, bktInfo *data.BucketInfo, id oid.ID) (map[string]string, error) {
lastObj, err := n.objectGet(ctx, bktInfo, id)
if err != nil {
Expand All @@ -176,7 +242,7 @@ func (n *layer) getTagsByOID(ctx context.Context, bktInfo *data.BucketInfo, id o
return tags, nil
}

func (n *layer) DeleteObjectTagging(ctx context.Context, p *ObjectVersion) error {
func (n *layer) DeleteObjectTagging(ctx context.Context, p *ObjectVersion, copiesNumber uint32) error {
fs := make(object.SearchFilters, 0, 4)
fs.AddFilter(object.AttributeFilePath, p.ObjectName, object.MatchStringEqual)
fs.AddFilter(s3headers.MetaType, s3headers.TypeTags, object.MatchStringEqual)
Expand Down Expand Up @@ -211,7 +277,14 @@ func (n *layer) DeleteObjectTagging(ctx context.Context, p *ObjectVersion) error

n.cache.DeleteTagging(objectTaggingCacheKey(p))

return nil
// Put an empty tag list to override tags inside the object itself.
putObjectTaggingParams := PutObjectTaggingParams{
ObjectVersion: p,
TagSet: make(map[string]string),
CopiesNumber: copiesNumber,
}

return n.PutObjectTagging(ctx, &putObjectTaggingParams)
}

func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
Expand Down
3 changes: 3 additions & 0 deletions api/s3headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ const (
AttributeVersioningState = NeoFSSystemMetadataPrefix + "VersioningState"
AttributeDeleteMarker = NeoFSSystemMetadataPrefix + "DeleteMarker"
UploadCompletedParts = NeoFSSystemMetadataPrefix + "Completed-Parts"

// Result: S3-Meta-Tag-.
NeoFSSystemMetadataTagPrefix = NeoFSSystemMetadataPrefix + "Tag-"
)

const (
Expand Down
Loading