Skip to content

implement user define secret management api #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,77 @@ func (r *RpcServer) TriggerTask(ctx context.Context, payload *avsproto.UserTrigg
return r.engine.TriggerTask(user, payload)
}

func (r *RpcServer) CreateSecret(ctx context.Context, payload *avsproto.CreateOrUpdateSecretReq) (*wrapperspb.BoolValue, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "%s: %s", auth.AuthenticationError, err.Error())
}

r.config.Logger.Info("process create secret",
"user", user.Address.String(),
"secret_name", payload.Name,
)

result, err := r.engine.CreateSecret(user, payload)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "")
}

return wrapperspb.Bool(result), nil
}

func (r *RpcServer) ListSecrets(ctx context.Context, payload *avsproto.ListSecretsReq) (*avsproto.ListSecretsResp, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "%s: %s", auth.AuthenticationError, err.Error())
}

r.config.Logger.Info("process list secret",
"user", user.Address.String(),
)

return r.engine.ListSecrets(user, payload)
}

func (r *RpcServer) UpdateSecret(ctx context.Context, payload *avsproto.CreateOrUpdateSecretReq) (*wrapperspb.BoolValue, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "%s: %s", auth.AuthenticationError, err.Error())
}

r.config.Logger.Info("process update secret",
"user", user.Address.String(),
"secret_name", payload.Name,
)

result, err := r.engine.UpdateSecret(user, payload)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "")
}

return wrapperspb.Bool(result), nil
}

func (r *RpcServer) DeleteSecret(ctx context.Context, payload *avsproto.DeleteSecretReq) (*wrapperspb.BoolValue, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "%s: %s", auth.AuthenticationError, err.Error())
}

r.config.Logger.Info("process delete secret",
"user", user.Address.String(),
"secret_name", payload.Name,
)

result, err := r.engine.DeleteSecret(user, payload)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "")
}

return wrapperspb.Bool(result), nil

}

// Operator action
func (r *RpcServer) SyncMessages(payload *avsproto.SyncMessagesReq, srv avsproto.Node_SyncMessagesServer) error {
err := r.engine.StreamCheckToOperator(payload, srv)
Expand Down
5 changes: 5 additions & 0 deletions core/taskengine/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@ Storage can also be inspect with telnet:
telnet /tmp/ap.sock

Then issue `get <ket>` or `list <prefix>` or `list *` to inspect current keys in the storage.

**Secret Storage**
- currently org_id will always be _ because we haven't implemented it yet
- workflow_id will also be _ so we can search be prefix
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t quite get this line, - workflow_id will also be _ so we can search be prefix

Does this mean the search is not done at the workflow level, but only user level? I’m fine with that, but let’s clarify this comment.

Copy link
Member Author

@v9n v9n Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean the search is not done at the workflow level, but only user level?

No, that is to help us search at any level we want. Regardless org or user id or workflow id.

We used https://github.com/dgraph-io/badger and we employee prefix search https://dgraph.io/docs/badger/get-started/#prefix-scans

We organize the key in this format (they are in schema.go btw)

secret:<org_id>::<workflow_id>:

When a thing is at user level, then workflow_id will be empty. we re-present it with _. Doing so will allow us to perform prefix scan of all user level secret by doing https://dgraph.io/docs/badger/get-started/#prefix-scans over secret:_:0x12345:_

To search all secret at workflow level we scan secret:_:0x12345:<workflow-id>.
To search all secret belong to an org we scan secret:<org-id>:.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see. Makes sense 👌

secret:<org_id>:<eoa>:<workflow_id>:<name> -> value
*/
package taskengine
99 changes: 97 additions & 2 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
)

const (
JobTypeExecuteTask = "execute_task"
DefaultItemPerPage = 50
JobTypeExecuteTask = "execute_task"
DefaultItemPerPage = 50
MaxSecretNameLength = 255
)

var (
Expand Down Expand Up @@ -849,6 +850,100 @@ func (n *Engine) CancelTaskByUser(user *model.User, taskID string) (bool, error)
return true, nil
}

func (n *Engine) CreateSecret(user *model.User, payload *avsproto.CreateOrUpdateSecretReq) (bool, error) {
secret := &model.Secret{
User: user,
Name: payload.Name,
Value: payload.Secret,
OrgID: payload.OrgId,
WorkflowID: payload.WorkflowId,
}

updates := map[string][]byte{}
if strings.HasPrefix(strings.ToLower(payload.Name), "ap_") {
return false, grpcstatus.Errorf(codes.InvalidArgument, "secret name cannot start with ap_")
}

if len(payload.Name) == 0 || len(payload.Name) > MaxSecretNameLength {
return false, grpcstatus.Errorf(codes.InvalidArgument, "secret name lengh is invalid: should be 1-255 character")
}

key, _ := SecretStorageKey(secret)
updates[key] = []byte(payload.Secret)
err := n.db.BatchWrite(updates)
if err == nil {
return true, nil
}

return false, grpcstatus.Errorf(codes.Internal, "Cannot save data")
}

func (n *Engine) UpdateSecret(user *model.User, payload *avsproto.CreateOrUpdateSecretReq) (bool, error) {
updates := map[string][]byte{}
secret := &model.Secret{
User: user,
Name: payload.Name,
Value: payload.Secret,
OrgID: payload.OrgId,
WorkflowID: payload.WorkflowId,
}
key, _ := SecretStorageKey(secret)
if ok, err := n.db.Exist([]byte(key)); !ok || err != nil {
return false, grpcstatus.Errorf(codes.NotFound, "Secret not found")
}

updates[key] = []byte(payload.Secret)

err := n.db.BatchWrite(updates)
if err == nil {
return true, nil
}

return true, nil
}

// ListSecrets
func (n *Engine) ListSecrets(user *model.User, payload *avsproto.ListSecretsReq) (*avsproto.ListSecretsResp, error) {
prefixes := []string{
SecretStoragePrefix(user),
}

result := &avsproto.ListSecretsResp{
Items: []*avsproto.ListSecretsResp_ResponseSecret{},
}

secretKeys, err := n.db.ListKeysMulti(prefixes)
if err != nil {
return nil, err
}
for _, k := range secretKeys {
secretWithNameOnly := SecretNameFromKey(k)
item := &avsproto.ListSecretsResp_ResponseSecret{
Name: secretWithNameOnly.Name,
OrgId: secretWithNameOnly.OrgID,
WorkflowId: secretWithNameOnly.WorkflowID,
}

result.Items = append(result.Items, item)
}

return result, nil
}

func (n *Engine) DeleteSecret(user *model.User, payload *avsproto.DeleteSecretReq) (bool, error) {
// No need to check permission, the key is prefixed by user eoa already
secret := &model.Secret{
Name: payload.Name,
User: user,
OrgID: payload.OrgId,
WorkflowID: payload.WorkflowId,
}
key, _ := SecretStorageKey(secret)
err := n.db.Delete([]byte(key))

return err == nil, err
}

// A global counter for the task engine
func (n *Engine) NewSeqID() (string, error) {
num := uint64(0)
Expand Down
Loading
Loading