天天看點

從0到1開發自動化運維平台-分頁器權限認證

作者:CharlesLai89

這段時間在開發SQL上線工單,N久沒來更新文章了。

今天主要是整理下代碼,然後上傳到github倉庫

編寫RBAC權限

class RbacPermission(BasePermission):
    """
    自定義權限
    """

    @classmethod
    def check_is_admin(cls, request):
        return request.user.is_authenticated and request.user.roles.filter(name='管理者').count() > 0

    @classmethod
    def get_permission_from_role(cls, request):
        try:
            perms = request.user.roles.values(
                'permissions__method',
            ).distinct()
            return [p['permissions__method'] for p in perms]
        except AttributeError:
            return []

    def _has_permission(self, request, view):
        """
        權限擷取方式
            從 perms_map 中擷取, 通過 request.method, http 請求方法來擷取對應權限點
            1. 預設格式
                perms_map = (
                    {'*': ('admin', '管理者')},
                    {'*': ('k8s_all', 'k8s管理')},
                    {'get': ('k8s_list', '檢視k8s')},
                    {'post': ('k8s_create', '建立k8s')},
                    {'put': ('k8s_edit', '編輯k8s')},
                    {'delete': ('k8s_delete', '删除k8s')}
                )
            2. 自定義方法格式
                perms_map = (
                    {'get_test_data': ('get_test_data', '擷取測試資料')},
                )
                此時 格式為  {http請求方法}_{ViewSet自定義action}

        :param request: rest_framework request 對象
        :param view: rest_framework view 對象
        :return:
        """
        _method = request._request.method.lower()
        url_whitelist = PLATFORM_CONFIG['whitelist'] if PLATFORM_CONFIG.get(
            'whitelist', None) else []
        path_info = request.path_info
        for item in url_whitelist:
            url = item['url']
            if url in path_info:
                logger.debug(f'請求位址 {path_info} 命中白名單 {url}, 放行')
                return True

        is_superuser = request.user.is_superuser
        # 超級管理者 或者 白名單模式 直接放行
        if is_superuser:
            logger.debug(
                f'使用者 {request.user} 是超級管理者, 放行 is_superuser = {is_superuser}')
            return True

        is_admin = RbacPermission.check_is_admin(request)
        perms = self.get_permission_from_role(request)
        # 不是管理者 且 權限清單為空的情況下, 直接拒絕
        if not is_admin and not perms:
            logger.debug(f'使用者 {request.user} 不是管理者 且 權限清單為空, 直接拒絕')
            return False

        perms_map = view.perms_map
        # 未配置權限映射的視圖一律禁止通路
        if not hasattr(view, 'perms_map'):
            logger.debug(f'未配置權限映射的視圖一律禁止通路 {view}')
            return False

        # _custom_method = None
        # default_funcs = ['create', 'list', 'retrieve', 'update', 'destroy']
        action = view.action
        _custom_method = f'{_method}_{action}'
        for i in perms_map:
            logger.debug(f'perms_map item ===  {i}')
            for method, alias in i.items():
                # 如果是管理者, 判斷目前perms_map是否帶有 {'*': ('admin', '管理者')} 标記,如果有, 則目前 ViewSet 所有方法全放行
                if is_admin and (method == '*' and alias[0] == 'admin'):
                    logger.debug('管理者判斷通過, 放行')
                    return True
                # 如果帶有某個子產品的管理權限, 則目前子產品所有方法都放行
                if method == '*' and alias[0] in perms:
                    logger.debug('子產品管理權限 判斷通過, 放行')
                    return True

                # 判斷自定義action的情況
                # {'get_test_data': ('get_test_data', '擷取測試資料')},
                # {'*_test_data': ('get_test_data', '擷取測試資料')},
                if _custom_method and alias[0] in perms and (_custom_method == method or method == f'*_{action}'):
                    logger.debug('自定義action權限 判斷通過, 放行')
                    return True

                # 判斷是否擁有ViewSet 某個方法的權限, 有則放行
                # {'get': ('workflow_list', '檢視工單')},
                if _method == method and alias[0] in perms:
                    logger.debug(f'{method}方法權限 判斷通過, 放行')
                    return True
        logger.debug(f'{path_info} 沒有符合條件的, 則預設禁止通路')
        return False

    def has_permission(self, request, view):
        res = self._has_permission(request, view)
        # 記錄權限異常的操作
        if not res:
            pass
        return res
           

更新自定義分頁器

重寫自定義分頁器,變更傳回的資料格式

class CustomPagination(PageNumberPagination):
    def get_paginated_response(self, data):
        return Response({'data': {'list': data, 'total': self.page.paginator.count, 'next': self.get_next_link(),
                                  'previous': self.get_previous_link()}, 'code': 20000, 'message': None}, status=status.HTTP_200_OK)
           

自定義認證

建立檔案common/extends/jwt_auth.py

from rest_framework_simplejwt.authentication import JWTAuthentication as BaseJWTAuthentication

api_settings = APISettings(
    getattr(settings, 'SIMPLE_JWT', None), DEFAULTS, IMPORT_STRINGS)


class JWTAuthentication(BaseJWTAuthentication):

    def get_validated_token(self, raw_token):
        """
        Validates an encoded JSON web token and returns a validated token
        wrapper object.
        """
        messages = []
        for AuthToken in api_settings.AUTH_TOKEN_CLASSES:
            try:
                return AuthToken(raw_token)
            except TokenError as e:
                messages.append(
                    {
                        "token_class": AuthToken.__name__,
                        "token_type": AuthToken.token_type,
                        "message": e.args[0],
                    }
                )

        raise CustomInvalidToken(
            {
                "detail": 'Token不合法或者已經過期,請重新登入.',
                "code": 40100
            }
        )           

編輯settings.py,更新REST_FRAMEWORK段

# drf配置
REST_FRAMEWORK = {
    # 自定義分頁
    'DEFAULT_PAGINATION_CLASS': 'common.extends.pagination.CustomPagination',
    'PAGE_SIZE': 10,
    # 使用者登陸認證方式
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'common.extends.jwt_auth.JWTAuthentication',
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.BasicAuthentication',
    ),
           

大概這些吧,其它一些變更,大家可以檢視github上的代碼

https://github.com/qitan/ydevops-backend

繼續閱讀