1
- import asyncio
2
- from datetime import datetime
3
- from functools import cached_property
4
- from typing import Any , Dict , List , Optional , Sequence , Set , Union
5
-
6
- from fastapi_amis_admin import admin
7
- from fastapi_amis_admin .admin import AdminAction , AdminApp
8
- from fastapi_amis_admin .amis import FormItem , SchemaNode , TableColumn , TableCRUD
9
- from fastapi_amis_admin .crud .base import ItemListSchema , SchemaCreateT , SchemaFilterT , SchemaModelT , SchemaReadT , SchemaUpdateT
10
- from fastapi_amis_admin .crud .parser import TableModelT
11
- from fastapi_amis_admin .crud .schema import CrudEnum
12
- from fastapi_amis_admin .utils .pydantic import ModelField
13
- from sqlalchemy .engine import Result
1
+ from fastapi_amis_admin .admin import BaseAuthFieldModelAdmin , BaseAuthSelectModelAdmin
14
2
from sqlalchemy .sql import Select
15
3
from starlette .requests import Request
16
4
17
- from fastapi_user_auth .admin .utils import get_schema_fields_name_label
18
5
from fastapi_user_auth .auth .schemas import SystemUserEnum
19
- from fastapi_user_auth .mixins .schemas import FieldPermEnum , SelectPerm
20
-
21
-
22
- class ReadOnlyModelAdmin (admin .ModelAdmin ):
23
- """只读模型管理Mixin
24
- 移除了创建,更新,删除等操作
25
- """
26
-
27
- @cached_property
28
- def registered_admin_actions (self ) -> Dict [str , "AdminAction" ]:
29
- actions = super ().registered_admin_actions
30
- return {
31
- key : action
32
- for key , action in actions .items ()
33
- if key not in {"create" , "update" , "delete" , "bulk_delete" , "bulk_update" , "bulk_create" }
34
- }
35
-
36
- async def has_create_permission (self , request : Request , data : SchemaUpdateT , ** kwargs ) -> bool :
37
- return False
38
-
39
- async def has_update_permission (
40
- self ,
41
- request : Request ,
42
- item_id : List [str ],
43
- data : SchemaUpdateT ,
44
- ** kwargs ,
45
- ) -> bool :
46
- return False
47
-
48
- async def has_delete_permission (self , request : Request , item_id : List [str ], ** kwargs ) -> bool :
49
- return False
50
-
51
-
52
- class AutoTimeModelAdmin (admin .ModelAdmin ):
53
- """禁止修改时间Mixin,没有Id,创建时间,更新时间,删除时间等字段的创建和更新"""
54
-
55
- create_exclude = {
56
- "id" ,
57
- "create_time" ,
58
- "update_time" ,
59
- "delete_time" ,
60
- }
61
- update_exclude = {
62
- "id" ,
63
- "create_time" ,
64
- "update_time" ,
65
- "delete_time" ,
66
- }
67
-
68
-
69
- class SoftDeleteModelAdmin (AutoTimeModelAdmin ):
70
- """软删除模型Mixin.
71
- - 需要在模型中定义delete_time字段.如果delete_time字段为None,则表示未删除.
72
- """
73
-
74
- def __init__ (self , app : "AdminApp" ):
75
- super ().__init__ (app )
76
- assert hasattr (self .model , "delete_time" ), "SoftDeleteModelAdmin需要在模型中定义delete_time字段"
77
-
78
- async def get_select (self , request : Request ):
79
- sel = await super ().get_select (request )
80
- return sel .where (self .model .delete_time == None ) # noqa E711
81
-
82
- def delete_item (self , obj : SchemaModelT ) -> None :
83
- obj .delete_time = datetime .now ()
84
-
85
-
86
- class FootableModelAdmin (admin .ModelAdmin ):
87
- """为模型管理Amis表格添加底部展示(Footable)属性"""
88
-
89
- async def get_list_table (self , request : Request ) -> TableCRUD :
90
- table = await super ().get_list_table (request )
91
- table .footable = True
92
- return table
93
-
94
-
95
- class BaseAuthFieldModelAdmin (admin .ModelAdmin ):
96
- """字段级别权限控制模型管理.
97
- - xxx_permission_fields:
98
- 1.动作权限字段,可以通过覆盖这些属性来控制哪些字段需要进行权限验证.
99
- 2.未设置的字段,则不进行权限验证.
100
- 3.一旦类被实例化,则会缓存这些属性,禁止再次修改.
101
- #todo 初步实现,未优化
102
- """
103
-
104
- perm_fields_exclude : Dict [Union [FieldPermEnum , int ], Sequence [str ]] = None
105
- """exclude指定的字段,不进行权限验证."""
106
-
107
- def __init__ (self , app : "AdminApp" ):
108
- super ().__init__ (app )
109
-
110
- def get_permission_fields (self , action : str ) -> Dict [str , str ]:
111
- """获取权限字段"""
112
- if not self .perm_fields_exclude :
113
- return {}
114
- info = {
115
- "list" : (self .schema_list , "列表展示-" , FieldPermEnum .LIST ),
116
- "filter" : (self .schema_filter , "列表筛选-" , FieldPermEnum .FILTER ),
117
- "create" : (self .schema_create , "新增-" , FieldPermEnum .CREATE ),
118
- "read" : (self .schema_read , "查看-" , FieldPermEnum .READ ),
119
- "update" : (self .schema_update , "更新-" , FieldPermEnum .UPDATE ),
120
- }
121
- if action not in info :
122
- return {}
123
- schema , prefix , perm = info [action ]
124
- exlude = set ()
125
- for k , fileds in self .perm_fields_exclude .items ():
126
- if (k & perm ) == perm :
127
- exlude .update (set (fileds ))
128
- return get_schema_fields_name_label (schema , prefix = prefix , exclude_required = True , exclude = exlude )
129
-
130
- @cached_property
131
- def create_permission_fields (self ) -> Dict [str , str ]:
132
- """创建权限字段"""
133
- return self .get_permission_fields ("create" )
134
-
135
- @cached_property
136
- def read_permission_fields (self ) -> Dict [str , str ]:
137
- """读取权限字段"""
138
- return self .get_permission_fields ("read" )
139
-
140
- @cached_property
141
- def update_permission_fields (self ) -> Dict [str , str ]:
142
- """更新权限字段"""
143
- return self .get_permission_fields ("update" )
144
-
145
- @cached_property
146
- def list_permission_fields (self ) -> Dict [str , str ]:
147
- """列表权限字段"""
148
- return self .get_permission_fields ("list" )
149
-
150
- @cached_property
151
- def filter_permission_fields (self ) -> Dict [str , str ]:
152
- """过滤筛选权限字段"""
153
- return self .get_permission_fields ("filter" )
154
-
155
- async def has_field_permission (self , request : Request , field : str , action : str = "" ) -> bool :
156
- """判断用户是否有字段权限"""
157
- return True
158
-
159
- async def get_deny_fields (self , request : Request , action : str = None ) -> Set [str ]:
160
- """获取没有权限的字段"""
161
- cache_key = f"{ self .unique_id } _exclude_fields"
162
- request_cache = request .scope .get (cache_key , {})
163
- if action in request_cache :
164
- return request_cache [action ]
165
- check_fields = {}
166
- if action == "list" :
167
- check_fields = self .list_permission_fields .keys ()
168
- elif action == "filter" :
169
- check_fields = self .filter_permission_fields .keys ()
170
- elif action == "create" :
171
- check_fields = self .create_permission_fields .keys ()
172
- elif action == "update" :
173
- check_fields = self .update_permission_fields .keys ()
174
- elif action == "read" :
175
- check_fields = self .read_permission_fields .keys ()
176
- else :
177
- pass
178
- fields = {field for field in check_fields if not await self .has_field_permission (request , field , action )}
179
- request_cache [action ] = fields
180
- if cache_key not in request .scope :
181
- request .scope [cache_key ] = request_cache
182
- return fields
183
-
184
- async def on_list_after (self , request : Request , result : Result , data : ItemListSchema , ** kwargs ) -> ItemListSchema :
185
- """Parse the database data query result dictionary into schema_list."""
186
- exclude = await self .get_deny_fields (request , "list" ) # 过滤没有权限的字段
187
- data = await super ().on_list_after (request , result , data , ** kwargs )
188
- data .items = [item .dict (exclude = exclude ) for item in data .items ] # 过滤没有权限的字段
189
- return data
190
-
191
- async def on_filter_pre (self , request : Request , obj : Optional [SchemaFilterT ], ** kwargs ) -> Dict [str , Any ]:
192
- data = await super ().on_filter_pre (request , obj , ** kwargs )
193
- if not data :
194
- return data
195
- exclude = await self .get_deny_fields (request , "filter" ) # 过滤没有权限的字段
196
- return {k : v for k , v in data .items () if k not in exclude }
197
-
198
- async def create_items (self , request : Request , items : List [SchemaCreateT ]) -> List [TableModelT ]:
199
- """Create multiple data"""
200
- exclude = await self .get_deny_fields (request , "create" )
201
- items = [item .copy (exclude = exclude ) for item in items ] # 过滤没有权限的字段
202
- items = await super ().create_items (request , items )
203
- return items
204
-
205
- async def read_items (self , request : Request , item_id : List [str ]) -> List [SchemaReadT ]:
206
- """Read multiple data"""
207
- items = await super ().read_items (request , item_id )
208
- exclude = await self .get_deny_fields (request , "read" ) # 过滤没有权限的字段
209
- return [item .copy (exclude = exclude ) for item in items ]
210
-
211
- async def on_update_pre (
212
- self ,
213
- request : Request ,
214
- obj : SchemaUpdateT ,
215
- item_id : Union [List [str ], List [int ]],
216
- ** kwargs ,
217
- ) -> Dict [str , Any ]:
218
- exclude = await self .get_deny_fields (request , "update" ) # 过滤没有权限的字段
219
- obj = obj .copy (exclude = exclude ) # 过滤没有权限的字段
220
- data = await super ().on_update_pre (request , obj , item_id , ** kwargs )
221
- return data
222
-
223
- async def get_form_item (
224
- self , request : Request , modelfield : ModelField , action : CrudEnum
225
- ) -> Union [FormItem , SchemaNode , None ]:
226
- """过滤前端创建,更新,筛选表单字段"""
227
- # todo 优化筛选和列表动作的界定
228
- # action为list时,表示列表展示字段.否则为筛选表单字段
229
- act = "filter" if action == "list" else action
230
- exclude = await self .get_deny_fields (request , act ) # 获取没有权限的字段
231
- name = modelfield .alias or modelfield .name
232
- if name in exclude :
233
- return None
234
- form_item = await super ().get_form_item (request , modelfield , action )
235
- return form_item
236
-
237
- async def get_list_column (self , request : Request , modelfield : ModelField ) -> Optional [TableColumn ]:
238
- """过滤前端展示字段"""
239
- exclude = await self .get_deny_fields (request , "list" ) # 获取没有权限的字段
240
- name = modelfield .alias or modelfield .name
241
- if name in exclude :
242
- return None
243
- column = await super ().get_list_column (request , modelfield )
244
- return column
245
6
246
7
247
8
class AuthFieldModelAdmin (BaseAuthFieldModelAdmin ):
@@ -253,34 +14,6 @@ async def has_field_permission(self, request: Request, field: str, action: str =
253
14
return effect
254
15
255
16
256
- class BaseAuthSelectModelAdmin (admin .ModelAdmin ):
257
- """包含选择数据集权限控制的模型管理"""
258
-
259
- select_permissions : List [SelectPerm ] = []
260
- """需要进行权限控制的数据集列表"""
261
-
262
- async def has_select_permission (self , request : Request , name : str ) -> bool :
263
- """判断用户是否有数据集权限"""
264
- return True
265
-
266
- async def get_select (self , request : Request ) -> Select :
267
- sel = await super ().get_select (request )
268
- return await self .filter_select (request , sel )
269
-
270
- async def filter_select (self , request : Request , sel : Select ) -> Select :
271
- """在sel中添加权限过滤条件"""
272
- for permission in self .select_permissions :
273
- if not isinstance (permission , SelectPerm ):
274
- continue
275
- effect = await self .has_select_permission (request , permission .name )
276
- # 如果权限为反向权限,则判断用户是否没有权限
277
- if permission .reverse ^ effect :
278
- sel = permission .call (self , request , sel )
279
- if asyncio .iscoroutine (sel ):
280
- sel = await sel
281
- return sel
282
-
283
-
284
17
class AuthSelectModelAdmin (BaseAuthSelectModelAdmin ):
285
18
async def has_select_permission (self , request : Request , name : str ) -> bool :
286
19
"""判断用户是否有数据集权限"""
@@ -294,9 +27,3 @@ async def filter_select(self, request: Request, sel: Select) -> Select:
294
27
if subject == SystemUserEnum .ROOT :
295
28
return sel
296
29
return await super ().filter_select (request , sel )
297
-
298
-
299
- class BaseAuthFieldFormAdmin (admin .FormAdmin ):
300
- """#todo 字段级别权限控制表单管理"""
301
-
302
- pass
0 commit comments