這段時間在開發SQL上線工單,N久沒來更新文章了。
今天主要是整理下代碼,然後上傳到github倉庫
編寫RBAC權限
class RbacPermission(BasePermission):
"""
自定義權限
"""
@classmethod
def check_is_admin(cls, request):
return request.user.is_authenticated and request.user.roles.filter(name='管理者').count() > 0
@classmethod
def get_permission_from_role(cls, request):
try:
perms = request.user.roles.values(
'permissions__method',
).distinct()
return [p['permissions__method'] for p in perms]
except AttributeError:
return []
def _has_permission(self, request, view):
"""
權限擷取方式
從 perms_map 中擷取, 通過 request.method, http 請求方法來擷取對應權限點
1. 預設格式
perms_map = (
{'*': ('admin', '管理者')},
{'*': ('k8s_all', 'k8s管理')},
{'get': ('k8s_list', '檢視k8s')},
{'post': ('k8s_create', '建立k8s')},
{'put': ('k8s_edit', '編輯k8s')},
{'delete': ('k8s_delete', '删除k8s')}
)
2. 自定義方法格式
perms_map = (
{'get_test_data': ('get_test_data', '擷取測試資料')},
)
此時 格式為 {http請求方法}_{ViewSet自定義action}
:param request: rest_framework request 對象
:param view: rest_framework view 對象
:return:
"""
_method = request._request.method.lower()
url_whitelist = PLATFORM_CONFIG['whitelist'] if PLATFORM_CONFIG.get(
'whitelist', None) else []
path_info = request.path_info
for item in url_whitelist:
url = item['url']
if url in path_info:
logger.debug(f'請求位址 {path_info} 命中白名單 {url}, 放行')
return True
is_superuser = request.user.is_superuser
# 超級管理者 或者 白名單模式 直接放行
if is_superuser:
logger.debug(
f'使用者 {request.user} 是超級管理者, 放行 is_superuser = {is_superuser}')
return True
is_admin = RbacPermission.check_is_admin(request)
perms = self.get_permission_from_role(request)
# 不是管理者 且 權限清單為空的情況下, 直接拒絕
if not is_admin and not perms:
logger.debug(f'使用者 {request.user} 不是管理者 且 權限清單為空, 直接拒絕')
return False
perms_map = view.perms_map
# 未配置權限映射的視圖一律禁止通路
if not hasattr(view, 'perms_map'):
logger.debug(f'未配置權限映射的視圖一律禁止通路 {view}')
return False
# _custom_method = None
# default_funcs = ['create', 'list', 'retrieve', 'update', 'destroy']
action = view.action
_custom_method = f'{_method}_{action}'
for i in perms_map:
logger.debug(f'perms_map item === {i}')
for method, alias in i.items():
# 如果是管理者, 判斷目前perms_map是否帶有 {'*': ('admin', '管理者')} 标記,如果有, 則目前 ViewSet 所有方法全放行
if is_admin and (method == '*' and alias[0] == 'admin'):
logger.debug('管理者判斷通過, 放行')
return True
# 如果帶有某個子產品的管理權限, 則目前子產品所有方法都放行
if method == '*' and alias[0] in perms:
logger.debug('子產品管理權限 判斷通過, 放行')
return True
# 判斷自定義action的情況
# {'get_test_data': ('get_test_data', '擷取測試資料')},
# {'*_test_data': ('get_test_data', '擷取測試資料')},
if _custom_method and alias[0] in perms and (_custom_method == method or method == f'*_{action}'):
logger.debug('自定義action權限 判斷通過, 放行')
return True
# 判斷是否擁有ViewSet 某個方法的權限, 有則放行
# {'get': ('workflow_list', '檢視工單')},
if _method == method and alias[0] in perms:
logger.debug(f'{method}方法權限 判斷通過, 放行')
return True
logger.debug(f'{path_info} 沒有符合條件的, 則預設禁止通路')
return False
def has_permission(self, request, view):
res = self._has_permission(request, view)
# 記錄權限異常的操作
if not res:
pass
return res
更新自定義分頁器
重寫自定義分頁器,變更傳回的資料格式
class CustomPagination(PageNumberPagination):
def get_paginated_response(self, data):
return Response({'data': {'list': data, 'total': self.page.paginator.count, 'next': self.get_next_link(),
'previous': self.get_previous_link()}, 'code': 20000, 'message': None}, status=status.HTTP_200_OK)
自定義認證
建立檔案common/extends/jwt_auth.py
from rest_framework_simplejwt.authentication import JWTAuthentication as BaseJWTAuthentication
api_settings = APISettings(
getattr(settings, 'SIMPLE_JWT', None), DEFAULTS, IMPORT_STRINGS)
class JWTAuthentication(BaseJWTAuthentication):
def get_validated_token(self, raw_token):
"""
Validates an encoded JSON web token and returns a validated token
wrapper object.
"""
messages = []
for AuthToken in api_settings.AUTH_TOKEN_CLASSES:
try:
return AuthToken(raw_token)
except TokenError as e:
messages.append(
{
"token_class": AuthToken.__name__,
"token_type": AuthToken.token_type,
"message": e.args[0],
}
)
raise CustomInvalidToken(
{
"detail": 'Token不合法或者已經過期,請重新登入.',
"code": 40100
}
)
編輯settings.py,更新REST_FRAMEWORK段
# drf配置
REST_FRAMEWORK = {
# 自定義分頁
'DEFAULT_PAGINATION_CLASS': 'common.extends.pagination.CustomPagination',
'PAGE_SIZE': 10,
# 使用者登陸認證方式
'DEFAULT_AUTHENTICATION_CLASSES': (
'common.extends.jwt_auth.JWTAuthentication',
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication',
),
大概這些吧,其它一些變更,大家可以檢視github上的代碼
https://github.com/qitan/ydevops-backend