-
Notifications
You must be signed in to change notification settings - Fork 340
remove async storage from graphql #5966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
62677f6
287508b
9c1ed62
adc6040
51a2c6d
c17708e
5b86f73
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,7 @@ | |
|
||
const { | ||
addHook, | ||
channel, | ||
AsyncResource | ||
channel | ||
} = require('./helpers/instrument') | ||
const shimmer = require('../../datadog-shimmer') | ||
|
||
|
@@ -89,10 +88,8 @@ function wrapParse (parse) { | |
return parse.apply(this, arguments) | ||
} | ||
|
||
const asyncResource = new AsyncResource('bound-anonymous-fn') | ||
|
||
return asyncResource.runInAsyncScope(() => { | ||
parseStartCh.publish() | ||
const ctx = {} | ||
return parseStartCh.runStores(ctx, () => { | ||
let document | ||
try { | ||
document = parse.apply(this, arguments) | ||
|
@@ -107,11 +104,12 @@ function wrapParse (parse) { | |
return document | ||
} catch (err) { | ||
err.stack | ||
parseErrorCh.publish(err) | ||
ctx.error = err | ||
parseErrorCh.publish(ctx) | ||
|
||
throw err | ||
} finally { | ||
parseFinishCh.publish({ source, document, docSource: documentSources.get(document) }) | ||
parseFinishCh.publish({ source, document, docSource: documentSources.get(document), ...ctx }) | ||
} | ||
}) | ||
} | ||
|
@@ -123,25 +121,24 @@ function wrapValidate (validate) { | |
return validate.apply(this, arguments) | ||
} | ||
|
||
const asyncResource = new AsyncResource('bound-anonymous-fn') | ||
|
||
return asyncResource.runInAsyncScope(() => { | ||
validateStartCh.publish({ docSource: documentSources.get(document), document }) | ||
|
||
const ctx = { docSource: documentSources.get(document), document } | ||
return validateStartCh.runStores(ctx, () => { | ||
let errors | ||
try { | ||
errors = validate.apply(this, arguments) | ||
if (errors && errors[0]) { | ||
validateErrorCh.publish(errors && errors[0]) | ||
ctx.error = errors && errors[0] | ||
validateErrorCh.publish(ctx) | ||
} | ||
return errors | ||
} catch (err) { | ||
err.stack | ||
validateErrorCh.publish(err) | ||
ctx.error = err | ||
validateErrorCh.publish(ctx) | ||
|
||
throw err | ||
} finally { | ||
validateFinishCh.publish({ document, errors }) | ||
validateFinishCh.publish({ errors, ...ctx }) | ||
} | ||
}) | ||
} | ||
|
@@ -155,15 +152,15 @@ function wrapExecute (execute) { | |
return exe.apply(this, arguments) | ||
} | ||
|
||
const asyncResource = new AsyncResource('bound-anonymous-fn') | ||
return asyncResource.runInAsyncScope(() => { | ||
const args = normalizeArgs(arguments, defaultFieldResolver) | ||
const schema = args.schema | ||
const document = args.document | ||
const source = documentSources.get(document) | ||
const contextValue = args.contextValue | ||
const operation = getOperation(document, args.operationName) | ||
const args = normalizeArgs(arguments, defaultFieldResolver) | ||
const schema = args.schema | ||
const document = args.document | ||
const source = documentSources.get(document) | ||
const contextValue = args.contextValue | ||
const operation = getOperation(document, args.operationName) | ||
|
||
const ctx = { operation, args, docSource: documentSources.get(document) } | ||
return startExecuteCh.runStores(ctx, () => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous code would skip this channel if |
||
if (contexts.has(contextValue)) { | ||
return exe.apply(this, arguments) | ||
} | ||
|
@@ -173,26 +170,22 @@ function wrapExecute (execute) { | |
wrapFields(schema._mutationType) | ||
} | ||
|
||
startExecuteCh.publish({ | ||
operation, | ||
args, | ||
docSource: documentSources.get(document) | ||
}) | ||
|
||
const context = { source, asyncResource, fields: {}, abortController: new AbortController() } | ||
const context = { source, fields: {}, abortController: new AbortController(), ...ctx } | ||
|
||
contexts.set(contextValue, context) | ||
|
||
return callInAsyncScope(exe, asyncResource, this, arguments, context.abortController, (err, res) => { | ||
return callInAsyncScope(exe, this, arguments, context.abortController, (err, res) => { | ||
if (finishResolveCh.hasSubscribers) finishResolvers(context) | ||
|
||
const error = err || (res && res.errors && res.errors[0]) | ||
|
||
if (error) { | ||
executeErrorCh.publish(error) | ||
ctx.error = error | ||
executeErrorCh.publish(ctx) | ||
} | ||
|
||
finishExecuteCh.publish({ res, args, context }) | ||
ctx.res = res | ||
finishExecuteCh.publish(ctx) | ||
}) | ||
}) | ||
} | ||
|
@@ -211,8 +204,8 @@ function wrapResolve (resolve) { | |
|
||
const field = assertField(context, info, args) | ||
|
||
return callInAsyncScope(resolve, field.asyncResource, this, arguments, context.abortController, (err) => { | ||
updateFieldCh.publish({ field, info, err }) | ||
return callInAsyncScope(resolve, this, arguments, context.abortController, (err) => { | ||
updateFieldCh.publish({ field, info, err, ...field.ctx }) | ||
wconti27 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}) | ||
} | ||
|
||
|
@@ -221,32 +214,30 @@ function wrapResolve (resolve) { | |
return resolveAsync | ||
} | ||
|
||
function callInAsyncScope (fn, aR, thisArg, args, abortController, cb) { | ||
function callInAsyncScope (fn, thisArg, args, abortController, cb) { | ||
cb = cb || (() => {}) | ||
|
||
return aR.runInAsyncScope(() => { | ||
if (abortController?.signal.aborted) { | ||
cb(null, null) | ||
throw new AbortError('Aborted') | ||
} | ||
if (abortController?.signal.aborted) { | ||
cb(null, null) | ||
throw new AbortError('Aborted') | ||
} | ||
|
||
try { | ||
const result = fn.apply(thisArg, args) | ||
if (result && typeof result.then === 'function') { | ||
// bind callback to this scope | ||
result.then( | ||
aR.bind(res => cb(null, res)), | ||
aR.bind(err => cb(err)) | ||
) | ||
} else { | ||
cb(null, result) | ||
} | ||
return result | ||
} catch (err) { | ||
cb(err) | ||
throw err | ||
try { | ||
const result = fn.apply(thisArg, args) | ||
if (result && typeof result.then === 'function') { | ||
// bind callback to this scope | ||
result.then( | ||
res => cb(null, res), | ||
err => cb(err) | ||
) | ||
} else { | ||
cb(null, result) | ||
} | ||
}) | ||
return result | ||
} catch (err) { | ||
cb(err) | ||
throw err | ||
} | ||
} | ||
|
||
function pathToArray (path) { | ||
|
@@ -271,27 +262,15 @@ function assertField (context, info, args) { | |
|
||
if (!field) { | ||
const parent = getParentField(context, path) | ||
|
||
// we want to spawn the new span off of the parent, not a new async resource | ||
parent.asyncResource.runInAsyncScope(() => { | ||
/* this child resource will run a branched scope off of the parent resource, which | ||
accesses the parent span from the storage unit in its own scope */ | ||
const childResource = new AsyncResource('bound-anonymous-fn') | ||
|
||
childResource.runInAsyncScope(() => { | ||
startResolveCh.publish({ | ||
info, | ||
context, | ||
args | ||
}) | ||
}) | ||
|
||
field = fields[pathString] = { | ||
parent, | ||
asyncResource: childResource, | ||
error: null | ||
} | ||
}) | ||
// we need to pass the parent span to the field if it exists for correct span parenting | ||
rochdev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// of nested fields | ||
const ctx = { info, context, args, childOf: parent?.ctx?.currentStore?.span } | ||
startResolveCh.publish(ctx) | ||
field = fields[pathString] = { | ||
parent, | ||
error: null, | ||
ctx | ||
} | ||
} | ||
|
||
return field | ||
|
@@ -349,20 +328,16 @@ function wrapFieldType (field) { | |
function finishResolvers ({ fields }) { | ||
Object.keys(fields).reverse().forEach(key => { | ||
const field = fields[key] | ||
const asyncResource = field.asyncResource | ||
asyncResource.runInAsyncScope(() => { | ||
if (field.error) { | ||
resolveErrorCh.publish(field.error) | ||
} | ||
finishResolveCh.publish(field.finishTime) | ||
}) | ||
const ctx = { field, finishTime: field.finishTime, ...field.ctx } | ||
if (field.error) { | ||
ctx.error = field.error | ||
resolveErrorCh.publish(ctx) | ||
} | ||
finishResolveCh.publish(ctx) | ||
}) | ||
} | ||
|
||
addHook({ name: '@graphql-tools/executor', file: 'cjs/execution/execute.js', versions: ['>=0.0.14'] }, execute => { | ||
shimmer.wrap(execute, 'execute', wrapExecute(execute)) | ||
return execute | ||
}) | ||
addHook({ name: '@graphql-tools/executor', file: 'cjs/execution/execute.js', versions: ['>=0.0.14'] }, execute => {}) | ||
|
||
addHook({ name: 'graphql', file: 'execution/execute.js', versions: ['>=0.10'] }, execute => { | ||
shimmer.wrap(execute, 'execute', wrapExecute(execute)) | ||
|
Uh oh!
There was an error while loading. Please reload this page.