Skip to content

Commit 46eb504

Browse files
authored
feat[0.3.0]: add plugin runner functionality (#144)
* feat: add integration configuration and plugin runner functionality - Introduced `IntegrationConfig` class for managing plugin testing configurations, including validation for the `dify_cli_path`. - Added `PluginInvokeRequest` and `PluginGenericResponse` models to handle plugin invocation requests and responses. - Implemented `PluginRunner` class to facilitate running plugins locally, including asynchronous message handling and process management. - Merged access actions into a single `PluginAccessAction` for streamlined access control. * apply ruff * feat: enhance plugin runner with new response handling and exception management - Added `PLUGIN_INVOKE_END` response type to manage the end of plugin invocations. - Introduced `PluginStopped` exception to handle plugin termination gracefully. - Updated `PluginRunner` to support stopping the plugin process and handling asynchronous message reading with improved error management. - Modified message queue to accommodate `None` responses for completed invocations. * refactor: rename PluginStopped exception to PluginStoppedError - Renamed `PluginStopped` to `PluginStoppedError` for clarity and consistency. - Updated references in the `run.py` file to reflect the new exception name. - Added a docstring to `PluginStoppedError` to describe its purpose. * fix: update version check for dify CLI in IntegrationConfig - Modified the version check in the IntegrationConfig class to require a minimum version of 0.1.0 instead of 0.4.0. - Ensured that the version validation logic is clearer and more robust. * chore: update dependencies and lock file - Updated the `content_hash` in `pdm.lock` to reflect changes in dependencies. - Added `packaging>=25.0` to the dependencies in `pyproject.toml` to ensure compatibility with the latest packaging features. * feat: enhance IntegrationConfig with improved CLI path validation - Added a list of potential plugin names to search for the dify CLI executable. - Updated the validation logic to check for the CLI path against multiple plugin names. - Changed error handling to use ValidationError for invalid CLI path and version checks. * feat: add setup script for dify-plugin-cli installation - Introduced a new script to automate the installation of dify-plugin-cli, including OS and architecture detection. - Updated the pull request workflow to include the setup of the dify-plugin-cli before installing dependencies. - Refactored error handling in IntegrationConfig to use ValueError instead of ValidationError for CLI path and version checks. * feat: add integration test for invoking LLM with mocked OpenAI server - Introduced a new integration test in `test_invoke_llm.py` to validate the invocation of the LLM plugin using a mocked OpenAI server. - Implemented a mock server to simulate OpenAI's chat completions API, supporting both streaming and non-streaming responses. - The test downloads the latest `langgenius-openai` plugin, runs the plugin, and asserts the expected response from the LLM. * fix: add timeout to requests in LLM integration test - Updated the `test_invoke_llm.py` file to include a timeout of 10 seconds for both POST and GET requests to the marketplace API. - This change aims to improve the reliability of the integration test by preventing indefinite blocking on network calls. * feat: enhance dify-plugin-cli setup script with installation verification and PATH configuration - Added verification to check if dify-plugin is available in the user's PATH after installation. - Implemented logic to add dify-plugin to the system PATH for all users when run as root, or to the user's profile for non-root users. - Created symlinks in common bin directories and provided usage instructions for the dify-plugin commands. * refactor: update dify-plugin-cli setup script for improved PATH management - Enhanced the script to add the installation directory to the GITHUB_PATH for GitHub Actions. - Modified the PATH export for local development to ensure the installation directory is prioritized. - Removed redundant installation verification and symlink creation logic, streamlining the setup process. * test: add SSH support to pull request workflow - Integrated SSH functionality into the pull request workflow using the mxschmitt/action-tmate@v3 action. - This addition allows for interactive debugging and terminal access during pull request checks. * chore: remove SSH step from pull request workflow and fix import in integration test - Eliminated the SSH step from the pull request workflow to streamline the process. - Fixed the import statement for the `requests` library in `test_invoke_llm.py` to ensure proper functionality during the test execution. * wtf * feat: implement OpenAI mock server for integration testing - Added a new mock server in `python/tests/__mock_server` to simulate OpenAI's chat completions API, supporting both streaming and non-streaming responses. - Updated the pull request workflow to install the `uv` tool and launch the mock server during testing. - Refactored `test_invoke_llm.py` to utilize the mock server, ensuring reliable integration tests without external dependencies. * apply ruff * refactor: streamline mock server launch in pull request workflow - Replaced inline mock server launch commands in the pull request workflow with a dedicated script `launch_mock_server.sh` for improved readability and maintainability. - The new script handles the execution of the mock server, ensuring a cleaner workflow configuration. * fix: update mock server launch path in pull request workflow - Changed the path for launching the mock server script from `./scripts/launch_mock_server.sh` to `./python/scripts/launch_mock_server.sh` to ensure correct execution within the workflow. * fix: run mock server in background during pull request workflow - Updated the pull request workflow to launch the mock server script in the background by appending `&` to the command. This change allows subsequent steps to execute without waiting for the mock server to terminate. * chore: clean up dify-plugin-cli setup script by removing outdated installation instructions - Removed outdated comments regarding the download and installation of dify-plugin-cli, streamlining the script for clarity. * feat: enhance PluginRunner with graceful shutdown and thread safety - Introduced a stop flag and a lock to ensure thread-safe access to the stop flag in the PluginRunner class. - Implemented a _close method to handle graceful termination of the plugin process, including sending a SIGTERM signal and closing pipes. - Refactored the _read_async method to ensure proper cleanup and handling of messages, maintaining the integrity of the message queue.
1 parent e380493 commit 46eb504

File tree

13 files changed

+572
-2
lines changed

13 files changed

+572
-2
lines changed

.github/workflows/pull-request.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,18 @@ jobs:
4242
python-version: ${{ matrix.python-version }}
4343
cache: true
4444
cache-dependency-path: python/pdm.lock
45+
- name: Setup dify-plugin-cli
46+
run: ./scripts/setup-dify-plugin-cli.sh
47+
# setup uv and get uv path to env
48+
- name: Install uv
49+
run: |
50+
curl -LsSf https://astral.sh/uv/install.sh | sh
51+
echo "UV_PATH=$(which uv)" >> $GITHUB_ENV
4552
- name: Install dependencies
4653
run: ./python/scripts/install-deps.sh
4754
- name: Run linter
4855
run: ./python/scripts/lint.sh
56+
- name: Launch mock server
57+
run: ./python/scripts/launch_mock_server.sh &
4958
- name: Run tests
5059
run: ./python/scripts/test.sh
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""
2+
This file is used to hold the integration config for plugin testing.
3+
"""
4+
5+
import shutil
6+
import subprocess
7+
8+
from packaging.version import Version
9+
from pydantic import Field, field_validator
10+
from pydantic_settings import BaseSettings, SettingsConfigDict
11+
12+
_PLUGIN_NAMES = [
13+
"dify",
14+
"dify.exe",
15+
"dify-plugin",
16+
"dify-plugin.exe",
17+
"dify-plugin-darwin-amd64",
18+
"dify-plugin-darwin-arm64",
19+
"dify-plugin-linux-amd64",
20+
"dify-plugin-linux-arm64",
21+
"dify-plugin-windows-amd64.exe",
22+
"dify-plugin-windows-arm64.exe",
23+
]
24+
25+
26+
class IntegrationConfig(BaseSettings):
27+
dify_cli_path: str = Field(default="", description="The path to the dify cli")
28+
29+
@field_validator("dify_cli_path")
30+
@classmethod
31+
def validate_dify_cli_path(cls, v):
32+
# find the dify cli path
33+
if not v:
34+
for plugin_name in _PLUGIN_NAMES:
35+
v = shutil.which(plugin_name)
36+
if v:
37+
break
38+
39+
if not v:
40+
raise ValueError("dify cli not found")
41+
42+
# check dify version
43+
version = subprocess.check_output([v, "version"]).decode("utf-8") # noqa: S603
44+
45+
try:
46+
version = Version(version)
47+
except Exception as e:
48+
raise ValueError("dify cli version is not valid") from e
49+
50+
if version < Version("0.1.0"):
51+
raise ValueError("dify cli version must be greater than 0.1.0 to support plugin run")
52+
53+
return v
54+
55+
model_config = SettingsConfigDict(env_file=".env")

python/dify_plugin/core/entities/plugin/request.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ class EndpointActions(Enum):
5151
InvokeEndpoint = "invoke_endpoint"
5252

5353

54+
# merge all the access actions
55+
PluginAccessAction = AgentActions | ToolActions | ModelActions | EndpointActions
56+
57+
5458
class PluginAccessRequest(BaseModel):
5559
type: PluginInvokeType
5660
user_id: str
@@ -109,6 +113,9 @@ def convert_prompt_messages(cls, v):
109113
raise ValueError("prompt_messages must be a list")
110114

111115
for i in range(len(v)):
116+
if isinstance(v[i], PromptMessage):
117+
continue
118+
112119
if v[i]["role"] == PromptMessageRole.USER.value:
113120
v[i] = UserPromptMessage(**v[i])
114121
elif v[i]["role"] == PromptMessageRole.ASSISTANT.value:
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from collections.abc import Mapping
2+
from enum import StrEnum
3+
from typing import Any, Generic, TypeVar
4+
5+
from pydantic import BaseModel
6+
7+
from dify_plugin.core.entities.plugin.request import (
8+
AgentActions,
9+
EndpointActions,
10+
ModelActions,
11+
PluginInvokeType,
12+
ToolActions,
13+
)
14+
15+
T = TypeVar("T", bound=BaseModel)
16+
17+
18+
class PluginInvokeRequest(BaseModel, Generic[T]):
19+
invoke_id: str
20+
type: PluginInvokeType
21+
action: AgentActions | ToolActions | ModelActions | EndpointActions
22+
request: T
23+
24+
25+
class ResponseType(StrEnum):
26+
INFO = "info"
27+
ERROR = "error"
28+
PLUGIN_RESPONSE = "plugin_response"
29+
PLUGIN_READY = "plugin_ready"
30+
PLUGIN_INVOKE_END = "plugin_invoke_end"
31+
32+
33+
class PluginGenericResponse(BaseModel):
34+
invoke_id: str
35+
type: ResponseType
36+
37+
response: Mapping[str, Any]

python/dify_plugin/integration/exc.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
class PluginStoppedError(Exception):
2+
"""
3+
The plugin has stopped.
4+
"""
5+
6+
pass

python/dify_plugin/integration/run.py

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
import logging
2+
import os
3+
import signal
4+
import subprocess
5+
import threading
6+
import uuid
7+
from collections.abc import Generator
8+
from queue import Queue
9+
from threading import Lock, Semaphore
10+
from typing import TypeVar
11+
12+
from gevent.os import tp_read
13+
from pydantic import BaseModel, ValidationError
14+
15+
from dify_plugin.config.integration_config import IntegrationConfig
16+
from dify_plugin.core.entities.plugin.request import (
17+
PluginAccessAction,
18+
PluginInvokeType,
19+
)
20+
from dify_plugin.integration.entities import PluginGenericResponse, PluginInvokeRequest, ResponseType
21+
from dify_plugin.integration.exc import PluginStoppedError
22+
23+
T = TypeVar("T")
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
class PluginRunner:
29+
"""
30+
A class that runs a plugin locally.
31+
32+
Usage:
33+
```python
34+
with PluginRunner(
35+
config=IntegrationConfig(),
36+
plugin_package_path="./langgenius-agent_0.0.14.difypkg",
37+
) as runner:
38+
for result in runner.invoke(
39+
PluginInvokeType.Agent,
40+
AgentActions.InvokeAgentStrategy,
41+
payload=request.AgentInvokeRequest(
42+
user_id="hello",
43+
agent_strategy_provider="agent",
44+
agent_strategy="function_calling",
45+
agent_strategy_params=agent_strategy_params,
46+
),
47+
response_type=AgentInvokeMessage,
48+
):
49+
assert result
50+
```
51+
"""
52+
53+
R = TypeVar("R", bound=BaseModel)
54+
55+
def __init__(self, config: IntegrationConfig, plugin_package_path: str, extra_args: list[str] | None = None):
56+
self.config = config
57+
self.plugin_package_path = plugin_package_path
58+
self.extra_args = extra_args or []
59+
60+
# create pipe to communicate with the plugin
61+
self.stdout_pipe_read, self.stdout_pipe_write = os.pipe()
62+
self.stderr_pipe_read, self.stderr_pipe_write = os.pipe()
63+
self.stdin_pipe_read, self.stdin_pipe_write = os.pipe()
64+
65+
# stdin write lock
66+
self.stdin_write_lock = Lock()
67+
68+
# setup stop flag
69+
self.stop_flag = False
70+
self.stop_flag_lock = Lock()
71+
72+
logger.info(f"Running plugin from {plugin_package_path}")
73+
74+
self.process = subprocess.Popen( # noqa: S603
75+
[config.dify_cli_path, "plugin", "run", plugin_package_path, "--response-format", "json", *self.extra_args],
76+
stdout=self.stdout_pipe_write,
77+
stderr=self.stderr_pipe_write,
78+
stdin=self.stdin_pipe_read,
79+
)
80+
81+
logger.info(f"Plugin process created with pid {self.process.pid}")
82+
83+
# wait for plugin to be ready
84+
self.ready_semaphore = Semaphore(0)
85+
86+
# create a thread to read the stdout and stderr
87+
self.stdout_reader = threading.Thread(target=self._message_reader, args=(self.stdout_pipe_read,))
88+
try:
89+
self.stdout_reader.start()
90+
except Exception as e:
91+
raise e
92+
93+
self.q = dict[str, Queue[PluginGenericResponse | None]]()
94+
self.q_lock = Lock()
95+
96+
# wait for the plugin to be ready
97+
self.ready_semaphore.acquire()
98+
99+
logger.info("Plugin ready")
100+
101+
def _close(self):
102+
with self.stop_flag_lock:
103+
if self.stop_flag:
104+
return
105+
106+
# stop the plugin
107+
self.stop_flag = True
108+
109+
# send signal SIGTERM to the plugin, so it can exit gracefully
110+
# do collect garbage like removing temporary files
111+
os.kill(self.process.pid, signal.SIGTERM)
112+
113+
# wait for the plugin to exit
114+
self.process.wait()
115+
116+
# close the pipes
117+
os.close(self.stdout_pipe_write)
118+
os.close(self.stderr_pipe_write)
119+
os.close(self.stdin_pipe_read)
120+
121+
def _read_async(self, fd: int) -> bytes:
122+
# read data from stdin using tp_read in 64KB chunks.
123+
# the OS buffer for stdin is usually 64KB, so using a larger value doesn't make sense.
124+
b = tp_read(fd, 65536)
125+
if not b:
126+
raise PluginStoppedError()
127+
return b
128+
129+
def _message_reader(self, pipe: int):
130+
# create a scanner to read the message line by line
131+
"""Read messages line by line from the pipe."""
132+
buffer = b""
133+
try:
134+
while True:
135+
try:
136+
data = self._read_async(pipe)
137+
except PluginStoppedError:
138+
break
139+
140+
if not data:
141+
continue
142+
143+
buffer += data
144+
145+
# if no b"\n" is in data, skip to the next iteration
146+
if data.find(b"\n") == -1:
147+
continue
148+
149+
# process line by line and keep the last line if it is not complete
150+
lines = buffer.split(b"\n")
151+
buffer = lines[-1]
152+
153+
lines = lines[:-1]
154+
for line in lines:
155+
line = line.strip()
156+
if not line:
157+
continue
158+
159+
self._publish_message(line.decode("utf-8"))
160+
finally:
161+
self._close()
162+
163+
def _publish_message(self, message: str):
164+
# parse the message
165+
try:
166+
parsed_message = PluginGenericResponse.model_validate_json(message)
167+
except ValidationError:
168+
return
169+
170+
if not parsed_message.invoke_id:
171+
if parsed_message.type == ResponseType.PLUGIN_READY:
172+
self.ready_semaphore.release()
173+
elif parsed_message.type == ResponseType.ERROR:
174+
raise ValueError(parsed_message.response)
175+
elif parsed_message.type == ResponseType.INFO:
176+
logger.info(parsed_message.response)
177+
return
178+
179+
with self.q_lock:
180+
if parsed_message.invoke_id not in self.q:
181+
return
182+
if parsed_message.type == ResponseType.PLUGIN_INVOKE_END:
183+
self.q[parsed_message.invoke_id].put(None)
184+
else:
185+
self.q[parsed_message.invoke_id].put(parsed_message)
186+
187+
def _write_to_pipe(self, data: bytes):
188+
# split the data into chunks of 4096 bytes
189+
chunks = [data[i : i + 4096] for i in range(0, len(data), 4096)]
190+
with (
191+
self.stdin_write_lock
192+
): # a lock is needed to avoid race condition when facing multiple threads writing to the pipe.
193+
for chunk in chunks:
194+
os.write(self.stdin_pipe_write, chunk)
195+
196+
def invoke(
197+
self,
198+
access_type: PluginInvokeType,
199+
access_action: PluginAccessAction,
200+
payload: BaseModel,
201+
response_type: type[R],
202+
) -> Generator[R, None, None]:
203+
with self.stop_flag_lock:
204+
if self.stop_flag:
205+
raise PluginStoppedError()
206+
207+
invoke_id = uuid.uuid4().hex
208+
request = PluginInvokeRequest(
209+
invoke_id=invoke_id,
210+
type=access_type,
211+
action=access_action,
212+
request=payload,
213+
)
214+
215+
q = Queue[PluginGenericResponse | None]()
216+
with self.q_lock:
217+
self.q[invoke_id] = q
218+
219+
try:
220+
# send invoke request to the plugin
221+
self._write_to_pipe(request.model_dump_json().encode("utf-8") + b"\n")
222+
223+
# wait for events
224+
while message := q.get():
225+
if message.invoke_id == invoke_id:
226+
if message.type == ResponseType.PLUGIN_RESPONSE:
227+
yield response_type.model_validate(message.response)
228+
elif message.type == ResponseType.ERROR:
229+
raise ValueError(message.response)
230+
else:
231+
raise ValueError("Invalid response type")
232+
else:
233+
raise ValueError("Invalid invoke id")
234+
finally:
235+
with self.q_lock:
236+
del self.q[invoke_id]
237+
238+
def __enter__(self):
239+
return self
240+
241+
def __exit__(self, exc_type, exc_value, traceback):
242+
self._close()

0 commit comments

Comments
 (0)