Skip to content

Split grpc connection out into node and web impls #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,326 changes: 3,512 additions & 1,814 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"nice-grpc": "^2.1.11",
"nice-grpc-client-middleware-retry": "^3.1.10",
"nice-grpc-common": "^2.0.2",
"nice-grpc-web": "^3.3.7",
"uuid": "^9.0.1"
},
"devDependencies": {
Expand Down Expand Up @@ -93,7 +94,7 @@
"ts-node": "^10.9.2",
"ts-proto": "^1.163.0",
"tsup": "^8.0.2",
"typedoc": "^0.25.12",
"typedoc": "^0.26.0",
"typedoc-plugin-extras": "^3.0.0",
"typescript": "^5.3.3"
},
Expand Down
4 changes: 2 additions & 2 deletions src/collections/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ export type {
IsPrimitiveField,
IsWeaviateField,
NestedKeys,
NonRefKeys,
NonReferenceInputs,
NonRefKeys,
PrimitiveKeys,
QueryNested,
QueryProperty,
QueryReference,
RefKeys,
ReferenceInput,
ReferenceInputs,
RefKeys,
} from './internal.js';
export * from './query.js';

Expand Down
2 changes: 1 addition & 1 deletion src/connection/grpc.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { isAbortError } from 'abort-controller-x';

import ConnectionGQL from './gql.js';
import { InternalConnectionParams } from './http.js';
import { InternalConnectionParams } from './transports/http.js';

import { ConsistencyLevel } from '../data/index.js';

Expand Down
281 changes: 3 additions & 278 deletions src/connection/http.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,8 @@
import { isAbortError } from 'abort-controller-x';
import { Agent } from 'http';

import OpenidConfigurationGetter from '../misc/openidConfigurationGetter.js';

import {
WeaviateInsufficientPermissionsError,
WeaviateInvalidInputError,
WeaviateRequestTimeoutError,
WeaviateUnauthenticatedError,
WeaviateUnexpectedStatusCodeError,
} from '../errors.js';
import {
ApiKey,
AuthAccessTokenCredentials,
AuthClientCredentials,
AuthUserPasswordCredentials,
OidcAuthenticator,
} from './auth.js';

/**
* You can only specify the gRPC proxy URL at this point in time. This is because ProxiesParams should be used to define tunnelling proxies
* and Weaviate does not support tunnelling proxies over HTTP/1.1 at this time.
*
* To use a forwarding proxy you should instead specify its URL as if it were the Weaviate instance itself.
*/
export type ProxiesParams = {
// http?: string;
// https?: string;
grpc?: string;
};

export type TimeoutParams = {
/** Define the configured timeout when querying data from Weaviate */
query?: number;
/** Define the configured timeout when mutating data to Weaviate */
insert?: number;
/** Define the configured timeout when initially connecting to Weaviate */
init?: number;
};

export type InternalConnectionParams = {
authClientSecret?: AuthClientCredentials | AuthAccessTokenCredentials | AuthUserPasswordCredentials;
apiKey?: ApiKey;
host: string;
scheme?: string;
headers?: HeadersInit;
// http1Agent?: Agent;
grpcProxyUrl?: string;
agent?: Agent;
timeout?: TimeoutParams;
skipInitChecks?: boolean;
};
import { WeaviateInvalidInputError } from '../errors.js';
import { OidcAuthenticator } from './auth.js';
import { HttpClient, InternalConnectionParams, httpClient } from './transports/http.js';

export interface ConnectionDetails {
host: string;
Expand Down Expand Up @@ -200,230 +152,3 @@ export default class ConnectionREST {
}

export * from './auth.js';

export interface HttpClient {
close: () => void;
patch: (path: string, payload: any, bearerToken?: string) => any;
head: (path: string, payload: any, bearerToken?: string) => any;
post: <B, T>(
path: string,
payload: B,
expectReturnContent: boolean,
bearerToken: string
) => Promise<T | undefined>;
get: <T>(path: string, expectReturnContent?: boolean, bearerToken?: string) => Promise<T>;
externalPost: (externalUrl: string, body: any, contentType: any) => any;
getRaw: (path: string, bearerToken?: string) => any;
delete: (path: string, payload: any, expectReturnContent?: boolean, bearerToken?: string) => any;
put: (path: string, payload: any, expectReturnContent?: boolean, bearerToken?: string) => any;
externalGet: (externalUrl: string) => Promise<any>;
}

const fetchWithTimeout = (
input: RequestInfo | URL,
timeout: number,
init?: RequestInit | undefined
): Promise<Response> => {
const controller = new AbortController();
// Set a timeout to abort the request
const timeoutId = setTimeout(() => controller.abort(), timeout * 1000);
return fetch(input, { ...init, signal: controller.signal })
.catch((error) => {
if (isAbortError(error)) {
throw new WeaviateRequestTimeoutError(`Request timed out after ${timeout}ms`);
}
throw error; // For other errors, rethrow them
})
.finally(() => clearTimeout(timeoutId));
};

export const httpClient = (config: InternalConnectionParams): HttpClient => {
const version = '/v1';
const baseUri = `${config.host}${version}`;
const url = makeUrl(baseUri);

return {
close: () => config.agent?.destroy(),
post: <B, T>(
path: string,
payload: B,
expectReturnContent: boolean,
bearerToken: string
): Promise<T | undefined> => {
const request = {
method: 'POST',
headers: {
...config.headers,
'content-type': 'application/json',
...getAuthHeaders(config, bearerToken),
},
body: JSON.stringify(payload),
agent: config.agent,
};
return fetchWithTimeout(url(path), config.timeout?.insert || 90, request).then(
checkStatus<T>(expectReturnContent)
);
},
put: <B, T>(
path: string,
payload: B,
expectReturnContent = true,
bearerToken = ''
): Promise<T | undefined> => {
const request = {
method: 'PUT',
headers: {
...config.headers,
'content-type': 'application/json',
...getAuthHeaders(config, bearerToken),
},
body: JSON.stringify(payload),
agent: config.agent,
};
return fetchWithTimeout(url(path), config.timeout?.insert || 90, request).then(
checkStatus<T>(expectReturnContent)
);
},
patch: <B, T>(path: string, payload: B, bearerToken = ''): Promise<T | undefined> => {
const request = {
method: 'PATCH',
headers: {
...config.headers,
'content-type': 'application/json',
...getAuthHeaders(config, bearerToken),
},
body: JSON.stringify(payload),
agent: config.agent,
};
return fetchWithTimeout(url(path), config.timeout?.insert || 90, request).then(checkStatus<T>(false));
},
delete: <B>(path: string, payload: B | null = null, expectReturnContent = false, bearerToken = '') => {
const request = {
method: 'DELETE',
headers: {
...config.headers,
'content-type': 'application/json',
...getAuthHeaders(config, bearerToken),
},
body: payload ? JSON.stringify(payload) : undefined,
agent: config.agent,
};
return fetchWithTimeout(url(path), config.timeout?.insert || 90, request).then(
checkStatus<undefined>(expectReturnContent)
);
},
head: <B>(path: string, payload: B | null = null, bearerToken = '') => {
const request = {
method: 'HEAD',
headers: {
...config.headers,
'content-type': 'application/json',
...getAuthHeaders(config, bearerToken),
},
body: payload ? JSON.stringify(payload) : undefined,
agent: config.agent,
};
return fetchWithTimeout(url(path), config.timeout?.query || 30, request).then(
handleHeadResponse<undefined>(false)
);
},
get: <T>(path: string, expectReturnContent = true, bearerToken = ''): Promise<T> => {
const request = {
method: 'GET',
headers: {
...config.headers,
...getAuthHeaders(config, bearerToken),
},
agent: config.agent,
};
return fetchWithTimeout(url(path), config.timeout?.query || 30, request).then(
checkStatus<any>(expectReturnContent)
);
},
getRaw: (path: string, bearerToken = '') => {
// getRaw does not handle the status leaving this to the caller
const request = {
method: 'GET',
headers: {
...config.headers,
...getAuthHeaders(config, bearerToken),
},
agent: config.agent,
};
return fetchWithTimeout(url(path), config.timeout?.query || 30, request);
},
externalGet: (externalUrl: string) => {
return fetch(externalUrl, {
method: 'GET',
headers: {
...config.headers,
},
}).then(checkStatus<any>(true));
},
externalPost: (externalUrl: string, body: any, contentType: any) => {
if (contentType == undefined || contentType == '') {
contentType = 'application/json';
}
const request = {
body: undefined,
method: 'POST',
headers: {
...config.headers,
'content-type': contentType,
},
};
if (body != null) {
request.body = body;
}
return fetch(externalUrl, request).then(checkStatus<any>(true));
},
};
};

const makeUrl = (basePath: string) => (path: string) => basePath + path;

const checkStatus =
<T>(expectResponseBody: boolean) =>
(res: Response) => {
if (res.status >= 400) {
return res.text().then((errText: string) => {
let err: string;
try {
// in case of invalid json response (like empty string)
err = JSON.stringify(JSON.parse(errText));
} catch (e) {
err = errText;
}
if (res.status === 401) {
return Promise.reject(new WeaviateUnauthenticatedError(err));
} else if (res.status === 403) {
return Promise.reject(new WeaviateInsufficientPermissionsError(403, err));
} else {
return Promise.reject(new WeaviateUnexpectedStatusCodeError(res.status, err));
}
});
}
if (expectResponseBody) {
return res.json() as Promise<T>;
}
return Promise.resolve(undefined);
};

const handleHeadResponse =
<T>(expectResponseBody: boolean) =>
(res: Response) => {
if (res.status == 200 || res.status == 204 || res.status == 404) {
return Promise.resolve(res.status == 200 || res.status == 204);
}
return checkStatus<T>(expectResponseBody)(res);
};

const getAuthHeaders = (config: InternalConnectionParams, bearerToken: string) =>
bearerToken
? {
Authorization: `Bearer ${bearerToken}`,
'X-Weaviate-Cluster-Url': config.host,
// keeping for backwards compatibility for older clusters for now. On newer clusters, Embedding Service reuses Authorization header.
'X-Weaviate-Api-Key': bearerToken,
}
: undefined;
Loading
Loading