Skip to content

[Xet] Basic shard creation #1633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions packages/hub/scripts/build-xet-wasm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ fi

# copy the generated hf_xet_thin_wasm_bg.js to the hub package and hf_xet_thin_wasm_bg.wasm to the hub package
cp "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.js" "./src/vendor/xet-chunk/chunker_wasm_bg.js"
cp "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.wasm.d.ts" "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.d.ts"
echo "// Generated by build-xet-wasm.sh" > "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
echo "export const wasmBase64 = atob(\`" >> "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
base64 "$CLONE_DIR/$PACKAGE/pkg/hf_xet_thin_wasm_bg.wasm" | fold -w 100 >> "./src/vendor/xet-chunk/chunker_wasm_bg.wasm.base64.ts"
Expand Down
176 changes: 126 additions & 50 deletions packages/hub/src/utils/createXorbs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,70 +13,146 @@ const MAX_CHUNK_SIZE = 2 * TARGET_CHUNK_SIZE;
const XORB_SIZE = 64 * 1024 * 1024;
const MAX_XORB_CHUNKS = 8 * 1024;

export async function* createXorbs(
fileSource: Blob
): AsyncGenerator<{ xorb: Uint8Array; hash: string }, void, undefined> {
export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGenerator<
| {
type: "xorb";
xorb: Uint8Array;
hash: string;
id: number;
chunks: Array<{ hash: string; length: number; offset: number }>;
}
| {
type: "file";
hash: string;
verificationHash: string;
sha256: string;
representation: Array<{
xorbId: number;
offset: number;
endOffset: number;
/** Unpacked length */
length: number;
}>;
},
void,
undefined
> {
const chunkModule = await import("../vendor/xet-chunk/chunker_wasm");
const sha256Module = await import("../vendor/hash-wasm/sha256-wrapper");
let xorbId = 0;

await chunkModule.init();
const chunker = new chunkModule.Chunker(TARGET_CHUNK_SIZE);

let xorb = new Uint8Array(XORB_SIZE);
const sourceChunks: Array<Uint8Array> = [];
let xorbOffset = 0;
let xorbChunks = Array<{ hash: string; length: number; offset: number }>();

try {
const reader = fileSource.stream().getReader();
let xorbOffset = 0;
let xorbChunks = Array<{ hash: string; length: number }>();

const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
for (const chunk of chunks) {
let chunkToCopy: Uint8Array;
if (chunk.length === sourceChunks[0].length) {
chunkToCopy = sourceChunks[0];
sourceChunks.shift();
} else if (chunk.length < sourceChunks[0].length) {
chunkToCopy = sourceChunks[0].subarray(0, chunk.length);
sourceChunks[0] = sourceChunks[0].subarray(chunk.length);
} else {
chunkToCopy = new Uint8Array(chunk.length);
let copyOffset = 0;
let index = 0;
while (copyOffset < chunk.length) {
chunkToCopy.set(sourceChunks[index].subarray(0, chunk.length - copyOffset), copyOffset);
copyOffset += sourceChunks[index].length;
index++;
}
sourceChunks.splice(0, index);
}
xorbOffset = writeChunk(xorb, xorbOffset, chunkToCopy);
if (xorbOffset === 0) {
// Failure to write chunk, maybe because it went over xorb size limit
yield { xorb: xorb.subarray(0, xorbOffset), hash: "" };
xorb = new Uint8Array(XORB_SIZE);
xorbOffset = writeChunk(xorb, 0, chunkToCopy);
for await (const fileSource of fileSources) {
const initialXorbOffset = xorbOffset;
const sourceChunks: Array<Uint8Array> = [];

const reader = fileSource.stream().getReader();
const fileChunks: Array<{ hash: string; length: number }> = [];
const fileRepresentation: Array<{ xorbId: number; offset: number; endOffset: number; length: number }> = [];

const sha256 = await sha256Module.createSHA256();
sha256.init();

const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
for (const chunk of chunks) {
let chunkOffset = xorbOffset;
fileChunks.push({ hash: chunk.hash, length: chunk.length });
let chunkToCopy: Uint8Array;
if (chunk.length === sourceChunks[0].length) {
chunkToCopy = sourceChunks[0];
sourceChunks.shift();
} else if (chunk.length < sourceChunks[0].length) {
chunkToCopy = sourceChunks[0].subarray(0, chunk.length);
sourceChunks[0] = sourceChunks[0].subarray(chunk.length);
} else {
chunkToCopy = new Uint8Array(chunk.length);
let copyOffset = 0;
let index = 0;
while (copyOffset < chunk.length) {
chunkToCopy.set(sourceChunks[index].subarray(0, chunk.length - copyOffset), copyOffset);
copyOffset += sourceChunks[index].length;
index++;
}
sourceChunks.splice(0, index);
}
xorbOffset = writeChunk(xorb, xorbOffset, chunkToCopy);
if (xorbOffset === 0) {
throw new Error("Failed to write chunk into xorb");
// Failure to write chunk, maybe because it went over xorb size limit
yield {
type: "xorb" as const,
xorb: xorb.subarray(0, xorbOffset),
hash: chunkModule.compute_xorb_hash(xorbChunks),
chunks: [...xorbChunks],
id: xorbId,
};
xorbId++;
xorb = new Uint8Array(XORB_SIZE);
chunkOffset = 0;
xorbOffset = writeChunk(xorb, 0, chunkToCopy);

if (xorbOffset === 0) {
throw new Error("Failed to write chunk into xorb");
}
}
const lastRep = fileRepresentation.at(-1);

if (!lastRep) {
fileRepresentation.push({
xorbId,
offset: initialXorbOffset,
endOffset: xorbOffset - initialXorbOffset,
length: chunk.length,
});
} else {
if (lastRep.xorbId === xorbId) {
lastRep.endOffset = xorbOffset - lastRep.offset;
lastRep.length += chunk.length;
} else {
fileRepresentation.push({ xorbId, offset: 0, endOffset: xorbOffset, length: chunk.length });
}
}
xorbChunks.push({ hash: chunk.hash, length: chunk.length, offset: chunkOffset });
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
yield {
type: "xorb" as const,
xorb: xorb.subarray(0, xorbOffset),
hash: chunkModule.compute_xorb_hash(xorbChunks),
chunks: [...xorbChunks],
id: xorbId,
};
xorbId++;
xorbOffset = 0;
xorbChunks = [];
xorb = new Uint8Array(XORB_SIZE);
}
}
xorbChunks.push(chunk);
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
yield { xorb: xorb.subarray(0, xorbOffset), hash: chunkModule.compute_xorb_hash(xorbChunks) };
xorbOffset = 0;
xorbChunks = [];
xorb = new Uint8Array(XORB_SIZE);
};

while (true) {
const { done, value } = await reader.read();
if (done) {
yield* addChunks(chunker.finish());
break;
}
sourceChunks.push(value);
sha256.update(value);
yield* addChunks(chunker.add_data(value));
}
};

while (true) {
const { done, value } = await reader.read();
if (done) {
yield* addChunks(chunker.finish());
break;
}
sourceChunks.push(value);
yield* addChunks(chunker.add_data(value));
yield {
type: "file" as const,
hash: chunkModule.compute_file_hash(fileChunks),
verificationHash: chunkModule.compute_range_verification_hash(fileChunks.map((x) => x.hash)),
sha256: sha256.digest("hex"),
representation: fileRepresentation,
};
}
} finally {
chunker.free();
Expand Down
Loading
Loading