diff --git a/examples/mixture-of-agents/index.html b/examples/mixture-of-agents/index.html index a35d275..e971b73 100644 --- a/examples/mixture-of-agents/index.html +++ b/examples/mixture-of-agents/index.html @@ -165,7 +165,7 @@ "GPT-4o Mini", "Llama 3.1 8B", "Mixtral 8x7B", - ] + ]; const individualResults = "{{ individual }}"; const aggResults = "{{ summaries }}"; @@ -190,7 +190,7 @@ contentArea.textContent = individualResults[currentLayer][currentIndex].trim(); - cardTitle.textContent = `${modelNames[currentIndex]} - Layer ${ currentLayer + 1 }`; + cardTitle.textContent = `${modelNames[currentIndex]} - Layer ${currentLayer + 1}`; } else { contentArea.textContent = aggResults[currentLayer].trim(); cardTitle.textContent = `MoA Layer ${currentLayer + 1}`; diff --git a/examples/module.ts b/examples/module.ts new file mode 100755 index 0000000..8411089 --- /dev/null +++ b/examples/module.ts @@ -0,0 +1,62 @@ +#!/usr/bin/env -S npx ts-node --transpileOnly + +import { Substrate, Box, Module, sb } from "substrate"; + +async function main() { + const SUBSTRATE_API_KEY = process.env["SUBSTRATE_API_KEY"]; + const substrate = new Substrate({ apiKey: SUBSTRATE_API_KEY }); + + const x = sb.var({ type: "string", default: "hello" }); + const y = sb.var({ type: "string" }); + const z = sb.var({ type: "object", properties: {} }); + + const a = new Box({ value: { a: x, z: z, array: [x, x, x] } }, { id: "A" }); + const b = new Box( + { value: { b: sb.interpolate`x=${a.future.value.get("a")}, y=${y}` } }, + { id: "B" }, + ); + + // publish the module on substrate.run + const publication = await substrate.module.publish({ + name: "my reusable graph", + nodes: [a, b], + inputs: { x, y, z }, + }); + console.log("published:", publication.json); + + // using the module from JSON + const mod = new Module({ + module_json: substrate.module.serialize({ + nodes: [a, b], + inputs: { x, y, z }, + }), + inputs: { + // when commented will use "hello" because it is defined as the default above + // x: 123, + y: "yyy", + z: { + arr: ["123"], + }, + }, + }); + + // using the module from publication/module id + // const mod = new Module({ + // module_id: publication.id, + // inputs: { y: "yyy", z: { arr: ["123"] } }, + // }); + + const c = new Box( + { + value: { + "1": mod.future.get("A.value.z.arr[0]"), + "2": mod.future.get("B.value.b"), + }, + }, + { id: "C" }, + ); + + const res = await substrate.run(mod, c); + console.log(JSON.stringify(res.json, null, 2)); +} +main(); diff --git a/package-lock.json b/package-lock.json index c11e188..f8879ac 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "120240617.1.7", "license": "MIT", "dependencies": { + "@types/json-schema": "^7.0.15", "@types/node-fetch": "^2.6.11", "node-fetch": "2.7.0", "pako": "^2.1.0" @@ -798,6 +799,11 @@ "integrity": "sha512-/kYRxGDLWzHOB7q+wtSUQlFrtcdUccpfy+X+9iMBpHK8QLLhx2wIPYuS5DYtR9Wa/YlZAbIovy7qVdB1Aq6Lyw==", "dev": true }, + "node_modules/@types/json-schema": { + "version": "7.0.15", + "resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.15.tgz", + "integrity": "sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA==" + }, "node_modules/@types/node": { "version": "20.14.11", "resolved": "https://registry.npmjs.org/@types/node/-/node-20.14.11.tgz", diff --git a/package.json b/package.json index f778af3..234dbe5 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "vitest": "^1.0.4" }, "dependencies": { + "@types/json-schema": "^7.0.15", "@types/node-fetch": "^2.6.11", "node-fetch": "2.7.0", "pako": "^2.1.0" diff --git a/src/Future.ts b/src/Future.ts index 75559a5..601f81c 100644 --- a/src/Future.ts +++ b/src/Future.ts @@ -1,5 +1,6 @@ import { idGenerator } from "substrate/idGenerator"; import { Node } from "substrate/Node"; +import { type JSONSchema7 } from "json-schema"; type Accessor = "item" | "attr"; type TraceOperation = { @@ -122,7 +123,7 @@ export class JQ extends Directive { rawValue: (val: JQCompatible) => ({ future_id: null, val }), }; - override next(...items: TraceProp[]) { + override next(..._items: TraceProp[]) { return new JQ(this.query, this.target); } @@ -315,3 +316,39 @@ export class FutureAnyObject extends Future { return super._result(); } } + +export class Variable extends Directive { + items: any[]; // NOTE: unused field (will remove this from direcitve in a later refactor) + name: string | null; + + constructor(items: any[]) { + super(); + this.items = items; + } + + override next(...args: any[]) { + return new Variable(args); + } + + override async result(): Promise { + return; + } + + override toJSON() { + return { + type: "variable", + source: "input", + name: this.name, + }; + } +} + +export class FutureVariable extends Future { + declare _directive: Variable; + schema: JSONSchema7; + + constructor(schema?: JSONSchema7, id: string = newFutureId()) { + super(new Variable([]), id); + this.schema = schema ?? {}; + } +} diff --git a/src/Module.ts b/src/Module.ts new file mode 100644 index 0000000..9950e50 --- /dev/null +++ b/src/Module.ts @@ -0,0 +1,36 @@ +import { Node, Options } from "substrate/Node"; +import { FutureVariable } from "substrate/Future"; + +type ModuleId = `mod_${string}`; + +export type SerializableModule = { + nodes: Node[]; + inputs: ModuleInputs; +}; + +export type NewModule = SerializableModule & { name: string }; +export type UpdateModule = NewModule & { id: ModuleId }; +export type PublishableModule = NewModule; // | UpdateModule; (TODO: implement update module) +export type PublishedModule = { + id: ModuleId; + uri: string; +}; + +export type ModuleInputs = Record; + +type ModuleIn = + | { + module_json: any; + inputs: Record; + } + | { + module_id: ModuleId; + inputs: Record; + }; + +export class Module extends Node { + constructor(args: ModuleIn, options?: Options) { + super(args, options); + this.node = "Module"; + } +} diff --git a/src/Substrate.ts b/src/Substrate.ts index e123c9f..6523dd6 100644 --- a/src/Substrate.ts +++ b/src/Substrate.ts @@ -1,13 +1,17 @@ import { SubstrateError, RequestTimeoutError } from "substrate/Error"; import { VERSION } from "substrate/version"; import OpenAPIjson from "substrate/openapi.json"; -import { SubstrateResponse } from "substrate/SubstrateResponse"; +import { + asSubstratePublishedModuleResponse, + SubstrateResponse, +} from "substrate/SubstrateResponse"; import { SubstrateStreamingResponse } from "substrate/SubstrateStreamingResponse"; import { Node } from "substrate/Node"; import { Future } from "substrate/Future"; import { getPlatformProperties } from "substrate/Platform"; import { deflate } from "pako"; import { randomString } from "substrate/idGenerator"; +import { SerializableModule, PublishableModule } from "substrate/Module"; type Configuration = { /** @@ -291,4 +295,83 @@ export class Substrate { return headers; } + + module = { + /** + * Returns an object that represents a publishable "module" or code that can be used to construct + * a `Module` node. + */ + serialize: ({ nodes, inputs }: SerializableModule) => { + const inputIdToName = {}; + const inputNameToSchema = {}; + + for (let name in inputs) { + let input = inputs[name]; + // @ts-ignore + inputIdToName[input._id] = name; + // @ts-ignore + inputNameToSchema[name] = input.schema; + } + + const dag = Substrate.serialize(...nodes); + + // update variable name bindings in dag using inputs + dag.futures = dag.futures.map((future: any) => { + if (future.directive.type === "variable" && !future.directive.name) { + // @ts-ignore + future.directive.name = inputIdToName[future.id]; + } + return future; + }); + + return { + dag, + inputs: inputNameToSchema, + api_version: this.apiVersion, + }; + }, + + /** + * Publishes a module on substrate.run + * + */ + publish: async ( + publishable: PublishableModule, + endpoint: string = "https://www.substrate.run/api/modules", + ) => { + /** + * NOTE: Because the Module publishing API lives in another app and subdomain, the `baseUrl` configuration + * will not be applied to this request like we do with `.run` + */ + const serialized = this.module.serialize({ + nodes: publishable.nodes, + inputs: publishable.inputs, + }); + + const body = { + module: { name: publishable.name }, + module_version: serialized, + }; + + const requestOptions = { + method: "POST", + headers: this.headers(), + body: JSON.stringify(body), + }; + + const request = new Request(endpoint, requestOptions); + const requestId = request.headers.get("x-substrate-request-id"); + const apiResponse = await fetch(request); + + if (apiResponse.ok) { + const json = await apiResponse.json(); + const res = new SubstrateResponse(request, apiResponse, json); + return asSubstratePublishedModuleResponse(res); + } else { + throw new SubstrateError( + `[Request failed] status=${apiResponse.status} statusText=${apiResponse.statusText} requestId=${requestId}`, + ); + } + }, + }; } diff --git a/src/SubstrateResponse.ts b/src/SubstrateResponse.ts index 65d00cf..61f67e6 100644 --- a/src/SubstrateResponse.ts +++ b/src/SubstrateResponse.ts @@ -1,5 +1,6 @@ import { AnyNode, NodeOutput } from "substrate/Nodes"; import { NodeError } from "substrate/Error"; +import { PublishedModule } from "substrate/Module"; /** * Response to a run request. @@ -39,3 +40,19 @@ export class SubstrateResponse { return node.output() as NodeOutput; } } + +// TODO: create an alternate SubstrateResponse for non-compose responses +// For now using a type assertions and modifying the object. +export type SubstratePublishModuleResponse = Omit< + SubstrateResponse, + "get" | "getError" +> & { json: PublishedModule }; +export const asSubstratePublishedModuleResponse = ( + res: SubstrateResponse, +): SubstratePublishModuleResponse => { + // @ts-ignore + delete res.get; + // @ts-ignore + delete res.getError; + return res as SubstratePublishModuleResponse; +}; diff --git a/src/index.ts b/src/index.ts index bf57663..2120dd7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -51,6 +51,8 @@ export { DeleteVectors, } from "substrate/Nodes"; +export { Module } from "substrate/Module"; + export { sb } from "substrate/sb"; export { Substrate }; import { Substrate } from "substrate/Substrate"; diff --git a/src/sb.ts b/src/sb.ts index f94848c..9e41861 100644 --- a/src/sb.ts +++ b/src/sb.ts @@ -1,10 +1,27 @@ -import { FutureAnyObject, FutureString } from "substrate/Future"; +import { + FutureAnyObject, + FutureVariable, + FutureString, +} from "substrate/Future"; import { StreamingResponse } from "substrate/SubstrateStreamingResponse"; export const sb = { concat: FutureString.concat, jq: FutureAnyObject.jq, interpolate: FutureString.interpolate, + /** + * `var` is used to specify a variable that can be bound to a name when creating a `module` (re-usable Substrate Graph) + * * Input types and validation paramters may optionally be described using a JSON Schema object. + * * Default values may also be specified here and will be used if user input is not provided for this input. + */ + var: (schema: FutureVariable["schema"]) => { + // NOTE: using `any` as the return type here for now to ease using + // this in general node input args or helper functions. + // + // Once we ship our Future type reorganization work, we can just + // use this as-is (Future) + return new FutureVariable(schema) as any; + }, streaming: { fromSSEResponse: StreamingResponse.fromReponse, },