Skip to content

Commit 289c8ba

Browse files
committed
registry: Fixes to allow layer uploads above 5GB
As we need to copy a stream of the uploaded layer (random uuid) to a `blobs/.../<digest>` path, that might surpass the maximum size of a monolithic upload to R2 without using multipart uploads. To circumvent this, we will calculate the digest from the uploaded path and compare. Then, we will create a reference in the original `blobs/.../digest` path pointing to the upload path.
1 parent 7f27e14 commit 289c8ba

File tree

5 files changed

+162
-12
lines changed

5 files changed

+162
-12
lines changed

src/registry/garbage-collector.ts

+8
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
// Unreferenced will delete all blobs that are not referenced by any manifest.
33
// Untagged will delete all blobs that are not referenced by any manifest and are not tagged.
44

5+
import { unknown } from "zod";
56
import { ServerError } from "../errors";
67
import { ManifestSchema } from "../manifest";
8+
import { isReference } from "./r2";
79

810
export type GarbageCollectionMode = "unreferenced" | "untagged";
911
export type GCOptions = {
@@ -219,6 +221,12 @@ export class GarbageCollector {
219221
await this.list(`${options.name}/blobs/`, async (object) => {
220222
const hash = object.key.split("/").pop();
221223
if (hash && !referencedBlobs.has(hash)) {
224+
const key = isReference(object);
225+
// also push the underlying reference object
226+
if (key) {
227+
unreferencedKeys.push(key);
228+
}
229+
222230
unreferencedKeys.push(object.key);
223231
if (unreferencedKeys.length > deleteThreshold) {
224232
if (!(await this.checkIfGCCanContinue(options.name, mark))) {

src/registry/r2.ts

+114-11
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from "../chunk";
1212
import { InternalError, ManifestError, RangeError, ServerError } from "../errors";
1313
import { SHA256_PREFIX_LEN, getSHA256, hexToDigest } from "../user";
14-
import { readableToBlob, readerToBlob, wrap } from "../utils";
14+
import { errorString, readableToBlob, readerToBlob, wrap } from "../utils";
1515
import { BlobUnknownError, ManifestUnknownError } from "../v2-errors";
1616
import {
1717
CheckLayerResponse,
@@ -29,6 +29,7 @@ import {
2929
} from "./registry";
3030
import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector";
3131
import { ManifestSchema, manifestSchema } from "../manifest";
32+
import { DigestInvalid, RegistryResponseJSON } from "../v2-responses";
3233

3334
export type Chunk =
3435
| {
@@ -101,6 +102,10 @@ export async function encodeState(state: State, env: Env): Promise<{ jwt: string
101102
return { jwt: jwtSignature, hash: await getSHA256(jwtSignature, "") };
102103
}
103104

105+
export const referenceHeader = "X-Serverless-Registry-Reference";
106+
export const digestHeaderInReference = "X-Serverless-Registry-Digest";
107+
export const registryUploadKey = "X-Serverless-Registry-Upload";
108+
104109
export async function getUploadState(
105110
name: string,
106111
uploadId: string,
@@ -127,6 +132,15 @@ export async function getUploadState(
127132
return { state: stateObject, stateStr: stateStr, hash: stateStrHash };
128133
}
129134

135+
export function isReference(r2Object: R2Object): false | string {
136+
if (r2Object.customMetadata === undefined) return false;
137+
const value = r2Object.customMetadata[referenceHeader];
138+
if (value !== undefined) {
139+
return value;
140+
}
141+
return false;
142+
}
143+
130144
export class R2Registry implements Registry {
131145
private gc: GarbageCollector;
132146

@@ -196,6 +210,11 @@ export class R2Registry implements Registry {
196210
// name format is:
197211
// <path>/<'blobs' | 'manifests'>/<name>
198212
const parts = object.key.split("/");
213+
// maybe an upload.
214+
if (parts.length === 1) {
215+
return;
216+
}
217+
199218
const repository = parts.slice(0, parts.length - 2).join("/");
200219
if (parts[parts.length - 2] === "blobs") return;
201220

@@ -389,15 +408,39 @@ export class R2Registry implements Registry {
389408
};
390409
}
391410

411+
const key = isReference(res);
412+
let [digest, size] = ["", 0];
413+
if (key) {
414+
const [res, err] = await wrap(this.env.REGISTRY.head(key));
415+
if (err) {
416+
return wrapError("layerExists", err);
417+
}
418+
419+
if (!res) {
420+
return { exists: false };
421+
}
422+
423+
if (!res.customMetadata) throw new Error("unreachable");
424+
if (!res.customMetadata[digestHeaderInReference]) throw new Error("unreachable");
425+
const possibleDigest = res.customMetadata[digestHeaderInReference];
426+
if (!possibleDigest) throw new Error("unreachable, no digest");
427+
428+
digest = possibleDigest;
429+
size = res.size;
430+
} else {
431+
digest = hexToDigest(res.checksums.sha256!);
432+
size = res.size;
433+
}
434+
392435
return {
393-
digest: hexToDigest(res.checksums.sha256!),
394-
size: res.size,
436+
digest,
437+
size,
395438
exists: true,
396439
};
397440
}
398441

399442
async getLayer(name: string, digest: string): Promise<RegistryError | GetLayerResponse> {
400-
const [res, err] = await wrap(this.env.REGISTRY.get(`${name}/blobs/${digest}`));
443+
let [res, err] = await wrap(this.env.REGISTRY.get(`${name}/blobs/${digest}`));
401444
if (err) {
402445
return wrapError("getLayer", err);
403446
}
@@ -408,9 +451,24 @@ export class R2Registry implements Registry {
408451
};
409452
}
410453

454+
const id = isReference(res);
455+
if (id) {
456+
[res, err] = await wrap(this.env.REGISTRY.get(id));
457+
if (err) {
458+
return wrapError("getLayer", err);
459+
}
460+
461+
if (!res) {
462+
// not a 500, because garbage collection deletes the underlying layer first
463+
return {
464+
response: new Response(JSON.stringify(BlobUnknownError), { status: 404 }),
465+
};
466+
}
467+
}
468+
411469
return {
412470
stream: res.body!,
413-
digest: hexToDigest(res.checksums.sha256!),
471+
digest,
414472
size: res.size,
415473
};
416474
}
@@ -419,7 +477,9 @@ export class R2Registry implements Registry {
419477
// Generate a unique ID for this upload
420478
const uuid = crypto.randomUUID();
421479

422-
const upload = await this.env.REGISTRY.createMultipartUpload(uuid);
480+
const upload = await this.env.REGISTRY.createMultipartUpload(uuid, {
481+
customMetadata: { [registryUploadKey]: "true" },
482+
});
423483
const state = {
424484
uploadId: upload.uploadId,
425485
parts: [],
@@ -691,12 +751,55 @@ export class R2Registry implements Registry {
691751
// TODO: Handle one last buffer here
692752
await upload.complete(state.parts);
693753
const obj = await this.env.REGISTRY.get(uuid);
694-
const put = this.env.REGISTRY.put(`${namespace}/blobs/${expectedSha}`, obj!.body, {
695-
sha256: (expectedSha as string).slice(SHA256_PREFIX_LEN),
696-
});
754+
if (obj === null) {
755+
console.error("unreachable, obj is null when we just created upload");
756+
return {
757+
response: new InternalError(),
758+
};
759+
}
697760

698-
await put;
699-
await this.env.REGISTRY.delete(uuid);
761+
const target = `${namespace}/blobs/${expectedSha}`;
762+
const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000;
763+
// If layer surpasses the maximum size of an R2 upload, we need to calculate the digest
764+
// stream and create a reference from the blobs path to the
765+
// upload path. In R2, moving objects mean copying the stream, which
766+
// doesn't really work if it's above 5GB due to R2 limits.
767+
if (obj.size >= MAXIMUM_SIZE_R2_OBJECT) {
768+
const compressionStream = new crypto.DigestStream("SHA-256");
769+
obj.body.pipeTo(compressionStream);
770+
const digest = hexToDigest(await compressionStream.digest);
771+
if (digest !== expectedSha) {
772+
return { response: new RegistryResponseJSON(JSON.stringify(DigestInvalid(expectedSha, digest))) };
773+
}
774+
775+
const [, err] = await wrap(
776+
this.env.REGISTRY.put(target, uuid, {
777+
customMetadata: {
778+
[referenceHeader]: uuid,
779+
[digestHeaderInReference]: digest,
780+
},
781+
}),
782+
);
783+
if (err !== null) {
784+
console.error("error uploading reference blob", errorString(err));
785+
await this.env.REGISTRY.delete(uuid);
786+
return {
787+
response: new InternalError(),
788+
};
789+
}
790+
} else {
791+
const put = this.env.REGISTRY.put(target, obj!.body, {
792+
sha256: (expectedSha as string).slice(SHA256_PREFIX_LEN),
793+
});
794+
const [, err] = await wrap(put);
795+
await this.env.REGISTRY.delete(uuid);
796+
if (err !== null) {
797+
console.error("error uploading blob", errorString(err));
798+
return {
799+
response: new InternalError(),
800+
};
801+
}
802+
}
700803
}
701804

702805
await this.env.REGISTRY.delete(getRegistryUploadsPath(state));

src/router.ts

+1
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ v2Router.put("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
455455
);
456456

457457
if (err) {
458+
console.error("Error uploading manifest", errorString(err))
458459
return new InternalError();
459460
}
460461

src/v2-responses.ts

+35
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,38 @@ export const ManifestTagsListTooBigError = {
77
},
88
],
99
};
10+
11+
export class RegistryResponse extends Response {
12+
constructor(body?: BodyInit | null, init?: ResponseInit) {
13+
super(body, {
14+
...init,
15+
headers: {
16+
...init?.headers,
17+
"Docker-Distribution-Api-Version": "registry/2.0",
18+
},
19+
});
20+
}
21+
}
22+
export class RegistryResponseJSON extends RegistryResponse {
23+
constructor(body?: BodyInit | null, init?: ResponseInit) {
24+
super(body, {
25+
...init,
26+
headers: {
27+
...init?.headers,
28+
"Content-Type": "application/json",
29+
},
30+
});
31+
}
32+
}
33+
export const DigestInvalid = (expected: string, got: string) => ({
34+
errors: [
35+
{
36+
code: "DIGEST_INVALID",
37+
message: "digests don't match",
38+
detail: {
39+
Expected: expected,
40+
Got: got,
41+
},
42+
},
43+
],
44+
});

test/index.test.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ async function generateManifest(name: string, schemaVersion: 1 | 2 = 2): Promise
2222
const res2 = await fetch(createRequest("PATCH", res.headers.get("location")!, stream, {}));
2323
expect(res2.ok).toBeTruthy();
2424
const last = await fetch(createRequest("PUT", res2.headers.get("location")! + "&digest=" + sha256, null, {}));
25-
expect(last.ok).toBeTruthy();
25+
if (!last.ok) {
26+
throw new Error(await last.text());
27+
}
28+
2629
return schemaVersion === 1
2730
? {
2831
schemaVersion,

0 commit comments

Comments
 (0)