@@ -4,50 +4,81 @@ import { config } from '../config';
4
4
import Logger from '../logger' ;
5
5
import { generateText } from '../services/llmWrapper' ;
6
6
import { initializedRagService } from '../services/ragService' ;
7
- import { GeminiQueryRequest , LLMChatResponse } from '../types' ;
7
+ import { GeminiContent , GeminiQueryRequest , LLMChatResponse } from '../types' ;
8
8
9
9
export const handleGeminiBatch = async ( req : Request , res : Response ) => {
10
10
const model = req . params . model ;
11
- let userQuery = '' ; // Define userQuery here to be accessible in the catch block
11
+ let userQueryForRAG = '' ; // For RAG and general query representation
12
12
13
13
try {
14
14
const { contents } = req . body as GeminiQueryRequest ;
15
15
16
- // Validate contents structure
16
+ // Initial structural validation (can be enhanced to check all items)
17
17
if (
18
18
! contents ||
19
19
! Array . isArray ( contents ) ||
20
20
contents . length === 0 ||
21
- ! contents [ 0 ] . parts ||
22
- ! Array . isArray ( contents [ 0 ] . parts ) ||
23
- contents [ 0 ] . parts . length === 0 ||
24
- ! contents [ 0 ] . parts [ 0 ] . text ||
25
- typeof contents [ 0 ] . parts [ 0 ] . text !== 'string' ||
26
- contents [ 0 ] . parts [ 0 ] . text . trim ( ) === ''
21
+ ! contents . every (
22
+ ( item ) =>
23
+ item . parts &&
24
+ Array . isArray ( item . parts ) &&
25
+ item . parts . length > 0 &&
26
+ item . parts . every ( ( part ) => part . text && typeof part . text === 'string' )
27
+ )
27
28
) {
28
29
return res . status ( 400 ) . json ( {
29
30
error :
30
- 'Bad Request: contents is required and must be an array with at least one part containing a non-empty text string.' ,
31
+ 'Bad Request: contents is required and must be an array of content items, each with at least one part containing a non-empty text string.' ,
31
32
} ) ;
32
33
}
33
- userQuery = contents [ 0 ] . parts [ 0 ] . text ; // Assign userQuery after validation
34
+
35
+ // Create userQueryForRAG from all parts of all content items
36
+ if ( contents && Array . isArray ( contents ) ) {
37
+ let messagesToConsider = contents ;
38
+ const windowSize = config . ragConversationWindowSize ;
39
+
40
+ if ( windowSize && windowSize > 0 && windowSize < contents . length ) {
41
+ messagesToConsider = contents . slice ( - windowSize ) ; // Get the last N messages
42
+ Logger . info ( `RAG windowing: Using last ${ windowSize } of ${ contents . length } messages for RAG query.` ) ;
43
+ } else if ( windowSize && windowSize > 0 && windowSize >= contents . length ) {
44
+ Logger . info (
45
+ `RAG windowing: Window size ${ windowSize } is >= total messages ${ contents . length } . Using all messages for RAG query.`
46
+ ) ;
47
+ // messagesToConsider remains 'contents'
48
+ } else {
49
+ // windowSize is 0 or not set
50
+ Logger . info ( `RAG windowing: Window size is 0 or not set. Using all ${ contents . length } messages for RAG query.` ) ;
51
+ // messagesToConsider remains 'contents'
52
+ }
53
+
54
+ userQueryForRAG = messagesToConsider
55
+ . flatMap ( ( contentItem ) => contentItem . parts . map ( ( part ) => part . text ) )
56
+ . join ( '\n' ) ;
57
+ }
58
+ // userQueryForErrorLogging removed
59
+
60
+ // Validate that the consolidated query is not empty
61
+ if ( userQueryForRAG . trim ( ) === '' ) {
62
+ return res . status ( 400 ) . json ( { error : 'Bad Request: Consolidated text from contents is empty.' } ) ;
63
+ }
34
64
35
65
const rag_type = config . geminiRagType ;
36
66
const numberOfResults = config . geminiNResults ;
37
67
38
68
Logger . info (
39
- `INFO: Gemini Batch Request. Model: ${ model } . RAG Type (from config): ${ rag_type } . N Results (from config): ${ numberOfResults } .`
69
+ `INFO: Gemini Batch Request. Model: ${ model } . RAG Type (from config): ${ rag_type } . N Results (from config): ${ numberOfResults } . Consolidated user query for RAG (first 100 chars): " ${ userQueryForRAG . substring ( 0 , 100 ) } ..." `
40
70
) ;
41
71
42
72
const ragService = await initializedRagService ;
43
- const chunks = await ragService . queryChunks ( userQuery , numberOfResults ) ;
73
+ const chunks = await ragService . queryChunks ( userQueryForRAG , numberOfResults ) ;
74
+
75
+ let contentsForLlm = JSON . parse ( JSON . stringify ( contents ) ) as GeminiContent [ ] ; // Deep copy
44
76
45
- let augmentedPrompt : string ;
46
77
if ( ! chunks || chunks . length === 0 ) {
47
- console . warn (
48
- `No relevant chunks found for query: "${ userQuery } " with model ${ model } . Querying LLM directly without RAG context.`
78
+ Logger . warn (
79
+ `No relevant chunks found for query (first 100 chars) : "${ userQueryForRAG . substring ( 0 , 100 ) } ... " with model ${ model } . Querying LLM directly without RAG context.`
49
80
) ;
50
- augmentedPrompt = userQuery ;
81
+ // contentsForLlm remains as is (original user contents)
51
82
} else {
52
83
let contextContent : string [ ] = [ ] ;
53
84
if ( rag_type === 'advanced' ) {
@@ -72,24 +103,33 @@ export const handleGeminiBatch = async (req: Request, res: Response) => {
72
103
}
73
104
74
105
if ( contextContent . length === 0 ) {
75
- console . warn (
76
- `Chunks were found for query "${ userQuery } " (RAG Type from config: ${ rag_type } , model: ${ model } ), but no relevant content could be extracted. Querying LLM directly.`
106
+ Logger . warn (
107
+ `Chunks were found for query (first 100 chars): "${ userQueryForRAG . substring ( 0 , 100 ) } ... " (RAG Type from config: ${ rag_type } , model: ${ model } ), but no relevant content could be extracted. Querying LLM directly.`
77
108
) ;
78
- augmentedPrompt = userQuery ;
109
+ // contentsForLlm remains as is
79
110
} else {
80
111
const context = contextContent . join ( '\n---\n' ) ;
81
112
const contextDescription =
82
113
rag_type === 'advanced' ? 'Relevant Information from Parent Documents' : 'Relevant Text Chunks' ;
83
- augmentedPrompt = `User Query: ${ userQuery } \n\n${ contextDescription } :\n---\n${ context } \n---\nBased on the relevant information above, answer the user query.` ;
114
+ const ragAugmentationPrefix = `Based on the relevant information below, answer the user query.\n${ contextDescription } :\n---\n${ context } \n---\nConsidering the above context and the conversation history, here is the latest user message: ` ;
115
+
116
+ const lastContentItem = contentsForLlm [ contentsForLlm . length - 1 ] ;
117
+ if ( lastContentItem && lastContentItem . parts && lastContentItem . parts . length > 0 ) {
118
+ lastContentItem . parts [ 0 ] . text = ragAugmentationPrefix + lastContentItem . parts [ 0 ] . text ;
119
+ } else {
120
+ Logger . warn (
121
+ 'Last content item for RAG augmentation is malformed or missing parts. RAG context might not be prepended as expected.'
122
+ ) ;
123
+ // Fallback: if the last message is weird, but we have context, maybe put context in its own message?
124
+ // For now, the original plan is to modify the last message. If it's malformed, it won't be modified.
125
+ }
84
126
}
85
127
}
86
128
87
- // Logger.info(`INFO: Gemini Batch Request. Model: ${model}. RAG Type: ${rag_type}.`); // Already logged above with more details
88
-
89
129
try {
90
130
const llmResponse = ( await generateText ( {
91
131
model : model ,
92
- query : augmentedPrompt ,
132
+ contents : contentsForLlm , // Pass the (potentially RAG-augmented) GeminiContent[]
93
133
stream : false ,
94
134
} ) ) as LLMChatResponse ;
95
135
res . status ( 200 ) . json ( llmResponse ) ;
@@ -100,7 +140,10 @@ export const handleGeminiBatch = async (req: Request, res: Response) => {
100
140
. json ( { details : llmError . message , error : `Failed to get response from LLM provider Gemini.` } ) ;
101
141
}
102
142
} catch ( error : any ) {
103
- Logger . error ( `Error in handleGeminiBatch for model ${ model } , query "${ userQuery } ":` , error ) ; // Use userQuery for logging
143
+ Logger . error (
144
+ `Error in handleGeminiBatch for model ${ model } , consolidated query (first 100 chars): "${ userQueryForRAG . substring ( 0 , 100 ) } ":` ,
145
+ error
146
+ ) ;
104
147
if ( error . message && error . message . includes ( 'ChromaDB collection is not initialized' ) ) {
105
148
return res . status ( 503 ) . json ( { error : 'Service Unavailable: RAG service is not ready.' } ) ;
106
149
}
@@ -113,65 +156,101 @@ export const handleGeminiBatch = async (req: Request, res: Response) => {
113
156
114
157
export const handleGeminiStream = async ( req : Request , res : Response ) => {
115
158
const model = req . params . model ;
116
- let userQuery = '' ; // Define userQuery here to be accessible in the catch block
159
+ let userQueryForRAG = '' ; // For RAG and general query representation
117
160
118
161
try {
119
162
const { contents } = req . body as GeminiQueryRequest ;
120
163
121
164
res . setHeader ( 'Content-Type' , 'text/event-stream' ) ;
122
165
res . setHeader ( 'Cache-Control' , 'no-cache' ) ;
123
166
res . setHeader ( 'Connection' , 'keep-alive' ) ;
124
- // res.flushHeaders(); // Flush headers after initial validation
125
167
126
- // Validate contents structure
168
+ // Initial structural validation (can be enhanced to check all items)
127
169
if (
128
170
! contents ||
129
171
! Array . isArray ( contents ) ||
130
172
contents . length === 0 ||
131
- ! contents [ 0 ] . parts ||
132
- ! Array . isArray ( contents [ 0 ] . parts ) ||
133
- contents [ 0 ] . parts . length === 0 ||
134
- ! contents [ 0 ] . parts [ 0 ] . text ||
135
- typeof contents [ 0 ] . parts [ 0 ] . text !== 'string' ||
136
- contents [ 0 ] . parts [ 0 ] . text . trim ( ) === ''
173
+ ! contents . every (
174
+ ( item ) =>
175
+ item . parts &&
176
+ Array . isArray ( item . parts ) &&
177
+ item . parts . length > 0 &&
178
+ item . parts . every ( ( part ) => part . text && typeof part . text === 'string' )
179
+ )
137
180
) {
138
- // If headers not sent, can send 400
139
181
if ( ! res . headersSent ) {
140
182
return res . status ( 400 ) . json ( {
141
183
error :
142
- 'Bad Request: contents is required and must be an array with at least one part containing a non-empty text string.' ,
184
+ 'Bad Request: contents is required and must be an array of content items, each with at least one part containing a non-empty text string.' ,
143
185
} ) ;
144
186
} else {
145
- // Headers sent, write error to stream
146
187
res . write (
147
188
`data: ${ JSON . stringify ( {
148
189
error :
149
- 'Bad Request: contents is required and must be an array with at least one part containing a non-empty text string.' ,
190
+ 'Bad Request: contents is required and must be an array of content items, each with at least one part containing a non-empty text string.' ,
150
191
} ) } \n\n`
151
192
) ;
152
193
res . end ( ) ;
153
194
return ;
154
195
}
155
196
}
156
- userQuery = contents [ 0 ] . parts [ 0 ] . text ; // Assign userQuery after validation
157
- res . flushHeaders ( ) ; // Send headers now that initial validation passed
197
+
198
+ // Create userQueryForRAG from all parts of all content items
199
+ if ( contents && Array . isArray ( contents ) ) {
200
+ let messagesToConsider = contents ;
201
+ const windowSize = config . ragConversationWindowSize ;
202
+
203
+ if ( windowSize && windowSize > 0 && windowSize < contents . length ) {
204
+ messagesToConsider = contents . slice ( - windowSize ) ; // Get the last N messages
205
+ Logger . info ( `RAG windowing: Using last ${ windowSize } of ${ contents . length } messages for RAG query (stream).` ) ;
206
+ } else if ( windowSize && windowSize > 0 && windowSize >= contents . length ) {
207
+ Logger . info (
208
+ `RAG windowing: Window size ${ windowSize } is >= total messages ${ contents . length } . Using all messages for RAG query (stream).`
209
+ ) ;
210
+ // messagesToConsider remains 'contents'
211
+ } else {
212
+ // windowSize is 0 or not set
213
+ Logger . info (
214
+ `RAG windowing: Window size is 0 or not set. Using all ${ contents . length } messages for RAG query (stream).`
215
+ ) ;
216
+ // messagesToConsider remains 'contents'
217
+ }
218
+
219
+ userQueryForRAG = messagesToConsider
220
+ . flatMap ( ( contentItem ) => contentItem . parts . map ( ( part ) => part . text ) )
221
+ . join ( '\n' ) ;
222
+ }
223
+ // userQueryForErrorLogging removed
224
+
225
+ // Validate that the consolidated query is not empty
226
+ if ( userQueryForRAG . trim ( ) === '' ) {
227
+ if ( ! res . headersSent ) {
228
+ return res . status ( 400 ) . json ( { error : 'Bad Request: Consolidated text from contents is empty.' } ) ;
229
+ } else {
230
+ res . write ( `data: ${ JSON . stringify ( { error : 'Bad Request: Consolidated text from contents is empty.' } ) } \n\n` ) ;
231
+ res . end ( ) ;
232
+ return ;
233
+ }
234
+ }
235
+ res . flushHeaders ( ) ; // Send headers now that validation passed
158
236
159
237
const rag_type = config . geminiRagType ;
160
238
const numberOfResults = config . geminiNResults ;
161
239
162
240
Logger . info (
163
- `INFO: Gemini Stream Request. Model: ${ model } . RAG Type (from config): ${ rag_type } . N Results (from config): ${ numberOfResults } .`
241
+ `INFO: Gemini Stream Request. Model: ${ model } . RAG Type (from config): ${ rag_type } . N Results (from config): ${ numberOfResults } . Consolidated user query for RAG (first 100 chars): " ${ userQueryForRAG . substring ( 0 , 100 ) } ..." `
164
242
) ;
165
243
166
244
const ragService = await initializedRagService ;
167
- const chunks = await ragService . queryChunks ( userQuery , numberOfResults ) ;
245
+ const chunks = await ragService . queryChunks ( userQueryForRAG , numberOfResults ) ;
246
+
247
+ let contentsForLlm = JSON . parse ( JSON . stringify ( contents ) ) as GeminiContent [ ] ; // Deep copy
168
248
169
- let augmentedPrompt : string ;
170
249
if ( ! chunks || chunks . length === 0 ) {
171
- console . warn (
172
- `No relevant chunks found for query: "${ userQuery } " with model ${ model } (stream). Querying LLM directly without RAG context.`
250
+ Logger . warn (
251
+ `No relevant chunks found for query (first 100 chars) : "${ userQueryForRAG . substring ( 0 , 100 ) } ... " with model ${ model } (stream). Querying LLM directly without RAG context.`
173
252
) ;
174
- augmentedPrompt = userQuery ;
253
+ // contentsForLlm remains as is
175
254
} else {
176
255
let contextContent : string [ ] = [ ] ;
177
256
if ( rag_type === 'advanced' ) {
@@ -196,47 +275,52 @@ export const handleGeminiStream = async (req: Request, res: Response) => {
196
275
}
197
276
198
277
if ( contextContent . length === 0 ) {
199
- console . warn (
200
- `Chunks were found for query "${ userQuery } " (RAG Type from config: ${ rag_type } , model: ${ model } , stream), but no relevant content could be extracted. Querying LLM directly.`
278
+ Logger . warn (
279
+ `Chunks were found for query (first 100 chars): "${ userQueryForRAG . substring ( 0 , 100 ) } ... " (RAG Type from config: ${ rag_type } , model: ${ model } , stream), but no relevant content could be extracted. Querying LLM directly.`
201
280
) ;
202
- augmentedPrompt = userQuery ;
281
+ // contentsForLlm remains as is
203
282
} else {
204
283
const context = contextContent . join ( '\n---\n' ) ;
205
284
const contextDescription =
206
285
rag_type === 'advanced' ? 'Relevant Information from Parent Documents' : 'Relevant Text Chunks' ;
207
- augmentedPrompt = `User Query: ${ userQuery } \n\n${ contextDescription } :\n---\n${ context } \n---\nBased on the relevant information above, answer the user query.` ;
286
+ const ragAugmentationPrefix = `Based on the relevant information below, answer the user query.\n${ contextDescription } :\n---\n${ context } \n---\nConsidering the above context and the conversation history, here is the latest user message: ` ;
287
+
288
+ const lastContentItem = contentsForLlm [ contentsForLlm . length - 1 ] ;
289
+ if ( lastContentItem && lastContentItem . parts && lastContentItem . parts . length > 0 ) {
290
+ lastContentItem . parts [ 0 ] . text = ragAugmentationPrefix + lastContentItem . parts [ 0 ] . text ;
291
+ } else {
292
+ Logger . warn (
293
+ 'Last content item for RAG augmentation is malformed or missing parts (stream). RAG context might not be prepended as expected.'
294
+ ) ;
295
+ }
208
296
}
209
297
}
210
298
211
- // Logger.info(`INFO: Gemini Stream Request. Model: ${model}. RAG Type: ${rag_type}.`); // Already logged above
212
-
213
299
try {
214
300
await generateText ( {
215
301
model : model ,
216
- // The onChunk callback now receives a raw SSE line string from llmWrapper.ts
217
302
onChunk : ( rawSseLine : string ) => {
218
- // Pass the raw SSE line, followed by a single newline, as per SSE spec.
219
303
res . write ( `${ rawSseLine } \n` ) ;
220
304
} ,
221
- query : augmentedPrompt ,
305
+ contents : contentsForLlm , // Pass the (potentially RAG-augmented) GeminiContent[]
222
306
stream : true ,
223
307
} ) ;
224
308
res . end ( ) ;
225
309
} catch ( llmError : any ) {
226
310
Logger . error ( `Error calling llmWrapper for Gemini stream (model: ${ model } ):` , llmError ) ;
227
311
if ( ! res . writableEnded ) {
228
- // Check if stream is still open
229
312
res . write (
230
313
`data: ${ JSON . stringify ( { details : llmError . message , error : `Failed to get response from LLM provider Gemini.` } ) } \n\n`
231
314
) ;
232
315
res . end ( ) ;
233
316
}
234
317
}
235
318
} catch ( error : any ) {
236
- Logger . error ( `Error in handleGeminiStream for model ${ model } , query "${ userQuery } ":` , error ) ; // Use userQuery for logging
319
+ Logger . error (
320
+ `Error in handleGeminiStream for model ${ model } , consolidated query (first 100 chars): "${ userQueryForRAG . substring ( 0 , 100 ) } ":` ,
321
+ error
322
+ ) ;
237
323
if ( ! res . headersSent ) {
238
- // This case should ideally not be reached if query validation is first.
239
- // However, for other early errors (like RAG service init), this is a fallback.
240
324
if ( error . message && error . message . includes ( 'ChromaDB collection is not initialized' ) ) {
241
325
res . status ( 503 ) . json ( { error : 'Service Unavailable: RAG service is not ready.' } ) ;
242
326
return ;
@@ -247,7 +331,6 @@ export const handleGeminiStream = async (req: Request, res: Response) => {
247
331
}
248
332
res . status ( 500 ) . json ( { details : error . message , error : 'Internal Server Error' } ) ;
249
333
} else if ( ! res . writableEnded ) {
250
- // Headers sent, stream is open, write error to stream
251
334
let errorMessage = 'Internal Server Error' ;
252
335
if ( error . message && error . message . includes ( 'ChromaDB collection is not initialized' ) ) {
253
336
errorMessage = 'Service Unavailable: RAG service is not ready.' ;
@@ -257,6 +340,5 @@ export const handleGeminiStream = async (req: Request, res: Response) => {
257
340
res . write ( `data: ${ JSON . stringify ( { details : error . message , error : errorMessage } ) } \n\n` ) ;
258
341
res . end ( ) ;
259
342
}
260
- // If res.writableEnded is true, can't do anything more.
261
343
}
262
344
} ;
0 commit comments