@@ -2,68 +2,233 @@ package sync
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
6
+ "slices"
5
7
"strings"
6
8
"time"
7
9
10
+ "github.com/Altinity/docker-sync/config"
8
11
"github.com/Altinity/docker-sync/internal/telemetry"
9
12
"github.com/Altinity/docker-sync/structs"
10
13
"github.com/cenkalti/backoff/v4"
14
+ "github.com/google/go-containerregistry/pkg/logs"
15
+ "github.com/google/go-containerregistry/pkg/name"
16
+ "github.com/google/go-containerregistry/pkg/v1/remote"
11
17
"github.com/rs/zerolog/log"
12
18
"go.opentelemetry.io/otel/attribute"
13
19
"go.opentelemetry.io/otel/metric"
14
- "go.uber. org/multierr "
20
+ "golang. org/x/sync/errgroup "
15
21
)
16
22
23
+ func checkRateLimit (err error ) error {
24
+ if strings .Contains (err .Error (), "HAP429" ) || strings .Contains (err .Error (), "TOOMANYREQUESTS" ) {
25
+ log .Warn ().
26
+ Msg ("Rate limited by registry, backing off" )
27
+ return err
28
+ }
29
+
30
+ return backoff .Permanent (err )
31
+ }
32
+
33
+ func push (ctx context.Context , image * structs.Image , desc * remote.Descriptor , dst string , tag string ) error {
34
+ return backoff .Retry (func () error {
35
+ pushAuth , _ := getAuth (image .GetRegistry (dst ), image .GetRepository (dst ))
36
+
37
+ pusher , err := remote .NewPusher (pushAuth )
38
+ if err != nil {
39
+ return err
40
+ }
41
+
42
+ dstTag , err := name .ParseReference (fmt .Sprintf ("%s:%s" , dst , tag ))
43
+ if err != nil {
44
+ return fmt .Errorf ("failed to parse tag: %w" , err )
45
+ }
46
+
47
+ logs .Progress .Printf ("Pushing %s" , dstTag )
48
+
49
+ if err := pusher .Push (ctx , dstTag , desc ); err != nil {
50
+ return checkRateLimit (err )
51
+ }
52
+
53
+ return nil
54
+ }, backoff .NewExponentialBackOff (
55
+ backoff .WithInitialInterval (1 * time .Minute ),
56
+ ))
57
+ }
58
+
59
+ func pull (ctx context.Context , puller * remote.Puller , image * structs.Image , tag string ) (* remote.Descriptor , error ) {
60
+ srcTag , err := name .ParseReference (fmt .Sprintf ("%s:%s" , image .Source , tag ))
61
+ if err != nil {
62
+ return nil , fmt .Errorf ("failed to parse tag: %w" , err )
63
+ }
64
+
65
+ var desc * remote.Descriptor
66
+
67
+ logs .Progress .Printf ("Fetching %s" , srcTag )
68
+
69
+ if err := backoff .Retry (func () error {
70
+ desc , err = puller .Get (ctx , srcTag )
71
+ if err != nil {
72
+ return checkRateLimit (err )
73
+ }
74
+ return nil
75
+ }, backoff .NewExponentialBackOff (
76
+ backoff .WithInitialInterval (1 * time .Minute ),
77
+ )); err != nil {
78
+ return nil , err
79
+ }
80
+
81
+ return desc , nil
82
+ }
83
+
17
84
func SyncImage (ctx context.Context , image * structs.Image ) error {
18
- var merr error
85
+ log .Info ().
86
+ Str ("image" , image .Source ).
87
+ Strs ("targets" , image .Targets ).
88
+ Msg ("Syncing image" )
19
89
20
- pullAuth , pullAuthName := getAuth (image .GetSourceRegistry (), image .GetSourceRepository ())
90
+ pullAuth , _ := getAuth (image .GetSourceRegistry (), image .GetSourceRepository ())
21
91
22
- tags , err := image . GetTags (pullAuth )
92
+ puller , err := remote . NewPuller (pullAuth )
23
93
if err != nil {
24
94
return err
25
95
}
26
96
27
- for _ , tag := range tags {
28
- if err := backoff .Retry (func () error {
29
- if err := SyncTag (image , tag , pullAuthName , pullAuth ); err != nil {
30
- if strings .Contains (err .Error (), "HAP429" ) || strings .Contains (err .Error (), "TOOMANYREQUESTS" ) {
31
- log .Warn ().
32
- Str ("source" , image .Source ).
33
- Msg ("Rate limited by registry, backing off" )
34
- return err
35
- }
97
+ srcRepo , err := name .NewRepository (image .Source )
98
+ if err != nil {
99
+ return err
100
+ }
36
101
37
- return backoff .Permanent (err )
102
+ srcLister , err := puller .Lister (ctx , srcRepo )
103
+ if err != nil {
104
+ return err
105
+ }
106
+
107
+ // Get all tags from source
108
+ log .Info ().
109
+ Str ("image" , image .Source ).
110
+ Msg ("Fetching tags" )
111
+
112
+ var srcTags []string
113
+
114
+ for srcLister .HasNext () {
115
+ tags , err := srcLister .Next (ctx )
116
+ if err != nil {
117
+ return err
118
+ }
119
+
120
+ srcTags = append (srcTags , tags .Tags ... )
121
+ }
122
+
123
+ log .Info ().
124
+ Str ("image" , image .Source ).
125
+ Int ("tags" , len (srcTags )).
126
+ Msg ("Found tags" )
127
+
128
+ // Get all tags from targets
129
+ var dstTags []string
130
+
131
+ for _ , dst := range image .Targets {
132
+ log .Info ().
133
+ Str ("image" , image .Source ).
134
+ Str ("target" , dst ).
135
+ Msg ("Fetching destination tags" )
136
+
137
+ dstRepo , err := name .NewRepository (dst )
138
+ if err != nil {
139
+ return err
140
+ }
141
+
142
+ pushAuth , _ := getAuth (image .GetRegistry (dst ), image .GetRepository (dst ))
143
+
144
+ dstPuller , err := remote .NewPuller (pushAuth )
145
+ if err != nil {
146
+ return err
147
+ }
148
+
149
+ dstLister , err := dstPuller .Lister (ctx , dstRepo )
150
+ if err != nil {
151
+ return err
152
+ }
153
+
154
+ for dstLister .HasNext () {
155
+ tags , err := dstLister .Next (ctx )
156
+ if err != nil {
157
+ return err
38
158
}
39
159
40
- return nil
41
- }, backoff .NewExponentialBackOff (
42
- backoff .WithInitialInterval (1 * time .Minute ),
43
- )); err != nil {
44
- errs := multierr .Errors (err )
45
- if len (errs ) > 0 {
46
- telemetry .Errors .Add (ctx , int64 (len (errs )),
47
- metric .WithAttributes (
48
- attribute.KeyValue {
49
- Key : "image" ,
50
- Value : attribute .StringValue (image .Source ),
51
- },
52
- attribute.KeyValue {
53
- Key : "tag" ,
54
- Value : attribute .StringValue (tag ),
55
- },
56
- ),
57
- )
58
- log .Error ().
59
- Errs ("errors" , errs ).
60
- Msg ("Failed to sync tag" )
61
-
62
- merr = multierr .Append (merr , err )
63
- continue
160
+ for _ , tag := range tags .Tags {
161
+ dstTags = append (dstTags , fmt .Sprintf ("%s:%s" , dst , tag ))
64
162
}
65
163
}
164
+
165
+ log .Info ().
166
+ Str ("image" , image .Source ).
167
+ Str ("target" , dst ).
168
+ Int ("tags" , len (dstTags )).
169
+ Msg ("Found destination tags" )
170
+ }
171
+
172
+ // Sync tags
173
+ for _ , tag := range srcTags {
174
+ g , ctx := errgroup .WithContext (ctx )
175
+ g .SetLimit (config .SyncMaxErrors .Int ())
176
+
177
+ log .Info ().
178
+ Str ("image" , image .Source ).
179
+ Str ("tag" , tag ).
180
+ Strs ("targets" , image .Targets ).
181
+ Msg ("Syncing tag" )
182
+
183
+ if err := func () error {
184
+ tag := tag
185
+
186
+ desc , err := pull (ctx , puller , image , tag )
187
+ if err != nil {
188
+ return err
189
+ }
190
+
191
+ for _ , dst := range image .Targets {
192
+ g .Go (func () error {
193
+ if slices .Contains (dstTags , fmt .Sprintf ("%s:%s" , dst , tag )) {
194
+ log .Info ().
195
+ Str ("image" , image .Source ).
196
+ Str ("tag" , tag ).
197
+ Str ("target" , dst ).
198
+ Msg ("Tag already exists, skipping" )
199
+ return nil
200
+ }
201
+ if err := push (ctx , image , desc , dst , tag ); err != nil {
202
+ log .Error ().Err (err ).Msg ("Failed to push tag" )
203
+ }
204
+ return err
205
+ })
206
+ }
207
+
208
+ return g .Wait ()
209
+ }(); err != nil {
210
+ log .Error ().
211
+ Err (err ).
212
+ Str ("image" , image .Source ).
213
+ Str ("tag" , tag ).
214
+ Msg ("Failed to sync tag" )
215
+
216
+ telemetry .Errors .Add (ctx , 1 ,
217
+ metric .WithAttributes (
218
+ attribute.KeyValue {
219
+ Key : "image" ,
220
+ Value : attribute .StringValue (image .Source ),
221
+ },
222
+ attribute.KeyValue {
223
+ Key : "tag" ,
224
+ Value : attribute .StringValue (tag ),
225
+ },
226
+ ),
227
+ )
228
+
229
+ return err
230
+ }
66
231
}
67
232
68
- return merr
233
+ return nil
69
234
}
0 commit comments