Skip to content

Commit 7bf5b9b

Browse files
committed
fix: s3 checksum
1 parent 24fcda7 commit 7bf5b9b

File tree

2 files changed

+73
-14
lines changed

2 files changed

+73
-14
lines changed

internal/sync/images.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"path/filepath"
78
"slices"
89
"strings"
910
"time"
1011

1112
"github.com/Altinity/docker-sync/config"
1213
"github.com/Altinity/docker-sync/internal/telemetry"
1314
"github.com/Altinity/docker-sync/structs"
15+
"github.com/aws/aws-sdk-go/aws"
16+
"github.com/aws/aws-sdk-go/service/s3"
1417
"github.com/cenkalti/backoff/v4"
1518
"github.com/google/go-containerregistry/pkg/name"
1619
"github.com/google/go-containerregistry/pkg/v1/remote"
@@ -162,7 +165,35 @@ func SyncImage(ctx context.Context, image *structs.Image) error {
162165

163166
for _, dst := range image.Targets {
164167
if strings.HasPrefix(dst, "r2:") || strings.HasPrefix(dst, "s3:") {
165-
// Comparison is performed during push
168+
var s3Session *s3.S3
169+
var bucket *string
170+
var err error
171+
172+
if strings.HasPrefix(dst, "r2:") {
173+
s3Session, bucket, err = getR2Session(dst)
174+
}
175+
if strings.HasPrefix(dst, "s3:") {
176+
s3Session, bucket, err = getS3Session(dst)
177+
}
178+
if err != nil {
179+
return err
180+
}
181+
182+
s3Lister, err := s3Session.ListObjectsV2(&s3.ListObjectsV2Input{
183+
Bucket: bucket,
184+
Prefix: aws.String(filepath.Join("v2", image.GetSourceRepository(), "manifests")),
185+
})
186+
if err != nil {
187+
return err
188+
}
189+
190+
for _, obj := range s3Lister.Contents {
191+
fname := filepath.Base(*obj.Key)
192+
if !strings.HasPrefix(fname, "sha256:") {
193+
dstTags = append(dstTags, fname)
194+
}
195+
}
196+
166197
continue
167198
}
168199

internal/sync/s3.go

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package sync
33
import (
44
"bytes"
55
"context"
6-
"crypto/sha256"
6+
"crypto/md5"
77
"fmt"
88
"io"
99
"net/http"
@@ -71,7 +71,7 @@ func pushS3WithSession(ctx context.Context, s3Session *s3.S3, bucket *string, im
7171
"v2",
7272
acl,
7373
aws.String("application/json"),
74-
bytes.NewReader([]byte{}), // No content is needed, we just need to return a 200.
74+
bytes.NewReader([]byte("{}")), // No content is needed, we just need to return a 200.
7575
); err != nil {
7676
return err
7777
}
@@ -168,19 +168,20 @@ func pushS3WithSession(ctx context.Context, s3Session *s3.S3, bucket *string, im
168168
ctx,
169169
s3Session,
170170
bucket,
171-
filepath.Join(baseDir, "manifests", tag),
171+
filepath.Join(baseDir, "manifests", desc.Digest.String()),
172172
acl,
173173
mediaType,
174174
bytes.NewReader(manifest),
175175
); err != nil {
176176
return err
177177
}
178178

179+
// Tag is added last so it can be used to check for duplication.
179180
if err := syncObject(
180181
ctx,
181182
s3Session,
182183
bucket,
183-
filepath.Join(baseDir, "manifests", desc.Digest.String()),
184+
manifestKey(image, tag),
184185
acl,
185186
mediaType,
186187
bytes.NewReader(manifest),
@@ -191,15 +192,19 @@ func pushS3WithSession(ctx context.Context, s3Session *s3.S3, bucket *string, im
191192
return nil
192193
}
193194

194-
func syncObject(ctx context.Context, s3Session *s3.S3, bucket *string, key string, acl *string, contentType *string, r io.ReadSeeker) error {
195-
// FIXME: we are reading the object every time to calculate the digest. This is inefficient.
196-
h := sha256.New()
197-
if _, err := io.Copy(h, r); err != nil {
198-
return err
195+
func s3ObjectExists(s3Session *s3.S3, bucket *string, key string) (bool, error) {
196+
_, err := s3Session.HeadObject(&s3.HeadObjectInput{
197+
Bucket: bucket,
198+
Key: &key,
199+
})
200+
if err != nil {
201+
return false, err
199202
}
200-
calculatedDigest := fmt.Sprintf("sha256:%x", h.Sum(nil))
201-
r.Seek(0, io.SeekStart)
202203

204+
return true, nil
205+
}
206+
207+
func syncObject(ctx context.Context, s3Session *s3.S3, bucket *string, key string, acl *string, contentType *string, r io.ReadSeeker) error {
203208
head, err := s3Session.HeadObject(&s3.HeadObjectInput{
204209
Bucket: bucket,
205210
Key: &key,
@@ -210,13 +215,32 @@ func syncObject(ctx context.Context, s3Session *s3.S3, bucket *string, key strin
210215
}
211216
}
212217

213-
headMetadataDigest, digestPresent := head.Metadata["X-Calculated-Digest"]
218+
// We store the digest as metadata so we can compare with the ETag without having to download the object.
219+
headMetadataDigestPtr, digestPresent := head.Metadata["X-Calculated-Digest"]
220+
var headMetadataDigest string
221+
if digestPresent {
222+
headMetadataDigest = *headMetadataDigestPtr
223+
}
224+
225+
var etag string
226+
if head != nil && head.ETag != nil {
227+
etag = strings.ReplaceAll(*head.ETag, `"`, "")
228+
}
214229

215230
if head == nil ||
216231
head.ContentType == nil ||
217232
*head.ContentType != *contentType ||
218233
!digestPresent ||
219-
*headMetadataDigest != calculatedDigest {
234+
headMetadataDigest != etag {
235+
236+
r.Seek(0, io.SeekStart)
237+
h := md5.New()
238+
if _, err := io.Copy(h, r); err != nil {
239+
return err
240+
}
241+
calculatedDigest := fmt.Sprintf("%x", h.Sum(nil))
242+
r.Seek(0, io.SeekStart)
243+
220244
log.Info().
221245
Str("bucket", *bucket).
222246
Str("key", key).
@@ -239,3 +263,7 @@ func syncObject(ctx context.Context, s3Session *s3.S3, bucket *string, key strin
239263

240264
return nil
241265
}
266+
267+
func manifestKey(image *structs.Image, tag string) string {
268+
return filepath.Join("v2", image.GetSourceRepository(), "manifests", tag)
269+
}

0 commit comments

Comments
 (0)