Skip to content

Commit 2f9d4a0

Browse files
committed
progress events for uploading xorbs
1 parent adbe363 commit 2f9d4a0

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

packages/hub/src/utils/createXorbs.ts

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,23 @@ const MAX_CHUNK_SIZE = 2 * TARGET_CHUNK_SIZE;
1313
const XORB_SIZE = 64 * 1024 * 1024;
1414
const MAX_XORB_CHUNKS = 8 * 1024;
1515

16-
export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGenerator<
16+
export async function* createXorbs(
17+
fileSources: AsyncGenerator<{ content: Blob; path: string; sha256: string }>
18+
): AsyncGenerator<
1719
| {
1820
type: "xorb";
1921
xorb: Uint8Array;
2022
hash: string;
2123
id: number;
2224
chunks: Array<{ hash: string; length: number; offset: number }>;
25+
files: Array<{
26+
path: string;
27+
progress: number;
28+
}>;
2329
}
2430
| {
2531
type: "file";
32+
path: string;
2633
hash: string;
2734
sha256: string;
2835
representation: Array<{
@@ -38,7 +45,6 @@ export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGene
3845
undefined
3946
> {
4047
const chunkModule = await import("../vendor/xet-chunk/chunker_wasm");
41-
const sha256Module = await import("../vendor/hash-wasm/sha256-wrapper");
4248
let xorbId = 0;
4349

4450
await chunkModule.init();
@@ -47,13 +53,15 @@ export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGene
4753
let xorb = new Uint8Array(XORB_SIZE);
4854
let xorbOffset = 0;
4955
let xorbChunks = Array<{ hash: string; length: number; offset: number }>();
56+
let xorbFiles: Record<string, number> = {};
5057

5158
try {
5259
for await (const fileSource of fileSources) {
5360
const initialXorbOffset = xorbOffset;
5461
const sourceChunks: Array<Uint8Array> = [];
5562

56-
const reader = fileSource.stream().getReader();
63+
const reader = fileSource.content.stream().getReader();
64+
let processedBytes = 0;
5765
const fileChunks: Array<{ hash: string; length: number }> = [];
5866
let currentChunkRangeBeginning = 0;
5967
const fileRepresentation: Array<{
@@ -64,9 +72,6 @@ export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGene
6472
rangeHash: string;
6573
}> = [];
6674

67-
const sha256 = await sha256Module.createSHA256();
68-
sha256.init();
69-
7075
const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
7176
for (const chunk of chunks) {
7277
let chunkOffset = xorbOffset;
@@ -98,11 +103,13 @@ export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGene
98103
hash: chunkModule.compute_xorb_hash(xorbChunks),
99104
chunks: [...xorbChunks],
100105
id: xorbId,
106+
files: Object.entries(xorbFiles).map(([path, progress]) => ({ path, progress })),
101107
};
102108
xorbId++;
103109
xorb = new Uint8Array(XORB_SIZE);
104110
chunkOffset = 0;
105111
xorbOffset = writeChunk(xorb, 0, chunkToCopy);
112+
xorbFiles = {};
106113

107114
if (xorbOffset === 0) {
108115
throw new Error("Failed to write chunk into xorb");
@@ -138,17 +145,20 @@ export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGene
138145
}
139146
}
140147
xorbChunks.push({ hash: chunk.hash, length: chunk.length, offset: chunkOffset });
148+
xorbFiles[fileSource.path] = processedBytes / fileSource.content.size;
141149
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
142150
yield {
143151
type: "xorb" as const,
144152
xorb: xorb.subarray(0, xorbOffset),
145153
hash: chunkModule.compute_xorb_hash(xorbChunks),
146154
chunks: [...xorbChunks],
147155
id: xorbId,
156+
files: Object.entries(xorbFiles).map(([path, progress]) => ({ path, progress })),
148157
};
149158
xorbId++;
150159
xorbOffset = 0;
151160
xorbChunks = [];
161+
xorbFiles = {};
152162
xorb = new Uint8Array(XORB_SIZE);
153163
}
154164
}
@@ -160,8 +170,8 @@ export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGene
160170
yield* addChunks(chunker.finish());
161171
break;
162172
}
173+
processedBytes += value.length;
163174
sourceChunks.push(value);
164-
sha256.update(value);
165175
yield* addChunks(chunker.add_data(value));
166176
}
167177

@@ -174,9 +184,10 @@ export async function* createXorbs(fileSources: AsyncGenerator<Blob>): AsyncGene
174184

175185
yield {
176186
type: "file" as const,
187+
path: fileSource.path,
177188
hash: chunkModule.compute_file_hash(fileChunks),
178-
sha256: sha256.digest("hex"),
179189
representation: fileRepresentation,
190+
sha256: fileSource.sha256,
180191
};
181192
}
182193
} finally {

packages/hub/src/utils/uploadShards.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ interface UploadShardsParams {
6363
/**
6464
* Outputs the file sha256 after their xorbs/shards have been uploaded.
6565
*/
66-
export async function uploadShards(source: AsyncGenerator<Blob>, params: UploadShardsParams): Promise<string[]> {
66+
export async function* uploadShards(
67+
source: AsyncGenerator<{ content: Blob; path: string; sha256: string }>,
68+
params: UploadShardsParams
69+
): AsyncGenerator<
70+
{ type: "file"; path: string; sha256: string } | { type: "fileProgress"; path: string; progress: number }
71+
> {
6772
const xorbHashes: Array<string> = [];
6873

6974
const fileInfoSection = new Uint8Array(Math.floor(SHARD_MAX_SIZE - SHARD_HEADER_SIZE - SHARD_FOOTER_SIZE) / 2);
@@ -76,7 +81,6 @@ export async function uploadShards(source: AsyncGenerator<Blob>, params: UploadS
7681
let xorbTotalSize = 0n;
7782
let fileTotalSize = 0n;
7883
let xorbTotalUnpackedSize = 0n;
79-
const fileShas: Array<string> = [];
8084

8185
for await (const output of createXorbs(source)) {
8286
switch (output.type) {
@@ -111,10 +115,14 @@ export async function uploadShards(source: AsyncGenerator<Blob>, params: UploadS
111115

112116
await uploadXorb(output, params);
113117
//^ Todo: queue it and do not await it
118+
119+
for (const file of output.files) {
120+
yield { type: "fileProgress", path: file.path, progress: file.progress };
121+
}
114122
break;
115123
}
116124
case "file": {
117-
fileShas.push(output.sha256); // note: if yielding instead, maybe wait until shard is uploaded.
125+
yield { type: "file", path: output.path, sha256: output.sha256 }; // Maybe wait until shard is uploaded before yielding.
118126

119127
// todo: handle out of bounds
120128

@@ -292,8 +300,6 @@ export async function uploadShards(source: AsyncGenerator<Blob>, params: UploadS
292300
if (xorbViewOffset || fileViewOffset) {
293301
await uploadShard(createShard(), params);
294302
}
295-
296-
return fileShas;
297303
}
298304

299305
// Todo: switch from hex to non-hex when WASM switches. For now consider hash is hex

0 commit comments

Comments
 (0)