Skip to content

[Xet] Basic shard creation #1633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/hub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"test": "vitest run",
"test:browser": "vitest run --browser.name=chrome --browser.headless --config vitest-browser.config.mts",
"check": "tsc",
"build:xet-wasm": "./scripts/build-xet-wasm.sh -t bundler -c -b hoytak/250714-eliminate-mdb-v1"
"build:xet-wasm": "./scripts/build-xet-wasm.sh -t bundler --clean"
},
"files": [
"src",
Expand Down
1 change: 1 addition & 0 deletions packages/hub/scripts/build-xet-wasm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ fi

# copy the generated hf_xet_thin_wasm_bg.js to the hub package and hf_xet_thin_wasm_bg.wasm to the hub package
cp "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.js" "./src/vendor/xet-chunk/chunker_wasm_bg.js"
cp "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.wasm.d.ts" "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.d.ts"
echo "// Generated by build-xet-wasm.sh" > "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
echo "export const wasmBase64 = atob(\`" >> "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
base64 "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.wasm" | fold -w 100 >> "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
Expand Down
2 changes: 0 additions & 2 deletions packages/hub/src/utils/XetBlob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,6 @@ async function getAccessToken(
const jwt = {
accessToken: json.accessToken,
expiresAt: new Date(json.exp * 1000),
initialAccessToken,
refreshUrl,
casUrl: json.casUrl,
};

Expand Down
199 changes: 150 additions & 49 deletions packages/hub/src/utils/createXorbs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,70 +13,171 @@ const MAX_CHUNK_SIZE = 2 * TARGET_CHUNK_SIZE;
const XORB_SIZE = 64 * 1024 * 1024;
const MAX_XORB_CHUNKS = 8 * 1024;

export async function* createXorbs(
fileSource: Blob
): AsyncGenerator<{ xorb: Uint8Array; hash: string }, void, undefined> {
export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGenerator<
| {
type: "xorb";
xorb: Uint8Array;
hash: string;
id: number;
chunks: Array<{ hash: string; length: number; offset: number }>;
}
| {
type: "file";
hash: string;
sha256: string;
representation: Array<{
xorbId: number;
offset: number;
endOffset: number;
/** Unpacked length */
length: number;
rangeHash: string;
}>;
},
void,
undefined
> {
const chunkModule = await import("../vendor/xet-chunk/chunker_wasm");
const sha256Module = await import("../vendor/hash-wasm/sha256-wrapper");
let xorbId = 0;

await chunkModule.init();
const chunker = new chunkModule.Chunker(TARGET_CHUNK_SIZE);

let xorb = new Uint8Array(XORB_SIZE);
const sourceChunks: Array<Uint8Array> = [];
let xorbOffset = 0;
let xorbChunks = Array<{ hash: string; length: number; offset: number }>();

try {
const reader = fileSource.stream().getReader();
let xorbOffset = 0;
let xorbChunks = Array<{ hash: string; length: number }>();

const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
for (const chunk of chunks) {
let chunkToCopy: Uint8Array;
if (chunk.length === sourceChunks[0].length) {
chunkToCopy = sourceChunks[0];
sourceChunks.shift();
} else if (chunk.length < sourceChunks[0].length) {
chunkToCopy = sourceChunks[0].subarray(0, chunk.length);
sourceChunks[0] = sourceChunks[0].subarray(chunk.length);
} else {
chunkToCopy = new Uint8Array(chunk.length);
let copyOffset = 0;
let index = 0;
while (copyOffset < chunk.length) {
chunkToCopy.set(sourceChunks[index].subarray(0, chunk.length - copyOffset), copyOffset);
copyOffset += sourceChunks[index].length;
index++;
}
sourceChunks.splice(0, index);
}
xorbOffset = writeChunk(xorb, xorbOffset, chunkToCopy);
if (xorbOffset === 0) {
// Failure to write chunk, maybe because it went over xorb size limit
yield { xorb: xorb.subarray(0, xorbOffset), hash: "" };
xorb = new Uint8Array(XORB_SIZE);
xorbOffset = writeChunk(xorb, 0, chunkToCopy);
for await (const fileSource of fileSources) {
const initialXorbOffset = xorbOffset;
const sourceChunks: Array<Uint8Array> = [];

const reader = fileSource.stream().getReader();
const fileChunks: Array<{ hash: string; length: number }> = [];
let currentChunkRangeBeginning = 0;
const fileRepresentation: Array<{
xorbId: number;
offset: number;
endOffset: number;
length: number;
rangeHash: string;
}> = [];

const sha256 = await sha256Module.createSHA256();
sha256.init();

const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
for (const chunk of chunks) {
let chunkOffset = xorbOffset;
fileChunks.push({ hash: chunk.hash, length: chunk.length });
let chunkToCopy: Uint8Array;
if (chunk.length === sourceChunks[0].length) {
chunkToCopy = sourceChunks[0];
sourceChunks.shift();
} else if (chunk.length < sourceChunks[0].length) {
chunkToCopy = sourceChunks[0].subarray(0, chunk.length);
sourceChunks[0] = sourceChunks[0].subarray(chunk.length);
} else {
chunkToCopy = new Uint8Array(chunk.length);
let copyOffset = 0;
let index = 0;
while (copyOffset < chunk.length) {
chunkToCopy.set(sourceChunks[index].subarray(0, chunk.length - copyOffset), copyOffset);
copyOffset += sourceChunks[index].length;
index++;
}
sourceChunks.splice(0, index);
}
xorbOffset = writeChunk(xorb, xorbOffset, chunkToCopy);
if (xorbOffset === 0) {
throw new Error("Failed to write chunk into xorb");
// Failure to write chunk, maybe because it went over xorb size limit
yield {
type: "xorb" as const,
xorb: xorb.subarray(0, xorbOffset),
hash: chunkModule.compute_xorb_hash(xorbChunks),
chunks: [...xorbChunks],
id: xorbId,
};
xorbId++;
xorb = new Uint8Array(XORB_SIZE);
chunkOffset = 0;
xorbOffset = writeChunk(xorb, 0, chunkToCopy);

if (xorbOffset === 0) {
throw new Error("Failed to write chunk into xorb");
}
}
const lastRep = fileRepresentation.at(-1);

if (!lastRep) {
fileRepresentation.push({
xorbId,
offset: initialXorbOffset,
endOffset: xorbOffset - initialXorbOffset,
length: chunk.length,
rangeHash: "",
});
currentChunkRangeBeginning = fileChunks.length - 1;
} else {
if (lastRep.xorbId === xorbId) {
lastRep.endOffset = xorbOffset - lastRep.offset;
lastRep.length += chunk.length;
} else {
lastRep.rangeHash = chunkModule.compute_verification_hash(
fileChunks.slice(currentChunkRangeBeginning, -1).map((x) => x.hash, -1)
);
fileRepresentation.push({
xorbId,
offset: 0,
endOffset: xorbOffset,
length: chunk.length,
rangeHash: "",
});
currentChunkRangeBeginning = fileChunks.length - 1;
}
}
xorbChunks.push({ hash: chunk.hash, length: chunk.length, offset: chunkOffset });
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
yield {
type: "xorb" as const,
xorb: xorb.subarray(0, xorbOffset),
hash: chunkModule.compute_xorb_hash(xorbChunks),
chunks: [...xorbChunks],
id: xorbId,
};
xorbId++;
xorbOffset = 0;
xorbChunks = [];
xorb = new Uint8Array(XORB_SIZE);
}
}
xorbChunks.push(chunk);
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
yield { xorb: xorb.subarray(0, xorbOffset), hash: chunkModule.compute_xorb_hash(xorbChunks) };
xorbOffset = 0;
xorbChunks = [];
xorb = new Uint8Array(XORB_SIZE);
};

while (true) {
const { done, value } = await reader.read();
if (done) {
yield* addChunks(chunker.finish());
break;
}
sourceChunks.push(value);
sha256.update(value);
yield* addChunks(chunker.add_data(value));
}
};

while (true) {
const { done, value } = await reader.read();
if (done) {
yield* addChunks(chunker.finish());
break;
const lastRep = fileRepresentation.at(-1);
if (lastRep) {
lastRep.rangeHash = chunkModule.compute_verification_hash(
fileChunks.slice(currentChunkRangeBeginning).map((x) => x.hash)
);
}
sourceChunks.push(value);
yield* addChunks(chunker.add_data(value));

yield {
type: "file" as const,
hash: chunkModule.compute_file_hash(fileChunks),
sha256: sha256.digest("hex"),
representation: fileRepresentation,
};
}
} finally {
chunker.free();
Expand Down
Loading
Loading