Skip to content

Commit 214f714

Browse files
committed
registry: Introduce hash-wasm so we can store the hash state and we can continue hashing from where we left it
1 parent 9511163 commit 214f714

File tree

3 files changed

+97
-10
lines changed

3 files changed

+97
-10
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
},
1212
"dependencies": {
1313
"@cfworker/base64url": "^1.12.5",
14+
"@taylorzane/hash-wasm": "^0.0.11",
1415
"@tsndr/cloudflare-worker-jwt": "^2.5.1",
1516
"itty-router": "^4.0.27",
1617
"zod": "^3.22.4"

pnpm-lock.yaml

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/registry/r2.ts

+88-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { createSHA256, IHasher, loadWasm } from "@taylorzane/hash-wasm";
12
import { Env } from "../..";
23
import jwt from "@tsndr/cloudflare-worker-jwt";
34
import {
@@ -30,6 +31,38 @@ import {
3031
import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector";
3132
import { ManifestSchema, manifestSchema } from "../manifest";
3233
import { DigestInvalid, RegistryResponseJSON } from "../v2-responses";
34+
// @ts-expect-error: No declaration file for module
35+
import sha256Wasm from "@taylorzane/hash-wasm/wasm/sha256.wasm";
36+
37+
export async function hash(readableStream: ReadableStream | null, state?: Uint8Array): Promise<IHasher> {
38+
loadWasm({ sha256: sha256Wasm });
39+
let hasher = await createSHA256();
40+
if (state !== undefined) {
41+
hasher.load(state);
42+
} else {
43+
hasher = hasher.init();
44+
}
45+
46+
const reader = readableStream?.getReader({ mode: "byob" });
47+
while (reader !== undefined) {
48+
// Read limit 5MB so we don't buffer that much memory at a time (Workers runtime is kinda weird with constraints with tee() if the other stream side is very slow)
49+
const array = new Uint8Array(1024 * 1024 * 5);
50+
const value = await reader.read(array);
51+
if (value.done) break;
52+
hasher.update(value.value);
53+
}
54+
55+
return hasher;
56+
}
57+
58+
export function hashStateToUint8Array(hashState: string): Uint8Array {
59+
const buffer = Buffer.from(hashState, "base64");
60+
return new Uint8Array(buffer);
61+
}
62+
63+
export function intoBase64FromUint8Array(array: Uint8Array): string {
64+
return Buffer.from(array).toString("base64");
65+
}
3366

3467
export type Chunk =
3568
| {
@@ -66,6 +99,7 @@ export type State = {
6699
registryUploadId: string;
67100
byteRange: number;
68101
name: string;
102+
hashState?: string;
69103
};
70104

71105
export function getRegistryUploadsPath(state: { registryUploadId: string; name: string }): string {
@@ -686,12 +720,48 @@ export class R2Registry implements Registry {
686720
};
687721
}
688722

689-
const res = await appendStreamKnownLength(stream, length);
723+
let hasherPromise: Promise<IHasher> | undefined;
724+
if (
725+
length <= MAXIMUM_CHUNK &&
726+
// if starting, or already started.
727+
(state.parts.length === 0 || (state.parts.length > 0 && state.hashState !== undefined))
728+
) {
729+
const [s1, s2] = stream.tee();
730+
stream = s1;
731+
let bytes: undefined | Uint8Array;
732+
if (state.hashState !== undefined) {
733+
bytes = hashStateToUint8Array(state.hashState);
734+
}
735+
736+
hasherPromise = hash(s2, bytes);
737+
} else {
738+
state.hashState = undefined;
739+
}
740+
741+
const [res, hasherResponse] = await Promise.allSettled([appendStreamKnownLength(stream, length), hasherPromise]);
690742
state.byteRange += length;
691-
if (res instanceof RangeError)
743+
if (res.status === "rejected") {
692744
return {
693-
response: res,
745+
response: new InternalError(),
694746
};
747+
}
748+
749+
if (res.value instanceof RangeError) {
750+
return {
751+
response: res.value,
752+
};
753+
}
754+
755+
if (hasherPromise !== undefined && hasherResponse !== undefined) {
756+
if (hasherResponse.status === "rejected") {
757+
throw hasherResponse.reason;
758+
}
759+
760+
if (hasherResponse.value === undefined) throw new Error("unreachable");
761+
762+
const value = hasherResponse.value.save();
763+
state.hashState = intoBase64FromUint8Array(value);
764+
}
695765

696766
const hashedJwtState = await encodeState(state, env);
697767
return {
@@ -758,16 +828,24 @@ export class R2Registry implements Registry {
758828
};
759829
}
760830

761-
const target = `${namespace}/blobs/${expectedSha}`;
762831
const MAXIMUM_SIZE_R2_OBJECT = 5 * 1000 * 1000 * 1000;
832+
if (obj.size >= MAXIMUM_SIZE_R2_OBJECT && state.hashState === undefined) {
833+
console.error(`The maximum size of an R2 object is 5gb, multipart uploads don't
834+
have an sha256 option. Please try to use a push tool that chunks the layers if your layer is above 5gb`);
835+
return {
836+
response: new InternalError(),
837+
};
838+
}
839+
840+
const target = `${namespace}/blobs/${expectedSha}`;
763841
// If layer surpasses the maximum size of an R2 upload, we need to calculate the digest
764842
// stream and create a reference from the blobs path to the
765-
// upload path. In R2, moving objects mean copying the stream, which
766-
// doesn't really work if it's above 5GB due to R2 limits.
767-
if (obj.size >= MAXIMUM_SIZE_R2_OBJECT) {
768-
const compressionStream = new crypto.DigestStream("SHA-256");
769-
obj.body.pipeTo(compressionStream);
770-
const digest = hexToDigest(await compressionStream.digest);
843+
// upload path. That's why we need hash-wasm, as it allows you to store the state.
844+
if (state.hashState !== undefined) {
845+
const stateEncoded = hashStateToUint8Array(state.hashState);
846+
const hasher = await hash(null, stateEncoded);
847+
const digest = hasher.digest("hex");
848+
771849
if (digest !== expectedSha) {
772850
return { response: new RegistryResponseJSON(JSON.stringify(DigestInvalid(expectedSha, digest))) };
773851
}

0 commit comments

Comments
 (0)