Skip to content

Commit d884cab

Browse files
authored
feat: add plugin reinstallation functionality and admin API support (#285)
* feat: add plugin reinstallation functionality and admin API support - Implemented ReinstallToAWSFromPkg method to allow reinstallation of plugins on AWS Lambda, updating function URL and name. - Added clearServerlessRuntimeCache method to manage serverless runtime cache. - Enhanced LaunchPlugin to support an ignoreIdempotent flag for forced reinstallation. - Introduced admin API endpoints for plugin reinstallation, secured with an API key validation middleware. - Updated configuration to include AdminApiEnabled and AdminApiKey settings. * refactor: update plugin reinstallation endpoint and improve unauthorized response - Changed the plugin reinstallation endpoint from "/plugins/reinstall" to "/plugin/serverless/reinstall" for better clarity. - Modified the unauthorized response in the AdminAPIKey middleware to return a more descriptive JSON message.
1 parent fa2ac6d commit d884cab

File tree

9 files changed

+231
-28
lines changed

9 files changed

+231
-28
lines changed

internal/core/plugin_manager/install_to_serverless.go

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ func (p *PluginManager) InstallToAWSFromPkg(
3434
return nil, err
3535
}
3636

37-
response, err := serverless.LaunchPlugin(originalPackager, decoder, p.serverlessConnectorLaunchTimeout)
37+
// serverless.LaunchPlugin will check if the plugin has already been launched, if so, it returns directly
38+
response, err := serverless.LaunchPlugin(originalPackager, decoder, p.serverlessConnectorLaunchTimeout, false)
3839
if err != nil {
3940
return nil, err
4041
}
@@ -119,3 +120,120 @@ func (p *PluginManager) InstallToAWSFromPkg(
119120

120121
return newResponse, nil
121122
}
123+
124+
/*
125+
* Reinstall a plugin to AWS Lambda, update function url and name
126+
*/
127+
func (p *PluginManager) ReinstallToAWSFromPkg(
128+
originalPackager []byte,
129+
decoder decoder.PluginDecoder,
130+
) (
131+
*stream.Stream[PluginInstallResponse], error,
132+
) {
133+
checksum, err := decoder.Checksum()
134+
if err != nil {
135+
return nil, err
136+
}
137+
// check valid manifest
138+
_, err = decoder.Manifest()
139+
if err != nil {
140+
return nil, err
141+
}
142+
uniqueIdentity, err := decoder.UniqueIdentity()
143+
if err != nil {
144+
return nil, err
145+
}
146+
147+
// check if serverless runtime exists
148+
serverlessRuntime, err := db.GetOne[models.ServerlessRuntime](
149+
db.Equal("plugin_unique_identifier", uniqueIdentity.String()),
150+
)
151+
if err == db.ErrDatabaseNotFound {
152+
return nil, fmt.Errorf("plugin not exists")
153+
}
154+
if err != nil {
155+
return nil, err
156+
}
157+
158+
response, err := serverless.LaunchPlugin(
159+
originalPackager,
160+
decoder,
161+
p.serverlessConnectorLaunchTimeout,
162+
true, // ignoreIdempotent, true means always reinstall
163+
)
164+
if err != nil {
165+
return nil, err
166+
}
167+
168+
newResponse := stream.NewStream[PluginInstallResponse](128)
169+
routine.Submit(map[string]string{
170+
"module": "plugin_manager",
171+
"function": "ReinstallToAWSFromPkg",
172+
"checksum": checksum,
173+
"unique_identity": uniqueIdentity.String(),
174+
}, func() {
175+
defer func() {
176+
newResponse.Close()
177+
}()
178+
179+
functionUrl := ""
180+
functionName := ""
181+
182+
response.Async(func(r serverless.LaunchFunctionResponse) {
183+
if r.Event == serverless.Info {
184+
newResponse.Write(PluginInstallResponse{
185+
Event: PluginInstallEventInfo,
186+
Data: "Installing...",
187+
})
188+
} else if r.Event == serverless.Done {
189+
if functionUrl == "" || functionName == "" {
190+
newResponse.Write(PluginInstallResponse{
191+
Event: PluginInstallEventError,
192+
Data: "Internal server error, failed to get lambda url or function name",
193+
})
194+
return
195+
}
196+
197+
// update serverless runtime
198+
serverlessRuntime.FunctionURL = functionUrl
199+
serverlessRuntime.FunctionName = functionName
200+
err = db.Update(&serverlessRuntime)
201+
if err != nil {
202+
newResponse.Write(PluginInstallResponse{
203+
Event: PluginInstallEventError,
204+
Data: "Failed to update serverless runtime",
205+
})
206+
return
207+
}
208+
209+
// clear cache
210+
err = p.clearServerlessRuntimeCache(uniqueIdentity)
211+
if err != nil {
212+
newResponse.Write(PluginInstallResponse{
213+
Event: PluginInstallEventError,
214+
Data: "Failed to clear serverless runtime cache",
215+
})
216+
return
217+
}
218+
219+
newResponse.Write(PluginInstallResponse{
220+
Event: PluginInstallEventDone,
221+
Data: "Installed",
222+
})
223+
} else if r.Event == serverless.Error {
224+
newResponse.Write(PluginInstallResponse{
225+
Event: PluginInstallEventError,
226+
Data: "Internal server error",
227+
})
228+
} else if r.Event == serverless.FunctionUrl {
229+
functionUrl = r.Message
230+
} else if r.Event == serverless.Function {
231+
functionName = r.Message
232+
} else {
233+
newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
234+
}
235+
})
236+
})
237+
238+
return newResponse, nil
239+
}

internal/core/plugin_manager/serverless.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,10 @@ func (p *PluginManager) getServerlessPluginRuntimeModel(
9393

9494
return runtime, nil
9595
}
96+
97+
func (p *PluginManager) clearServerlessRuntimeCache(
98+
identity plugin_entities.PluginUniqueIdentifier,
99+
) error {
100+
_, err := cache.Del(p.getServerlessRuntimeCacheKey(identity))
101+
return err
102+
}

internal/core/plugin_manager/serverless_connector/launch.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func LaunchPlugin(
1919
originPackage []byte,
2020
decoder decoder.PluginDecoder,
2121
timeout int, // in seconds
22+
ignoreIdempotent bool, // if true, never check if the plugin has launched
2223
) (*stream.Stream[LaunchFunctionResponse], error) {
2324
checksum, err := decoder.Checksum()
2425
if err != nil {
@@ -36,28 +37,30 @@ func LaunchPlugin(
3637
return nil, err
3738
}
3839

39-
function, err := FetchFunction(manifest, checksum)
40-
if err != nil {
41-
if err != ErrFunctionNotFound {
42-
return nil, err
40+
if !ignoreIdempotent {
41+
function, err := FetchFunction(manifest, checksum)
42+
if err != nil {
43+
if err != ErrFunctionNotFound {
44+
return nil, err
45+
}
46+
} else {
47+
// found, return directly
48+
response := stream.NewStream[LaunchFunctionResponse](3)
49+
response.Write(LaunchFunctionResponse{
50+
Event: FunctionUrl,
51+
Message: function.FunctionURL,
52+
})
53+
response.Write(LaunchFunctionResponse{
54+
Event: Function,
55+
Message: function.FunctionName,
56+
})
57+
response.Write(LaunchFunctionResponse{
58+
Event: Done,
59+
Message: "",
60+
})
61+
response.Close()
62+
return response, nil
4363
}
44-
} else {
45-
// found, return directly
46-
response := stream.NewStream[LaunchFunctionResponse](3)
47-
response.Write(LaunchFunctionResponse{
48-
Event: FunctionUrl,
49-
Message: function.FunctionURL,
50-
})
51-
response.Write(LaunchFunctionResponse{
52-
Event: Function,
53-
Message: function.FunctionName,
54-
})
55-
response.Write(LaunchFunctionResponse{
56-
Event: Done,
57-
Message: "",
58-
})
59-
response.Close()
60-
return response, nil
6164
}
6265

6366
response, err := SetupFunction(manifest, checksum, bytes.NewReader(originPackage), timeout)

internal/server/constants/constants.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package constants
22

33
const (
4-
X_PLUGIN_ID = "X-Plugin-ID"
5-
X_API_KEY = "X-Api-Key"
4+
X_PLUGIN_ID = "X-Plugin-ID"
5+
X_API_KEY = "X-Api-Key"
6+
X_ADMIN_API_KEY = "X-Admin-Api-Key"
67

78
CONTEXT_KEY_PLUGIN_INSTALLATION = "plugin_installation"
89
CONTEXT_KEY_PLUGIN_UNIQUE_IDENTIFIER = "plugin_unique_identifier"

internal/server/controllers/plugins.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ func UploadPlugin(app *app.Config) gin.HandlerFunc {
3535

3636
tenantId := c.Param("tenant_id")
3737
if tenantId == "" {
38-
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("Tenant ID is required")).ToResponse())
38+
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("tenant ID is required")).ToResponse())
3939
return
4040
}
4141

4242
if difyPkgFileHeader.Size > app.MaxPluginPackageSize {
43-
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("File size exceeds the maximum limit")).ToResponse())
43+
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("file size exceeds the maximum limit")).ToResponse())
4444
return
4545
}
4646

@@ -67,12 +67,12 @@ func UploadBundle(app *app.Config) gin.HandlerFunc {
6767

6868
tenantId := c.Param("tenant_id")
6969
if tenantId == "" {
70-
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("Tenant ID is required")).ToResponse())
70+
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("tenant ID is required")).ToResponse())
7171
return
7272
}
7373

7474
if difyBundleFileHeader.Size > app.MaxBundlePackageSize {
75-
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("File size exceeds the maximum limit")).ToResponse())
75+
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("file size exceeds the maximum limit")).ToResponse())
7676
return
7777
}
7878

@@ -140,6 +140,16 @@ func InstallPluginFromIdentifiers(app *app.Config) gin.HandlerFunc {
140140
}
141141
}
142142

143+
func ReinstallPluginFromIdentifier(app *app.Config) gin.HandlerFunc {
144+
return func(c *gin.Context) {
145+
BindRequest(c, func(request struct {
146+
PluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier `json:"plugin_unique_identifier" validate:"required,plugin_unique_identifier"`
147+
}) {
148+
service.ReinstallPluginFromIdentifier(c, app, request.PluginUniqueIdentifier)
149+
})
150+
}
151+
}
152+
143153
func FetchPluginInstallationTasks(c *gin.Context) {
144154
BindRequest(c, func(request struct {
145155
TenantID string `uri:"tenant_id" validate:"required"`

internal/server/http_server.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,17 @@ func (app *App) server(config *app.Config) func() {
3434
pluginGroup := engine.Group("/plugin/:tenant_id")
3535
pprofGroup := engine.Group("/debug/pprof")
3636

37+
if config.AdminApiEnabled {
38+
if len(config.AdminApiKey) < 10 {
39+
log.Panic("length of admin api key must be greater than 10")
40+
}
41+
42+
adminGroup := engine.Group("/admin")
43+
adminGroup.Use(app.AdminAPIKey(config.AdminApiKey))
44+
45+
app.adminGroup(adminGroup, config)
46+
}
47+
3748
if config.SentryEnabled {
3849
// setup sentry for all groups
3950
sentryGroup := []*gin.RouterGroup{
@@ -169,6 +180,10 @@ func (app *App) pluginManagementGroup(group *gin.RouterGroup, config *app.Config
169180
group.GET("/agent_strategy", controllers.GetAgentStrategy)
170181
}
171182

183+
func (app *App) adminGroup(group *gin.RouterGroup, config *app.Config) {
184+
group.POST("/plugin/serverless/reinstall", controllers.ReinstallPluginFromIdentifier(config))
185+
}
186+
172187
func (app *App) pluginAssetGroup(group *gin.RouterGroup) {
173188
group.GET("/:id", controllers.GetAsset)
174189
}

internal/server/middleware.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,14 @@ func (app *App) InitClusterID() gin.HandlerFunc {
170170
ctx.Next()
171171
}
172172
}
173+
174+
func (app *App) AdminAPIKey(key string) gin.HandlerFunc {
175+
return func(ctx *gin.Context) {
176+
if ctx.GetHeader(constants.X_ADMIN_API_KEY) != key {
177+
ctx.AbortWithStatusJSON(401, gin.H{"message": "unauthorized"})
178+
return
179+
}
180+
181+
ctx.Next()
182+
}
183+
}

internal/service/install_plugin.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/gin-gonic/gin"
89
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
910
"github.com/langgenius/dify-plugin-daemon/internal/db"
1011
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
@@ -335,6 +336,38 @@ func InstallPluginFromIdentifiers(
335336
return entities.NewSuccessResponse(response)
336337
}
337338

339+
/*
340+
* Reinstall a plugin from a given identifier, no tenant_id is needed
341+
*/
342+
func ReinstallPluginFromIdentifier(
343+
ctx *gin.Context,
344+
config *app.Config,
345+
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
346+
) {
347+
baseSSEService(func() (*stream.Stream[plugin_manager.PluginInstallResponse], error) {
348+
if config.Platform != app.PLATFORM_SERVERLESS {
349+
return nil, fmt.Errorf("reinstall is only supported on serverless platform")
350+
}
351+
352+
manager := plugin_manager.Manager()
353+
pkgFile, err := manager.GetPackage(pluginUniqueIdentifier)
354+
if err != nil {
355+
return nil, errors.Join(err, errors.New("failed to get package"))
356+
}
357+
358+
zipDecoder, err := decoder.NewZipPluginDecoder(pkgFile)
359+
if err != nil {
360+
return nil, errors.Join(err, errors.New("failed to create zip decoder"))
361+
}
362+
stream, err := manager.ReinstallToAWSFromPkg(pkgFile, zipDecoder)
363+
if err != nil {
364+
return nil, errors.Join(err, errors.New("failed to reinstall plugin"))
365+
}
366+
367+
return stream, nil
368+
}, ctx, 1800)
369+
}
370+
338371
func UpgradePlugin(
339372
config *app.Config,
340373
tenant_id string,

internal/types/app/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package app
22

33
import (
44
"fmt"
5+
56
"github.com/langgenius/dify-plugin-daemon/internal/oss"
67

78
"github.com/go-playground/validator/v10"
@@ -12,6 +13,10 @@ type Config struct {
1213
ServerPort uint16 `envconfig:"SERVER_PORT" validate:"required"`
1314
ServerKey string `envconfig:"SERVER_KEY" validate:"required"`
1415

16+
// admin api enable
17+
AdminApiEnabled bool `envconfig:"ADMIN_API_ENABLED" default:"false"`
18+
AdminApiKey string `envconfig:"ADMIN_API_KEY"`
19+
1520
// dify inner api
1621
DifyInnerApiURL string `envconfig:"DIFY_INNER_API_URL" validate:"required"`
1722
DifyInnerApiKey string `envconfig:"DIFY_INNER_API_KEY" validate:"required"`

0 commit comments

Comments
 (0)