天天看點

從0到1開發自動化運維平台-接口文檔分頁視圖和權限配置

作者:CharlesLai89

安裝依賴

pip install djangorestframework-simplejwt
pip install django-filter
pip install coreapi
pip install drf-yasg           

配置swagger接口文檔

1、添加drf_yasg到settings.py

INSTALLED_APPS = [
...
    'rest_framework',
    'drf_yasg',
    'cmdb.apps.CmdbConfig',
]           

2、配置路由

from rest_framework import permissions
from drf_yasg.views import get_schema_view
from drf_yasg import openapi

schema_view = get_schema_view(
    openapi.Info(
        title="DevOps運維平台",
        default_version='v1',
        description="DevOps運維平台 接口文檔",
        terms_of_service="",
        contact=openapi.Contact(email="[email protected]"),
        license=openapi.License(name="BSD License"),
    ),
    public=True,
    permission_classes=(permissions.AllowAny,),
)
...
urlpatterns = [
    path('apidoc/', schema_view.with_ui('swagger', 
...
]           

restframework配置

# drf配置
REST_FRAMEWORK = {
    # 自定義分頁
    'DEFAULT_PAGINATION_CLASS': 'common.extends.pagination.CustomPagination',
    'PAGE_SIZE': 20,
    # 使用者登陸認證方式
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'rest_framework_simplejwt.authentication.JWTAuthentication',
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.BasicAuthentication',
    ),
    # 全局權限攔截
    'DEFAULT_PERMISSION_CLASSES': (
        'common.extends.permissions.RbacPermission',
    ),
    'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.coreapi.AutoSchema',
    'DEFAULT_FILTER_BACKENDS': (
        'django_filters.rest_framework.DjangoFilterBackend',
        'rest_framework.filters.SearchFilter',
        'rest_framework.filters.OrderingFilter',
    ),
    'DEFAULT_RENDERER_CLASSES': ['rest_framework.renderers.JSONRenderer',
                                 'rest_framework.renderers.BrowsableAPIRenderer'] if DEBUG else [
        'rest_framework.rend           

分頁擴充

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author  :   Charles Lai
@file    :   pagination.py
@time    :   2023/03/26 15:22
@contact :   [email protected]
'''

# here put the import lib
from rest_framework.pagination import PageNumberPagination, LimitOffsetPagination
from rest_framework.response import Response
from rest_framework import status


class CustomPagination(PageNumberPagination):
    def get_paginated_response(self, data):
        return Response({'data': {'list': data, 'total': self.page.paginator.count, 'next': self.get_next_link(),
            'previous': self.get_previous_link()}, 'success': True, 'errorCode': 0, 'errorMessage': None}, status=status.HTTP_200_OK)
           

自定義公共視圖

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author  :   Charles Lai
@file    :   viewsets.py
@time    :   2023/03/26 14:42
@contact :   [email protected]
'''

# here put the import lib
import inspect
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from rest_framework import viewsets
from rest_framework import pagination
from rest_framework.settings import api_settings
from rest_framework.filters import OrderingFilter
from django.db.models.query import QuerySet
from django.db.models import ProtectedError
from django.core.cache import cache
import pytz
import logging

logger = logging.getLogger(__name__)


def ops_response(data, success=True, errorCode=0, errorMessage=None, status=status.HTTP_200_OK):
    """
    傳回自定義
    data清單資料格式:{
 list: [
 ],
 current?: number,
 pageSize?: number,
 total?: number,
}
    """
    return Response({'data': data, 'success': success, 'errorCode': errorCode, 'errorMessage': errorMessage}, status=status)


class AutoModelViewSet(viewsets.ModelViewSet):
    """
    A viewset that provides default `create()`, `retrieve()`, `update()`,
    `partial_update()`, `destroy()` and `list()` actions.
    """

    permission_classes = [IsAuthenticated]
    permission_classes_by_action = {}
    filter_backends = (OrderingFilter, )

    def __init__(self, *args, **kwargs):
        if not hasattr(self, 'queryset'):
            raise AttributeError('必須定義 類屬性 queryset')

        if not hasattr(self, 'serializer_class'):
            raise AttributeError('必須定義 類屬性 serializer_class')

        super().__init__(*args, **kwargs)

    def get_serializer(self, *args, **kwargs):
        """
        重寫 get_serializer 類,用來支援自動擷取不同的 serializer_class
        例子:  list 方法, 設定一個serializer_list_class, 則調用get_serializer的時候, 優先擷取
        命名格式 serializer_{call_func_name}_class
        :param args:
        :param kwargs:
        :return:
        """
        call_func_name = inspect.stack()[1][3]
        serializer_class = getattr(self, f'serializer_{call_func_name}_class', None)
        if not serializer_class:
            serializer_class = self.get_serializer_class()
        kwargs['context'] = self.get_serializer_context()
        return serializer_class(*args, **kwargs)

    def get_object(self):
        return super(AutoModelViewSet, self).get_object()

    def get_permissions(self):
        try:
            return [permission() for permission in self.permission_classes_by_action[self.action]]
        except KeyError:
            return [permission() for permission in self.permission_classes]
        
    def get_permission_from_role(self, request):
        try:
            perms = request.user.roles.values(
                'permissions__method',
            ).distinct()
            return [p['permissions__method'] for p in perms]
        except (AttributeError, TypeError):
            return []

    def extend_filter(self, queryset):
        return queryset

    def get_queryset(self):
        assert self.queryset is not None, (
                "'%s' should either include a `queryset` attribute, "
                "or override the `get_queryset()` method."
                % self.__class__.__name__
        )
        queryset = self.extend_filter(self.queryset)
        if isinstance(queryset, QuerySet):
            queryset = queryset.all()
        return queryset.distinct()

    def create(self, request, *args, **kwargs):
        try:
            request.data['name'] = request.data['name'].strip(' ').replace(' ', '-')
        except BaseException as e:
            logger.debug('exception ', str(e))
        serializer = self.get_serializer(data=request.data)
        if not serializer.is_valid():
            return ops_response({}, success=False, errorCode=40000, errorMessage=serializer.errors)
        try:
            self.perform_create(serializer)
        except BaseException as e:
            return ops_response({}, success=False, errorCode=50000, errorMessage=str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)
        return ops_response(serializer.data)

    def list(self, request, pk=None, *args, **kwargs):
        queryset = self.filter_queryset(self.get_queryset())
        page_size = request.query_params.get('page_size', None)
        if not page_size:
            page_size = api_settings.PAGE_SIZE
        pagination.PageNumberPagination.page_size = page_size
        page = self.paginate_queryset(queryset)
        if page is not None:
            serializer = self.get_serializer(page, many=True)
            return self.get_paginated_response(serializer.data)
        serializer = self.get_serializer(queryset, many=True)
        return ops_response({'list': serializer.data, 'total': queryset.count()})

    def update(self, request, *args, **kwargs):
        instance = self.get_object()
        partial = kwargs.pop('partial', False)
        try:
            request.data['name'] = request.data['name'].strip(' ').replace(' ', '-')
        except BaseException as e:
            logger.warning(f'不包含name字段: {str(e)}')
        serializer = self.get_serializer(instance, data=request.data, partial=partial)
        if not serializer.is_valid():
            return ops_response({}, success=False, errorCode=40000, errorMessage=serializer.errors)
        try:
            self.perform_update(serializer)
        except BaseException as e:
            return ops_response({}, success=False, errorCode=50000, errorMessage=str(e))

        if getattr(instance, '_prefetched_objects_cache', None):
            instance._prefetched_objects_cache = {}
        data = {'data': serializer.data, 'status': 'success', 'code': 20000}
        return ops_response(serializer.data)

    def retrieve(self, request, *args, **kwargs):
        instance = self.get_object()
        serializer = self.get_serializer(instance)
        return ops_response(serializer.data)

    def destroy(self, request, *args, **kwargs):
        instance = self.get_object()
        try:
            self.perform_destroy(instance)
        except ProtectedError:
            # 存在關聯資料,不可删除
            return ops_response({}, success=False, errorCode=40300, errorMessage='存在關聯資料,禁止删除!')
        except BaseException as e:
            logger.exception(f'删除資料發生錯誤 {e}, {e.__class__}')
            return ops_response({}, success=False, errorCode=50000, errorMessage=f'删除異常: {str(e)}')
        return ops_response('删除成功')


class AutoModelParentViewSet(AutoModelViewSet):

    def get_queryset(self):
        assert self.queryset is not None, (
                "'%s' should either include a `queryset` attribute, "
                "or override the `get_queryset()` method."
                % self.__class__.__name__
        )
        queryset = self.extend_filter(self.queryset)
        if self.action == 'list':
            if not self.request.query_params.get('search'):
                queryset = queryset.filter(parent__isnull=True)
        if isinstance(queryset, QuerySet):
            queryset = queryset.all()
        return queryset.distinct()
           

自定義權限校驗

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author  :   Charles Lai
@file    :   permissions.py
@time    :   2023/03/26 15:27
@contact :   [email protected]
'''

# here put the import lib
from rest_framework.permissions import BasePermission
from config import platform

import logging

logger = logging.getLogger(__name__)


class RbacPermission(BasePermission):
    """
    自定義權限
    """

    @classmethod
    def check_is_admin(cls, request):
        return request.user.is_authenticated and request.user.roles.filter(name='管理者').count() > 0

    @classmethod
    def get_permission_from_role(cls, request):
        try:
            perms = request.user.roles.values(
                'permissions__method',
            ).distinct()
            return [p['permissions__method'] for p in perms]
        except AttributeError:
            return []

    def _has_permission(self, request, view):
        """
        權限擷取方式
            從 perms_map 中擷取, 通過 request.method, http 請求方法來擷取對應權限點
            1. 預設格式
                perms_map = (
                    {'*': ('admin', '管理者')},
                    {'*': ('k8s_all', 'k8s管理')},
                    {'get': ('k8s_list', '檢視k8s')},
                    {'post': ('k8s_create', '建立k8s')},
                    {'put': ('k8s_edit', '編輯k8s')},
                    {'delete': ('k8s_delete', '删除k8s')}
                )
            2. 自定義方法格式
                perms_map = (
                    {'get_test_data': ('get_test_data', '擷取測試資料')},
                )
                此時 格式為  {http請求方法}_{ViewSet自定義action}

        :param request: rest_framework request 對象
        :param view: rest_framework view 對象
        :return:
        """
        _method = request._request.method.lower()
        url_whitelist = platform['whitelist'] if platform else []
        path_info = request.path_info
        for item in url_whitelist:
            url = item['url']
            if url in path_info:
                logger.debug(f'請求位址 {path_info} 命中白名單 {url}, 放行')
                return True
            
        is_superuser = request.user.is_superuser
        # 超級管理者 或者 白名單模式 直接放行
        if is_superuser:
            logger.debug(f'使用者 {request.user} 是超級管理者, 放行 is_superuser = {is_superuser}')
            return True

        is_admin = RbacPermission.check_is_admin(request)
        perms = self.get_permission_from_role(request)
        # 不是管理者 且 權限清單為空的情況下, 直接拒絕
        if not is_admin and not perms:
            logger.debug(f'使用者 {request.user} 不是管理者 且 權限清單為空, 直接拒絕')
            return False

        perms_map = view.perms_map
        # 未配置權限映射的視圖一律禁止通路
        if not hasattr(view, 'perms_map'):
            logger.debug(f'未配置權限映射的視圖一律禁止通路 {view}')
            return False

        # _custom_method = None
        # default_funcs = ['create', 'list', 'retrieve', 'update', 'destroy']
        action = view.action
        _custom_method = f'{_method}_{action}'
        for i in perms_map:
            logger.debug(f'perms_map item ===  {i}')
            for method, alias in i.items():
                # 如果是管理者, 判斷目前perms_map是否帶有 {'*': ('admin', '管理者')} 标記,如果有, 則目前 ViewSet 所有方法全放行
                if is_admin and (method == '*' and alias[0] == 'admin'):
                    logger.debug('管理者判斷通過, 放行')
                    return True
                # 如果帶有某個子產品的管理權限, 則目前子產品所有方法都放行
                if method == '*' and alias[0] in perms:
                    logger.debug('子產品管理權限 判斷通過, 放行')
                    return True

                # 判斷自定義action的情況
                # {'get_test_data': ('get_test_data', '擷取測試資料')},
                # {'*_test_data': ('get_test_data', '擷取測試資料')},
                if _custom_method and alias[0] in perms and (_custom_method == method or method == f'*_{action}'):
                    logger.debug('自定義action權限 判斷通過, 放行')
                    return True

                # 判斷是否擁有ViewSet 某個方法的權限, 有則放行
                # {'get': ('workflow_list', '檢視工單')},
                if _method == method and alias[0] in perms:
                    logger.debug(f'{method}方法權限 判斷通過, 放行')
                    return True
        logger.debug(f'{path_info} 沒有符合條件的, 則預設禁止通路')
        return False

    def has_permission(self, request, view):
        res = self._has_permission(request, view)
        # 記錄權限異常的操作
        if not res:
            pass
        return res


class AdminPermission(BasePermission):

    def has_permission(self, request, view):
        if RbacPermission.check_is_admin(request):
            return True
        return False


class ObjPermission(BasePermission):
    """
    密碼管理對象級權限控制
    """

    def has_object_permission(self, request, view, obj):
        perms = RbacPermission.get_permission_from_role(request)
        if 'admin' in perms:
            return True
        elif request.user.id == obj.uid_id:
            return True
           

更新cmdb子產品視圖

我們之前已經完成的view_cmdb.py檔案裡,有幾處需要更新:

1、将viewsets.ModelViewSet更改成公共視圖裡的AutoModelViewSet

2、将Response更改成ops_reponse,確定傳回内容格式一緻.

運作項目

(venv) ➜  ydevops-backend  cd /home/charles/ydevops-backend ; /usr/bin/env /home/charles/ydevops-backend/venv/bin/python /home/charles/.vscode-server/extensions/ms-python.python-2023.5.10791008/pythonFiles/lib/python/debugpy/adapter/../../debugpy/launcher 41223 -- /home/charles/ydevops-backend/manage.py runserver 0.0.0.0:9000 
Watching for file changes with StatReloader
Performing system checks...

System check identified no issues (0 silenced).
March 26, 2023 - 18:50:30
Django version 4.1.7, using settings 'devops_backend.settings'
Starting development server at http://0.0.0.0:9000/
Quit the server with CONTROL-C.           

通路接口文檔 http://localhost:9000/apidoc/

從0到1開發自動化運維平台-接口文檔分頁視圖和權限配置

在這個頁面,我們可以做一些CRUD操作,如檢視環境(由于資料量小,可以先把settings.py裡的預設PAGE_SIZE改為1)

從0到1開發自動化運維平台-接口文檔分頁視圖和權限配置

Okay,今天先到這吧...

繼續閱讀