Replies: 1 comment
-
To resolve the issue where batch processing inside the Here is how the def batch(
self,
inputs: List[LanguageModelInput],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Any]:
config = config or None
# If <= 1 config use the underlying models batch implementation.
if config is None or isinstance(config, dict) or len(config) <= 1:
if isinstance(config, list):
config = config[0]
return self._model(config).batch(
inputs, config=config, return_exceptions=return_exceptions, **kwargs
)
# If multiple configs default to Runnable.batch which uses executor to invoke
# in parallel.
else:
return super().batch(
inputs, config=config, return_exceptions=return_exceptions, **kwargs
)
async def abatch(
self,
inputs: List[LanguageModelInput],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
) -> List[Any]:
config = config or None
# If <= 1 config use the underlying models batch implementation.
if config is None or isinstance(config, dict) or len(config) <= 1:
if isinstance(config, list):
config = config[0]
return await self._model(config).abatch(
inputs, config=config, return_exceptions=return_exceptions, **kwargs
)
# If multiple configs default to Runnable.batch which uses executor to invoke
# in parallel.
else:
return await super().abatch(
inputs, config=config, return_exceptions=return_exceptions, **kwargs
) Key Points to Check:
Additionally, the def _batch_with_config(
self,
func: Union[
Callable[[List[Input]], List[Union[Exception, Output]]],
Callable[
[List[Input], List[CallbackManagerForChainRun]],
List[Union[Exception, Output]],
],
Callable[
[List[Input], List[CallbackManagerForChainRun], List[RunnableConfig]],
List[Union[Exception, Output]],
],
],
input: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
run_type: Optional[str] = None,
**kwargs: Optional[Any],
) -> List[Output]:
"""Helper method to transform an Input value to an Output value,
with callbacks. Use this method to implement invoke() in subclasses."""
if not input:
return []
configs = get_config_list(config, len(input))
callback_managers = [get_callback_manager_for_config(c) for c in configs]
run_managers = [
callback_manager.on_chain_start(
dumpd(self),
input,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
for callback_manager, input, config in zip(
callback_managers, input, configs
)
]
try:
if accepts_config(func):
kwargs["config"] = [
patch_config(c, callbacks=rm.get_child())
for c, rm in zip(configs, run_managers)
]
if accepts_run_manager(func):
kwargs["run_manager"] = run_managers
output = func(input, **kwargs) # type: ignore[call-arg]
except BaseException as e:
for run_manager in run_managers:
run_manager.on_chain_error(e)
if return_exceptions:
return cast(List[Output], [e for _ in input])
else:
raise
else:
first_exception: Optional[Exception] = None
for run_manager, out in zip(run_managers, output):
if isinstance(out, Exception):
first_exception = first_exception or out
run_manager.on_chain_error(out)
else:
run_manager.on_chain_end(out)
if return_exceptions or first_exception is None:
return cast(List[Output], output)
else:
raise first_exception By ensuring that the |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Checked other resources
Commit to Help
Example Code
Description
I am building a SubQuery system using Langgraph. The system takes a user's query, generates multiple sub-queries, answers the sub-queries in parallel using batch processing, and then combines all the sub-answers. Inside the class, I use the batch method for both a retriever_chain and an llm_chain to process the sub-queries in parallel.
However, when I try to process a batch of queries outside the class using the batch method of the compiled workflow (e.g., app.batch(queries)), it seems that the batch processing inside the class and outside the class are not working together correctly. The results are not as expected. How can I resolve this issue so that batch processing works seamlessly both inside and outside the class?
System Info
langchain==0.1.20
langchain-community==0.0.38
langchain-core==0.1.52
langchain-mistralai==0.1.7
langchain-openai==0.1.5
langchain-text-splitters==0.0.2
langchainhub==0.1.15
Beta Was this translation helpful? Give feedback.
All reactions