diff --git a/.github/workflows/s3-tests.yml b/.github/workflows/s3-tests.yml index 411c0b75..ba0a64f8 100644 --- a/.github/workflows/s3-tests.yml +++ b/.github/workflows/s3-tests.yml @@ -53,7 +53,7 @@ jobs: uses: dsaltares/fetch-gh-release-asset@1.1.1 with: repo: 'nspcc-dev/neofs-node' - version: 'tags/v0.41.1' + version: 'tags/v0.42.0' file: 'neofs-cli-linux-amd64' target: 's3-tests/neofs-cli' @@ -61,7 +61,7 @@ jobs: uses: dsaltares/fetch-gh-release-asset@1.1.1 with: repo: 'nspcc-dev/neofs-node' - version: 'tags/v0.41.1' + version: 'tags/v0.42.0' file: 'neofs-adm-linux-amd64' target: 's3-tests/neofs-adm' @@ -69,7 +69,7 @@ jobs: uses: dsaltares/fetch-gh-release-asset@1.1.1 with: repo: 'nspcc-dev/neofs-node' - version: 'tags/v0.41.1' + version: 'tags/v0.42.0' file: 'neofs-ir-linux-amd64' target: 's3-tests/neofs-ir' @@ -77,7 +77,7 @@ jobs: uses: dsaltares/fetch-gh-release-asset@1.1.1 with: repo: 'nspcc-dev/neofs-node' - version: 'tags/v0.41.1' + version: 'tags/v0.42.0' file: 'neofs-lens-linux-amd64' target: 's3-tests/neofs-lens' @@ -85,7 +85,7 @@ jobs: uses: dsaltares/fetch-gh-release-asset@1.1.1 with: repo: 'nspcc-dev/neofs-node' - version: 'tags/v0.41.1' + version: 'tags/v0.42.0' file: 'neofs-node-linux-amd64' target: 's3-tests/neofs-node' @@ -93,7 +93,7 @@ jobs: uses: dsaltares/fetch-gh-release-asset@1.1.1 with: repo: 'nspcc-dev/neo-go' - version: 'tags/v0.105.1' + version: 'tags/v0.106.0' file: 'neo-go-linux-amd64' target: 's3-tests/neo-go' diff --git a/api/data/info.go b/api/data/info.go index b5549423..4d1f86e9 100644 --- a/api/data/info.go +++ b/api/data/info.go @@ -40,14 +40,15 @@ type ( IsDir bool IsDeleteMarker bool - Bucket string - Name string - Size int64 - ContentType string - Created time.Time - HashSum string - Owner user.ID - Headers map[string]string + Bucket string + Name string + Size int64 + ContentType string + Created time.Time + HashSum string + Owner user.ID + OwnerPublicKey keys.PublicKey + Headers map[string]string } // NotificationInfo store info to send s3 notification. diff --git a/api/data/tree.go b/api/data/tree.go index 7796a69b..b324b6d9 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -1,9 +1,12 @@ package data import ( + "fmt" "strconv" + "strings" "time" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" @@ -72,10 +75,42 @@ type MultipartInfo struct { Key string UploadID string Owner user.ID + OwnerPubKey keys.PublicKey Created time.Time Meta map[string]string CopiesNumber uint32 - SplitID string +} + +// LinkObjectPayload contains part info of the complex object. +// This data will be used for linking object construction. +type LinkObjectPayload struct { + OID oid.ID + Size uint32 +} + +// Marshal converts LinkObjectPayload to string. +func (e *LinkObjectPayload) Marshal() string { + return fmt.Sprintf("%s:%d", e.OID.String(), e.Size) +} + +// Unmarshal converts string to LinkObjectPayload. +func (e *LinkObjectPayload) Unmarshal(value string) error { + parts := strings.Split(value, ":") + if len(parts) != 2 { + return fmt.Errorf("invalid format: %s", value) + } + + if err := e.OID.DecodeString(parts[0]); err != nil { + return fmt.Errorf("invalid id: %w", err) + } + + size, err := strconv.ParseUint(parts[1], 10, 32) + if err != nil { + return fmt.Errorf("invalid size: %w", err) + } + + e.Size = uint32(size) + return nil } // PartInfo is upload information about part. @@ -95,8 +130,8 @@ type PartInfo struct { MultipartHash []byte // HomoHash contains internal state of the [hash.Hash] to calculate whole object homomorphic payload hash. HomoHash []byte - // Elements contain [oid.ID] object list for the current part. - Elements []oid.ID + // Elements contain [oid.ID] and size for each element for the current part. + Elements []LinkObjectPayload } // ToHeaderString form short part representation to use in S3-Completed-Parts header. diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 2e99a537..41969d12 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -7,7 +7,6 @@ import ( "strconv" "time" - "github.com/google/uuid" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/layer" @@ -101,17 +100,14 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re return } - uploadID := uuid.New() additional := []zap.Field{ - zap.String("uploadID", uploadID.String()), zap.String("Key", reqInfo.ObjectName), } p := &layer.CreateMultipartParams{ Info: &layer.UploadInfoParams{ - UploadID: uploadID.String(), - Bkt: bktInfo, - Key: reqInfo.ObjectName, + Bkt: bktInfo, + Key: reqInfo.ObjectName, }, Data: &layer.UploadData{}, } @@ -154,7 +150,8 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re return } - if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil { + uploadID, err := h.obj.CreateMultipartUpload(r.Context(), p) + if err != nil { h.logAndSendError(w, "could create multipart upload", reqInfo, err, additional...) return } @@ -166,9 +163,10 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re resp := InitiateMultipartUploadResponse{ Bucket: reqInfo.BucketName, Key: reqInfo.ObjectName, - UploadID: uploadID.String(), + UploadID: uploadID, } + additional = append(additional, zap.String("uploadID", uploadID)) if err = api.EncodeToResponse(w, resp); err != nil { h.logAndSendError(w, "could not encode InitiateMultipartUploadResponse to response", reqInfo, err, additional...) return @@ -648,12 +646,12 @@ func encodeListMultipartUploadsToResponse(info *layer.ListMultipartUploadsInfo, m := MultipartUpload{ Initiated: u.Created.UTC().Format(time.RFC3339), Initiator: Initiator{ - ID: u.Owner.String(), + ID: u.OwnerPubKey.StringCompressed(), DisplayName: u.Owner.String(), }, Key: u.Key, Owner: Owner{ - ID: u.Owner.String(), + ID: u.OwnerPubKey.StringCompressed(), DisplayName: u.Owner.String(), }, UploadID: u.UploadID, @@ -671,7 +669,7 @@ func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPart XMLName: xml.Name{}, Bucket: params.Info.Bkt.Name, Initiator: Initiator{ - ID: info.Owner.String(), + ID: info.OwnerPubKey.StringCompressed(), DisplayName: info.Owner.String(), }, IsTruncated: info.IsTruncated, @@ -679,7 +677,7 @@ func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPart MaxParts: params.MaxParts, NextPartNumberMarker: info.NextPartNumberMarker, Owner: Owner{ - ID: info.Owner.String(), + ID: info.OwnerPubKey.StringCompressed(), DisplayName: info.Owner.String(), }, PartNumberMarker: params.PartNumberMarker, diff --git a/api/handler/object_list.go b/api/handler/object_list.go index f437eda9..60b526f8 100644 --- a/api/handler/object_list.go +++ b/api/handler/object_list.go @@ -1,6 +1,7 @@ package handler import ( + "fmt" "net/http" "net/url" "strconv" @@ -33,12 +34,18 @@ func (h *handler) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) { return } - if err = api.EncodeToResponse(w, encodeV1(params, list)); err != nil { + encoded, err := encodeV1(params, list) + if err != nil { + h.logAndSendError(w, "encode V1", reqInfo, err) + return + } + + if err = api.EncodeToResponse(w, encoded); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err) } } -func encodeV1(p *layer.ListObjectsParamsV1, list *layer.ListObjectsInfoV1) *ListObjectsV1Response { +func encodeV1(p *layer.ListObjectsParamsV1, list *layer.ListObjectsInfoV1) (*ListObjectsV1Response, error) { res := &ListObjectsV1Response{ Name: p.BktInfo.Name, EncodingType: p.Encode, @@ -52,9 +59,14 @@ func encodeV1(p *layer.ListObjectsParamsV1, list *layer.ListObjectsInfoV1) *List res.CommonPrefixes = fillPrefixes(list.Prefixes, p.Encode) - res.Contents = fillContentsWithOwner(list.Objects, p.Encode) + content, err := fillContentsWithOwner(list.Objects, p.Encode) + if err != nil { + return nil, fmt.Errorf("fill contents with owner: %w", err) + } + + res.Contents = content - return res + return res, nil } // ListObjectsV2Handler handles objects listing requests for API version 2. @@ -77,12 +89,18 @@ func (h *handler) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) { return } - if err = api.EncodeToResponse(w, encodeV2(params, list)); err != nil { + encoded, err := encodeV2(params, list) + if err != nil { + h.logAndSendError(w, "encode V2", reqInfo, err) + return + } + + if err = api.EncodeToResponse(w, encoded); err != nil { h.logAndSendError(w, "something went wrong", reqInfo, err) } } -func encodeV2(p *layer.ListObjectsParamsV2, list *layer.ListObjectsInfoV2) *ListObjectsV2Response { +func encodeV2(p *layer.ListObjectsParamsV2, list *layer.ListObjectsInfoV2) (*ListObjectsV2Response, error) { res := &ListObjectsV2Response{ Name: p.BktInfo.Name, EncodingType: p.Encode, @@ -98,9 +116,14 @@ func encodeV2(p *layer.ListObjectsParamsV2, list *layer.ListObjectsInfoV2) *List res.CommonPrefixes = fillPrefixes(list.Prefixes, p.Encode) - res.Contents = fillContents(list.Objects, p.Encode, p.FetchOwner) + content, err := fillContents(list.Objects, p.Encode, p.FetchOwner) + if err != nil { + return nil, fmt.Errorf("fill content: %w", err) + } - return res + res.Contents = content + + return res, nil } func parseListObjectsArgsV1(reqInfo *api.ReqInfo) (*layer.ListObjectsParamsV1, error) { @@ -184,11 +207,11 @@ func fillPrefixes(src []string, encode string) []CommonPrefix { return dst } -func fillContentsWithOwner(src []*data.ObjectInfo, encode string) []Object { +func fillContentsWithOwner(src []*data.ObjectInfo, encode string) ([]Object, error) { return fillContents(src, encode, true) } -func fillContents(src []*data.ObjectInfo, encode string, fetchOwner bool) []Object { +func fillContents(src []*data.ObjectInfo, encode string, fetchOwner bool) ([]Object, error) { var dst []Object for _, obj := range src { res := Object{ @@ -198,6 +221,15 @@ func fillContents(src []*data.ObjectInfo, encode string, fetchOwner bool) []Obje ETag: obj.HashSum, } + if size, ok := obj.Headers[layer.AttributeDecryptedSize]; ok { + sz, err := strconv.ParseInt(size, 10, 64) + if err != nil { + return nil, fmt.Errorf("parse decrypted size %s: %w", size, err) + } + + res.Size = sz + } + if fetchOwner { res.Owner = &Owner{ ID: obj.Owner.String(), @@ -207,7 +239,7 @@ func fillContents(src []*data.ObjectInfo, encode string, fetchOwner bool) []Obje dst = append(dst, res) } - return dst + return dst, nil } func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http.Request) { @@ -277,7 +309,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck Key: ver.ObjectInfo.Name, LastModified: ver.ObjectInfo.Created.UTC().Format(time.RFC3339), Owner: Owner{ - ID: ver.ObjectInfo.Owner.String(), + ID: ver.ObjectInfo.OwnerPublicKey.StringCompressed(), DisplayName: ver.ObjectInfo.Owner.String(), }, Size: ver.ObjectInfo.Size, diff --git a/api/layer/layer.go b/api/layer/layer.go index 67da7c61..172eebde 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -226,7 +226,7 @@ type ( DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject - CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error + CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) (string, error) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 141caf0b..8b8a80f1 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -16,6 +16,7 @@ import ( "time" "github.com/minio/sio" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/layer/encryption" @@ -118,6 +119,7 @@ type ( ListPartsInfo struct { Parts []*Part Owner user.ID + OwnerPubKey keys.PublicKey NextPartNumberMarker int IsTruncated bool } @@ -130,29 +132,34 @@ type ( NextUploadIDMarker string } UploadInfo struct { - IsDir bool - Key string - UploadID string - Owner user.ID - Created time.Time + IsDir bool + Key string + UploadID string + Owner user.ID + OwnerPubKey keys.PublicKey + Created time.Time } ) -func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error { +func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) (string, error) { metaSize := len(p.Header) if p.Data != nil { metaSize += len(p.Data.ACLHeaders) metaSize += len(p.Data.TagSet) } + ownerPubKey, err := n.OwnerPublicKey(ctx) + if err != nil { + return "", fmt.Errorf("owner pub key: %w", err) + } + info := &data.MultipartInfo{ Key: p.Info.Key, - UploadID: p.Info.UploadID, Owner: n.Owner(ctx), + OwnerPubKey: *ownerPubKey, Created: TimeNow(ctx), Meta: make(map[string]string, metaSize), CopiesNumber: p.CopiesNumber, - SplitID: object.NewSplitID().String(), } for key, val := range p.Header { @@ -171,11 +178,27 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar if p.Info.Encryption.Enabled() { if err := addEncryptionHeaders(info.Meta, p.Info.Encryption); err != nil { - return fmt.Errorf("add encryption header: %w", err) + return "", fmt.Errorf("add encryption header: %w", err) } } - return n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info) + zeroPartInfo, err := n.uploadZeroPart(ctx, info, p.Info) + if err != nil { + return "", fmt.Errorf("upload zero part: %w", err) + } + + info.UploadID = zeroPartInfo.UploadID + + nodeID, err := n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info) + if err != nil { + return "", fmt.Errorf("create multipart upload: %w", err) + } + + if err = n.finalizeZeroPart(ctx, p.Info.Bkt, nodeID, zeroPartInfo); err != nil { + return "", fmt.Errorf("finalize zero part: %w", err) + } + + return zeroPartInfo.UploadID, nil } func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) { @@ -229,6 +252,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf var ( splitPreviousID oid.ID + splitFirstID oid.ID isSetSplitPreviousID bool multipartHash = sha256.New() tzHash hash.Hash @@ -238,34 +262,35 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf tzHash = tz.New() } - lastPart, err := n.treeService.GetLastPart(ctx, bktInfo, multipartInfo.ID) + lastPart, err := n.treeService.GetPartByNumber(ctx, bktInfo, multipartInfo.ID, p.PartNumber-1) if err != nil { - // if ErrPartListIsEmpty, there is the first part of multipart. - if !errors.Is(err, ErrPartListIsEmpty) { - return nil, fmt.Errorf("getLastPart: %w", err) - } - } else { - // try to restore hash state from the last part. - // the required interface is guaranteed according to the docs, so just cast without checks. - binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) - if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { - return nil, fmt.Errorf("unmarshal previous part hash: %w", err) - } - - if tzHash != nil { - binaryUnmarshaler = tzHash.(encoding.BinaryUnmarshaler) - if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { - return nil, fmt.Errorf("unmarshal previous part homo hash: %w", err) - } + return nil, fmt.Errorf("getLastPart: %w", err) + } + + // try to restore hash state from the last part. + // the required interface is guaranteed according to the docs, so just cast without checks. + binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, fmt.Errorf("unmarshal previous part hash: %w", err) + } + + if tzHash != nil { + binaryUnmarshaler = tzHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, fmt.Errorf("unmarshal previous part homo hash: %w", err) } + } + + isSetSplitPreviousID = true + splitPreviousID = lastPart.OID - isSetSplitPreviousID = true - splitPreviousID = lastPart.OID + if err = splitFirstID.DecodeString(multipartInfo.UploadID); err != nil { + return nil, fmt.Errorf("failed to decode multipart upload ID: %w", err) } var ( id oid.ID - elements []oid.ID + elements []data.LinkObjectPayload creationTime = TimeNow(ctx) // User may upload part large maxObjectSize in NeoFS. From users point of view it is a single object. // We have to calculate the hash from this object separately. @@ -284,20 +309,25 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf CreationTime: creationTime, CopiesNumber: multipartInfo.CopiesNumber, Multipart: &Multipart{ - SplitID: multipartInfo.SplitID, MultipartHashes: objHashes, }, } chunk := n.buffers.Get().(*[]byte) - + var totalBytes int // slice part manually. Simultaneously considering the part is a single object for user. for { if isSetSplitPreviousID { prm.Multipart.SplitPreviousID = &splitPreviousID } + if !splitFirstID.Equals(oid.ID{}) { + prm.Multipart.SplitFirstID = &splitFirstID + } + nBts, readErr := io.ReadAtLeast(payloadReader, *chunk, len(*chunk)) + totalBytes += nBts + if nBts > 0 { prm.Payload = bytes.NewReader((*chunk)[:nBts]) prm.PayloadSize = uint64(nBts) @@ -309,7 +339,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf isSetSplitPreviousID = true splitPreviousID = id - elements = append(elements, id) + elements = append(elements, data.LinkObjectPayload{OID: id, Size: uint32(nBts)}) } if readErr == nil { @@ -379,16 +409,125 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf ID: id, CID: bktInfo.CID, - Owner: bktInfo.Owner, - Bucket: bktInfo.Name, - Size: partInfo.Size, - Created: partInfo.Created, - HashSum: partInfo.ETag, + Owner: bktInfo.Owner, + OwnerPublicKey: bktInfo.OwnerPublicKey, + Bucket: bktInfo.Name, + Size: partInfo.Size, + Created: partInfo.Created, + HashSum: partInfo.ETag, } return objInfo, nil } +func (n *layer) uploadZeroPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadInfoParams) (*data.PartInfo, error) { + encInfo := FormEncryptionInfo(multipartInfo.Meta) + if err := p.Encryption.MatchObjectEncryption(encInfo); err != nil { + n.log.Warn("mismatched obj encryptionInfo", zap.Error(err)) + return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters) + } + + var ( + bktInfo = p.Bkt + attributes [][2]string + multipartHash = sha256.New() + tzHash hash.Hash + id oid.ID + elements []data.LinkObjectPayload + creationTime = TimeNow(ctx) + currentPartHash = sha256.New() + ) + + if p.Encryption.Enabled() { + attributes = append(attributes, [2]string{AttributeDecryptedSize, "0"}) + } + + if n.neoFS.IsHomomorphicHashingEnabled() { + tzHash = tz.New() + } + + var objHashes []hash.Hash + if tzHash != nil { + objHashes = append(objHashes, tzHash) + } + + prm := PrmObjectCreate{ + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Attributes: attributes, + CreationTime: creationTime, + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + MultipartHashes: objHashes, + }, + Payload: bytes.NewBuffer(nil), + } + + id, _, err := n.objectPutAndHash(ctx, prm, bktInfo) + if err != nil { + return nil, err + } + + elements = append(elements, data.LinkObjectPayload{OID: id, Size: 0}) + + reqInfo := api.GetReqInfo(ctx) + n.log.Debug("upload zero part", + zap.String("reqId", reqInfo.RequestID), + zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID), + zap.String("multipart upload", id.String()), + zap.Int("part number", 0), zap.String("object", p.Key), zap.Stringer("oid", id)) + + partInfo := &data.PartInfo{ + Key: p.Key, + // UploadID equals zero part ID intentionally. + UploadID: id.String(), + Number: 0, + OID: id, + Size: 0, + ETag: hex.EncodeToString(currentPartHash.Sum(nil)), + Created: prm.CreationTime, + Elements: elements, + } + + // encoding hash.Hash state to save it in tree service. + // the required interface is guaranteed according to the docs, so just cast without checks. + binaryMarshaler := multipartHash.(encoding.BinaryMarshaler) + partInfo.MultipartHash, err = binaryMarshaler.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } + + if tzHash != nil { + binaryMarshaler = tzHash.(encoding.BinaryMarshaler) + partInfo.HomoHash, err = binaryMarshaler.MarshalBinary() + + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } + } + + return partInfo, nil +} + +func (n *layer) finalizeZeroPart(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, partInfo *data.PartInfo) error { + oldPartID, err := n.treeService.AddPart(ctx, bktInfo, nodeID, partInfo) + oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) + if err != nil && !oldPartIDNotFound { + return err + } + + if !oldPartIDNotFound { + if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { + n.log.Error("couldn't delete old part object", zap.Error(err), + zap.String("cnrID", bktInfo.CID.EncodeToString()), + zap.String("bucket name", bktInfo.Name), + zap.String("objID", oldPartID.EncodeToString())) + } + } + + return nil +} + func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadPartParams, partID int, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { parts, err := n.treeService.GetPartsAfter(ctx, bktInfo, multipartInfo.ID, partID) if err != nil { @@ -506,8 +645,22 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize int64 var encMultipartObjectSize uint64 var lastPartID int - var children []oid.ID var completedPartsHeader strings.Builder + var splitFirstID oid.ID + + if err = splitFirstID.DecodeString(multipartInfo.UploadID); err != nil { + return nil, nil, fmt.Errorf("decode splitFirstID from UploadID :%w", err) + } + + // +1 is the zero part, it equals to the uploadID. + // +1 is the last part, it will be created later in the code. + var measuredObjects = make([]object.MeasuredObject, 0, len(p.Parts)+2) + + // user know nothing about zero part, we have to add this part manually. + var zeroObject object.MeasuredObject + zeroObject.SetObjectID(splitFirstID) + measuredObjects = append(measuredObjects, zeroObject) + for i, part := range p.Parts { partInfo := partsInfo[part.PartNumber] if partInfo == nil || part.ETag != partInfo.ETag { @@ -539,7 +692,13 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar lastPartID = part.PartNumber } - children = append(children, partInfo.Elements...) + for _, element := range partInfo.Elements { + // Collecting payload for the link object. + var mObj object.MeasuredObject + mObj.SetObjectID(element.OID) + mObj.SetObjectSize(element.Size) + measuredObjects = append(measuredObjects, mObj) + } } multipartHash := sha256.New() @@ -623,7 +782,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar CreationTime: TimeNow(ctx), CopiesNumber: multipartInfo.CopiesNumber, Multipart: &Multipart{ - SplitID: multipartInfo.SplitID, + SplitFirstID: &splitFirstID, SplitPreviousID: &splitPreviousID, HeaderObject: header, }, @@ -635,7 +794,13 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar return nil, nil, err } - children = append(children, lastPartObjID) + var mObj object.MeasuredObject + // last part has the zero length. + mObj.SetObjectID(lastPartObjID) + measuredObjects = append(measuredObjects, mObj) + + var linkObj = object.Link{} + linkObj.SetObjects(measuredObjects) // linking object prm = PrmObjectCreate{ @@ -644,11 +809,10 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar CreationTime: TimeNow(ctx), CopiesNumber: multipartInfo.CopiesNumber, Multipart: &Multipart{ - SplitID: multipartInfo.SplitID, HeaderObject: header, - Children: children, + SplitFirstID: &splitFirstID, + Link: &linkObj, }, - Payload: bytes.NewBuffer(nil), } _, _, err = n.objectPutAndHash(ctx, prm, p.Info.Bkt) @@ -682,16 +846,17 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar n.cache.CleanListCacheEntriesContainingObject(p.Info.Key, p.Info.Bkt.CID) objInfo := &data.ObjectInfo{ - ID: headerObjectID, - CID: p.Info.Bkt.CID, - Owner: p.Info.Bkt.Owner, - Bucket: p.Info.Bkt.Name, - Name: p.Info.Key, - Size: multipartObjetSize, - Created: prm.CreationTime, - Headers: initMetadata, - ContentType: initMetadata[api.ContentType], - HashSum: newVersion.ETag, + ID: headerObjectID, + CID: p.Info.Bkt.CID, + Owner: p.Info.Bkt.Owner, + OwnerPublicKey: p.Info.Bkt.OwnerPublicKey, + Bucket: p.Info.Bkt.Name, + Name: p.Info.Key, + Size: multipartObjetSize, + Created: prm.CreationTime, + Headers: initMetadata, + ContentType: initMetadata[api.ContentType], + HashSum: newVersion.ETag, } extObjInfo := &data.ExtendedObjectInfo{ @@ -794,10 +959,16 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn } res.Owner = multipartInfo.Owner + res.OwnerPubKey = multipartInfo.OwnerPubKey parts := make([]*Part, 0, len(partsInfo)) for _, partInfo := range partsInfo { + // We need to skip this part, it is an artificial and not a client uploaded. + if partInfo.Number == 0 { + continue + } + parts = append(parts, &Part{ ETag: partInfo.ETag, LastModified: partInfo.Created.UTC().Format(time.RFC3339), @@ -931,10 +1102,11 @@ func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimit } return &UploadInfo{ - IsDir: isDir, - Key: key, - UploadID: uploadInfo.UploadID, - Owner: uploadInfo.Owner, - Created: uploadInfo.Created, + IsDir: isDir, + Key: key, + UploadID: uploadInfo.UploadID, + Owner: uploadInfo.Owner, + OwnerPubKey: uploadInfo.OwnerPubKey, + Created: uploadInfo.Created, } } diff --git a/api/layer/neofs.go b/api/layer/neofs.go index dd1fee0c..700548a3 100644 --- a/api/layer/neofs.go +++ b/api/layer/neofs.go @@ -123,15 +123,15 @@ type PrmObjectCreate struct { type Multipart struct { // MultipartHashes contains hashes for the multipart object payload calculation (optional). MultipartHashes []hash.Hash - // SplitID contains splitID for multipart object (optional). - SplitID string // SplitPreviousID contains [oid.ID] of previous object in chain (optional). SplitPreviousID *oid.ID - // Children contains all objects in multipart chain, for linking object (optional). - Children []oid.ID + // SplitFirstID contains [oid.ID] of the first object in chain (The first object has nil here). + SplitFirstID *oid.ID // HeaderObject is a virtual representation of complete multipart object (optional). It is used to set Parent in // linking object. HeaderObject *object.Object + // Link contains info for linking object. + Link *object.Link } // PrmObjectDelete groups parameters of NeoFS.DeleteObject operation. diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index 6ac92f23..f52b0d0e 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -270,19 +270,13 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID obj.SetOwnerID(&prm.Creator) t.currentEpoch++ - if prm.Multipart != nil && prm.Multipart.SplitID != "" { - var split object.SplitID - if err := split.Parse(prm.Multipart.SplitID); err != nil { - return oid.ID{}, fmt.Errorf("split parse: %w", err) - } - obj.SetSplitID(&split) - + if prm.Multipart != nil { if prm.Multipart.SplitPreviousID != nil { obj.SetPreviousID(*prm.Multipart.SplitPreviousID) } - if len(prm.Multipart.Children) > 0 { - obj.SetChildren(prm.Multipart.Children...) + if prm.Multipart.SplitFirstID != nil { + obj.SetFirstID(*prm.Multipart.SplitFirstID) } if prm.Multipart.HeaderObject != nil { @@ -294,6 +288,45 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID obj.SetParentID(id) obj.SetParent(prm.Multipart.HeaderObject) } + + if prm.Multipart.Link != nil { + obj.WriteLink(*prm.Multipart.Link) + prm.Payload = bytes.NewReader(obj.Payload()) + obj.SetPayloadSize(uint64(len(obj.Payload()))) + + var ( + addr oid.Address + payload []byte + ) + + for _, e := range prm.Multipart.Link.Objects() { + addr = newAddress(prm.Container, e.ObjectID()) + if partialObject, ok := t.objects[addr.EncodeToString()]; ok { + payload = append(payload, partialObject.Payload()...) + } + } + + pid, isSet := prm.Multipart.HeaderObject.ID() + if !isSet { + return oid.ID{}, errors.New("HeaderObject id is not set") + } + + realHeaderObj := object.New() + realHeaderObj.SetContainerID(prm.Container) + realHeaderObj.SetID(pid) + realHeaderObj.SetPayloadSize(uint64(len(payload))) + realHeaderObj.SetAttributes(attrs...) + realHeaderObj.SetCreationEpoch(t.currentEpoch) + realHeaderObj.SetOwnerID(&prm.Creator) + realHeaderObj.SetPayload(payload) + + var h checksum.Checksum + checksum.Calculate(&h, checksum.SHA256, payload) + realHeaderObj.SetPayloadChecksum(h) + + addr = newAddress(prm.Container, pid) + t.objects[addr.EncodeToString()] = realHeaderObj + } } if len(prm.Locks) > 0 { diff --git a/api/layer/object.go b/api/layer/object.go index 7ec1ecf2..abc1ef8c 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -292,14 +292,15 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend ID: id, CID: p.BktInfo.CID, - Owner: owner, - Bucket: p.BktInfo.Name, - Name: p.Object, - Size: p.Size, - Created: prm.CreationTime, - Headers: p.Header, - ContentType: p.Header[api.ContentType], - HashSum: newVersion.ETag, + Owner: owner, + OwnerPublicKey: p.BktInfo.OwnerPublicKey, + Bucket: p.BktInfo.Name, + Name: p.Object, + Size: p.Size, + Created: prm.CreationTime, + Headers: p.Header, + ContentType: p.Header[api.ContentType], + HashSum: newVersion.ETag, } extendedObjInfo := &data.ExtendedObjectInfo{ diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 59ab5fdd..2b5adbbd 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -276,13 +276,13 @@ func (t *TreeServiceMock) GetAllVersionsByPrefix(_ context.Context, bktInfo *dat return result, nil } -func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error { +func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) (uint64, error) { cnrMultipartsMap, ok := t.multiparts[bktInfo.CID.EncodeToString()] if !ok { t.multiparts[bktInfo.CID.EncodeToString()] = map[string][]*data.MultipartInfo{ info.Key: {info}, } - return nil + return 0, nil } multiparts := cnrMultipartsMap[info.Key] @@ -291,7 +291,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data } cnrMultipartsMap[info.Key] = append(multiparts, info) - return nil + return info.ID, nil } func (t *TreeServiceMock) GetMultipartUploadsByPrefix(_ context.Context, _ *data.BucketInfo, _ string) ([]*data.MultipartInfo, error) { @@ -363,7 +363,7 @@ LOOP: return result, nil } -func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { +func (t *TreeServiceMock) GetPartByNumber(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, number int) (*data.PartInfo, error) { parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) if err != nil { return nil, fmt.Errorf("get parts: %w", err) @@ -390,7 +390,18 @@ func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketI return 1 }) - return parts[len(parts)-1], nil + var pi *data.PartInfo + for _, part := range parts { + if part.Number != number { + continue + } + + if pi == nil || pi.ServerCreated.Before(part.ServerCreated) { + pi = part + } + } + + return pi, nil } func (t *TreeServiceMock) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) { diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 4494cf3b..601ebcc3 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -63,7 +63,7 @@ type TreeService interface { PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) - CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error + CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) (uint64, error) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) @@ -74,11 +74,10 @@ type TreeService interface { // If object id to remove is not found returns ErrNoNodeToRemove error. AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) - // GetLastPart returns the latest uploaded part. - // + // GetPartByNumber returns the part by number. If part was uploaded few times the newest one will be returned. // Return errors: // - [ErrPartListIsEmpty] if there is no parts in the upload id. - GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) + GetPartByNumber(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, number int) (*data.PartInfo, error) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) // Compound methods for optimizations diff --git a/api/layer/util.go b/api/layer/util.go index ead9d695..1209d6b9 100644 --- a/api/layer/util.go +++ b/api/layer/util.go @@ -95,14 +95,15 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectI CID: bkt.CID, IsDir: false, - Bucket: bkt.Name, - Name: filepathFromObject(meta), - Created: creation, - ContentType: mimeType, - Headers: customHeaders, - Owner: *meta.OwnerID(), - Size: int64(meta.PayloadSize()), - HashSum: hex.EncodeToString(payloadChecksum.Value()), + Bucket: bkt.Name, + Name: filepathFromObject(meta), + Created: creation, + ContentType: mimeType, + Headers: customHeaders, + Owner: *meta.OwnerID(), + OwnerPublicKey: bkt.OwnerPublicKey, + Size: int64(meta.PayloadSize()), + HashSum: hex.EncodeToString(payloadChecksum.Value()), } } diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 40993933..1244af41 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -274,19 +274,13 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi obj.SetAttributes(attrs...) obj.SetPayloadSize(prm.PayloadSize) - if prm.Multipart != nil && prm.Multipart.SplitID != "" { - var split object.SplitID - if err := split.Parse(prm.Multipart.SplitID); err != nil { - return oid.ID{}, fmt.Errorf("parse split ID: %w", err) - } - obj.SetSplitID(&split) - + if prm.Multipart != nil { if prm.Multipart.SplitPreviousID != nil { obj.SetPreviousID(*prm.Multipart.SplitPreviousID) } - if len(prm.Multipart.Children) > 0 { - obj.SetChildren(prm.Multipart.Children...) + if prm.Multipart.SplitFirstID != nil { + obj.SetFirstID(*prm.Multipart.SplitFirstID) } if prm.Multipart.HeaderObject != nil { @@ -298,6 +292,15 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi obj.SetParentID(id) obj.SetParent(prm.Multipart.HeaderObject) } + + if prm.Multipart.Link != nil { + obj.WriteLink(*prm.Multipart.Link) + prm.Payload = bytes.NewReader(obj.Payload()) + obj.SetPayloadSize(uint64(len(obj.Payload()))) + + // Link object should never have a previous one. + obj.ResetPreviousID() + } } if len(prm.Locks) > 0 { @@ -355,7 +358,7 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi if ok { return oid.ID{}, fmt.Errorf("%w: %s", layer.ErrAccessDenied, reason) } - return oid.ID{}, fmt.Errorf("save object via connection pool: %w", err) + return oid.ID{}, fmt.Errorf("put init: %w", err) } data := x.buffers.Get() @@ -365,7 +368,7 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi x.buffers.Put(chunk) if err != nil { - return oid.ID{}, fmt.Errorf("read payload chunk: %w", err) + return oid.ID{}, fmt.Errorf("copy payload with buffer: %w", err) } if err = writer.Close(); err != nil { diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 76fa7aba..b3520465 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -2,6 +2,7 @@ package neofs import ( "context" + "encoding/hex" "errors" "fmt" "io" @@ -56,7 +57,6 @@ const ( isUnversionedKV = "IsUnversioned" isTagKV = "IsTag" uploadIDKV = "UploadId" - splitIDKV = "SplitId" partNumberKV = "Number" sizeKV = "Size" etagKV = "ETag" @@ -74,6 +74,7 @@ const ( // keys for delete marker nodes. isDeleteMarkerKV = "IsDeleteMarker" ownerKV = "Owner" + ownerPubKeyKV = "OwnerPubKey" createdKV = "Created" serverCreatedKV = "SrvCreated" @@ -226,8 +227,13 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { } case ownerKV: _ = multipartInfo.Owner.DecodeString(string(kv.GetValue())) - case splitIDKV: - multipartInfo.SplitID = string(kv.GetValue()) + case ownerPubKeyKV: + pk, err := keys.NewPublicKeyFromString(string(kv.GetValue())) + if err != nil { + return nil, fmt.Errorf("decode pub key: %w", err) + } + + multipartInfo.OwnerPubKey = *pk default: multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) } @@ -279,19 +285,19 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { partInfo.HomoHash = []byte(value) case elementsKV: elements := strings.Split(value, ",") - partInfo.Elements = make([]oid.ID, len(elements)) + partInfo.Elements = make([]data.LinkObjectPayload, len(elements)) for i, e := range elements { - var id oid.ID - if err = id.DecodeString(e); err != nil { - return nil, fmt.Errorf("invalid oid: %w", err) + var element data.LinkObjectPayload + if err = element.Unmarshal(e); err != nil { + return nil, fmt.Errorf("invalid element: %w", err) } - partInfo.Elements[i] = id + partInfo.Elements[i] = element } } } - if partInfo.Number <= 0 { + if partInfo.Number < 0 { return nil, fmt.Errorf("it's not a part node") } @@ -854,12 +860,10 @@ func (c *TreeClient) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo return c.removeNode(ctx, bktInfo, versionTree, id) } -func (c *TreeClient) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error { +func (c *TreeClient) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) (uint64, error) { path := pathFromName(info.Key) meta := metaFromMultipart(info, path[len(path)-1]) - _, err := c.addNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta) - - return err + return c.addNodeByPath(ctx, bktInfo, systemTree, path[:len(path)-1], meta) } func (c *TreeClient) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) { @@ -933,7 +937,7 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult elements := make([]string, len(info.Elements)) for i, e := range info.Elements { - elements[i] = e.String() + elements[i] = e.Marshal() } meta := map[string]string{ @@ -995,7 +999,7 @@ func (c *TreeClient) GetParts(ctx context.Context, bktInfo *data.BucketInfo, mul return result, nil } -func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { +func (c *TreeClient) GetPartByNumber(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, number int) (*data.PartInfo, error) { parts, err := c.GetParts(ctx, bktInfo, multipartNodeID) if err != nil { return nil, fmt.Errorf("get parts: %w", err) @@ -1022,7 +1026,18 @@ func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, return 1 }) - return parts[len(parts)-1], nil + var pi *data.PartInfo + for _, part := range parts { + if part.Number != number { + continue + } + + if pi == nil || pi.ServerCreated.Before(part.ServerCreated) { + pi = part + } + } + + return pi, nil } // GetPartsAfter returns parts uploaded after partID. These parts are sorted and filtered by creation time. @@ -1318,9 +1333,9 @@ func metaFromSettings(settings *data.BucketSettings) map[string]string { func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]string { info.Meta[fileNameKV] = fileName info.Meta[uploadIDKV] = info.UploadID + info.Meta[ownerPubKeyKV] = hex.EncodeToString(info.OwnerPubKey.Bytes()) info.Meta[ownerKV] = info.Owner.EncodeToString() info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) - info.Meta[splitIDKV] = info.SplitID return info.Meta }