|
1 | 1 | import asyncio
|
2 | 2 | from datetime import datetime
|
3 | 3 | from functools import cached_property
|
4 |
| -from typing import Any, Dict, List, Optional, Set, Union |
| 4 | +from typing import Any, Dict, List, Optional, Sequence, Set, Union |
5 | 5 |
|
6 | 6 | from fastapi_amis_admin import admin
|
7 | 7 | from fastapi_amis_admin.admin import AdminAction, AdminApp
|
|
16 | 16 |
|
17 | 17 | from fastapi_user_auth.admin.utils import get_schema_fields_name_label
|
18 | 18 | from fastapi_user_auth.auth.schemas import SystemUserEnum
|
19 |
| -from fastapi_user_auth.mixins.schemas import PermFieldsExclude, SelectPerm |
| 19 | +from fastapi_user_auth.mixins.schemas import FieldPermEnum, SelectPerm |
20 | 20 |
|
21 | 21 |
|
22 | 22 | class ReadOnlyModelAdmin(admin.ModelAdmin):
|
@@ -101,41 +101,31 @@ class BaseAuthFieldModelAdmin(admin.ModelAdmin):
|
101 | 101 | #todo 初步实现,未优化
|
102 | 102 | """
|
103 | 103 |
|
104 |
| - perm_fields_exclude: PermFieldsExclude = PermFieldsExclude() |
| 104 | + perm_fields_exclude: Dict[int, Sequence[str]] = None |
105 | 105 | """exclude指定的字段,不进行权限验证."""
|
106 | 106 |
|
107 | 107 | def __init__(self, app: "AdminApp"):
|
108 | 108 | super().__init__(app)
|
109 | 109 |
|
110 | 110 | def get_permission_fields(self, action: str) -> Dict[str, str]:
|
111 | 111 | """获取权限字段"""
|
112 |
| - all_exclude = self.perm_fields_exclude.all or [] |
113 |
| - if action == "list": |
114 |
| - list_exclude = self.perm_fields_exclude.list or [] |
115 |
| - return get_schema_fields_name_label( |
116 |
| - self.schema_list, prefix="列表展示-", exclude_required=True, exclude=[*all_exclude, *list_exclude] |
117 |
| - ) |
118 |
| - elif action == "filter": |
119 |
| - filter_exclude = self.perm_fields_exclude.filter or [] |
120 |
| - return get_schema_fields_name_label( |
121 |
| - self.schema_filter, prefix="列表筛选-", exclude_required=True, exclude=[*all_exclude, *filter_exclude] |
122 |
| - ) |
123 |
| - elif action == "create": |
124 |
| - create_exclude = self.perm_fields_exclude.create or [] |
125 |
| - return get_schema_fields_name_label( |
126 |
| - self.schema_create, prefix="新增-", exclude_required=True, exclude=[*all_exclude, *create_exclude] |
127 |
| - ) |
128 |
| - elif action == "read": |
129 |
| - read_exclude = self.perm_fields_exclude.read or [] |
130 |
| - return get_schema_fields_name_label( |
131 |
| - self.schema_read, prefix="查看-", exclude_required=True, exclude=[*all_exclude, *read_exclude] |
132 |
| - ) |
133 |
| - elif action == "update": |
134 |
| - update_exclude = self.perm_fields_exclude.update or [] |
135 |
| - return get_schema_fields_name_label( |
136 |
| - self.schema_update, prefix="更新-", exclude_required=True, exclude=[*all_exclude, *update_exclude] |
137 |
| - ) |
138 |
| - return {} |
| 112 | + if not self.perm_fields_exclude: |
| 113 | + return {} |
| 114 | + info = { |
| 115 | + "list": (self.schema_list, "列表展示-", FieldPermEnum.LIST), |
| 116 | + "filter": (self.schema_filter, "列表筛选-", FieldPermEnum.FILTER), |
| 117 | + "create": (self.schema_create, "新增-", FieldPermEnum.CREATE), |
| 118 | + "read": (self.schema_read, "查看-", FieldPermEnum.READ), |
| 119 | + "update": (self.schema_update, "更新-", FieldPermEnum.UPDATE), |
| 120 | + } |
| 121 | + if action not in info: |
| 122 | + return {} |
| 123 | + schema, prefix, perm = info[action] |
| 124 | + exlude = set() |
| 125 | + for k, fileds in self.perm_fields_exclude.items(): |
| 126 | + if (k & perm) == perm: |
| 127 | + exlude.update(set(fileds)) |
| 128 | + return get_schema_fields_name_label(schema, prefix=prefix, exclude_required=True, exclude=exlude) |
139 | 129 |
|
140 | 130 | @cached_property
|
141 | 131 | def create_permission_fields(self) -> Dict[str, str]:
|
|
0 commit comments