Skip to content

Commit 26482b8

Browse files
committed
feat: workflow init
1 parent ecc2579 commit 26482b8

23 files changed

+434
-0
lines changed

apps/application/__init__.py

Whitespace-only changes.

apps/application/admin.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from django.contrib import admin
2+
3+
# Register your models here.

apps/application/apps.py

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from django.apps import AppConfig
2+
3+
4+
class ApplicationConfig(AppConfig):
5+
default_auto_field = 'django.db.models.BigAutoField'
6+
name = 'application'

apps/application/migrations/__init__.py

Whitespace-only changes.

apps/application/models/__init__.py

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: __init__.py
6+
@date:2025/5/7 15:14
7+
@desc:
8+
"""
+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: application.py
6+
@date:2025/5/7 15:29
7+
@desc:
8+
"""

apps/application/tests.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from django.test import TestCase
2+
3+
# Create your tests here.

apps/application/views/__init__.py

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: __init__.py
6+
@date:2025/5/9 18:51
7+
@desc:
8+
"""

apps/common/utils/common.py

+1
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ def parse_md_image(content: str):
265265
image_list = [match.group() for match in matches]
266266
return image_list
267267

268+
268269
def bulk_create_in_batches(model, data, batch_size=1000):
269270
if len(data) == 0:
270271
return

apps/workflow/__init__.py

Whitespace-only changes.

apps/workflow/admin.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from django.contrib import admin
2+
3+
# Register your models here.

apps/workflow/apps.py

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from django.apps import AppConfig
2+
3+
4+
class WorkflowConfig(AppConfig):
5+
default_auto_field = 'django.db.models.BigAutoField'
6+
name = 'workflow'

apps/workflow/migrations/__init__.py

Whitespace-only changes.

apps/workflow/models/__init__.py

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: __init__.py
6+
@date:2025/5/7 15:43
7+
@desc:
8+
"""

apps/workflow/models/workflow.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: workflow.py
6+
@date:2025/5/7 15:44
7+
@desc:
8+
"""
9+
from django.db import models
10+
import uuid_utils.compat as uuid
11+
12+
13+
class WorkflowType(models.TextChoices):
14+
# 应用
15+
APPLICATION = "APPLICATION"
16+
# 知识库
17+
KNOWLEDGE = "KNOWLEDGE"
18+
# ....
19+
20+
21+
class Workflow(models.Model):
22+
id = models.UUIDField(primary_key=True, max_length=128, default=uuid.uuid7, editable=False, verbose_name="主键id")
23+
workflow = models.JSONField(verbose_name="工作流数据", default=dict)
24+
type = models.CharField(verbose_name="工作流类型", choices=WorkflowType.choices, default=WorkflowType.APPLICATION)
25+
create_time = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
26+
update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True)
27+
28+
class Meta:
29+
db_table = "workflow"
30+
ordering = ['update_time']

apps/workflow/tests.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from django.test import TestCase
2+
3+
# Create your tests here.

apps/workflow/views/__init__.py

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: __init__.py.py
6+
@date:2025/5/7 15:43
7+
@desc:
8+
"""

apps/workflow/workflow/__init__.py

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: __init__.py
6+
@date:2025/5/7 16:15
7+
@desc:
8+
"""

apps/workflow/workflow/common.py

+214
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: workflow.py
6+
@date:2025/5/9 10:58
7+
@desc:
8+
"""
9+
from typing import List, Dict
10+
from queue import Queue, Empty
11+
12+
from common.utils.common import group_by
13+
14+
15+
class Content:
16+
def __init__(self, content: str, reasoning_content: str, **kwargs):
17+
"""
18+
内容
19+
@param content: ai响应内容
20+
@param reasoning_content:思考过程
21+
@param kwargs: 其他参数
22+
"""
23+
self.content = content
24+
self.reasoning_content = reasoning_content
25+
for key in kwargs:
26+
self.__setattr__(key, kwargs.get(key))
27+
28+
29+
class Chunk:
30+
31+
def __init__(self, runtime_id: str, node_id: str, node_name: str, content: Content, node_data, children, loop_index,
32+
**kwargs):
33+
"""
34+
35+
@param runtime_id: 运行时id
36+
@param node_id: 节点id
37+
@param node_name: 节点名称
38+
@param loop_index: 循环下标
39+
@param children: 子块
40+
@param node_data 节点数据
41+
@param content: 内容
42+
"""
43+
self.runtime_id = runtime_id
44+
self.node_id = node_id
45+
self.node_name = node_name
46+
self.loop_index = loop_index
47+
self.children = children
48+
self.content = content
49+
self.node_data = node_data
50+
for key in kwargs:
51+
self.__setattr__(key, kwargs.get(key))
52+
53+
54+
class Channel:
55+
"""
56+
对话管道
57+
"""
58+
messages = Queue()
59+
is_end = False
60+
61+
def write(self, message):
62+
if isinstance(message, Channel) | isinstance(message, Chunk):
63+
if self.is_end:
64+
raise "通道已关闭"
65+
self.messages.put(message)
66+
else:
67+
raise "不支持的管道参数"
68+
69+
def end(self):
70+
self.is_end = True
71+
return self.messages.put(None)
72+
73+
def pop(self):
74+
if self.is_end:
75+
return self.messages.get_nowait()
76+
return self.messages.get()
77+
78+
def generator(self):
79+
while True:
80+
try:
81+
message = self.pop()
82+
if message:
83+
if isinstance(message, Channel):
84+
for chunk in message.generator():
85+
yield chunk
86+
else:
87+
yield message
88+
except Empty:
89+
return
90+
91+
92+
class Node:
93+
94+
def __init__(self, _id: str, _type: str, x: int, y: int, properties: dict, **kwargs):
95+
"""
96+
97+
@param _id: 节点id
98+
@param _type: 类型
99+
@param x: 节点x轴位置
100+
@param y: 节点y轴位置
101+
@param properties:
102+
@param kwargs:
103+
"""
104+
self.id = _id
105+
self.type = _type
106+
self.x = x
107+
self.y = y
108+
self.properties = properties
109+
for keyword in kwargs:
110+
self.__setattr__(keyword, kwargs.get(keyword))
111+
112+
113+
class Edge:
114+
def __init__(self, _id: str, _type: str, sourceNodeId: str, targetNodeId: str, **keywords):
115+
"""
116+
线
117+
@param _id: 线id
118+
@param _type: 线类型
119+
@param sourceNodeId:
120+
@param targetNodeId:
121+
@param keywords:
122+
"""
123+
self.id = _id
124+
self.type = _type
125+
self.sourceNodeId = sourceNodeId
126+
self.targetNodeId = targetNodeId
127+
for keyword in keywords:
128+
self.__setattr__(keyword, keywords.get(keyword))
129+
130+
131+
class EdgeNode:
132+
edge: Edge
133+
node: Node
134+
135+
def __init__(self, edge, node):
136+
self.edge = edge
137+
self.node = node
138+
139+
140+
class Workflow:
141+
"""
142+
节点列表
143+
"""
144+
nodes: List[Node]
145+
"""
146+
线列表
147+
"""
148+
edges: List[Edge]
149+
"""
150+
节点id:node
151+
"""
152+
node_map: Dict[str, Node]
153+
"""
154+
节点id:当前节点id上面的所有节点
155+
"""
156+
up_node_map: Dict[str, List[EdgeNode]]
157+
"""
158+
节点id:当前节点id下面的所有节点
159+
"""
160+
next_node_map: Dict[str, List[EdgeNode]]
161+
162+
def __init__(self, nodes: List[Node], edges: List[Edge]):
163+
self.nodes = nodes
164+
self.edges = edges
165+
self.node_map = {node.id: node for node in nodes}
166+
167+
self.up_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.sourceNodeId)) for
168+
edge in edges] for
169+
key, edges in
170+
group_by(edges, key=lambda edge: edge.targetNodeId).items()}
171+
172+
self.next_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.targetNodeId)) for edge in edges] for
173+
key, edges in
174+
group_by(edges, key=lambda edge: edge.sourceNodeId).items()}
175+
176+
def get_node(self, node_id):
177+
"""
178+
根据node_id 获取节点信息
179+
@param node_id: node_id
180+
@return: 节点信息
181+
"""
182+
return self.node_map.get(node_id)
183+
184+
def get_up_edge_nodes(self, node_id) -> List[EdgeNode]:
185+
"""
186+
根据节点id 获取当前连接前置节点和连线
187+
@param node_id: 节点id
188+
@return: 节点连线列表
189+
"""
190+
return self.up_node_map.get(node_id)
191+
192+
def get_next_edge_nodes(self, node_id) -> List[EdgeNode]:
193+
"""
194+
根据节点id 获取当前连接目标节点和连线
195+
@param node_id: 节点id
196+
@return: 节点连线列表
197+
"""
198+
return self.next_node_map.get(node_id)
199+
200+
def get_up_nodes(self, node_id) -> List[Node]:
201+
"""
202+
根据节点id 获取当前连接前置节点
203+
@param node_id: 节点id
204+
@return: 节点列表
205+
"""
206+
return [en.node for en in self.up_node_map.get(node_id)]
207+
208+
def get_next_nodes(self, node_id) -> List[Node]:
209+
"""
210+
根据节点id 获取当前连接目标节点
211+
@param node_id: 节点id
212+
@return: 节点列表
213+
"""
214+
return [en.node for en in self.next_node_map.get(node_id, [])]

apps/workflow/workflow/i_node.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: i_node.py
6+
@date:2025/5/7 16:41
7+
@desc:
8+
"""
9+
import time
10+
from abc import abstractmethod
11+
12+
from common.utils.common import get_sha256_hash
13+
from workflow.workflow.common import Channel, Chunk
14+
15+
16+
class INode:
17+
# 当前节点支持的工作流类型
18+
supported_workflow_type_list = []
19+
# 节点类型
20+
type = None
21+
# 节点管道
22+
channel = Channel()
23+
24+
def __init__(self, node, workflow_manage, chunk: Chunk = None, up_node_id_list=None, loop_index=None):
25+
self.node = node
26+
self.chunk = chunk
27+
if chunk is not None:
28+
self.context = chunk.node_data | {}
29+
else:
30+
self.context = {}
31+
# 运行时id
32+
self.runtime_node_id = get_sha256_hash("".join(up_node_id_list | []) + node.id + str(loop_index | ""))
33+
self.workflow_manage = workflow_manage
34+
self.node_serializer = self.get_node_serializer()(data=node.properties.get('node_data'))
35+
self.is_valid()
36+
37+
def is_valid(self):
38+
self.node_serializer.is_valid(raise_exception=True)
39+
40+
def execute(self, **kwargs):
41+
pass
42+
43+
def run(self):
44+
start_time = time.time()
45+
self.context['start_time'] = start_time
46+
self._run()
47+
self.context['run_time'] = time.time() - start_time
48+
49+
def _run(self):
50+
return self.execute(**self.node_serializer.data)
51+
52+
@abstractmethod
53+
def get_node_serializer(self):
54+
pass

0 commit comments

Comments
 (0)