Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/dbgpt-app/src/dbgpt_app/operators/rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def __init__(
space_id=space.id,
top_k=self._top_k,
rerank=reranker,
system_app=self.system_app,
)

async def map(self, query: str) -> HOContextBody:
Expand Down
3 changes: 3 additions & 0 deletions packages/dbgpt-core/src/dbgpt/core/awel/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def _get_type_cls(type_name: str) -> Type[Any]:

if type_name in _TYPE_REGISTRY:
return _TYPE_REGISTRY[type_name]
# Not registered, try to get the new class name from the compat config.
new_cls = get_new_class_name(type_name)
if new_cls and new_cls in _TYPE_REGISTRY:
return _TYPE_REGISTRY[new_cls]
Expand Down Expand Up @@ -1521,4 +1522,6 @@ def _register_resource(
alias_ids: Optional[List[str]] = None,
):
"""Register the operator."""
# Register the type
_ = _get_type_name(cls)
_OPERATOR_REGISTRY.register_flow(cls, resource_metadata, alias_ids)
3 changes: 3 additions & 0 deletions packages/dbgpt-core/src/dbgpt/core/awel/flow/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def _register(
def _register_flow_compat(
curr_version: str, last_support_version: str, metadata: FlowCompatMetadata
):
# We use type_name as the key
# For example, dbgpt.core.DefaultLLMOperator may be refactor to
# dbgpt_ext.DefaultLLMOperator, so we use DefaultLLMOperator as the key
_TYPE_NAME_TO_COMPAT_METADATA[metadata.type_name].append(
_FlowCompat(curr_version, last_support_version, metadata)
)
Expand Down
65 changes: 57 additions & 8 deletions packages/dbgpt-core/src/dbgpt/core/awel/flow/flow_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid
from contextlib import suppress
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Type, Union, cast
from typing import Any, Callable, Dict, List, Literal, Optional, Type, Union, cast

from typing_extensions import Annotated

Expand Down Expand Up @@ -1013,29 +1013,43 @@ def _build_mapper_operators(dag: DAG, mappers: List[str]) -> List[DAGNode]:
return tasks


def fill_flow_panel(flow_panel: FlowPanel):
def fill_flow_panel(
flow_panel: FlowPanel,
metadata_func: Callable[
[Union[ViewMetadata, ResourceMetadata]], Union[ViewMetadata, ResourceMetadata]
] = None,
ignore_options_error: bool = False,
update_id: bool = False,
):
"""Fill the flow panel with the latest metadata.

Args:
flow_panel (FlowPanel): The flow panel to fill.
"""
if not flow_panel.flow_data:
return
id_mapping = {}
for node in flow_panel.flow_data.nodes:
try:
parameters_map = {}
if node.data.is_operator:
data = cast(ViewMetadata, node.data)
key = data.get_operator_key()
operator_cls: Type[DAGNode] = _get_operator_class(key)
metadata = operator_cls.metadata
metadata = None
if metadata_func:
metadata = metadata_func(data)
if not metadata:
key = data.get_operator_key()
operator_cls: Type[DAGNode] = _get_operator_class(key)
metadata = operator_cls.metadata
if not metadata:
raise ValueError("Metadata is not set.")
input_parameters = {p.name: p for p in metadata.inputs}
output_parameters = {p.name: p for p in metadata.outputs}
for i in node.data.inputs:
if i.name in input_parameters:
new_param = input_parameters[i.name]
i.type_name = new_param.type_name
i.type_cls = new_param.type_cls
i.label = new_param.label
i.description = new_param.description
i.dynamic = new_param.dynamic
Expand All @@ -1045,6 +1059,8 @@ def fill_flow_panel(flow_panel: FlowPanel):
for i in node.data.outputs:
if i.name in output_parameters:
new_param = output_parameters[i.name]
i.type_name = new_param.type_name
i.type_cls = new_param.type_cls
i.label = new_param.label
i.description = new_param.description
i.dynamic = new_param.dynamic
Expand All @@ -1053,13 +1069,27 @@ def fill_flow_panel(flow_panel: FlowPanel):
i.mappers = new_param.mappers
else:
data = cast(ResourceMetadata, node.data)
key = data.get_origin_id()
metadata = _get_resource_class(key).metadata
metadata = None
if metadata_func:
metadata = metadata_func(data)
if not metadata:
key = data.get_origin_id()
metadata = _get_resource_class(key).metadata

for param in metadata.parameters:
parameters_map[param.name] = param

# Update the latest metadata.
if node.data.type_cls != metadata.type_cls:
old_type_cls = node.data.type_cls
node.data.type_cls = metadata.type_cls
node.data.type_name = metadata.type_name
if not node.data.is_operator and update_id:
# Update key
old_id = data.id
new_id = old_id.replace(old_type_cls, metadata.type_cls)
data.id = new_id
id_mapping[old_id] = new_id
node.data.label = metadata.label
node.data.description = metadata.description
node.data.category = metadata.category
Expand All @@ -1072,11 +1102,30 @@ def fill_flow_panel(flow_panel: FlowPanel):
new_param = parameters_map[param.name]
param.label = new_param.label
param.description = new_param.description
param.options = new_param.get_dict_options() # type: ignore
try:
param.options = new_param.get_dict_options() # type: ignore
except Exception as e:
if ignore_options_error:
logger.warning(
f"Unable to fill the options for the parameter: {e}"
)
else:
raise
param.type_cls = new_param.type_cls
param.default = new_param.default
param.placeholder = new_param.placeholder
param.alias = new_param.alias
param.ui = new_param.ui

except (FlowException, ValueError) as e:
logger.warning(f"Unable to fill the flow panel: {e}")

if not update_id:
return

for edge in flow_panel.flow_data.edges:
for old_id, new_id in id_mapping.items():
edge.source.replace(old_id, new_id)
edge.target.replace(old_id, new_id)
edge.source_handle.replace(old_id, new_id)
edge.target_handle.replace(old_id, new_id)
122 changes: 116 additions & 6 deletions packages/dbgpt-core/src/dbgpt/util/cli/flow_compat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import copy
import json
import os
from typing import Optional
from typing import List, Optional, Tuple, Union

import click

Expand Down Expand Up @@ -60,14 +62,24 @@ def tool_flow_cli_group():
"by one minor version."
),
)
@click.option(
"--update-template",
is_flag=True,
default=False,
required=False,
help=_("Update the template file."),
)
def gen_compat(
module: Optional[str],
output: Optional[str],
curr_version: Optional[str],
last_support_version: Optional[str],
update_template: bool = False,
):
"""Generate the compatibility flow mapping file."""
from ._module import _scan_awel_flow
from dbgpt.util.cli._module import _scan_awel_flow

lang = os.getenv("LANGUAGE")

modules = []
if module:
Expand Down Expand Up @@ -96,10 +108,108 @@ def gen_compat(
os.makedirs(output, exist_ok=True)
output_file = os.path.join(output, curr_version + "_compat_flow.json")
user_input = clog.ask(f"Output to {output_file}, do you want to continue?(y/n)")
save_compat = True
if not user_input or user_input.lower() != "y":
clog.info("Cancelled")
return
with open(output_file, "w") as f:
import json
save_compat = False
if save_compat:
with open(output_file, "w") as f:
import json

json.dump(output_dicts, f, ensure_ascii=False, indent=4)

if update_template:
_update_template(output_dicts, lang=lang)


def _update_template(
compat_data: dict, template_file: Optional[str] = None, lang: str = "en"
):
from dbgpt.core.awel.dag.base import DAGNode
from dbgpt.core.awel.flow.base import (
_TYPE_REGISTRY,
)
from dbgpt.core.awel.flow.compat import (
FlowCompatMetadata,
_register_flow_compat,
get_new_class_name,
)
from dbgpt.core.awel.flow.flow_factory import (
ResourceMetadata,
ViewMetadata,
fill_flow_panel,
)
from dbgpt_serve.flow.api.schemas import ServerResponse
from dbgpt_serve.flow.service.service import (
_get_flow_templates_from_files,
_parse_flow_template_from_json,
)

json.dump(output_dicts, f, ensure_ascii=False, indent=4)
last_support_version = compat_data["last_support_version"]
curr_version = compat_data["curr_version"]

for data in compat_data["compat"]:
metadata = FlowCompatMetadata(**data)
_register_flow_compat(curr_version, last_support_version, metadata)

templates: List[Tuple[str, ServerResponse]] = []
if template_file:
with open(template_file, "r") as f:
data = json.load(f)
templates.append((template_file, _parse_flow_template_from_json(data)))
else:
templates.extend(_get_flow_templates_from_files(lang))

new_templates: List[Tuple[str, ServerResponse]] = []

def metadata_func(old_metadata: Union[ViewMetadata, ResourceMetadata]):
type_cls = old_metadata.type_cls
if type_cls in _TYPE_REGISTRY:
return None
new_type_cls = None
try:
new_type_cls = get_new_class_name(type_cls)
if not new_type_cls or new_type_cls not in _TYPE_REGISTRY:
return None
obj_type = _TYPE_REGISTRY[new_type_cls]
if isinstance(old_metadata, ViewMetadata):
if not isinstance(obj_type, DAGNode):
return None
obj_type.metadata
elif isinstance(old_metadata, ResourceMetadata):
metadata_attr = f"_resource_metadata_{obj_type.__name__}"
return getattr(obj_type, metadata_attr)
else:
raise ValueError(f"Unknown metadata type: {type(old_metadata)}")
except Exception as e:
clog.warning(
f"Error get metadata for {type_cls}: {str(e)}, new_type_cls: "
f"{new_type_cls}"
)
return None

for template_file, template in templates:
new_flow_template = copy.deepcopy(template)
try:
fill_flow_panel(
new_flow_template,
metadata_func,
ignore_options_error=True,
update_id=True,
)
new_templates.append((template_file, new_flow_template))
except Exception as e:
import traceback

traceback.print_exc()
clog.warning(f"Error fill flow panel for {template_file}: {str(e)}")

user_input = clog.ask("Do you want to update the template file?(y/n)")
if not user_input or user_input.lower() != "y":
clog.info("Cancelled")
return
for template_file, flow in new_templates:
template_dict = {"flow": flow.model_dump()}
dag_json = json.dumps(template_dict, indent=4, ensure_ascii=False)
with open(template_file, "w") as f:
f.write(dag_json)
2 changes: 1 addition & 1 deletion packages/dbgpt-core/src/dbgpt/vis/tags/vis_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def render_prompt(self) -> Optional[str]:
"""Return the prompt for the vis protocol."""
return default_chart_type_prompt()

async def generate_param(self, **kwargs) -> Optional[Dict[str, Any]]:
def sync_generate_param(self, **kwargs) -> Optional[Dict[str, Any]]:
"""Generate the parameters required by the vis protocol."""
chart = kwargs.get("chart", None)
data_df = kwargs.get("data_df", None)
Expand Down
2 changes: 1 addition & 1 deletion packages/dbgpt-core/src/dbgpt/vis/tags/vis_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class VisDashboard(Vis):
"""Dashboard Vis Protocol."""

async def generate_param(self, **kwargs) -> Optional[Dict[str, Any]]:
def sync_generate_param(self, **kwargs) -> Optional[Dict[str, Any]]:
"""Generate the parameters required by the vis protocol."""
charts: Optional[dict] = kwargs.get("charts", None)
title: Optional[str] = kwargs.get("title", None)
Expand Down
Loading