Skip to content

Fix token usage / cost often being underreported #6122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 81 additions & 24 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,11 @@ export class Task extends EventEmitter<ClineEvents> {
this.isStreaming = true

try {
for await (const chunk of stream) {
const iterator = stream[Symbol.asyncIterator]()
let item = await iterator.next()
while (!item.done) {
const chunk = item.value
item = await iterator.next()
if (!chunk) {
// Sometimes chunk is undefined, no idea that can cause
// it, but this workaround seems to fix it.
Expand Down Expand Up @@ -1423,16 +1427,86 @@ export class Task extends EventEmitter<ClineEvents> {
break
}

// PREV: We need to let the request finish for openrouter to
// get generation details.
// UPDATE: It's better UX to interrupt the request at the
// cost of the API cost not being retrieved.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment calls out OpenRouter specifically, but I think if seen usage being misreported with other providers as well.

if (this.didAlreadyUseTool) {
assistantMessage +=
"\n\n[Response interrupted by a tool use result. Only one tool may be used at a time and should be placed at the end of the message.]"
break
}
}

const drainStreamInBackgroundToFindAllUsage = async () => {
const timeoutMs = 30000 // 30 second timeout
const startTime = Date.now()

try {
let usageFound = false
while (!item.done) {
// Check for timeout
if (Date.now() - startTime > timeoutMs) {
console.warn(`Background usage collection timed out after ${timeoutMs}ms`)
break
}

const chunk = item.value
item = await iterator.next()
if (chunk && chunk.type === "usage") {
usageFound = true
inputTokens += chunk.inputTokens
outputTokens += chunk.outputTokens
cacheWriteTokens += chunk.cacheWriteTokens ?? 0
cacheReadTokens += chunk.cacheReadTokens ?? 0
totalCost = chunk.totalCost
}
}
if (usageFound) {
updateApiReqMsg()
await this.saveClineMessages()
}
if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) {
TelemetryService.instance.captureLlmCompletion(this.taskId, {
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
cost:
totalCost ??
calculateApiCostAnthropic(
this.api.getModel().info,
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
),
})
} else {
const modelId = getModelId(this.apiConfiguration)
console.warn(
`Suspicious: request ${lastApiReqIndex} is complete, but no usage info was found. Model: ${modelId}`,
)
}
} catch (error) {
console.error("Error draining stream for usage data:", error)
// Still try to capture whatever usage data we have collected so far
if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) {
TelemetryService.instance.captureLlmCompletion(this.taskId, {
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
cost:
totalCost ??
calculateApiCostAnthropic(
this.api.getModel().info,
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
),
})
}
}
}
const backgroundDrainPromise = drainStreamInBackgroundToFindAllUsage() // Store promise reference
} catch (error) {
// Abandoned happens when extension is no longer waiting for the
// Cline instance to finish aborting (error is thrown here when
Expand Down Expand Up @@ -1466,24 +1540,6 @@ export class Task extends EventEmitter<ClineEvents> {
this.isStreaming = false
}

if (inputTokens > 0 || outputTokens > 0 || cacheWriteTokens > 0 || cacheReadTokens > 0) {
TelemetryService.instance.captureLlmCompletion(this.taskId, {
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
cost:
totalCost ??
calculateApiCostAnthropic(
this.api.getModel().info,
inputTokens,
outputTokens,
cacheWriteTokens,
cacheReadTokens,
),
})
}

// Need to call here in case the stream was aborted.
if (this.abort || this.abandoned) {
throw new Error(`[RooCode#recursivelyMakeRooRequests] task ${this.taskId}.${this.instanceId} aborted`)
Expand Down Expand Up @@ -1513,7 +1569,8 @@ export class Task extends EventEmitter<ClineEvents> {
presentAssistantMessage(this)
}

updateApiReqMsg()
// Note: updateApiReqMsg() is now called from within drainStreamInBackgroundToFindAllUsage
// to avoid race conditions with the background task
await this.saveClineMessages()
await this.providerRef.deref()?.postStateToWebview()

Expand Down