Skip to content

feat: add persistent retry queue for failed telemetry events #6031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion packages/cloud/src/CloudService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class CloudService {
this.settingsService = cloudSettingsService
}

this.telemetryClient = new TelemetryClient(this.authService, this.settingsService)
this.telemetryClient = new TelemetryClient(this.authService, this.settingsService, this.context)

this.shareService = new ShareService(this.authService, this.settingsService, this.log)

Expand Down Expand Up @@ -181,6 +181,31 @@ export class CloudService {
this.telemetryClient!.capture(event)
}

public getTelemetryConnectionStatus(): boolean {
this.ensureInitialized()
return this.telemetryClient!.getConnectionStatus()
}

public async getTelemetryQueueMetadata() {
this.ensureInitialized()
return this.telemetryClient!.getQueueMetadata()
}

public async triggerTelemetryRetry(): Promise<void> {
this.ensureInitialized()
return this.telemetryClient!.triggerRetry()
}

public setTelemetryConnectionStatusCallback(callback: (isConnected: boolean) => void): void {
this.ensureInitialized()
this.telemetryClient!.setConnectionStatusCallback(callback)
}

public setTelemetryQueueSizeCallback(callback: (size: number, isAboveThreshold: boolean) => void): void {
this.ensureInitialized()
this.telemetryClient!.setQueueSizeCallback(callback)
}

// ShareService

public async shareTask(
Expand Down
133 changes: 123 additions & 10 deletions packages/cloud/src/TelemetryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@ import {
type ClineMessage,
} from "@roo-code/types"
import { BaseTelemetryClient } from "@roo-code/telemetry"
import * as vscode from "vscode"

import { getRooCodeApiUrl } from "./Config"
import type { AuthService } from "./auth"
import type { SettingsService } from "./SettingsService"
import { TelemetryQueue, TelemetryRetryManager } from "./telemetry"

export class TelemetryClient extends BaseTelemetryClient {
private queue?: TelemetryQueue
private retryManager?: TelemetryRetryManager
private connectionStatusCallback?: (isConnected: boolean) => void
private queueSizeCallback?: (size: number, isAboveThreshold: boolean) => void

constructor(
private authService: AuthService,
private settingsService: SettingsService,
private context?: vscode.ExtensionContext,
debug = false,
) {
super(
Expand All @@ -23,18 +31,93 @@ export class TelemetryClient extends BaseTelemetryClient {
},
debug,
)

// Initialize queue and retry manager if context is provided
if (context) {
// Initialize queue
this.queue = new TelemetryQueue(context, {
maxQueueSize: 1000,
maxRetries: 5,
queueSizeWarningThreshold: 100,
})

// Initialize retry manager
this.retryManager = new TelemetryRetryManager(
this.queue,
async (event) => {
// Send event without queueing on retry
await this.sendEventDirect(event)
},
{
retryIntervalMs: 30000, // 30 seconds
batchSize: 10,
onConnectionStatusChange: (isConnected) => {
if (this.connectionStatusCallback) {
this.connectionStatusCallback(isConnected)
}
},
onQueueSizeChange: (size, isAboveThreshold) => {
if (this.queueSizeCallback) {
this.queueSizeCallback(size, isAboveThreshold)
}
},
},
)

// Start the retry manager
this.retryManager.start()
}
}

/**
* Sets a callback for connection status changes
*/
public setConnectionStatusCallback(callback: (isConnected: boolean) => void): void {
this.connectionStatusCallback = callback
}

/**
* Sets a callback for queue size changes
*/
public setQueueSizeCallback(callback: (size: number, isAboveThreshold: boolean) => void): void {
this.queueSizeCallback = callback
}

/**
* Gets the current connection status
*/
public getConnectionStatus(): boolean {
return this.retryManager?.getConnectionStatus() ?? true
}

/**
* Gets the current queue metadata
*/
public async getQueueMetadata() {
if (!this.queue) {
return null
}
return this.queue.getQueueMetadata()
}

/**
* Manually triggers a retry of queued events
*/
public async triggerRetry(): Promise<void> {
if (this.retryManager) {
await this.retryManager.triggerRetry()
}
}

private async fetch(path: string, options: RequestInit) {
if (!this.authService.isAuthenticated()) {
return
throw new Error("Not authenticated")
}

const token = this.authService.getSessionToken()

if (!token) {
console.error(`[TelemetryClient#fetch] Unauthorized: No session token available.`)
return
throw new Error("No session token available")
}

const response = await fetch(`${getRooCodeApiUrl()}/api/${path}`, {
Expand All @@ -43,18 +126,35 @@ export class TelemetryClient extends BaseTelemetryClient {
})

if (!response.ok) {
console.error(
`[TelemetryClient#fetch] ${options.method} ${path} -> ${response.status} ${response.statusText}`,
)
throw new Error(`HTTP ${response.status}: ${response.statusText}`)
}

return response
}

/**
* Sends an event directly without queueing (used by retry manager)
*/
private async sendEventDirect(event: TelemetryEvent): Promise<void> {
const payload = {
type: event.event,
properties: event.properties || {},
}

const result = rooCodeTelemetryEventSchema.safeParse(payload)

if (!result.success) {
throw new Error(`Invalid telemetry event: ${result.error.message}`)
}

await this.fetch(`events`, { method: "POST", body: JSON.stringify(result.data) })
}

public override async capture(event: TelemetryEvent) {
if (!this.isTelemetryEnabled() || !this.isEventCapturable(event.event)) {
if (this.debug) {
console.info(`[TelemetryClient#capture] Skipping event: ${event.event}`)
}

return
}

Expand All @@ -73,14 +173,19 @@ export class TelemetryClient extends BaseTelemetryClient {
console.error(
`[TelemetryClient#capture] Invalid telemetry event: ${result.error.message} - ${JSON.stringify(payload)}`,
)

return
}

try {
await this.fetch(`events`, { method: "POST", body: JSON.stringify(result.data) })
} catch (error) {
console.error(`[TelemetryClient#capture] Error sending telemetry event: ${error}`)
const errorMessage = error instanceof Error ? error.message : String(error)
console.error(`[TelemetryClient#capture] Error sending telemetry event: ${errorMessage}`)

// Queue the event for retry if we have a queue
if (this.queue && this.retryManager) {
await this.retryManager.queueFailedEvent(event, errorMessage)
}
}
}

Expand Down Expand Up @@ -165,5 +270,13 @@ export class TelemetryClient extends BaseTelemetryClient {
return true
}

public override async shutdown() {}
public override async shutdown() {
// Stop the retry manager
if (this.retryManager) {
this.retryManager.stop()
}
// Clear callbacks to prevent memory leaks
this.connectionStatusCallback = undefined
this.queueSizeCallback = undefined
}
}
Loading