-
Couldn't load subscription status.
- Fork 0
feat: scheduled tasks #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…-connection fix deep research disconnect issue
WalkthroughThis PR adds Temporal-backed task scheduling and a Task service with HTTP handlers under /api/v1/tasks (create, list, delete). Temporal client initialization is conditional on config (TemporalEndpoint, TemporalNamespace, TemporalAPIKey) and is closed on shutdown. DeepResearch gains a SessionManager, message persistence APIs, and adjusted freemium/premium session handling. Database migrations and sqlc-generated code add tasks and deep_research_messages tables and queries. REST and WebSocket wiring were updated to accept the session manager and to register task routes only when the task service is available. Logging around task/session initialization and route registration was added. Sequence Diagram(s)sequenceDiagram
%% Task creation flow (conditional Temporal)
actor User
participant API as Task Handler
participant Svc as Task Service
participant DB as Postgres
participant Temporal as Temporal Server
User->>API: POST /api/v1/tasks
API->>Svc: CreateTask(userID, req)
Svc->>DB: INSERT task record
DB-->>Svc: task row
alt Temporal available
Svc->>Temporal: Create schedule (cron/one-time)
Temporal-->>Svc: schedule created
Svc->>DB: UPDATE task status -> active
else Temporal disabled
Svc-->>API: return task with pending status
end
Svc-->>API: created Task
API-->>User: 201 Created
sequenceDiagram
%% DeepResearch WebSocket + SessionManager (premium vs freemium)
participant Client as WebSocket Client
participant API as DeepResearch Handler
participant Svc as DeepResearch Service
participant SM as SessionManager
participant DB as Message Storage
Client->>API: WebSocket connect
API->>Svc: validateFreemiumAccess(user)
alt Premium user
Svc->>SM: CreateSession(user, chat)
SM-->>Svc: session created (parallel allowed)
else Freemium user
Svc->>SM: HasActiveBackend(user)
SM-->>Svc: activeCount
alt Limit exceeded
Svc-->>API: error (suggest upgrade)
API-->>Client: disconnect/error
else Allowed
Svc->>SM: CreateSession(user, chat)
SM-->>Svc: session created
end
end
alt Reconnection
Svc->>DB: Replay unsent messages
DB-->>Svc: messages
end
API-->>Client: connection established / messages streamed
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Files/areas to inspect closely:
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
internal/deepr/session_manager.go (3)
120-133: Also close BackendConn during RemoveSession.Canceling context doesn’t guarantee the backend websocket closes. Close it explicitly to release FDs.
@@ - // Cancel context + // Cancel context if session.CancelFunc != nil { session.CancelFunc() } + // Close backend connection + if session.BackendConn != nil { + _ = session.BackendConn.Close() + session.BackendConn = nil + }
298-303: Return a real error when backend is unavailable.Returning nil here can mislead callers into assuming success.
@@ - if session.BackendConn == nil { + if session.BackendConn == nil { sm.logger.WithComponent("deepr-session").Warn("backend connection closed, cannot write", slog.String("user_id", userID), slog.String("chat_id", chatID)) - return nil // Backend connection closed + return fmt.Errorf("backend connection is not available") }Add import:
fmt.
206-233: Prune dead client sockets on broadcast failures and add write deadlines.Lines 206-233: Failed writes accumulate in the map, causing repeated errors and socket leaks. Collect failed clientIDs during the loop, then remove them after unlocking. Also set write deadlines to prevent indefinite blocking.
Suggested implementation:
var toDelete []string session.mu.RLock() for clientID, conn := range session.clientConns { conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := conn.WriteMessage(websocket.TextMessage, message); err != nil { sm.logger.WithComponent("deepr-session").Error("failed to broadcast to client", /* ... */) toDelete = append(toDelete, clientID) failedCount++ } else { sentCount++ } } session.mu.RUnlock() if len(toDelete) > 0 { session.mu.Lock() for _, id := range toDelete { if c := session.clientConns[id]; c != nil { _ = c.Close() } delete(session.clientConns, id) } session.mu.Unlock() }internal/deepr/service.go (1)
632-642: Escape path segments when building wsURLuserID/chatID are path segments; if they ever contain ‘/’ or special chars, wsURL becomes invalid or misroutes. Use url.PathEscape on each segment.
Apply:
-wsURL := url.URL{ - Scheme: "ws", - Host: deepResearchHost, - Path: "/deep_research/" + userID + "/" + chatID + "/", -} +wsURL := url.URL{ + Scheme: "ws", + Host: deepResearchHost, + Path: "/deep_research/" + url.PathEscape(userID) + "/" + url.PathEscape(chatID) + "/", +}
🧹 Nitpick comments (23)
internal/config/config.go (1)
87-94: Group Temporal configuration fields together.The Temporal configuration fields (
TemporalAPIKey,TemporalEndpoint,TemporalNamespace) are split and interleaved with Message Storage fields, making the configuration harder to maintain. Group all three Temporal fields consecutively.Apply this diff to group the Temporal fields:
// Temporal TemporalAPIKey string + TemporalEndpoint string + TemporalNamespace string + // Message Storage MessageStorageEnabled bool // Enable/disable encrypted message storage to Firestore MessageStorageRequireEncryption bool // If true, refuse to store messages when encryption fails (strict E2EE mode). If false, fallback to plaintext storage (default: graceful degradation) - TemporalEndpoint string - TemporalNamespace string MessageStorageWorkerPoolSize int // Number of worker goroutines processing message queue (higher = more concurrent Firestore writes)Also update the initialization in
LoadConfig(lines 209-218) to match:// Temporal TemporalAPIKey: getEnvOrDefault("TEMPORAL_API_KEY", ""), TemporalEndpoint: getEnvOrDefault("TEMPORAL_ENDPOINT", ""), + TemporalNamespace: getEnvOrDefault("TEMPORAL_NAMESPACE", ""), + // Message Storage MessageStorageEnabled: getEnvOrDefault("MESSAGE_STORAGE_ENABLED", "true") == "true", MessageStorageRequireEncryption: getEnvOrDefault("MESSAGE_STORAGE_REQUIRE_ENCRYPTION", "false") == "true", MessageStorageWorkerPoolSize: getEnvAsInt("MESSAGE_STORAGE_WORKER_POOL_SIZE", 5), MessageStorageBufferSize: getEnvAsInt("MESSAGE_STORAGE_BUFFER_SIZE", 500), MessageStorageTimeoutSeconds: getEnvAsInt("MESSAGE_STORAGE_TIMEOUT_SECONDS", 30), - TemporalNamespace: getEnvOrDefault("TEMPORAL_NAMESPACE", ""), MessageStorageCacheSize: getEnvAsInt("MESSAGE_STORAGE_CACHE_SIZE", 1000),internal/storage/pg/queries/tasks.sql (1)
10-19: Prefer explicit column lists over SELECT * to reduce coupling.Using SELECT * ties sqlc models to future schema changes and can silently break consumers. List columns explicitly in GetTaskByID/GetTasksByUserID/GetTasksByChatID/GetAllActiveTasks.
Also applies to: 29-32
cmd/server/main.go (4)
137-143: Avoid logging raw Temporal endpoint/namespace in production logs.Config values can be sensitive operational metadata. Log presence booleans only or guard under debug.
- log.Info("checking temporal configuration", - slog.Bool("has_endpoint", config.AppConfig.TemporalEndpoint != ""), - slog.Bool("has_namespace", config.AppConfig.TemporalNamespace != ""), - slog.Bool("has_api_key", config.AppConfig.TemporalAPIKey != ""), - slog.String("endpoint", config.AppConfig.TemporalEndpoint), - slog.String("namespace", config.AppConfig.TemporalNamespace)) + log.Info("checking temporal configuration", + slog.Bool("has_endpoint", config.AppConfig.TemporalEndpoint != ""), + slog.Bool("has_namespace", config.AppConfig.TemporalNamespace != ""), + slog.Bool("has_api_key", config.AppConfig.TemporalAPIKey != ""))
144-166: Trim config inputs before checks/creation.Protect against trailing spaces in env vars.
- if config.AppConfig.TemporalEndpoint != "" && config.AppConfig.TemporalNamespace != "" && config.AppConfig.TemporalAPIKey != "" { + if strings.TrimSpace(config.AppConfig.TemporalEndpoint) != "" && + strings.TrimSpace(config.AppConfig.TemporalNamespace) != "" && + strings.TrimSpace(config.AppConfig.TemporalAPIKey) != "" {(Import strings if not already in this scope.)
340-345: Capture and log Close() error on shutdown.If Close() returns error, log it.
- if taskService != nil { - taskService.Close() - log.Info("task service shutdown complete") - } + if taskService != nil { + if err := taskService.Close(); err != nil { + log.Error("task service shutdown error", slog.String("error", err.Error())) + } else { + log.Info("task service shutdown complete") + } + }
460-473: Consider returning a clear 501/404 for /api/v1/tasks when disabled.Right now the routes are omitted. If clients still call them, they’ll get 404 from the router. Optionally mount the group and return 501 Not Implemented with guidance.
internal/storage/pg/migrations/007_create_tasks.sql (1)
1-18: Add CHECK constraints and time modeling to prevent invalid rows and simplify scheduling.
- Enforce enums:
- type ∈ ('recurring','one_time')
- status ∈ ('pending','active','paused','deleted')
- Model one-time execution as TIMESTAMPTZ; keep cron expression separate.
Example adjustments:
CREATE TABLE IF NOT EXISTS tasks ( task_id TEXT PRIMARY KEY, user_id TEXT NOT NULL, chat_id TEXT NOT NULL, task_name TEXT NOT NULL, task_text TEXT NOT NULL, - type TEXT NOT NULL, -- 'recurring' or 'one_time' - time TEXT NOT NULL, -- cron format for both types - status TEXT NOT NULL DEFAULT 'pending', + type TEXT NOT NULL, -- 'recurring' or 'one_time' + cron_expr TEXT, -- for recurring + scheduled_at TIMESTAMPTZ, -- for one-time + status TEXT NOT NULL DEFAULT 'pending', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); + +ALTER TABLE tasks + ADD CONSTRAINT tasks_type_check CHECK (type IN ('recurring','one_time')), + ADD CONSTRAINT tasks_status_check CHECK (status IN ('pending','active','paused','deleted')), + ADD CONSTRAINT tasks_time_check CHECK ( + (type = 'recurring' AND cron_expr IS NOT NULL AND scheduled_at IS NULL) OR + (type = 'one_time' AND scheduled_at IS NOT NULL AND cron_expr IS NULL) + );Also consider composite indexes to match query patterns:
CREATE INDEX IF NOT EXISTS idx_tasks_user_created_at ON tasks (user_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_tasks_chat_created_at ON tasks (chat_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_tasks_status_created_at ON tasks (status, created_at DESC);internal/deepr/models.go (2)
5-12: Consider json.RawMessage for Message.Data to avoid double-encoding.If Data may contain JSON, use json.RawMessage and omit when empty.
-import "time" +import ( + "encoding/json" + "time" +) @@ - Data string `json:"data,omitempty"` + Data json.RawMessage `json:"data,omitempty"`
27-36: Define and reuse typed constants for MessageType.To avoid drift (“status”, “error”, “final”, etc.), declare consts in this package and validate on write paths.
internal/storage/pg/sqlc/models.go (2)
12-22: Avoid exposing sql.NullTime and DB models over JSONDeepResearchMessage uses sql.NullTime with a JSON tag. If this type is ever marshaled in API responses, it serializes as an object {Time, Valid}, not a nullable timestamp. Keep sqlc models internal to persistence and map to API types using *time.Time. Also consider dropping JSON tags in generated models to avoid accidental exposure.
58-69: Align Task model naming and typing with API; clarify schedule semantics
- JSON keys here are camelCase (taskId, userId, …) while API models use snake_case (task_id, …). Mixing both increases integration risk if a DB model leaks into responses.
- Field Time is a free-form string; prefer a stronger type or documented format per type (“recurring” vs “one_time”).
Consider mapping sqlc.Task → task.Task in the service layer only, and ensure only API models are serialized. Also define explicit time semantics (see models.go comment).
internal/task/handlers.go (2)
67-74: Don’t leak internal errors to clients; map service errors to 4xx where applicableCurrently returns {error, details: err.Error()} with 500. Return a generic message to clients and log the details; map known validation/ownership errors from the service to 400/403/404.
59-64: Avoid logging sensitive payloads; keep IDs but omit free-form textLogging chat_id/task_name/type/time on every request may expose user intent/titles. Log IDs and type; avoid task_name/time at info level or mask them.
internal/deepr/service.go (4)
81-214: Freemium gating has race conditions; add atomic guardTwo simultaneous “new session” attempts for the same freemium user can pass the active-session check and both start. Use an atomic guard:
- Gate via sessionManager (StartIfAbsent returning created bool), or
- Use a Firebase transaction/compare-and-set to create a “lock/lease” doc with TTL before opening backend, releasing on completion.
Also, JSON error writes ignore failures; consider best-effort logging around WriteMessage.
684-693: Close all client connections when session ends to avoid goroutine/socket leaksSession uses a background sessionCtx; when it’s canceled, backend loop exits but client reader goroutines continue until clients disconnect. Have sessionManager close all clientConns on cancel (and notify clients).
Example:
defer func() { cancel() s.sessionManager.CloseAllClientConnections(userID, chatID) // new helper; safe idempotent }()
364-525: Reconnection: duplicate client removalYou both defer RemoveClientConnection and also call it on read error in the goroutine. If Remove is idempotent this is fine; otherwise make one path authoritative.
780-813: Storage/broadcast ordering is solid; add tie-breaker to “sent” semanticsmessageSent := broadcastErr == nil && clientCount > 0 is fine. For stable ordering across equal created_at timestamps, persist a monotonic sequence (e.g., ULID or created_at + id ordering) so replay order matches broadcast order.
internal/task/models.go (1)
5-17: Strengthen typing for schedule fieldsUse the declared enums in the struct to prevent arbitrary strings:
-type Task struct { +type Task struct { ... - Type string `json:"type" db:"type"` + Type TaskType `json:"type" db:"type"` - Time string `json:"time" db:"time"` + Time string `json:"time" db:"time"` }Keep DB column as text, but constrain API via TaskType.
internal/storage/pg/sqlc/deep_research_messages.sql.go (2)
69-76: Add deterministic ordering for stable replaysORDER BY created_at ASC can produce nondeterministic order for equal timestamps. Add id as a tiebreaker.
-ORDER BY created_at ASC +ORDER BY created_at ASC, id ASCAlso applies to: 122-129
109-120: Indexes to support hot pathsQueries filter by session_id (and sent=false). Add indexes in a migration:
- CREATE INDEX CONCURRENTLY idx_drm_session_created_at ON deep_research_messages (session_id, created_at);
- CREATE INDEX CONCURRENTLY idx_drm_session_sent ON deep_research_messages (session_id) WHERE sent = FALSE;
This will speed GetSessionMessages/GetUnsentMessages and MarkAllMessagesAsSent.
Also applies to: 162-171, 173-182
internal/task/service.go (3)
84-97: Consider adding cron expression validation.Currently, invalid cron expressions will cause the task to be created in the database, then fail during Temporal schedule creation (requiring cleanup at line 178). While Temporal's validation is thorough, pre-validating the cron format would prevent unnecessary database writes and improve error feedback.
75-214: Consider reducing log verbosity in CreateTask.The function contains approximately 15 log statements tracking every step of execution. While this detail can be helpful during development, it may impact performance in production and create noisy logs. Consider keeping only critical decision points (validation failures, external calls) and errors, removing intermediate progress logs like "validating task type", "generated task ID", "preparing to create temporal schedule", etc.
247-272: Consider distinguishing "not found" from actual errors in schedule deletion.When Temporal schedule deletion fails (line 254), the code logs a warning and continues. This is reasonable since the database is the source of truth. However, if the schedule doesn't exist (already deleted or never created), the warning might be misleading. Consider checking if the error indicates "not found" and logging at a lower level in that case.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (22)
cmd/server/main.go(7 hunks)go.mod(6 hunks)internal/auth/firebase_client.go(12 hunks)internal/config/config.go(2 hunks)internal/deepr/db_storage.go(7 hunks)internal/deepr/handlers.go(2 hunks)internal/deepr/models.go(2 hunks)internal/deepr/service.go(15 hunks)internal/deepr/session_manager.go(8 hunks)internal/deepr/storage.go(1 hunks)internal/iap/handler.go(1 hunks)internal/iap/service.go(1 hunks)internal/search/service.go(1 hunks)internal/storage/pg/migrations/007_create_tasks.sql(1 hunks)internal/storage/pg/queries/tasks.sql(1 hunks)internal/storage/pg/sqlc/deep_research_messages.sql.go(1 hunks)internal/storage/pg/sqlc/models.go(2 hunks)internal/storage/pg/sqlc/querier.go(2 hunks)internal/storage/pg/sqlc/tasks.sql.go(1 hunks)internal/task/handlers.go(1 hunks)internal/task/models.go(1 hunks)internal/task/service.go(1 hunks)
🔇 Additional comments (11)
go.mod (2)
31-142: Significant indirect dependency expansion expected from Temporal SDK.The ~25 new indirect dependencies (lines 53, 57, 69, 71, 76, 77, 95, 99, 100, 106, 107, 124, etc.) are typical transitive dependencies pulled in by
go.temporal.io/sdkand related packages. This is expected and not a concern—Temporal SDK brings its own dependency tree (gRPC, protobuf, testing utilities, etc.).
6-6: Firestore is directly used; gRPC appears to be an unintended transitive dependency promotion.Verification confirms
cloud.google.com/go/firestoreis intentionally used across three files (internal/auth/firebase_client.go,internal/messaging/firestore.go,internal/messaging/service.go) for Firebase UUID and session management. Thego.temporal.io/sdk v1.37.0addition is justified for task orchestration.However,
google.golang.org/grpc v1.72.0has no direct imports or usage in the codebase—it appears to be a transitive dependency from the Temporal SDK. Consider removing it from directrequirestatements unless there's a specific reason to pin it.All three versions have no known security advisories.
internal/deepr/service.go (3)
19-26: Good addition: sessionManager on ServiceSession manager integration is clear and keeps backend alive independent of clients.
28-41: State mapping is simple and readableDefaults to in_progress; fine for forward-compat with new event types.
714-724: Good: session scoped logging and clean terminationCancellation path logs include duration and message count; helps ops.
internal/task/models.go (1)
52-55: Response DTOs look goodLightweight, future-proof.
internal/task/service.go (3)
24-66: LGTM: Temporal client initialization is well-structured.The constructor properly validates configuration, uses secure TLS settings (MinVersion: TLS 1.2), and handles errors appropriately. The API key redaction in error messages is a good security practice.
68-73: LGTM: Proper resource cleanup.The defensive nil check ensures safe cleanup.
216-245: LGTM: Straightforward query implementation.The method correctly queries and maps database results to task objects.
internal/storage/pg/sqlc/tasks.sql.go (1)
1-223: LGTM: Generated code follows correct patterns.The SQLC-generated queries follow standard best practices with proper resource cleanup (deferred
rows.Close()), error checking (rows.Err()), and result scanning. The SQL statements use appropriate indexes (task_id, user_id, chat_id) and timestamps are set withNOW().internal/storage/pg/sqlc/querier.go (1)
11-54: LGTM: Interface signatures align with implementations.The new method signatures for task management (CreateTask, DeleteTask, GetAllActiveTasks, GetTaskByID, GetTasksByChatID, GetTasksByUserID, UpdateTaskStatus) and deep research messages correctly match their corresponding implementations in the generated code.
| // Parse request body | ||
| var req CreateTaskRequest | ||
| if err := c.ShouldBindJSON(&req); err != nil { | ||
| log.Error("failed to bind request", slog.String("error", err.Error())) | ||
| c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body", "details": err.Error()}) | ||
| return | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Validate type and schedule; reject bad input early
Add server-side validation for Type (one of: recurring, one_time) and Time (cron for recurring; timestamp for one_time or enforce “run-once” semantics). Return 400 on validation errors instead of deferring to the service.
Apply after binding:
var req CreateTaskRequest
if err := c.ShouldBindJSON(&req); err != nil {
...
}
+// Validate type
+switch req.Type {
+case string(TaskTypeRecurring), string(TaskTypeOneTime):
+default:
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid type", "allowed": []string{string(TaskTypeRecurring), string(TaskTypeOneTime)}})
+ return
+}
+// Validate time
+if req.Type == string(TaskTypeRecurring) {
+ if _, err := cron.ParseStandard(req.Time); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid cron spec", "details": err.Error()})
+ return
+ }
+} else {
+ if _, err := time.Parse(time.RFC3339, req.Time); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid one_time timestamp (RFC3339 required)", "details": err.Error()})
+ return
+ }
+}(Assumes robfig/cron/v3 is in use; otherwise validate in the service.)
Committable suggestion skipped: line range outside the PR's diff.
internal/task/handlers.go
Outdated
| // TODO: Verify that the task belongs to the user before deleting | ||
| // For now, we'll delete without verification, but in production you should verify ownership | ||
|
|
||
| // Delete the task | ||
| err := h.service.DeleteTask(c.Request.Context(), taskID) | ||
| if err != nil { | ||
| log.Error("failed to delete task", | ||
| slog.String("error", err.Error()), | ||
| slog.String("task_id", taskID), | ||
| slog.String("user_id", userID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Authorization flaw: task deletion without ownership check
Any authenticated user can delete any task by ID. Pass userID to the service and enforce WHERE task_id = $1 AND user_id = $2; return 404 if no row deleted.
Apply:
-// Delete the task
-err := h.service.DeleteTask(c.Request.Context(), taskID)
+// Delete the task with ownership enforcement
+err := h.service.DeleteTask(c.Request.Context(), userID, taskID)
if err != nil {
...
}And in the service layer:
-func (s *Service) DeleteTask(ctx context.Context, taskID string) error {
+func (s *Service) DeleteTask(ctx context.Context, userID, taskID string) error {
- res, err := s.repo.DeleteTask(ctx, taskID)
+ res, err := s.repo.DeleteTaskByOwner(ctx, userID, taskID) // DELETE ... WHERE task_id=$1 AND user_id=$2
if err != nil { return err }
if res.RowsAffected() == 0 { return ErrNotFound }
return nil
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In internal/task/handlers.go around lines 152 to 161, the handler deletes tasks
without verifying ownership; modify the handler to pass the current userID into
the service DeleteTask call, and update error handling to treat a “not found /
no rows deleted” result as a 404. Then change the service DeleteTask
implementation to execute deletion with WHERE task_id = $1 AND user_id = $2 (or
equivalent ORM call), return a specific not-found error when no row was
affected, and preserve other errors; the handler should map that not-found error
to a 404 response and only log/return 500 for other failures.
| // CreateTaskRequest represents the request to create a new task. | ||
| type CreateTaskRequest struct { | ||
| ChatID string `json:"chat_id" binding:"required"` | ||
| TaskName string `json:"task_name" binding:"required"` | ||
| TaskText string `json:"task_text" binding:"required"` | ||
| Type string `json:"type" binding:"required"` // "recurring" or "one_time" | ||
| Time string `json:"time" binding:"required"` // cron format for both types (e.g., "0 9 * * *" for daily at 9am, "30 14 20 8 *" for one-time on Aug 20 at 14:30) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify “one_time” semantics; cron example is not one-time
Comment shows “30 14 20 8 *” which fires annually. For one_time, accept RFC3339 timestamp (e.g., 2025-08-20T14:30:00Z) or enforce “run once then auto-paused” semantics server-side. Also add binding oneof validation for Type.
Apply:
- Type string `json:"type" binding:"required"` // "recurring" or "one_time"
- Time string `json:"time" binding:"required"` // cron format for both types (e.g., "0 9 * * *" for daily at 9am, "30 14 20 8 *" for one-time on Aug 20 at 14:30)
+ Type string `json:"type" binding:"required,oneof=recurring one_time"`
+ Time string `json:"time" binding:"required"` // cron for recurring; RFC3339 timestamp for one_time📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // CreateTaskRequest represents the request to create a new task. | |
| type CreateTaskRequest struct { | |
| ChatID string `json:"chat_id" binding:"required"` | |
| TaskName string `json:"task_name" binding:"required"` | |
| TaskText string `json:"task_text" binding:"required"` | |
| Type string `json:"type" binding:"required"` // "recurring" or "one_time" | |
| Time string `json:"time" binding:"required"` // cron format for both types (e.g., "0 9 * * *" for daily at 9am, "30 14 20 8 *" for one-time on Aug 20 at 14:30) | |
| } | |
| // CreateTaskRequest represents the request to create a new task. | |
| type CreateTaskRequest struct { | |
| ChatID string `json:"chat_id" binding:"required"` | |
| TaskName string `json:"task_name" binding:"required"` | |
| TaskText string `json:"task_text" binding:"required"` | |
| Type string `json:"type" binding:"required,oneof=recurring one_time"` | |
| Time string `json:"time" binding:"required"` // cron for recurring; RFC3339 timestamp for one_time | |
| } |
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal/deepr/session_manager.go (1)
74-80: Consider holding backendWriteMu when closing the backend connection.While gorilla/websocket handles concurrent
Closecalls safely, acquiringexistingSession.backendWriteMubefore closing the backend connection (line 79) would maintain consistency with the write-serialization pattern and prevent potential write attempts to the closing connection.Apply this diff to improve synchronization consistency:
// Proactively cancel and close sockets to avoid leaks if existingSession.CancelFunc != nil { existingSession.CancelFunc() } + // Acquire write lock to prevent concurrent writes during close + existingSession.backendWriteMu.Lock() if existingSession.BackendConn != nil { _ = existingSession.BackendConn.Close() } + existingSession.backendWriteMu.Unlock() existingSession.mu.Lock() for _, c := range existingSession.clientConns { _ = c.Close() } existingSession.clientConns = make(map[string]*websocket.Conn) existingSession.mu.Unlock()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/deepr/session_manager.go(8 hunks)
🔇 Additional comments (3)
internal/deepr/session_manager.go (3)
19-20: LGTM! Good addition of write serialization.The
backendWriteMufield correctly serializes writes to the backend WebSocket connection, which is required by gorilla/websocket. The mutex is properly used inWriteToBackend(lines 310-311) to ensure only one goroutine writes at a time.
61-87: Critical issue from past review has been addressed correctly.The data race and resource leak concerns from the previous review have been properly fixed:
- Lines 64-66: Client count is safely read with proper locking
- Lines 75-80: Existing session's context is canceled and backend connection is closed
- Lines 81-86: All client connections are closed and the map is cleared under lock
116-119: LGTM! Logging improvements enhance observability.The added logging in
RemoveSession(lines 116-119) andHasActiveBackend(lines 274-289) provides valuable diagnostic information for tracking session lifecycle and state. The client count inHasActiveBackendis safely read under lock (lines 278-280), avoiding potential data races.Also applies to: 274-289
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
internal/task/service.go (2)
92-98: Consider adding cron format validation for recurring tasks.While Temporal validates cron expressions, early validation would provide clearer error messages to users. The cron parser is already available for one-time tasks (lines 149-150).
Apply this diff to add early validation:
// Validate cron format (both types use cron) log.Info("validating cron format", slog.String("time", req.Time)) -// Basic cron validation - Temporal will do more thorough validation if req.Time == "" { log.Error("time is empty") return nil, fmt.Errorf("time cannot be empty") } +// Validate cron expression format early for clearer error messages +parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) +if _, err := parser.Parse(req.Time); err != nil { + log.Error("invalid cron expression format", + slog.String("error", err.Error()), + slog.String("cron_expression", req.Time)) + return nil, fmt.Errorf("invalid cron expression: %w", err) +}
156-159: Log cleanup errors for better debugging.While cleanup failures in error paths are secondary, logging them would provide valuable debugging information when investigating task creation issues.
Apply this pattern to log cleanup errors:
// Clean up the database entry log.Info("cleaning up database entry due to schedule creation failure") -_, _ = s.queries.DeleteTask(ctx, pgdb.DeleteTaskParams{ +if _, delErr := s.queries.DeleteTask(ctx, pgdb.DeleteTaskParams{ TaskID: taskID, UserID: userID, -}) +}); delErr != nil { + log.Warn("cleanup: failed to delete task from database", + slog.String("error", delErr.Error()), + slog.String("task_id", taskID)) +} return nil, fmt.Errorf("failed to create schedule: %w", err)Apply similar logging to the other cleanup locations (lines 156-159 and 221-224).
Also applies to: 193-196, 221-224
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (9)
cmd/server/main.go(7 hunks)go.mod(6 hunks)internal/proxy/handlers.go(1 hunks)internal/storage/pg/queries/tasks.sql(1 hunks)internal/storage/pg/sqlc/querier.go(2 hunks)internal/storage/pg/sqlc/tasks.sql.go(1 hunks)internal/task/handlers.go(1 hunks)internal/task/service.go(1 hunks)internal/title_generation/generator.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- internal/storage/pg/sqlc/querier.go
- internal/storage/pg/queries/tasks.sql
- internal/task/handlers.go
- go.mod
🔇 Additional comments (10)
internal/title_generation/generator.go (1)
39-40: Verify the max_tokens value aligns with the 4-word constraint, or clarify the intent.Your observation is accurate:
max_tokensis set to 1000, but the system prompt explicitly states "MAXIMUM 4 WORDS IN YOUR ANSWER". A 4-word title typically requires only 10–20 tokens.While the system prompt constraint will enforce the 4-word limit regardless, the 1000-token value is unnecessarily high and may:
- Increase API costs if pricing is based on requested max_tokens rather than tokens consumed.
- Impact latency depending on the provider.
Clarify whether this is intentional (e.g., safety buffer for future prompt changes) or can be reduced to a more reasonable value like 20–50 for efficiency.
internal/storage/pg/sqlc/tasks.sql.go (1)
1-228: LGTM - Generated code with proper patterns.This sqlc-generated file implements a complete CRUD layer for tasks with appropriate error handling, parameterized queries, and resource cleanup. The query methods follow standard sqlc conventions and the JSON tags on parameter structs enable seamless API integration.
internal/proxy/handlers.go (1)
114-115: Good practice - decoupling async task from request lifecycle.Using
context.Background()ensures the title generation goroutine completes independently of the HTTP request. The inline comment clearly documents the intent.cmd/server/main.go (4)
135-146: Good graceful degradation pattern.Initializing the task service with graceful fallback allows the server to operate even when Temporal is unavailable. The logging clearly indicates feature availability.
150-150: Session manager initialization looks correct.
440-452: Conditional route registration with good visibility.The nil check prevents route registration when the task service is unavailable, and the logging provides clear operational feedback.
320-324: Proper cleanup with nil check.The shutdown sequence correctly closes the task service before shutting down the servers.
internal/task/service.go (3)
26-67: Robust service initialization with proper validation.The constructor validates required configuration, establishes a secure TLS connection to Temporal Cloud with API key authentication, and provides clear error messages and logging.
146-170: One-time task scheduling correctly fixed.The code now properly parses the cron expression to calculate the next run time and sets
EndAtaccordingly, addressing the critical issue from previous reviews. The 5-minute buffer after the expected run provides adequate margin for execution.
205-226: Status update failure properly handled.The code now ensures consistency by cleaning up both the Temporal schedule and database entry when status update fails, preventing the critical inconsistency flagged in previous reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
cmd/server/main.go(7 hunks)
🔇 Additional comments (4)
cmd/server/main.go (4)
249-251: LGTM!The addition of
taskHandleranddeeprSessionManagerto therestServerInputstruct correctly wires the new services into the REST server setup.Also applies to: 360-362
316-320: LGTM!The shutdown logic correctly checks for
nilbefore closing the task service, properly handling the case where task service initialization may have failed.
436-448: LGTM with dependency on fixing line 192.The conditional registration of task routes with appropriate logging is well-implemented. This code will work correctly once the task handler initialization at line 192 is made conditional on a non-nil task service.
451-451: Handler signature correctly updated and matches the call site.The
DeepResearchHandlersignature atinternal/deepr/handlers.go:21correctly includessessionManager *SessionManageras the fifth parameter, and the call at line 451 passes all parameters in the correct order. The sessionManager is also properly used within the handler body at line 78 when creating the service instance.
| taskService, err := task.NewService( | ||
| config.AppConfig.TemporalEndpoint, | ||
| config.AppConfig.TemporalNamespace, | ||
| config.AppConfig.TemporalAPIKey, | ||
| db.Queries, | ||
| logger.WithComponent("task"), | ||
| ) | ||
| if err != nil { | ||
| log.Error("failed to initialize task service", slog.String("error", err.Error())) | ||
| // Don't exit, just disable task scheduling feature | ||
| log.Warn("task scheduling feature disabled") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify nil handling in downstream task handler initialization.
The task service initialization allows graceful degradation by continuing execution when Temporal is unavailable. However, taskService will be nil after this error path, and line 192 unconditionally creates a taskHandler using this potentially nil service.
Run the following script to verify if task.NewHandler safely handles a nil service parameter:
🏁 Script executed:
#!/bin/bash
# Description: Check if task.NewHandler handles nil service parameter
# Search for the NewHandler function implementation
ast-grep --pattern $'func NewHandler($_, $_) $_ {
$$$
}'
# Also check for nil checks in the handler
rg -nP --type=go -A 10 'func NewHandler' internal/task/Length of output: 2110
I need to verify if the handler methods perform nil checks before using the service.
I need to check if the handler methods use the service and whether they check for nil.
Let me search the codebase to find how the service is actually used in the handler methods.
🏁 Script executed:
#!/bin/bash
# Search for h.service usage patterns in task handlers
rg -nP --type=go 'h\.service\.' internal/task/handlers.go | head -20
# Also search for nil checks that might protect service usage
rg -nP --type=go 'if.*nil' internal/task/handlers.go | head -10Length of output: 131
Let me find the correct location of task handler files.
---
🏁 Script executed:
```shell
#!/bin/bash
# Find all Go files related to task handling
fd -e go -p 'task'
# Also search for files containing NewHandler function
rg -l "func NewHandler" --type=go | grep -i task
Length of output: 138
Now let me examine the actual handler implementation.
---
🏁 Script executed:
```shell
#!/bin/bash
# View the task handlers file to see method implementations
cat internal/task/handlers.go
# Also search for nil checks in handlers
rg -nP --type=go '(h\.service|if.*nil)' internal/task/handlers.go
Length of output: 5608
Now I need to check how the task handler routes are registered in main.go to see if there's proper protection.
---
🏁 Script executed:
```shell
#!/bin/bash
# Find the task handler route registration in main.go
rg -nP --type=go -B 5 -A 10 'taskHandler|task.*route' cmd/server/main.go | head -60
# Also check if taskService is passed/checked before taskHandler creation
rg -nP --type=go -B 3 -A 10 'NewHandler.*task' cmd/server/main.go
Length of output: 3323
Check taskService before creating handler to prevent nil pointer panics at runtime.
The handler is created unconditionally at line 192 even when taskService is nil. Since NewHandler doesn't validate the service parameter, it returns a valid Handler with a nil service. This means:
- The route registration check at line 437 (
if input.taskHandler != nil) will always pass because the Handler itself is nevernil - When routes are called, handler methods (CreateTask, GetTasks, DeleteTask) directly invoke methods on the
nilservice without checking, causing runtime panics
Fix: Only create taskHandler if taskService is not nil. Replace line 192 with a conditional:
var taskHandler *task.Handler
if taskService != nil {
taskHandler = task.NewHandler(taskService, logger.WithComponent("task"))
}Then the route check at line 437 will correctly guard against nil handlers.
🤖 Prompt for AI Agents
In cmd/server/main.go around lines 135-146 (where taskService is initialized)
and line 192 (where taskHandler is created) the code always constructs a
task.Handler even when taskService initialization failed, leading to a non-nil
Handler with a nil service and runtime panics when handler methods are invoked;
change the code so you only instantiate taskHandler when taskService != nil
(i.e., declare var taskHandler *task.Handler and set it inside an if taskService
!= nil block calling task.NewHandler with the logger), so the later route
registration check (line 437) correctly prevents registering task routes when
the handler is absent.
| iapHandler := iap.NewHandler(iapService, logger.WithComponent("iap")) | ||
| mcpHandler := mcp.NewHandler(mcpService) | ||
| searchHandler := search.NewHandler(searchService, logger.WithComponent("search")) | ||
| taskHandler := task.NewHandler(taskService, logger.WithComponent("task")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task handler created with potentially nil service.
taskHandler is created unconditionally, but taskService may be nil if initialization failed at lines 142-146. This could lead to a nil pointer dereference when the handler methods are invoked.
Consider making the handler initialization conditional:
- taskHandler := task.NewHandler(taskService, logger.WithComponent("task"))
+ var taskHandler *task.Handler
+ if taskService != nil {
+ taskHandler = task.NewHandler(taskService, logger.WithComponent("task"))
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| taskHandler := task.NewHandler(taskService, logger.WithComponent("task")) | |
| var taskHandler *task.Handler | |
| if taskService != nil { | |
| taskHandler = task.NewHandler(taskService, logger.WithComponent("task")) | |
| } |
🤖 Prompt for AI Agents
In cmd/server/main.go around line 192, the task handler is created
unconditionally even though taskService may be nil from earlier initialization
(lines 142–146); update the code to check taskService for nil before creating
task.NewHandler — if taskService is nil, either log an error and exit/fail fast,
or create the handler only after a successful service initialization (or inject
a noop/fallback service implementation), ensuring no handler is constructed with
a nil service to avoid nil pointer dereferences.
| config.AppConfig.TemporalEndpoint, | ||
| config.AppConfig.TemporalNamespace, | ||
| config.AppConfig.TemporalAPIKey, | ||
| db.Queries, | ||
| logger.WithComponent("task"), | ||
| ) | ||
| if err != nil { | ||
| log.Error("failed to initialize task service", slog.String("error", err.Error())) | ||
| // Don't exit, just disable task scheduling feature | ||
| log.Warn("task scheduling feature disabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should panic here, much easier to debug, no reason to launch the service if it's not fully working
| api.POST("/exa/search", input.searchHandler.PostExaSearchHandler) // POST /api/v1/exa/search (Exa AI) | ||
|
|
||
| // Task API routes (protected) | ||
| if input.taskHandler != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this check either
| if existingSession, exists := sm.sessions[key]; exists { | ||
| // Safely read client count | ||
| existingSession.mu.RLock() | ||
| existingClientCount := len(existingSession.clientConns) | ||
| existingSession.mu.RUnlock() | ||
|
|
||
| sm.logger.WithComponent("deepr-session").Warn("OVERWRITING existing session", | ||
| slog.String("user_id", userID), | ||
| slog.String("chat_id", chatID), | ||
| slog.String("session_key", key), | ||
| slog.Int("existing_client_count", existingClientCount)) | ||
|
|
||
| // Proactively cancel and close sockets to avoid leaks | ||
| if existingSession.CancelFunc != nil { | ||
| existingSession.CancelFunc() | ||
| } | ||
| if existingSession.BackendConn != nil { | ||
| _ = existingSession.BackendConn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is stored in memory, then it might break since there's multiple instances of proxy running
| // Queue async title generation (non-blocking) | ||
| go titleService.QueueTitleGeneration(c.Request.Context(), title_generation.TitleGenerationRequest{ | ||
| // Use background context since this runs async and shouldn't be tied to request lifecycle | ||
| go titleService.QueueTitleGeneration(context.Background(), title_generation.TitleGenerationRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bit weird place to generate title, might be good idea to refactor in the future
|
|
||
| CREATE INDEX IF NOT EXISTS idx_tasks_user_id ON tasks (user_id); | ||
| CREATE INDEX IF NOT EXISTS idx_tasks_chat_id ON tasks (chat_id); | ||
| CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks (status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing joint index on user_id, task_id. This will perform full table scan
| @@ -0,0 +1,323 @@ | |||
| package task | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The service seems to only wrap database operations, so bit of an overkill. Also it's good idea to initialise temporal client in main as other features likely will depend on it, but we can refactor later not to block this
Summary by CodeRabbit
New Features
Improvements