1
- """
2
- Generate answer_node
3
- """
4
1
import re
5
2
import json
6
3
from typing import List , Optional
7
4
import requests
5
+ import asyncio
8
6
from tqdm import tqdm
9
7
from langchain .prompts import PromptTemplate
10
8
from langchain_core .output_parsers import JsonOutputParser
@@ -25,16 +23,6 @@ class GenerateAnswerNode(BaseNode):
25
23
and the content extracted from a webpage. It constructs a prompt from the user's input
26
24
and the scraped content, feeds it to the LLM, and parses the LLM's response to produce
27
25
an answer.
28
-
29
- Attributes:
30
- llm_model: An instance of a language model client, configured for generating answers.
31
- verbose (bool): A flag indicating whether to show print statements during execution.
32
-
33
- Args:
34
- input (str): Boolean expression defining the input keys needed from the state.
35
- output (List[str]): List of output keys to be updated in the state.
36
- node_config (dict): Additional configuration for the node.
37
- node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
38
26
"""
39
27
40
28
def __init__ (
@@ -57,7 +45,33 @@ def __init__(
57
45
self .additional_info = node_config .get ("additional_info" , "" )
58
46
self .api_key = node_config .get ("config" , {}).get ("llm" , {}).get ("api_key" , "" )
59
47
48
+ async def _process_chunks_async (self , chunks , templates , user_prompt , format_instructions ):
49
+ async def send_request (prompt ):
50
+ url = "https://api.openai.com/v1/chat/completions"
51
+ headers = {
52
+ "Content-Type" : "application/json" ,
53
+ "Authorization" : f"Bearer { self .api_key } "
54
+ }
55
+ response = await requests .post (url , headers = headers , json = {
56
+ "model" : self .llm_model .model_name ,
57
+ "messages" : [{"role" : "user" , "content" : prompt }],
58
+ "temperature" : 0
59
+ }, timeout = 10 )
60
+ response_text = response .json ()['choices' ][0 ]['message' ]['content' ]
61
+ return parse_response_to_dict (response_text )
60
62
63
+ tasks = []
64
+ for i , chunk in enumerate (chunks ):
65
+ prompt = templates ['chunks' ].format (
66
+ question = user_prompt ,
67
+ context = chunk ,
68
+ chunk_id = i + 1 ,
69
+ format_instructions = format_instructions
70
+ )
71
+ tasks .append (send_request (prompt ))
72
+
73
+ results = await asyncio .gather (* tasks )
74
+ return results
61
75
62
76
def execute (self , state : dict ) -> dict :
63
77
self .logger .info (f"--- Executing { self .node_name } Node ---" )
@@ -76,63 +90,60 @@ def execute(self, state: dict) -> dict:
76
90
'merge' : TEMPLATE_MERGE_MD
77
91
}
78
92
79
- url = "https://api.openai.com/v1/chat/completions"
80
- headers = {
81
- "Content-Type" : "application/json" ,
82
- "Authorization" : f"Bearer { self .api_key } "
83
- }
84
-
85
93
if len (doc ) == 1 :
86
94
prompt = templates ['no_chunks' ].format (
87
95
question = user_prompt ,
88
96
context = doc [0 ],
89
97
format_instructions = format_instructions
90
98
)
91
- response = requests .post (url , headers = headers , json = {
92
- "model" : self .llm_model .model_name ,
93
- "messages" : [{"role" : "user" , "content" : prompt }],
94
- "temperature" : 0
95
- }, timeout = 10 )
99
+ response = requests .post (
100
+ url = "https://api.openai.com/v1/chat/completions" ,
101
+ headers = {
102
+ "Content-Type" : "application/json" ,
103
+ "Authorization" : f"Bearer { self .api_key } "
104
+ },
105
+ json = {
106
+ "model" : self .llm_model .model_name ,
107
+ "messages" : [{"role" : "user" , "content" : prompt }],
108
+ "temperature" : 0
109
+ },
110
+ timeout = 10
111
+ )
96
112
97
113
response_text = response .json ()['choices' ][0 ]['message' ]['content' ]
98
114
cleaned_response = parse_response_to_dict (response_text )
99
115
state .update ({self .output [0 ]: cleaned_response })
100
116
return state
101
117
102
- chunks_responses = []
103
- for i , chunk in enumerate (
104
- tqdm (doc , desc = "Processing chunks" ,
105
- disable = not self .verbose )):
106
- prompt = templates ['chunks' ].format (
118
+ else :
119
+ chunks_responses = asyncio .run (
120
+ self ._process_chunks_async (doc , templates , user_prompt , format_instructions )
121
+ )
122
+
123
+ merge_context = " " .join ([json .dumps (chunk ) for chunk in chunks_responses ])
124
+ merge_prompt = templates ['merge' ].format (
107
125
question = user_prompt ,
108
- context = chunk ,
109
- chunk_id = i + 1 ,
126
+ context = merge_context ,
110
127
format_instructions = format_instructions
111
128
)
112
- response = requests .post (url , headers = headers , json = {
113
- "model" : self .llm_model .model_name ,
114
- "messages" : [{"role" : "user" , "content" : prompt }],
115
- "temperature" : 0
116
- }, timeout = 10 )
117
- chunk_response = response .json ()['choices' ][0 ]['message' ]['content' ]
118
- cleaned_chunk_response = parse_response_to_dict (chunk_response )
119
- chunks_responses .append (cleaned_chunk_response )
120
-
121
- merge_context = " " .join ([json .dumps (chunk ) for chunk in chunks_responses ])
122
- merge_prompt = templates ['merge' ].format (
123
- question = user_prompt ,
124
- context = merge_context ,
125
- format_instructions = format_instructions
126
- )
127
- response = requests .post (url , headers = headers , json = {
128
- "model" : self .llm_model .model_name ,
129
- "messages" : [{"role" : "user" , "content" : merge_prompt }],
130
- "temperature" : 0
131
- }, timeout = 10 )
132
- response_text = response .json ()['choices' ][0 ]['message' ]['content' ]
133
- cleaned_response = parse_response_to_dict (response_text )
134
- state .update ({self .output [0 ]: cleaned_response })
135
- return state
129
+ response = requests .post (
130
+ url = "https://api.openai.com/v1/chat/completions" ,
131
+ headers = {
132
+ "Content-Type" : "application/json" ,
133
+ "Authorization" : f"Bearer { self .api_key } "
134
+ },
135
+ json = {
136
+ "model" : self .llm_model .model_name ,
137
+ "messages" : [{"role" : "user" , "content" : merge_prompt }],
138
+ "temperature" : 0
139
+ },
140
+ timeout = 10
141
+ )
142
+
143
+ response_text = response .json ()['choices' ][0 ]['message' ]['content' ]
144
+ cleaned_response = parse_response_to_dict (response_text )
145
+ state .update ({self .output [0 ]: cleaned_response })
146
+ return state
136
147
137
148
else :
138
149
templates = {
@@ -142,13 +153,15 @@ def execute(self, state: dict) -> dict:
142
153
}
143
154
144
155
if self .additional_info :
145
- templates = {key : self .additional_info + template for key , template in templates .items ()}
156
+ templates = {key : self .additional_info +
157
+ template for key , template in templates .items ()}
146
158
147
159
if len (doc ) == 1 :
148
160
prompt = PromptTemplate (
149
161
template = templates ['no_chunks' ],
150
162
input_variables = ["question" ],
151
- partial_variables = {"context" : doc , "format_instructions" : format_instructions }
163
+ partial_variables = {"context" : doc [0 ],
164
+ "format_instructions" : format_instructions }
152
165
)
153
166
chain = prompt | self .llm_model | output_parser
154
167
answer = chain .invoke ({"question" : user_prompt })
0 commit comments