From 9c186267caa14f8978869bb52c3b354b54883a44 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 30 Jan 2024 19:47:53 -0800 Subject: [PATCH] WIP --- package-lock.json | 15 +++++++++ package.json | 1 + src/client.js | 65 ++++++++++++++++++++++++++++++-------- src/types.js | 12 ++++++-- src/utils/car.js | 59 +++++++++++++++++++++++------------ test/car.spec.js | 39 ++++++++++++++--------- test/fallback.spec.js | 72 +++++++++++++++++++++++++++++-------------- test/index.spec.js | 3 +- 8 files changed, 194 insertions(+), 72 deletions(-) diff --git a/package-lock.json b/package-lock.json index 55f87f3..78da02d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "@ipld/dag-pb": "^2.1.18", "@multiformats/blake2": "^1.0.11", "browser-readablestream-to-it": "^2.0.4", + "content-range": "^2.0.2", "hashring": "^3.2.0", "idb": "^7.1.1", "ipfs-unixfs-exporter": "https://gitpkg.now.sh/filecoin-saturn/js-ipfs-unixfs/packages/ipfs-unixfs-exporter?build", @@ -1574,6 +1575,15 @@ "resolved": "https://registry.npmjs.org/connection-parse/-/connection-parse-0.0.7.tgz", "integrity": "sha512-bTTG28diWg7R7/+qE5NZumwPbCiJOT8uPdZYu674brDjBWQctbaQbYlDKhalS+4i5HxIx+G8dZsnBHKzWpp01A==" }, + "node_modules/content-range": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/content-range/-/content-range-2.0.2.tgz", + "integrity": "sha512-ayHd/VQMRfWFBLVXyYvNhbbR+vq5OgLJnViGV4arzQiX9odRhf1spmXF7pFEHiR8htli29Hbii0URCNfUTN4vQ==", + "funding": { + "type": "github", + "url": "https://github.com/sponsors/gregberge" + } + }, "node_modules/cookie": { "version": "0.4.2", "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.2.tgz", @@ -6929,6 +6939,11 @@ "resolved": "https://registry.npmjs.org/connection-parse/-/connection-parse-0.0.7.tgz", "integrity": "sha512-bTTG28diWg7R7/+qE5NZumwPbCiJOT8uPdZYu674brDjBWQctbaQbYlDKhalS+4i5HxIx+G8dZsnBHKzWpp01A==" }, + "content-range": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/content-range/-/content-range-2.0.2.tgz", + "integrity": "sha512-ayHd/VQMRfWFBLVXyYvNhbbR+vq5OgLJnViGV4arzQiX9odRhf1spmXF7pFEHiR8htli29Hbii0URCNfUTN4vQ==" + }, "cookie": { "version": "0.4.2", "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.2.tgz", diff --git a/package.json b/package.json index ce2c58a..98a385f 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "@ipld/dag-pb": "^2.1.18", "@multiformats/blake2": "^1.0.11", "browser-readablestream-to-it": "^2.0.4", + "content-range": "^2.0.2", "hashring": "^3.2.0", "idb": "^7.1.1", "ipfs-unixfs-exporter": "https://gitpkg.now.sh/filecoin-saturn/js-ipfs-unixfs/packages/ipfs-unixfs-exporter?build", diff --git a/src/client.js b/src/client.js index 43e6c10..750181a 100644 --- a/src/client.js +++ b/src/client.js @@ -3,7 +3,7 @@ import { CID } from 'multiformats' import pLimit from 'p-limit' -import { extractVerifiedContent } from './utils/car.js' +import { extractVerifiedContent, extractVerifiedEntity, normalizeContentRange } from './utils/car.js' import { asAsyncIterable, asyncIteratorToBuffer } from './utils/itr.js' import { randomUUID } from './utils/uuid.js' import { memoryStorage } from './storage/index.js' @@ -12,11 +12,13 @@ import { parseUrl, addHttpPrefix } from './utils/url.js' import { isBrowserContext } from './utils/runtime.js' import HashRing from 'hashring' import { isErrorUnavoidable } from './utils/errors.js' +import { parse as parseRange } from "content-range" const MAX_NODE_WEIGHT = 100 /** * @typedef {import('./types.js').Node} Node * @typedef {import('./types.js').FetchOptions} FetchOptions + * @typedef {import('./types.js').Response} Response */ export class Saturn { @@ -250,7 +252,7 @@ export class Saturn { * * @param {string} cidPath * @param {FetchOptions} [opts={}] - * @returns {Promise>} + * @returns {Promise} */ async * fetchContentWithFallback (cidPath, opts = {}) { const upstreamController = opts.controller @@ -276,8 +278,8 @@ export class Saturn { } let byteCount = 0 const fetchOptions = Object.assign(opts, options) - const byteChunks = await this.fetchContent(cidPath, fetchOptions) - for await (const chunk of byteChunks) { + const response = await this.fetchContent(cidPath, fetchOptions) + for await (const chunk of response.body) { // avoid sending duplicate chunks if (byteCount < byteCountCheckpoint) { // checks for overlapping chunks @@ -366,9 +368,9 @@ export class Saturn { * * @param {string} cidPath * @param {FetchOptions} [opts={}] - * @returns {Promise>} + * @returns {Promise} */ - async * fetchContent (cidPath, opts = {}) { + async fetchContent (cidPath, opts = {}) { let res, controller, log opts = Object.assign({}, this.config, opts) @@ -386,21 +388,57 @@ export class Saturn { yield chunk } } + + const self = this + async function * withErrorHandling(itr) { + return async function * () { + try { + yield * itr + } catch (err) { + log.error = err.message + controller.abort() + throw err + } finally { + self._finalizeLog(log) + } + } + } try { const itr = metricsIterable(asAsyncIterable(res.body)) + const self = this if (!opts.format) { - yield * itr + const body = withErrorHandling(itr) + let totalSize = parseInt(res.headers.get('Content-Length')) + let range + if (res.headers.has('Content-Range')) { + const parsed = parseRange(res.headers.get('Content-Range')) + totalSize = parsed?.size || totalSize + range = { + rangeStart: parsed?.start, + rangeEnd: parsed?.end + } + } + return { + totalSize, range, body + } } else { - yield * extractVerifiedContent(cidPath, itr, opts.range || {}) + const node = await extractVerifiedEntity(cidPath, itr) + let range + if (opts.range) { + range = normalizeContentRange(node, opts.range || {}) + } + return { + totalSize: Number(node.size), + range, + body: withErrorHandling(extractVerifiedContent(node, opts.range || {})) + } } } catch (err) { log.error = err.message controller.abort() - - throw err - } finally { this._finalizeLog(log) + throw err } } @@ -411,7 +449,8 @@ export class Saturn { * @returns {Promise} */ async fetchContentBuffer (cidPath, opts = {}) { - return await asyncIteratorToBuffer(this.fetchContent(cidPath, opts)) + const response = await this.fetchContent(cidPath, opts) + return await asyncIteratorToBuffer(response.body) } /** @@ -646,4 +685,4 @@ export class Saturn { this.nodes = nodes this.storage.set(Saturn.nodesListKey, nodes) } -} +} \ No newline at end of file diff --git a/src/types.js b/src/types.js index a23e98f..a6f51cb 100644 --- a/src/types.js +++ b/src/types.js @@ -34,8 +34,16 @@ * Options for a range request * * @typedef {object} ContentRange - * @property {number} [rangeStart] - * @property {number} [rangeEnd] + * @property {number | null } [rangeStart] + * @property {number | null } [rangeEnd] + */ + +/** + * Response to fetchContent + * @typedef {object} Response + * @property {number} totalSize + * @property {ContentRange | undefined} range + * @property {AsyncIterable} body */ export {} diff --git a/src/utils/car.js b/src/utils/car.js index 60c8ff7..d9a3b77 100644 --- a/src/utils/car.js +++ b/src/utils/car.js @@ -85,49 +85,70 @@ export async function verifyBlock (cid, bytes) { } /** - * Verifies and extracts the raw content from a CAR stream. + * Verifies a car stream represents a valid IPLD entity and returns it * * @param {string} cidPath * @param {ReadableStream|AsyncIterable} carStream - * @param {import('../types.js').ContentRange} options */ -export async function * extractVerifiedContent (cidPath, carStream, options = {}) { +export async function extractVerifiedEntity(cidPath, carStream) { const getter = await CarBlockGetter.fromStream(carStream) - const node = await unixfs.exporter(cidPath, getter) + return await unixfs.exporter(cidPath, getter) +} - for await (const chunk of contentGenerator(node, options)) { +/** + * Extracts raw content of a verified IPLD entity + * + * @param {unixfs.UnixFSEntry} node + * @param {import('../types.js').ContentRange} options + */ +export async function * extractVerifiedContent(node, options = {}) { + const normalizedRange = normalizeContentRange(node, options) + const length = normalizedRange.rangeEnd + 1 - normalizedRange.rangeStart + for await (const chunk of node.content({ })) { yield chunk } } /** - * + * Returns the request content range normalized to the file size * @param {unixfs.UnixFSEntry} node * @param {import('../types.js').ContentRange} options + * @returns {import('../types.js').ContentRange} */ -function contentGenerator(node, options = {}) { - +export function normalizeContentRange(node, options = {}) { let rangeStart = options.rangeStart ?? 0 if (rangeStart < 0) { rangeStart = rangeStart + Number(node.size) if (rangeStart < 0) { rangeStart = 0 } - + } else if (rangeStart > 0) { + if (rangeStart >= Number(node.size)) { + throw new Error("range start greater than content length") + } } + if (options.rangeEnd === null || options.rangeEnd === undefined) { - return node.content({offset: rangeStart}) + return { + rangeStart, + rangeEnd: Number(node.size) - 1 + } } - let rangeEnd = options.rangeEnd + let rangeEnd = options.rangeEnd + if (rangeEnd < 0) { + rangeEnd = rangeEnd + Number(node.size) - 1 if (rangeEnd < 0) { - rangeEnd = rangeEnd + Number(node.size) - } else { - rangeEnd = rangeEnd+1 + throw new Error("range end is too small") } - const toRead = rangeEnd - rangeStart - if (toRead < 0) { - throw new Error("range end must be greater than range start") + } else { + if (rangeEnd >= Number(node.size)) { + rangeEnd = Number(node.size) - 1 } - return node.content({offset: rangeStart, length: toRead}) -} \ No newline at end of file + } + + if (rangeEnd - rangeStart < 0) { + throw new Error("range end must be greater than or equal to range start") + } + return { rangeStart, rangeEnd } +} diff --git a/test/car.spec.js b/test/car.spec.js index 36217c8..eb43b52 100644 --- a/test/car.spec.js +++ b/test/car.spec.js @@ -6,7 +6,7 @@ import { getFixturePath, concatChunks } from './test-utils.js' import { CarReader, CarWriter } from '@ipld/car' import { CID } from 'multiformats/cid' -import { extractVerifiedContent } from '#src/utils/car.js' +import { extractVerifiedContent, extractVerifiedEntity } from '../src/index.js' describe('CAR Verification', () => { it('should extract content from a valid CAR', async () => { @@ -15,7 +15,8 @@ describe('CAR Verification', () => { const filepath = getFixturePath('hello.car') const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node) const buffer = await concatChunks(contentItr) const actualContent = String.fromCharCode(...buffer) const expectedContent = 'hello world\n' @@ -29,7 +30,8 @@ describe('CAR Verification', () => { const filepath = getFixturePath('hello.car') const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream, {rangeStart: 1, rangeEnd: 3}) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node, { rangeStart: 1, rangeEnd: 3 }) const buffer = await concatChunks(contentItr) const actualContent = String.fromCharCode(...buffer) const expectedContent = 'ell' @@ -43,7 +45,8 @@ describe('CAR Verification', () => { const filepath = getFixturePath('hello.car') const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream, {rangeStart: 1}) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node, { rangeStart: 1 }) const buffer = await concatChunks(contentItr) const actualContent = String.fromCharCode(...buffer) const expectedContent = 'ello world\n' @@ -57,7 +60,8 @@ describe('CAR Verification', () => { const filepath = getFixturePath('hello.car') const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream, {rangeStart: 1, rangeEnd: -1}) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node, { rangeStart: 1, rangeEnd: -1 }) const buffer = await concatChunks(contentItr) const actualContent = String.fromCharCode(...buffer) const expectedContent = 'ello world' @@ -71,7 +75,8 @@ describe('CAR Verification', () => { const filepath = getFixturePath('hello.car') const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream, {rangeStart: -5, rangeEnd: -1}) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node, {rangeStart: -5, rangeEnd: -1}) const buffer = await concatChunks(contentItr) const actualContent = String.fromCharCode(...buffer) const expectedContent = 'orld' @@ -83,8 +88,8 @@ describe('CAR Verification', () => { const cidPath = 'QmStvUMCtXxEb8wRjNSUqWwqHBEDhmnEd5nHp5siV7bm1Z' const filepath = getFixturePath('multi_block_filtered.car') const carStream = fs.createReadStream(filepath) - - const contentItr = await extractVerifiedContent(cidPath, carStream, { rangeStart: 300, rangeEnd: 349 }) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node, { rangeStart: 300, rangeEnd: 349 }) const buffer = await concatChunks(contentItr) const actualContent = Buffer.from(buffer).toString('base64') @@ -102,7 +107,8 @@ describe('CAR Verification', () => { const filepath = getFixturePath('subdir.car') const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node) const buffer = await concatChunks(contentItr) const actualContent = String.fromCharCode(...buffer) const expectedContent = 'hello world\n' @@ -116,7 +122,8 @@ describe('CAR Verification', () => { const filepath = getFixturePath('dag-cbor-with-identity.car') const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node) const itr = contentItr[Symbol.asyncIterator]() const actualContent = (await itr.next()).value const expectedContent = { asdf: 324 } @@ -130,7 +137,8 @@ describe('CAR Verification', () => { const filepath = getFixturePath('dag-cbor-traversal.car') const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node) const itr = contentItr[Symbol.asyncIterator]() const actualContent = (await itr.next()).value const expectedContent = { hello: 'this is not a link' } @@ -144,7 +152,8 @@ describe('CAR Verification', () => { const filepath = './fixtures/dag-json-traversal.car' const carStream = fs.createReadStream(filepath) - const contentItr = await extractVerifiedContent(cidPath, carStream) + const node = await extractVerifiedEntity(cidPath, carStream) + const contentItr = extractVerifiedContent(node) const itr = contentItr[Symbol.asyncIterator]() const actualContent = (await itr.next()).value const expectedContent = { hello: 'this is not a link' } @@ -169,7 +178,8 @@ describe('CAR Verification', () => { await assert.rejects( async () => { - for await (const _ of extractVerifiedContent(cidPath, out)) {} + const node = await extractVerifiedEntity(cidPath, out) + for await (const _ of extractVerifiedContent(node)) {} }, { name: 'VerificationError', @@ -207,7 +217,8 @@ describe('CAR Verification', () => { await assert.rejects( async () => { - for await (const _ of extractVerifiedContent(cidPath, out)) {} + const node = await extractVerifiedEntity(cidPath, out) + for await (const _ of extractVerifiedContent(node)) {} }, { name: 'VerificationError', diff --git a/test/fallback.spec.js b/test/fallback.spec.js index be4236c..4ab515d 100644 --- a/test/fallback.spec.js +++ b/test/fallback.spec.js @@ -267,9 +267,14 @@ describe('Client Fallback', () => { server.listen(MSW_SERVER_OPTS) const saturn = new Saturn({ ...options }) - const fetchContentMock = mock.fn(async function * (cidPath, opts) { - yield Buffer.from('chunk1') - yield Buffer.from('chunk2') + const fetchContentMock = mock.fn(async function (cidPath, opts) { + const body = async function * () { + yield Buffer.from('chunk1') + yield Buffer.from('chunk2') + } + return { + body: body() + } }) saturn.fetchContent = fetchContentMock const content = await saturn.fetchContentWithFallback('some-cid-path') @@ -295,7 +300,7 @@ describe('Client Fallback', () => { server.listen(MSW_SERVER_OPTS) const saturn = new Saturn({ ...options }) - const fetchContentMock = mock.fn(async function * (cidPath, opts) { throw new Error('Fetch error') }) // eslint-disable-line + const fetchContentMock = mock.fn(async function (cidPath, opts) { throw new Error('Fetch error') }) // eslint-disable-line saturn.fetchContent = fetchContentMock let error @@ -394,10 +399,13 @@ describe('Client Fallback', () => { await saturn.loadNodesPromise let callCount = 0 - const fetchContentMock = mock.fn(async function * (cidPath, opts) { + const fetchContentMock = mock.fn(async function (cidPath, opts) { callCount++ - yield '' - throw new Error('file does not exist') + const body = async function * () { + yield '' + throw new Error('file does not exist') + } + return { body: body() } }) saturn.fetchContent = fetchContentMock @@ -429,15 +437,19 @@ describe('Client Fallback', () => { const saturn = new Saturn({ ...options }) let callCount = 0 - const fetchContentMock = mock.fn(async function * (cidPath, opts) { + const fetchContentMock = mock.fn(async function (cidPath, opts) { callCount++ + let body if (callCount === 1) { throw new Error('First call error') } if (callCount === 2) { - yield Buffer.from('chunk1-overlap') - yield Buffer.from('chunk2') + body = async function * () { + yield Buffer.from('chuxxnk1-overlap') + yield Buffer.from('chunk2') + } } + return { body: body() } }) saturn.fetchContent = fetchContentMock @@ -468,16 +480,22 @@ describe('Client Fallback', () => { const saturn = new Saturn({ ...options }) let callCount = 0 - let fetchContentMock = mock.fn(async function * (cidPath, opts) { + let fetchContentMock = mock.fn(async function (cidPath, opts) { callCount++ + let body if (callCount === 1) { - yield Buffer.from('chunk1-overlap') - throw new Error('First call error') + body = async function * () { + yield Buffer.from('chunk1-overlap') + throw new Error('First call error') + } } if (callCount === 2) { - yield Buffer.from('chunk1-overlap') - yield Buffer.from('chunk2') + body = async function * () { + yield Buffer.from('chunk1-overlap') + yield Buffer.from('chunk2') + } } + return { body: body() } }) saturn.fetchContent = fetchContentMock @@ -492,21 +510,29 @@ describe('Client Fallback', () => { assert.strictEqual(fetchContentMock.mock.calls.length, 2) callCount = 0 - fetchContentMock = mock.fn(async function * (cidPath, opts) { + fetchContentMock = mock.fn(async function (cidPath, opts) { callCount++ + let body if (callCount === 1) { - yield Buffer.from('chunk1-') - throw new Error('First call error') + body = async function * () { + yield Buffer.from('chunk1-') + throw new Error('First call error') + } } if (callCount === 2) { - yield Buffer.from('chunk1') - yield Buffer.from('-overlap') - throw new Error('Second call error') + body = async function * () { + yield Buffer.from('chunk1') + yield Buffer.from('-overlap') + throw new Error('Second call error') + } } if (callCount === 3) { - yield Buffer.from('chunk1-overlap') - yield Buffer.from('chunk2') + body = async function * () { + yield Buffer.from('chunk1-overlap') + yield Buffer.from('chunk2') + } } + return { body: body() } }) saturn.fetchContent = fetchContentMock diff --git a/test/index.spec.js b/test/index.spec.js index 1f4b808..2e56311 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -91,7 +91,8 @@ describe('Saturn client', () => { it('should create a log on fetch success', async () => { client.reportingLogs = true - for await (const _ of client.fetchContent(HELLO_CID)) {} // eslint-disable-line + const response = await client.fetchContent(HELLO_CID) + for await (const _ of response.body) {} // eslint-disable-line const log = client.logs.pop()