@@ -3,9 +3,11 @@ package sync
3
3
import (
4
4
"bytes"
5
5
"context"
6
+ "crypto/sha256"
6
7
"fmt"
7
8
"io"
8
9
"net/http"
10
+ "os"
9
11
"path/filepath"
10
12
"strings"
11
13
"time"
@@ -69,12 +71,12 @@ func pushS3WithSession(ctx context.Context, s3Session *s3.S3, bucket *string, im
69
71
"v2" ,
70
72
acl ,
71
73
aws .String ("application/json" ),
72
- []byte {}, // No content is needed, we just need to return a 200.
74
+ bytes . NewReader ( []byte {}) , // No content is needed, we just need to return a 200.
73
75
); err != nil {
74
76
return err
75
77
}
76
78
77
- baseDir := filepath .Join ("v2" , image .GetName ())
79
+ baseDir := filepath .Join ("v2" , image .GetSourceRepository ())
78
80
79
81
i , err := desc .Image ()
80
82
if err != nil {
@@ -93,7 +95,7 @@ func pushS3WithSession(ctx context.Context, s3Session *s3.S3, bucket *string, im
93
95
filepath .Join (baseDir , "blobs" , cnfHash .String ()),
94
96
acl ,
95
97
aws .String ("application/vnd.docker.container.image.v1+json" ),
96
- cnf ,
98
+ bytes . NewReader ( cnf ) ,
97
99
); err != nil {
98
100
return err
99
101
}
@@ -105,46 +107,56 @@ func pushS3WithSession(ctx context.Context, s3Session *s3.S3, bucket *string, im
105
107
return err
106
108
}
107
109
110
+ // Blobs can be huge and we need a io.ReadSeeker, so we can't read them all into memory.
111
+ tmpDir , err := os .MkdirTemp (os .TempDir (), "docker-sync" )
112
+ if err != nil {
113
+ return err
114
+ }
115
+
108
116
// Layers are synced first to avoid making a tag available before all its blobs are available.
109
117
for _ , layer := range l {
110
- digest , err := layer .Digest ()
111
- if err != nil {
112
- return err
113
- }
114
-
115
- mediaType , err := layer .MediaType ()
116
- if err != nil {
117
- return err
118
- }
118
+ if err := func () error {
119
+ digest , err := layer .Digest ()
120
+ if err != nil {
121
+ return err
122
+ }
119
123
120
- var r io.ReadCloser
124
+ mediaType , err := layer .MediaType ()
125
+ if err != nil {
126
+ return err
127
+ }
121
128
122
- if strings .HasSuffix (string (mediaType ), ".gzip" ) {
123
- r , err = layer .Compressed ()
129
+ r , err := layer .Compressed ()
124
130
if err != nil {
125
131
return err
126
132
}
127
- } else {
128
- r , err = layer . Uncompressed ( )
133
+
134
+ tmpFile , err := os . Create ( filepath . Join ( tmpDir , "blob" ) )
129
135
if err != nil {
130
136
return err
131
137
}
132
- }
138
+ defer os .Remove (tmpFile .Name ())
139
+ defer tmpFile .Close ()
133
140
134
- b , err := io .ReadAll ( r )
135
- if err != nil {
136
- return err
137
- }
141
+ if _ , err := io .Copy ( tmpFile , r ); err != nil {
142
+ return err
143
+ }
144
+ tmpFile . Seek ( 0 , io . SeekStart )
138
145
139
- if err := syncObject (
140
- ctx ,
141
- s3Session ,
142
- bucket ,
143
- filepath .Join (baseDir , "blobs" , digest .String ()),
144
- acl ,
145
- aws .String (string (mediaType )),
146
- b ,
147
- ); err != nil {
146
+ if err := syncObject (
147
+ ctx ,
148
+ s3Session ,
149
+ bucket ,
150
+ filepath .Join (baseDir , "blobs" , digest .String ()),
151
+ acl ,
152
+ aws .String (string (mediaType )),
153
+ tmpFile ,
154
+ ); err != nil {
155
+ return err
156
+ }
157
+
158
+ return nil
159
+ }(); err != nil {
148
160
return err
149
161
}
150
162
}
@@ -160,7 +172,7 @@ func pushS3WithSession(ctx context.Context, s3Session *s3.S3, bucket *string, im
160
172
filepath .Join (baseDir , "manifests" , tag ),
161
173
acl ,
162
174
mediaType ,
163
- manifest ,
175
+ bytes . NewReader ( manifest ) ,
164
176
); err != nil {
165
177
return err
166
178
}
@@ -172,15 +184,22 @@ func pushS3WithSession(ctx context.Context, s3Session *s3.S3, bucket *string, im
172
184
filepath .Join (baseDir , "manifests" , desc .Digest .String ()),
173
185
acl ,
174
186
mediaType ,
175
- manifest ,
187
+ bytes . NewReader ( manifest ) ,
176
188
); err != nil {
177
189
return err
178
190
}
179
191
180
192
return nil
181
193
}
182
194
183
- func syncObject (ctx context.Context , s3Session * s3.S3 , bucket * string , key string , acl * string , contentType * string , b []byte ) error {
195
+ func syncObject (ctx context.Context , s3Session * s3.S3 , bucket * string , key string , acl * string , contentType * string , r io.ReadSeeker ) error {
196
+ h := sha256 .New ()
197
+ if _ , err := io .Copy (h , r ); err != nil {
198
+ return err
199
+ }
200
+ calculatedDigest := fmt .Sprintf ("sha256:%x" , h .Sum (nil ))
201
+ r .Seek (0 , io .SeekStart )
202
+
184
203
head , err := s3Session .HeadObject (& s3.HeadObjectInput {
185
204
Bucket : bucket ,
186
205
Key : & key ,
@@ -191,24 +210,28 @@ func syncObject(ctx context.Context, s3Session *s3.S3, bucket *string, key strin
191
210
}
192
211
}
193
212
213
+ headMetadataDigest , digestPresent := head .Metadata ["calculatedDigest" ]
214
+
194
215
if head == nil ||
195
- head .ContentLength == nil ||
196
- * head .ContentLength != int64 (len (b )) ||
197
216
head .ContentType == nil ||
198
- * head .ContentType != * contentType {
217
+ * head .ContentType != * contentType ||
218
+ ! digestPresent ||
219
+ * headMetadataDigest != calculatedDigest {
199
220
log .Info ().
200
221
Str ("bucket" , * bucket ).
201
222
Str ("key" , key ).
202
223
Str ("contentType" , * contentType ).
203
- Int64 ("contentLength" , int64 (len (b ))).
204
224
Msg ("Syncing object" )
205
225
206
226
if _ , err := s3Session .PutObject (& s3.PutObjectInput {
207
227
Bucket : bucket ,
208
228
Key : & key ,
209
- Body : bytes . NewReader ( b ) ,
229
+ Body : r ,
210
230
ACL : acl ,
211
231
ContentType : contentType ,
232
+ Metadata : map [string ]* string {
233
+ "calculatedDigest" : aws .String (calculatedDigest ),
234
+ },
212
235
}); err != nil {
213
236
return err
214
237
}
0 commit comments