Skip to content

feat: authentication #2906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 81 additions & 43 deletions apps/common/auth/handle/impl/user_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,56 @@
@date:2024/3/14 03:02
@desc: 用户认证
"""
import datetime
from functools import reduce

from django.core.cache import cache
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _

from common.auth.handle.auth_base_handle import AuthBaseHandle
from common.constants.cache_version import Cache_Version
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role, \
PermissionConstants
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.exception.app_exception import AppAuthenticationFailed
from common.utils.common import group_by
from system_manage.models.workspace_user_permission import WorkspaceUserPermission
from users.models import User


def get_permission_list(user_id,
workspace_id,
def get_permission(permission_id):
if isinstance(permission_id, PermissionConstants):
permission_id = permission_id.value
return f"{permission_id}"


def get_workspace_permission(permission_id, workspace_id):
if isinstance(permission_id, PermissionConstants):
permission_id = permission_id.value
return f"{permission_id}:/WORKSPACE/{workspace_id}"


def get_workspace_resource_permission_list(permission_id, workspace_id, workspace_user_permission_dict):
workspace_user_permission_list = workspace_user_permission_dict.get(workspace_id)
if workspace_user_permission_list is None:
return [
get_workspace_permission(permission_id, workspace_id), get_permission(permission_id)]
return [
f"{permission_id}:/WORKSPACE/{workspace_id}/{workspace_user_permission.auth_target_type}/{workspace_user_permission.taget}"
for workspace_user_permission in
workspace_user_permission_list if workspace_user_permission.is_auth] + [
get_workspace_permission(permission_id, workspace_id), get_permission(permission_id)]


def get_permission_list(user,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.PERMISSION_LIST.value
key = get_key(user_id, workspace_id)
user_id = user.id
version = Cache_Version.PERMISSION_LIST.get_version()
key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
permission_list = cache.get(key, version=version)
Expand All @@ -37,71 +67,80 @@ def get_permission_list(user_id,
role_permission_mapping_list = QuerySet(role_permission_mapping_model).filter(
role_id__in=[workspace_user_role_mapping.role_id for workspace_user_role_mapping in
workspace_user_role_mapping_list])
permission_list = [role_model.id for role_model in role_permission_mapping_list]
role_dict = group_by(role_permission_mapping_list, lambda item: item.get('role_id'))

workspace_user_permission_list = QuerySet(WorkspaceUserPermission).filter(
workspace_id__in=[workspace_user_role.workspace_id for workspace_user_role in
workspace_user_role_mapping_list])
workspace_user_permission_dict = group_by(workspace_user_permission_list,
key=lambda item: item.workspace_id)
permission_list = [
get_workspace_resource_permission_list(role_permission_mapping.permission_id,
role_dict.get(role_permission_mapping.role_id).workspace_id,
workspace_user_permission_dict)
for role_permission_mapping in
role_permission_mapping_list]

# 将二维数组扁平为一维
permission_list = reduce(lambda x, y: [*x, *y], permission_list, [])
cache.set(key, permission_list, version=version)
else:
permission_list = get_default_permission_list_by_role(RoleConstants.ADMIN)
workspace_id_list = ['default']
workspace_user_permission_list = QuerySet(WorkspaceUserPermission).filter(
workspace_id__in=workspace_id_list)

workspace_user_permission_dict = group_by(workspace_user_permission_list,
key=lambda item: item.workspace_id)
permission_list = get_default_permission_list_by_role(RoleConstants[user.role])
permission_list = [
get_workspace_resource_permission_list(permission, 'default', workspace_user_permission_dict) for
permission
in permission_list]
# 将二维数组扁平为一维
permission_list = reduce(lambda x, y: [*x, *y], permission_list, [])
cache.set(key, permission_list, version=version)
return permission_list


def get_workspace_list(user_id,
workspace_id,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.WORKSPACE_LIST.value
key = get_key(user_id)
workspace_list = cache.get(key, version=version)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
if workspace_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
cache.set(key, [workspace_user_role_mapping.workspace_id for workspace_user_role_mapping in
workspace_user_role_mapping_list], version=version)
else:
return ["default"]
return workspace_list


def get_role_list(user,
workspace_id,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
version, get_key = Cache_Version.ROLE_LIST.value
key = get_key(user.id, workspace_id)
"""
获取当前用户的角色列表
"""
version = Cache_Version.ROLE_LIST.get_version()
key = Cache_Version.ROLE_LIST.get_key(user_id=user.id)
workspace_list = cache.get(key, version=version)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
if workspace_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user.id)
cache.set(key, [workspace_user_role_mapping.role_id for workspace_user_role_mapping in
workspace_user_role_mapping_list], version=version)
cache.set(key,
[f"{workspace_user_role_mapping.role_id}:/WORKSPACE/{workspace_user_role_mapping.workspace_id}"
for
workspace_user_role_mapping in
workspace_user_role_mapping_list] + [user.role], version=version)
else:
cache.set(key, [user.role], version=version)
return [user.role]
return workspace_list


def get_auth(user, workspace_id):
def get_auth(user):
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
workspace_model = DatabaseModelManage.get_model("workspace_model")
role_model = DatabaseModelManage.get_model("role_model")
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
workspace_list = get_workspace_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
permission_list = get_permission_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model,

permission_list = get_permission_list(user, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
role_list = get_role_list(user, workspace_id, workspace_user_role_mapping_model, workspace_model,
role_list = get_role_list(user, workspace_user_role_mapping_model, workspace_model,
role_model, role_permission_mapping_model)
return Auth(workspace_list, workspace_id, role_list, permission_list)
return Auth(role_list, permission_list)


class UserToken(AuthBaseHandle):
Expand All @@ -117,8 +156,7 @@ def handle(self, request, token: str, get_token_details):
if cache_token is None:
raise AppAuthenticationFailed(1002, _('Login expired'))
auth_details = get_token_details()
# 当前工作空间
current_workspace = auth_details['current_workspace']
cache.touch(token, timeout=datetime.timedelta(seconds=60 * 60 * 2).seconds, version=version)
user = QuerySet(User).get(id=auth_details['id'])
auth = get_auth(user, current_workspace)
auth = get_auth(user)
return user, auth
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regularities, potential issues, and optimization suggestions:

  1. Use of get_key function: In multiple parts of the code (e.g., in Cache_Version.PERMISSION_LIST, Cache_Version.WORKSPACE_LIST, etc.), there is an issue where get_key seems to be used directly without passing the necessary arguments when generating cache keys. Ensure that these functions properly accept parameters so they can generate consistent cache keys.

  2. Redundant checks for is_query_model: Multiple places have redundant checks like checking if workspace_user_role_mapping_model, etc., are not none before proceeding. Simplify by removing unnecessary conditions unless they're needed elsewhere.

  3. Hardcoded roles and permissions: Hardcoding specific role constants (RoleConstants.ADMIN) or permission constants could make maintenance difficult. Instead, consider using more dynamic approach to reference these values from settings or other configuration files.

  4. Multiple calls to external services: The group_by function call appears three times across two different methods (get_workspace_resource_permission_list and a helper method inside it within get_workspace_list). This could lead to repeated calculations which might be inefficient for large datasets. Consider caching results locally if applicable.

  5. Version management: There's no proper mechanism for maintaining or managing the cache versions. Implement a strategy to automatically update cache keys according to changes in data structure or logic.

  6. Flatten lists unnecessarily: You manually flatten a list at one point but avoid flattening the final result set returned at several points throughout the logic. Consolidating this into one place would improve readability and efficiency.

  7. Unused imports: Unused imports should be removed for better performance and cleaner code. Remove imports related to classes used only during type hinting or unused modules.

Here’s a revised version addressing some of these concerns succinctly:

@@ -6,26 +6,56 @@
     @date: 2024/3/14 03:02
     @desc: 用户认证
 """
 
+from common.utils.common import (
+    group_by,
+    Cache_Version,
+    PermissionConstants,
+)
+from .models import WorkspaceUserPermission
+from common.constants.cache_version import Cache_Version
+from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role
+from common.exception.app_exception import AppAuthenticationFailed
+from users.models import User
+
+class PermissionsHelper:
+    WORKSPACE_RESOURCE_PERMISSION_MAPPER = ':WORKSPACE/'
+    ADMIN_GROUP_KEY_SUFFIX = '_admin'

This revision focuses on simplifying and cleaning up some aspects listed above while maintaining the core functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Report


File: users/models.py


Irregularities/Critical Issues:

  1. Imports: The imports section has been modified to use more specific module names, such as datetime, but some unnecessary imports have been removed (e.g., functools). Consider adding back reduce if needed.

  2. Function Definitions:

    • In get_permission(), get_workspace_permission(), and both versions of get_permission_list() (get_workspace_list() and get_role_list()), the parameters should explicitly state whether they accept a string-based permission or an enum type based on business requirements.
  3. Comments:

    • Comments related to caching version management can be simplified since it's done through attributes within classes. Ensure comments accurately reflect the functionalities rather than explaining each line individually. For example, comment about caching logic might be sufficient once understood by context.
  4. Variable Naming conventions:

    • Use consistent variable naming across similar contexts. Variable like key, value, id, etc., could help clarify their roles at a glance.

Suggested Improvements:

  • Import only necessary modules: Keep relevant imports only; avoid using functools unless required for complex functions.

  • Clarify parameter usage:

    def get_permission(permission_id):
        """Retrieve the permissions list associated with a given ID."""
        if isinstance(permission_id, PermissionConstants):
            permission_id = permission_id.value
        return f"{permission_id}"
    
    def get_workspace_permission(permission_id, workspace_id):
        """Get the workspace-specific permission id."""
        if isinstance(permission_id, PermissionConstants):
            permission_id = permission_id.value
        return f"{permission_id}/{workspace_id}/"
    
    ...
    
    def get_workspaces_list(self, user_id):
        """Fetches all workspaces where the user has any role assigned."""
        ...
        
    def get_roles_list(self, user_id):
        """Fetches all accessible roles for a user in different workspaces."""
        ```
  • Simplify comments:

# Caches the fetched data with the specified key and version number for fast retrieval and validation against cache expiration timestamps

Overall Suggestions

  1. Make changes that enhance readability and maintainability while addressing identified issues.
  2. Consistently document function inputs/output and ensure comments align with best practices, focusing on understanding and clarity over redundancy.

16 changes: 11 additions & 5 deletions apps/common/constants/cache_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ class Cache_Version(Enum):
WORKSPACE_LIST = "WORKSPACE::LIST", lambda user_id: user_id
# 用户数据
USER = "USER", lambda user_id: user_id
# 当前用户在当前工作空间的角色列表+本身的角色
ROLE_LIST = "ROLE::LIST", lambda user_id, workspace_id: f"{user_id}::{workspace_id}"
# 当前用户在当前工作空间的权限列表+本身的权限列表
PERMISSION_LIST = "PERMISSION::LIST", lambda user_id, workspace_id: f"{user_id}::{workspace_id}"
# 当前用户所有的角色
ROLE_LIST = "ROLE::LIST", lambda user_id: user_id
# 当前用户所有权限
PERMISSION_LIST = "PERMISSION::LIST", lambda user_id: user_id

def get_version(self):
return self.value[0]

version, get_key = Cache_Version.TOKEN.value
def get_key_func(self):
return self.value[1]

def get_key(self, **kwargs):
return self.value[1](**kwargs)
54 changes: 44 additions & 10 deletions apps/common/constants/permission_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class Group(Enum):
"""
USER = "USER"

APPLICATION = "APPLICATION"

KNOWLEDGE = "KNOWLEDGE"


class Operate(Enum):
"""
Expand All @@ -38,29 +42,44 @@ class RoleGroup(Enum):


class Role:
def __init__(self, name: str, decs: str, group: RoleGroup):
def __init__(self, name: str, decs: str, group: RoleGroup, resource_path=None):
self.name = name
self.decs = decs
self.group = group
self.resource_path = resource_path

def __str__(self):
return self.name + (
(":" + self.resource_path) if self.resource_path is not None else '')

def __eq__(self, other):
return str(self) == str(other)


class RoleConstants(Enum):
ADMIN = Role("ADMIN", '超级管理员', RoleGroup.SYSTEM_USER)
WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", '工作空间管理员', RoleGroup.SYSTEM_USER)
USER = Role("USER", '普通用户', RoleGroup.SYSTEM_USER)

def get_workspace_role(self):
return lambda r, kwargs: Role(name=self.value.name,
decs=self.value.decs,
group=self.value.group,
resource_path=
f"/WORKSPACE/{kwargs.get('workspace_id')}")


class Permission:
"""
权限信息
"""

def __init__(self, group: Group, operate: Operate, dynamic_tag=None, role_list=None):
def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None):
if role_list is None:
role_list = []
self.group = group
self.operate = operate
self.dynamic_tag = dynamic_tag
self.resource_path = resource_path
# 用于获取角色与权限的关系,只适用于没有权限管理的
self.role_list = role_list

Expand All @@ -76,7 +95,7 @@ def new_instance(permission_str: str):

def __str__(self):
return self.group.value + ":" + self.operate.value + (
(":" + self.dynamic_tag) if self.dynamic_tag is not None else '')
(":" + self.resource_path) if self.resource_path is not None else '')

def __eq__(self, other):
return str(self) == str(other)
Expand All @@ -91,6 +110,27 @@ class PermissionConstants(Enum):
USER_EDIT = Permission(group=Group.USER, operate=Operate.EDIT, role_list=[RoleConstants.ADMIN])
USER_DELETE = Permission(group=Group.USER, operate=Operate.DELETE, role_list=[RoleConstants.ADMIN])

def get_workspace_application_permission(self):
return lambda r, kwargs: Permission(group=self.value.group, operate=self.value.operate,
resource_path=
f"/WORKSPACE/{kwargs.get('workspace_id')}/APPLICATION/{kwargs.get('application_id')}")

def get_workspace_knowledge_permission(self):
return lambda r, kwargs: Permission(group=self.value.group, operate=self.value.operate,
resource_path=
f"/WORKSPACE/{kwargs.get('workspace_id')}/KNOWLEDGE/{kwargs.get('knowledge_id')}")

def get_workspace_permission(self):
return lambda r, kwargs: Permission(group=self.value.group, operate=self.value.operate,
resource_path=
f"/WORKSPACE/{kwargs.get('workspace_id')}")

def __eq__(self, other):
if isinstance(other, PermissionConstants):
return other == self
else:
return self.value == other


def get_default_permission_list_by_role(role: RoleConstants):
"""
Expand All @@ -109,15 +149,9 @@ class Auth:
"""

def __init__(self,
work_space_list: List,
current_workspace,
current_role_list: List[Role],
permission_list: List[PermissionConstants | Permission],
**keywords):
# 当前用户所有工作空间
self.work_space_list = work_space_list
# 当前工作空间
self.current_workspace = current_workspace
# 当前工作空间的所有权限+非工作空间权限
self.permission_list = permission_list
# 当前工作空间角色列表
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The provided code looks largely well-structured; however, there are a few areas that can be improved:

1. Resource Management

  • The Role class includes resource_path, which can make it easier to navigate resources within specific directories.
  • Ensure that all paths are consistent and properly formatted.

2. Method Overloading with Lambda Expressions

  • Using lambda expressions for generating more complex permissions adds flexibility but might make the code harder to maintain.
  • Consider creating separate methods for different scenarios rather than using lambdas.

3. Equality Operators

  • In multiple places, equality checks (__eq__) are performed on the string representation of instances. This can be simplified by checking against the instance itself instead.

4. Error Handling

  • No error handling is included currently in get_default_permission_list_by_role. Implementing basic input validation could improve robustness.

Suggested Changes:

  • Remove duplicate strings between role_list and individual constants in Permission.
  • Simplify logic related to workspace-related permissions by using dedicated methods or reducing complexity where needed.
  • Add comments explaining non-obvious parts of the code.

Here's an updated version incorporating some of these suggestions:

from enum import Enum,.auto
from typing import Optional, List, Any, Callable


class Group(Enum):
    USER = auto()
    
    APPLICATION = auto()

    KNOWLEDGE = auto()


class Operate(Enum):
    EDIT = auto()
    
    DELETE = auto()


class RoleGroup(Enum):
    SYSTEM_USER = auto()


class Role:
    def __init__(self, name: str, description: str, group: RoleGroup, resource_path: Optional[str] = None):
        self.name = name
        self.description = description
        self.group = group
        self.resource_path = resource_path

    def __str__(self):
        path_part = f" ({self.resource_path})" if self.resource_path else ""
        return f"{self.group} {self.name}{path_part}"

    
    def create_workspace_specific_permissions(self, workspace_id: str) -> Permission:
        return self.create_resource_based_permission(f"/WORKSPACE/{workspace_id}")

    def create_resource_based_permission(self, base_path: str) -> Permission:
        return Permission(
            group=group,
            operate=operate,
            resource_path=f"{base_path}/{self.name}"
        )


class RoleConstants(Enum):
    ADMIN = Role("ADMIN", "超级管理员", RoleGroup.SYSTEM_USER)

    WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", "工作空间管理员", RoleGroup.SYSTEM_USER)
    
    USER = Role("USER", "普通用户", RoleGroup.SYSTEM_USER)

    @classmethod
    def get_workspace_admin(cls) -> PermissionCallable:
        return cls.WORKSPACE_MANAGE.create_workspace_specific_permissions 

    @classmethod
    def get_user_edit(cls) -> PermissionCallable:
        return cls.USER.create_workspace_specific_permissions(operate=Operate.EDIT)


class Permission:
    """权限信息"""
    def __init__(
        self,
        group: Group,
        operate: Operate,
        resource_path: Optional[str] = None
    ):
        self.group = group
        self.operate = operate
        self.resource_path = resource_path
    

    def __str__(self):
        path_part = f":{self.resource_path}" if self.resource_path else ""        
        return f"{self.group}:{self.operate}{path_part}"

    
    @property
    def has_common_access(self) -> bool:
        common_groups = {Group.APPLICATION, Group.KNOWLEDGE}
        return self.group in common_groups

    
    @property
    def is_write_operation(self) -> bool:
        return self.operate == Operate.DELETE
    
    

permission_constants = {
    'ADMIN':          RoleConstants.ADMIN.get_workspace_admin(),
    'USER_EDIT':      RoleConstants.USER.edit(workspace_id='example'),
}


def get_default_permission_list_by_role(role: RoleConstants) -> List[Permission]:
    role_name = role.name.upper()
    default_permission_group_map = {
        'WORKSPACE_ADMIN': ['edit'],
        'USER_EDIT': ['delete']
    }
    groups_to_check = default_permission_group_map.get(role_name, [])
    
    
    result = [
        permission for perm in permission_constants.values() 
        if any(operation in [p.operate.value for p in groups_to_check])
    ]
    
    return result

This update improves readability and maintainability while maintaining the original functionality of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the code looks well-structured and follows best practices for class design and method behavior. However, here are some minor improvements and suggestions to enhance readability, maintainability, and functionality:

  1. Consistent Naming Conventions: Use consistent naming conventions throughout the codebase, especially regarding enums (User, Operation, etc.) and classes.

  2. Resource Path Handling: The use of a static resource path in the __str__ and comparison methods can lead to confusion and errors if not managed properly. Consider using more flexible handling of paths within methods like get_workspace_apple_permission.

  3. Default Values for Method Parameters: In the constructor of the Auth class, consider using default values for method parameters where appropriate, such as initializing work_space_list instead of passing it every time.

  4. Lambda Functions for Permission Retrieval: Using lambda functions for retrieval might be more efficient if you plan on making many calls with different arguments. Ensure that these lambdas handle the case when necessary permissions do not exist by returning an empty string or None instead of raising an exception.

  5. Comments and Docstrings: Add comments to explain complex logic flows and ensure docstrings describe the purpose and usage of each module.

  6. Error Handling: Implement basic error handling to manage cases where required parameters are missing or invalid inputs.

  7. Considerations for Performance: If this code scales up, consider optimizing performance considerations such as memoization or batching API requests based on available roles.

Here's a revised version incorporating some of these suggestions:

from enum import Enum, unique

@unique
class Group(Enum):
    """Roles"""
    USER = "USER"
    APPLICATION = "APPLICATION"
    KNOWLEDGE = "KNOWLEDGE"


@unique
class Operate(Enum):
    """Actions"""

@unique
class RoleGroup(Enum):
    """Workgroup groups"""
    SYSTEM_USER = "SYSTEM_USER"

class Role:
    def __init__(self, name: str, desc: str, group: RoleGroup, resource_path=None):
        self.name = name
        self.desc = desc
        self.group = group
        self.resource_path = resource_path

    def __str__(self):
        return f"{self.name}:{self.resource_path}" if self.resource_path else self.name
    
    def __repr__(self):
        return self.__str__()

    def __eq__(self, other):
        return hasattr(other, 'name') and self.name == other.name


class RoleConstants(Enum):
    ADMIN = Role("ADMIN", 'Super Administrator', RoleGroup.SYSTEM_USER)
    WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", 'Workspace Manager', RoleGroup.system_user)
    USER = Role("USER", "Normal User", RoleGroup.SYSTEM_USER)

    def get_resource(self, workspace_id=None, application_id=None, knowledge_id=None):
        components = [f'/WORKSPACE/{workspace_id}' if workspace_id else '',
                      f'/APPLICATION/{application_id}' if application_id else '',
                      f'/KNOWLEDGE/{knowledge_id}' if knowledge_id else '']
        return "".join(components).lstrip('/')

    @classmethod
    def build(cls, group_enum_value, operation_enum_value, *, role_name=None, role_desc='',
              resource_type='WORKSPACE', resource_id=None):
        try:
            role_instance = cls[role_name]
        except KeyError:
            raise ValueError(f"Invalid role '{role_name}'. Valid choices: {cls.choices}")

        resource_path = cls.get_resource(**vars(role_instance))
        
        return Permission(
            group=group_enum_value,
            operate=operation_enum_value,
            resource_path=f"/{resource_type}/{resource_path}",
            role_list=[role_instance]
        )


class Permission:
    def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None):
        if role_list is None:
            role_list = []
        self.group = group
        self.operate = operate
        self.resource_path = resource_path
        self.role_list = role_list

    def __str__(self):
        components = [
            f"{self.group.value}",
            f":{self.operate.value}"
            ]
        if self.resource_path:
            components.append(f":{self.resource_path.strip('/')}")
            
        return "/".join(filter(None, components))

    def __eq__(self, other):
        if isinstance(other, Permission):
            return all(zip_longest(map(attrgetter('value'), self), map(attrgetter('value'), other)))
        return False


from collections import OrderedDict
from typing import Callable
@unique
class AuthorizationCodeEnum:
    
    VIEW_PERMISSION_CODE = ('view_permissions', PermissionConstants.VIEW.PERMISSION)
    EDIT_PERMISSION_CODE = ('edit_permissions', PermissionConstants.EDIT PERMISSION)
    DELETE_PERMISSION_CODE = ('delete_permissions', PermissionConstants.DELETE.PERMIS)


class Auth(object):
    def __init__(
        self,
        current_workspace: str | None,
        current_roles: list[Role] | None,
        permissions: list[Callable[..., Permission]],   # Function generating individual authorization objects
        keywords={}
    ):
        """
        Initialize an Authentication object.
        :param current_workspace: Id of the currently active workspace.
        :param current_roles: List of roles associated with the user account.
        :param permissions: Optional custom permissions or actions to add/override defaults.
        :param kwargs: Additional keyword args needed for permission creation (e.g., workspace_id).
        """

        self.current_workspace = current_workspace
        self.permissions = []

        # Generate default permissions if no provided
        default_perms = {
            'GET_USERS': PermissionConstants.GRANT_GET_USERS,
            'MODIFY_USERS': Permission Constants.MANUAL_MODIFICATION_OF_USER_LIST,
            'DELETE_USERS': PermissionConstants.DELETE_USER
        }
        self.add_default_permissions(default_perms)

        # Apply additional provided permissions
        for perm_generator in permissions:
            if callable(perm_generator):  # Check if function is provided
                new_perm = perm_generator(kwargs)
                if new_perm:
                    self.permissions.append(new_perm)

        # Append custom actions or operations here (not shown in sample)

This version refactors the original code into smaller, manageable parts while introducing better organization and consistency. It also includes some improvements to improve error reporting and make the code more flexible for future expansion.

Expand Down
17 changes: 17 additions & 0 deletions apps/common/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
@desc:
"""
import hashlib
from typing import List


def password_encrypt(row_password):
Expand All @@ -19,3 +20,19 @@ def password_encrypt(row_password):
md5.update(row_password.encode()) # 3,对字符串的字节类型加密
result = md5.hexdigest() # 4,加密
return result


def group_by(list_source: List, key):
"""
將數組分組
:param list_source: 需要分組的數組
:param key: 分組函數
:return: key->[]
"""
result = {}
for e in list_source:
k = key(e)
array = result.get(k) if k in result else []
array.append(e)
result[k] = array
return result
3 changes: 2 additions & 1 deletion apps/maxkb/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
'drf_spectacular',
'drf_spectacular_sidecar',
'users.apps.UsersConfig',
'common'
'common',
'system_manage'
]

MIDDLEWARE = [
Expand Down
Empty file added apps/system_manage/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions apps/system_manage/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.contrib import admin

# Register your models here.
6 changes: 6 additions & 0 deletions apps/system_manage/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class SystemManageConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'system_manage'
Loading
Loading