-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat: authentication #2906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: authentication #2906
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,26 +6,56 @@ | |
@date:2024/3/14 03:02 | ||
@desc: 用户认证 | ||
""" | ||
import datetime | ||
from functools import reduce | ||
|
||
from django.core.cache import cache | ||
from django.db.models import QuerySet | ||
from django.utils.translation import gettext_lazy as _ | ||
|
||
from common.auth.handle.auth_base_handle import AuthBaseHandle | ||
from common.constants.cache_version import Cache_Version | ||
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role | ||
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role, \ | ||
PermissionConstants | ||
from common.database_model_manage.database_model_manage import DatabaseModelManage | ||
from common.exception.app_exception import AppAuthenticationFailed | ||
from common.utils.common import group_by | ||
from system_manage.models.workspace_user_permission import WorkspaceUserPermission | ||
from users.models import User | ||
|
||
|
||
def get_permission_list(user_id, | ||
workspace_id, | ||
def get_permission(permission_id): | ||
if isinstance(permission_id, PermissionConstants): | ||
permission_id = permission_id.value | ||
return f"{permission_id}" | ||
|
||
|
||
def get_workspace_permission(permission_id, workspace_id): | ||
if isinstance(permission_id, PermissionConstants): | ||
permission_id = permission_id.value | ||
return f"{permission_id}:/WORKSPACE/{workspace_id}" | ||
|
||
|
||
def get_workspace_resource_permission_list(permission_id, workspace_id, workspace_user_permission_dict): | ||
workspace_user_permission_list = workspace_user_permission_dict.get(workspace_id) | ||
if workspace_user_permission_list is None: | ||
return [ | ||
get_workspace_permission(permission_id, workspace_id), get_permission(permission_id)] | ||
return [ | ||
f"{permission_id}:/WORKSPACE/{workspace_id}/{workspace_user_permission.auth_target_type}/{workspace_user_permission.target}" | ||
for workspace_user_permission in | ||
workspace_user_permission_list if workspace_user_permission.is_auth] + [ | ||
get_workspace_permission(permission_id, workspace_id), get_permission(permission_id)] | ||
|
||
|
||
def get_permission_list(user, | ||
workspace_user_role_mapping_model, | ||
workspace_model, | ||
role_model, | ||
role_permission_mapping_model): | ||
version, get_key = Cache_Version.PERMISSION_LIST.value | ||
key = get_key(user_id, workspace_id) | ||
user_id = user.id | ||
version = Cache_Version.PERMISSION_LIST.get_version() | ||
key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id) | ||
# 获取权限列表 | ||
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None | ||
permission_list = cache.get(key, version=version) | ||
|
@@ -37,71 +67,80 @@ def get_permission_list(user_id, | |
role_permission_mapping_list = QuerySet(role_permission_mapping_model).filter( | ||
role_id__in=[workspace_user_role_mapping.role_id for workspace_user_role_mapping in | ||
workspace_user_role_mapping_list]) | ||
permission_list = [role_model.id for role_model in role_permission_mapping_list] | ||
role_dict = group_by(role_permission_mapping_list, lambda item: item.get('role_id')) | ||
|
||
workspace_user_permission_list = QuerySet(WorkspaceUserPermission).filter( | ||
workspace_id__in=[workspace_user_role.workspace_id for workspace_user_role in | ||
workspace_user_role_mapping_list]) | ||
workspace_user_permission_dict = group_by(workspace_user_permission_list, | ||
key=lambda item: item.workspace_id) | ||
permission_list = [ | ||
get_workspace_resource_permission_list(role_permission_mapping.permission_id, | ||
role_dict.get(role_permission_mapping.role_id).workspace_id, | ||
workspace_user_permission_dict) | ||
for role_permission_mapping in | ||
role_permission_mapping_list] | ||
|
||
# 将二维数组扁平为一维 | ||
permission_list = reduce(lambda x, y: [*x, *y], permission_list, []) | ||
cache.set(key, permission_list, version=version) | ||
else: | ||
permission_list = get_default_permission_list_by_role(RoleConstants.ADMIN) | ||
workspace_id_list = ['default'] | ||
workspace_user_permission_list = QuerySet(WorkspaceUserPermission).filter( | ||
workspace_id__in=workspace_id_list) | ||
|
||
workspace_user_permission_dict = group_by(workspace_user_permission_list, | ||
key=lambda item: item.workspace_id) | ||
permission_list = get_default_permission_list_by_role(RoleConstants[user.role]) | ||
permission_list = [ | ||
get_workspace_resource_permission_list(permission, 'default', workspace_user_permission_dict) for | ||
permission | ||
in permission_list] | ||
# 将二维数组扁平为一维 | ||
permission_list = reduce(lambda x, y: [*x, *y], permission_list, []) | ||
cache.set(key, permission_list, version=version) | ||
return permission_list | ||
|
||
|
||
def get_workspace_list(user_id, | ||
workspace_id, | ||
workspace_user_role_mapping_model, | ||
workspace_model, | ||
role_model, | ||
role_permission_mapping_model): | ||
version, get_key = Cache_Version.WORKSPACE_LIST.value | ||
key = get_key(user_id) | ||
workspace_list = cache.get(key, version=version) | ||
# 获取权限列表 | ||
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None | ||
if workspace_list is None: | ||
if is_query_model: | ||
# 获取工作空间 用户 角色映射数据 | ||
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id) | ||
cache.set(key, [workspace_user_role_mapping.workspace_id for workspace_user_role_mapping in | ||
workspace_user_role_mapping_list], version=version) | ||
else: | ||
return ["default"] | ||
return workspace_list | ||
|
||
|
||
def get_role_list(user, | ||
workspace_id, | ||
workspace_user_role_mapping_model, | ||
workspace_model, | ||
role_model, | ||
role_permission_mapping_model): | ||
version, get_key = Cache_Version.ROLE_LIST.value | ||
key = get_key(user.id, workspace_id) | ||
""" | ||
获取当前用户的角色列表 | ||
""" | ||
version = Cache_Version.ROLE_LIST.get_version() | ||
key = Cache_Version.ROLE_LIST.get_key(user_id=user.id) | ||
workspace_list = cache.get(key, version=version) | ||
# 获取权限列表 | ||
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None | ||
if workspace_list is None: | ||
if is_query_model: | ||
# 获取工作空间 用户 角色映射数据 | ||
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user.id) | ||
cache.set(key, [workspace_user_role_mapping.role_id for workspace_user_role_mapping in | ||
workspace_user_role_mapping_list], version=version) | ||
cache.set(key, | ||
[f"{workspace_user_role_mapping.role_id}:/WORKSPACE/{workspace_user_role_mapping.workspace_id}" | ||
for | ||
workspace_user_role_mapping in | ||
workspace_user_role_mapping_list] + [user.role], version=version) | ||
else: | ||
cache.set(key, [user.role], version=version) | ||
return [user.role] | ||
return workspace_list | ||
|
||
|
||
def get_auth(user, workspace_id): | ||
def get_auth(user): | ||
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping") | ||
workspace_model = DatabaseModelManage.get_model("workspace_model") | ||
role_model = DatabaseModelManage.get_model("role_model") | ||
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model") | ||
workspace_list = get_workspace_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model, | ||
role_model, role_permission_mapping_model) | ||
permission_list = get_permission_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model, | ||
|
||
permission_list = get_permission_list(user, workspace_user_role_mapping_model, workspace_model, | ||
role_model, role_permission_mapping_model) | ||
role_list = get_role_list(user, workspace_id, workspace_user_role_mapping_model, workspace_model, | ||
role_list = get_role_list(user, workspace_user_role_mapping_model, workspace_model, | ||
role_model, role_permission_mapping_model) | ||
return Auth(workspace_list, workspace_id, role_list, permission_list) | ||
return Auth(role_list, permission_list) | ||
|
||
|
||
class UserToken(AuthBaseHandle): | ||
|
@@ -117,8 +156,7 @@ def handle(self, request, token: str, get_token_details): | |
if cache_token is None: | ||
raise AppAuthenticationFailed(1002, _('Login expired')) | ||
auth_details = get_token_details() | ||
# 当前工作空间 | ||
current_workspace = auth_details['current_workspace'] | ||
cache.touch(token, timeout=datetime.timedelta(seconds=60 * 60 * 2).seconds, version=version) | ||
user = QuerySet(User).get(id=auth_details['id']) | ||
auth = get_auth(user, current_workspace) | ||
auth = get_auth(user) | ||
return user, auth | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code Review ReportFile: Irregularities/Critical Issues:
Suggested Improvements:
# Caches the fetched data with the specified key and version number for fast retrieval and validation against cache expiration timestamps Overall Suggestions
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,10 @@ class Group(Enum): | |
""" | ||
USER = "USER" | ||
|
||
APPLICATION = "APPLICATION" | ||
|
||
KNOWLEDGE = "KNOWLEDGE" | ||
|
||
|
||
class Operate(Enum): | ||
""" | ||
|
@@ -38,29 +42,44 @@ class RoleGroup(Enum): | |
|
||
|
||
class Role: | ||
def __init__(self, name: str, decs: str, group: RoleGroup): | ||
def __init__(self, name: str, decs: str, group: RoleGroup, resource_path=None): | ||
self.name = name | ||
self.decs = decs | ||
self.group = group | ||
self.resource_path = resource_path | ||
|
||
def __str__(self): | ||
return self.name + ( | ||
(":" + self.resource_path) if self.resource_path is not None else '') | ||
|
||
def __eq__(self, other): | ||
return str(self) == str(other) | ||
|
||
|
||
class RoleConstants(Enum): | ||
ADMIN = Role("ADMIN", '超级管理员', RoleGroup.SYSTEM_USER) | ||
WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", '工作空间管理员', RoleGroup.SYSTEM_USER) | ||
USER = Role("USER", '普通用户', RoleGroup.SYSTEM_USER) | ||
|
||
def get_workspace_role(self): | ||
return lambda r, kwargs: Role(name=self.value.name, | ||
decs=self.value.decs, | ||
group=self.value.group, | ||
resource_path= | ||
f"/WORKSPACE/{kwargs.get('workspace_id')}") | ||
|
||
|
||
class Permission: | ||
""" | ||
权限信息 | ||
""" | ||
|
||
def __init__(self, group: Group, operate: Operate, dynamic_tag=None, role_list=None): | ||
def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None): | ||
if role_list is None: | ||
role_list = [] | ||
self.group = group | ||
self.operate = operate | ||
self.dynamic_tag = dynamic_tag | ||
self.resource_path = resource_path | ||
# 用于获取角色与权限的关系,只适用于没有权限管理的 | ||
self.role_list = role_list | ||
|
||
|
@@ -76,7 +95,7 @@ def new_instance(permission_str: str): | |
|
||
def __str__(self): | ||
return self.group.value + ":" + self.operate.value + ( | ||
(":" + self.dynamic_tag) if self.dynamic_tag is not None else '') | ||
(":" + self.resource_path) if self.resource_path is not None else '') | ||
|
||
def __eq__(self, other): | ||
return str(self) == str(other) | ||
|
@@ -91,6 +110,27 @@ class PermissionConstants(Enum): | |
USER_EDIT = Permission(group=Group.USER, operate=Operate.EDIT, role_list=[RoleConstants.ADMIN]) | ||
USER_DELETE = Permission(group=Group.USER, operate=Operate.DELETE, role_list=[RoleConstants.ADMIN]) | ||
|
||
def get_workspace_application_permission(self): | ||
return lambda r, kwargs: Permission(group=self.value.group, operate=self.value.operate, | ||
resource_path= | ||
f"/WORKSPACE/{kwargs.get('workspace_id')}/APPLICATION/{kwargs.get('application_id')}") | ||
|
||
def get_workspace_knowledge_permission(self): | ||
return lambda r, kwargs: Permission(group=self.value.group, operate=self.value.operate, | ||
resource_path= | ||
f"/WORKSPACE/{kwargs.get('workspace_id')}/KNOWLEDGE/{kwargs.get('knowledge_id')}") | ||
|
||
def get_workspace_permission(self): | ||
return lambda r, kwargs: Permission(group=self.value.group, operate=self.value.operate, | ||
resource_path= | ||
f"/WORKSPACE/{kwargs.get('workspace_id')}") | ||
|
||
def __eq__(self, other): | ||
if isinstance(other, PermissionConstants): | ||
return other == self | ||
else: | ||
return self.value == other | ||
|
||
|
||
def get_default_permission_list_by_role(role: RoleConstants): | ||
""" | ||
|
@@ -109,15 +149,9 @@ class Auth: | |
""" | ||
|
||
def __init__(self, | ||
work_space_list: List, | ||
current_workspace, | ||
current_role_list: List[Role], | ||
permission_list: List[PermissionConstants | Permission], | ||
**keywords): | ||
# 当前用户所有工作空间 | ||
self.work_space_list = work_space_list | ||
# 当前工作空间 | ||
self.current_workspace = current_workspace | ||
# 当前工作空间的所有权限+非工作空间权限 | ||
self.permission_list = permission_list | ||
# 当前工作空间角色列表 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code ReviewThe provided code looks largely well-structured; however, there are a few areas that can be improved: 1. Resource Management
2. Method Overloading with Lambda Expressions
3. Equality Operators
4. Error Handling
Suggested Changes:
Here's an updated version incorporating some of these suggestions: from enum import Enum,.auto
from typing import Optional, List, Any, Callable
class Group(Enum):
USER = auto()
APPLICATION = auto()
KNOWLEDGE = auto()
class Operate(Enum):
EDIT = auto()
DELETE = auto()
class RoleGroup(Enum):
SYSTEM_USER = auto()
class Role:
def __init__(self, name: str, description: str, group: RoleGroup, resource_path: Optional[str] = None):
self.name = name
self.description = description
self.group = group
self.resource_path = resource_path
def __str__(self):
path_part = f" ({self.resource_path})" if self.resource_path else ""
return f"{self.group} {self.name}{path_part}"
def create_workspace_specific_permissions(self, workspace_id: str) -> Permission:
return self.create_resource_based_permission(f"/WORKSPACE/{workspace_id}")
def create_resource_based_permission(self, base_path: str) -> Permission:
return Permission(
group=group,
operate=operate,
resource_path=f"{base_path}/{self.name}"
)
class RoleConstants(Enum):
ADMIN = Role("ADMIN", "超级管理员", RoleGroup.SYSTEM_USER)
WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", "工作空间管理员", RoleGroup.SYSTEM_USER)
USER = Role("USER", "普通用户", RoleGroup.SYSTEM_USER)
@classmethod
def get_workspace_admin(cls) -> PermissionCallable:
return cls.WORKSPACE_MANAGE.create_workspace_specific_permissions
@classmethod
def get_user_edit(cls) -> PermissionCallable:
return cls.USER.create_workspace_specific_permissions(operate=Operate.EDIT)
class Permission:
"""权限信息"""
def __init__(
self,
group: Group,
operate: Operate,
resource_path: Optional[str] = None
):
self.group = group
self.operate = operate
self.resource_path = resource_path
def __str__(self):
path_part = f":{self.resource_path}" if self.resource_path else ""
return f"{self.group}:{self.operate}{path_part}"
@property
def has_common_access(self) -> bool:
common_groups = {Group.APPLICATION, Group.KNOWLEDGE}
return self.group in common_groups
@property
def is_write_operation(self) -> bool:
return self.operate == Operate.DELETE
permission_constants = {
'ADMIN': RoleConstants.ADMIN.get_workspace_admin(),
'USER_EDIT': RoleConstants.USER.edit(workspace_id='example'),
}
def get_default_permission_list_by_role(role: RoleConstants) -> List[Permission]:
role_name = role.name.upper()
default_permission_group_map = {
'WORKSPACE_ADMIN': ['edit'],
'USER_EDIT': ['delete']
}
groups_to_check = default_permission_group_map.get(role_name, [])
result = [
permission for perm in permission_constants.values()
if any(operation in [p.operate.value for p in groups_to_check])
]
return result This update improves readability and maintainability while maintaining the original functionality of the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Overall, the code looks well-structured and follows best practices for class design and method behavior. However, here are some minor improvements and suggestions to enhance readability, maintainability, and functionality:
Here's a revised version incorporating some of these suggestions: from enum import Enum, unique
@unique
class Group(Enum):
"""Roles"""
USER = "USER"
APPLICATION = "APPLICATION"
KNOWLEDGE = "KNOWLEDGE"
@unique
class Operate(Enum):
"""Actions"""
@unique
class RoleGroup(Enum):
"""Workgroup groups"""
SYSTEM_USER = "SYSTEM_USER"
class Role:
def __init__(self, name: str, desc: str, group: RoleGroup, resource_path=None):
self.name = name
self.desc = desc
self.group = group
self.resource_path = resource_path
def __str__(self):
return f"{self.name}:{self.resource_path}" if self.resource_path else self.name
def __repr__(self):
return self.__str__()
def __eq__(self, other):
return hasattr(other, 'name') and self.name == other.name
class RoleConstants(Enum):
ADMIN = Role("ADMIN", 'Super Administrator', RoleGroup.SYSTEM_USER)
WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", 'Workspace Manager', RoleGroup.system_user)
USER = Role("USER", "Normal User", RoleGroup.SYSTEM_USER)
def get_resource(self, workspace_id=None, application_id=None, knowledge_id=None):
components = [f'/WORKSPACE/{workspace_id}' if workspace_id else '',
f'/APPLICATION/{application_id}' if application_id else '',
f'/KNOWLEDGE/{knowledge_id}' if knowledge_id else '']
return "".join(components).lstrip('/')
@classmethod
def build(cls, group_enum_value, operation_enum_value, *, role_name=None, role_desc='',
resource_type='WORKSPACE', resource_id=None):
try:
role_instance = cls[role_name]
except KeyError:
raise ValueError(f"Invalid role '{role_name}'. Valid choices: {cls.choices}")
resource_path = cls.get_resource(**vars(role_instance))
return Permission(
group=group_enum_value,
operate=operation_enum_value,
resource_path=f"/{resource_type}/{resource_path}",
role_list=[role_instance]
)
class Permission:
def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None):
if role_list is None:
role_list = []
self.group = group
self.operate = operate
self.resource_path = resource_path
self.role_list = role_list
def __str__(self):
components = [
f"{self.group.value}",
f":{self.operate.value}"
]
if self.resource_path:
components.append(f":{self.resource_path.strip('/')}")
return "/".join(filter(None, components))
def __eq__(self, other):
if isinstance(other, Permission):
return all(zip_longest(map(attrgetter('value'), self), map(attrgetter('value'), other)))
return False
from collections import OrderedDict
from typing import Callable
@unique
class AuthorizationCodeEnum:
VIEW_PERMISSION_CODE = ('view_permissions', PermissionConstants.VIEW.PERMISSION)
EDIT_PERMISSION_CODE = ('edit_permissions', PermissionConstants.EDIT PERMISSION)
DELETE_PERMISSION_CODE = ('delete_permissions', PermissionConstants.DELETE.PERMIS)
class Auth(object):
def __init__(
self,
current_workspace: str | None,
current_roles: list[Role] | None,
permissions: list[Callable[..., Permission]], # Function generating individual authorization objects
keywords={}
):
"""
Initialize an Authentication object.
:param current_workspace: Id of the currently active workspace.
:param current_roles: List of roles associated with the user account.
:param permissions: Optional custom permissions or actions to add/override defaults.
:param kwargs: Additional keyword args needed for permission creation (e.g., workspace_id).
"""
self.current_workspace = current_workspace
self.permissions = []
# Generate default permissions if no provided
default_perms = {
'GET_USERS': PermissionConstants.GRANT_GET_USERS,
'MODIFY_USERS': Permission Constants.MANUAL_MODIFICATION_OF_USER_LIST,
'DELETE_USERS': PermissionConstants.DELETE_USER
}
self.add_default_permissions(default_perms)
# Apply additional provided permissions
for perm_generator in permissions:
if callable(perm_generator): # Check if function is provided
new_perm = perm_generator(kwargs)
if new_perm:
self.permissions.append(new_perm)
# Append custom actions or operations here (not shown in sample) This version refactors the original code into smaller, manageable parts while introducing better organization and consistency. It also includes some improvements to improve error reporting and make the code more flexible for future expansion. |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from django.contrib import admin | ||
|
||
# Register your models here. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from django.apps import AppConfig | ||
|
||
|
||
class SystemManageConfig(AppConfig): | ||
default_auto_field = 'django.db.models.BigAutoField' | ||
name = 'system_manage' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regularities, potential issues, and optimization suggestions:
Use of
get_key
function: In multiple parts of the code (e.g., inCache_Version.PERMISSION_LIST
,Cache_Version.WORKSPACE_LIST
, etc.), there is an issue whereget_key
seems to be used directly without passing the necessary arguments when generating cache keys. Ensure that these functions properly accept parameters so they can generate consistent cache keys.Redundant checks for
is_query_model
: Multiple places have redundant checks like checking ifworkspace_user_role_mapping_model
, etc., are not none before proceeding. Simplify by removing unnecessary conditions unless they're needed elsewhere.Hardcoded roles and permissions: Hardcoding specific role constants (
RoleConstants.ADMIN
) or permission constants could make maintenance difficult. Instead, consider using more dynamic approach to reference these values from settings or other configuration files.Multiple calls to external services: The
group_by
function call appears three times across two different methods (get_workspace_resource_permission_list
and a helper method inside it withinget_workspace_list
). This could lead to repeated calculations which might be inefficient for large datasets. Consider caching results locally if applicable.Version management: There's no proper mechanism for maintaining or managing the cache versions. Implement a strategy to automatically update cache keys according to changes in data structure or logic.
Flatten lists unnecessarily: You manually flatten a list at one point but avoid flattening the final result set returned at several points throughout the logic. Consolidating this into one place would improve readability and efficiency.
Unused imports: Unused imports should be removed for better performance and cleaner code. Remove imports related to classes used only during type hinting or unused modules.
Here’s a revised version addressing some of these concerns succinctly:
This revision focuses on simplifying and cleaning up some aspects listed above while maintaining the core functionality.