天天看点

Django框架之DRF之认证组件,权限组件,频率组件,token

一 认证组件:

使用方法:

1.写一个认证类,新建文件:my_auth.py

class MyAuth(BaseAuthentication):
    def authenticate(self, request):
        from app01.views import produce_token
        #在url中发送token
        # token = request.GET.get("token")
        # name = request.GET.get('name')
        #在请求头中发送token
        username = request.META.get("HTTP_USERNAME")
        print("META is what", request.META)
        #从数据库中获取token信息
        token = request.META.get("HTTP_TOKEN")
        # token = request.POST.get("token")
        # username = request.POST.get("name")
        if token == produce_token(username):
            user_obj = models.User.objects.filter(token=token).first()
            return user_obj, token
        else:
            raise AuthenticationFailed("请先去登录")
           

局部使用:在视图类中添加认证:

class Book(APIView):
	#登录认证类为列表,可从源码中看
    authentication_classes = [MyAuth, ]
  #  permission_classes = [MyPerssion,]
  #  throttle_classes = [MyThrottle,]
    def get(self, request, *args, **kwargs):
        back_dic = {'code': 100, 'msg': "查询成功"}
        book_list = models.Book.objects.all()
        # print(book_list)
        book_ser = app01serilizer.BookSer(instance=book_list, many=True)
        back_dic['data'] = book_ser.data
        return Response(book_ser.data)
           

全局认证使用方法设置:

在settings.py中配置

#从哪里可以看到呢,APIVIEW点进去
#authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
REST_FRAMEWORK={
                "DEFAULT_AUTHENTICATION_CLASSES":["app01.my_examine.MyAuth",]
            }
           

全局认证已设置,视图中的所有类都会进行设置值的认证,这显然是不符合实际的,因为有些视图不能设置认证,比如注册,登录

所以需要局部对认证进行认证禁用,方法:

在局部认证的视图类位置添加下述代码,完成局部禁用

authentication_classes = []

小结:(通过分析源码)

-如果在项目的setting.py中配置了REST_FRAMEWORK,默认先从项目的setting中取
            -如果取不到,才去默认的drf配置文件中取
            -如果用户在视图类中配置了某个,先去用户配置的取
            视图类第一   然后settings配置文件   最后系统配置
            总结:
                先取视图类中配置的----》项目setting中取----》默认配置
           

token简单描述:

token实现过程:用户登录成功后会在服务端通过自己的加密算法算出用户相关的并且唯一的一串字符串(token)连同响应的数据一起发给前端,前端进行保存,在下次用户访问发送请求的时候会将token数据一并发给后端服务器,后端服务器首先就会对token进行校验(这个校验的地方可以是中间件也可以在视图中),校验的过程大致就是再次通过用户相关数据进行加密算法算出这个用户的token字符串,拿现在算出来的token与保存在前端用户发送过来的token进行匹配,如果一致则代表用户认证登录成功,当然这个token是可以设置有效时间的。在一定程度上缓解了服务器的压力。

token产生过程:

比如说我用HMAC-SHA256 算法,加上一个只有我才知道的密钥, 对数据做一个签名, 把这个签名和数据一起作为token , 由于密钥别人不知道, 就无法伪造token了。

Django框架之DRF之认证组件,权限组件,频率组件,token

这个token服务端不保存, 前端发送请求时候会捎带这个token,我再用同样的HMAC-SHA256 算法和同样的密钥,对数据再计算一次签名, 和token中的签名做个比较, 如果相同, 我就知道用户已经登录过了,并且可以直接取到用户的user id , 如果不相同, 数据部分肯定被人篡改过, 我就告诉前端用户: 对不起,没有认证。

Django框架之DRF之认证组件,权限组件,频率组件,token

Token 中的数据是明文保存的(虽然我会用Base64做下编码, 但那不是加密), 还是可以被别人看到的, 所以我不能在其中保存像密码这样的敏感信息。

当然, 如果一个人的token 被别人偷走了, 那我也没办法, 我也会认为小偷就是合法用户, 这其实和一个人的session id 被别人偷走是一样的。

点我查看更多token相关信息。

应用token编写登录接口:

models.py

from django.db import models

# from django.contrib import

# Create your models here.


class User(models.Model):
    name = models.CharField(max_length=32)
    password = models.CharField(max_length=32)
    token = models.CharField(max_length=32,null=True)
    user_type = models.CharField(max_length=6,choices=(('1',"超级用户"),("2","普通用户")),default="2")


class Book(models.Model):
    name = models.CharField(max_length=32)
    price = models.IntegerField()
    authors = models.ManyToManyField(to='Author')


class Author(models.Model):
    name = models.CharField(max_length=32)
    age = models.IntegerField()
    authordetail = models.OneToOneField(to='AuthorDetail')



class AuthorDetail(models.Model):
    phone = models.CharField(max_length=32,null=True)
    sex = models.CharField(max_length=32,choices=((1,"男"),(2,"女")))
           
views.py
from django.shortcuts import render

# Create your views here.
from rest_framework.views import APIView
from rest_framework.response import Response
from app01 import models
from app01 import app01serilizer
from rest_framework.exceptions import ValidationError


def produce_token(val):
    import hashlib
    if val:
        md = hashlib.md5()
        md.update(val.encode("utf-8"))
        return md.hexdigest()
    else:
        return None


class Register(APIView):
    authentication_classes = []
    permission_classes = []

    def post(self, request, *args, **kwargs):
        print(request.data)
        back_dic = {'code': 100, "msg": "注册成功"}
        name = request.data.get('name')
        password = request.data.get("password")
        user_obj = models.User.objects.filter(name=name)
        if not user_obj:
            # 反序列话
            user = app01serilizer.UserSer(data=request.data)
            if user.is_valid():
                user.save()
                back_dic['data'] = user.validated_data
                return Response(back_dic)
            else:
                back_dic['msg'] = '注册失败'
                back_dic['errors'] = user.errors
                return Response(back_dic)
        else:
            back_dic['code'] = 101
            back_dic['msg'] = "用户已存在"
            return Response(back_dic)


class Login(APIView):
    #  接收登录的信息然后返回token
    authentication_classes = []

    def post(self, request, *args, **kwargs):
        back_dic = {'code': 100, 'msg': "登录成功"}
        name = request.data.get('name')
        password = request.data.get("password")
        user_obj = models.User.objects.filter(name=name, password=password)
        if user_obj:
            # 由于不需要的可以不传
            user_ops = app01serilizer.UserSer(data=request.data)
            if user_ops.is_valid():
                token = produce_token(name)
                # defaults给值是需要更新的值
                user_obj.update_or_create(name=name, password=password, defaults={"token": token})
                back_dic['token'] = token
                back_dic['data'] = user_ops.validated_data
                return Response(back_dic)
        return Response({"code": "101", "msg": "登录失败"})


from app01.Auth import MyAuth
from app01.Auth import MyPerssion
from app01.Auth import MyThrottle

class Book(APIView):
    authentication_classes = [MyAuth, ]
    permission_classes = [MyPerssion,]
    throttle_classes = [MyThrottle,]
    def get(self, request, *args, **kwargs):
        back_dic = {'code': 100, 'msg': "查询成功"}
        book_list = models.Book.objects.all()
        # print(book_list)
        book_ser = app01serilizer.BookSer(instance=book_list, many=True)
        back_dic['data'] = book_ser.data
        return Response(book_ser.data)

    def post(self, request, *args, **kwargs):
        back_dic = {'code': 100, 'msg': "创建ok"}
        book_ops = app01serilizer.BookSer(data=request.data)
        # print(book_ops)
        if book_ops.is_valid():
            book_ops.save()
            back_dic['data'] = book_ops.validated_data

            return Response(back_dic)
        else:
            back_dic['msg'] = '创建失败'
            back_dic['code'] = 101
            # Response是不是因为他能传输带对象的字典
            return Response(back_dic)


class BookDetail(APIView):
    authentication_classes = [MyAuth]
    permission_classes = [MyPerssion]
    throttle_classes = [MyThrottle,]

    def get(self,request,*args,**kwargs):
        # print(kwargs)
        # print("funck  ---------------------bookdetail")
        back_dic = {'code':100,'msg':"查询成功"}
        book_obj = models.Book.objects.filter(pk=kwargs['pk']).first()
        bookdetail_ser = app01serilizer.BookSer(instance=book_obj)
        back_dic['data'] = bookdetail_ser.data
        return Response(back_dic)


    def post(self,request,*args,**kwargs):
        print(kwargs)
        back_dic = {'code': 100, 'msg': "修改成功"}
        # print(request.data)
        book_ops = app01serilizer.BookSer(data=request.data)
        if book_ops.is_valid():
            book_ops.save()
            back_dic['data'] = book_ops.validated_data
            return Response(back_dic)
        else:
            back_dic['msg'] = "修改失败"
            return Response()

    def put(self,request,*args,**kwargs):
        print("put kwargs",kwargs)
        book_ops = app01serilizer.BookSer(data=request.data)
        print(book_ops)
        name = request.data.get("name")
        price = request.data.get("price")
        authors = request.data.get("authors")
        pk = kwargs.get("pk")
        print("authors*****",authors)
        pk = int(pk)
        book_obj = models.Book.objects.filter(id=pk)

        if book_ops.is_valid():
            print('haha')
        else:
            print('ggg')
            print("adsdasdsa",book_ops.errors)
            # book_ops.update(name=name,default={"price":price})
        print('pk is ',pk)
        if book_obj:
            book_obj.update(id=pk,name=name,price=price)
            #
            #全自动建立第三张表的关系,我就用
            book_obj.first().authors.set(authors)
            back_dic = {'code': 100, 'msg': "修改成功"}
            return Response(back_dic)
        else:
            back_dic = {'code': 100, 'msg': "用户不存在"}
            return Response(back_dic)





class Author(APIView):
    # authentication_classes = [MyAuth,]

    def post(self, request, *args, **kwargs):
        back_dic = {'code': 100, 'msg': "创建ok"}
        author_ops = app01serilizer.AuthorSer(data=request.data)
        if author_ops.is_valid():
            author_ops.save()
            back_dic['data'] = author_ops.validated_data

            return Response(back_dic)
        else:
            back_dic['msg'] = '创建失败'
            back_dic['code'] = 101
            # Response是不是因为他能传输带对象的字典
            return Response(back_dic)


           

认证权限访问频率

#注:登录成功返回了user对象到request表中,request.user表中有一个字段是用户的权限,因此可以取到值判断权限

Auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from app01 import models
from rest_framework.response import Response
from rest_framework.permissions import BasePermission
# from rest_framework.exceptions import


from rest_framework.request import Request


class MyAuth(BaseAuthentication):
    def authenticate(self, request):
        from app01.views import produce_token
        # token = request.GET.get("token")
        # name = request.GET.get('name')
        username = request.META.get("HTTP_USERNAME")
        print("META is what", request.META)
        token = request.META.get("HTTP_TOKEN")
        # token = request.POST.get("token")
        # username = request.POST.get("name")
        if token == produce_token(username):
            user_obj = models.User.objects.filter(token=token).first()
            return user_obj, token
        else:
            raise AuthenticationFailed("请先去登录")


class MyPerssion(BasePermission):
    # request对象,view是APIVIEW对象
    def has_permission(self, request, view):
        print(request.user)
        if request.user:
        #注:登录成功返回了user对象到request表中,request.user表中有一个字段是用户的权限,因此可以取到值判断权限
            if request.user.user_type == '1':
                return True
            else:
                return False
        else:
            return False


from rest_framework.throttling import SimpleRateThrottle


class MyThrottle(SimpleRateThrottle):
    scope = 'ww'
    # getattr("scope")
    def get_cache_key(self, request, view):
        # id = request.GET.get("")
        print("*****get_ident",self.get_ident(request))
      #  xff = request.META.get('HTTP_X_FORWARDED_FOR')
        remote_addr = request.META.get('REMOTE_ADDR')
     #   print("xff is ",xff)

        # num_proxies = api_settings.NUM_PROXIES
        return self.get_ident(request)

           

路由配置

from django.conf.urls import url
from django.contrib import admin
from app01 import views
urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^register/', views.Register.as_view()),
    url(r'^login/', views.Login.as_view()),
    url(r'^books/$', views.Book.as_view()),
    url(r'^bookdetail/(?P<pk>\d+)', views.BookDetail.as_view()),

]

           

settings.py

REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES":['app01.Auth.MyAuth'],
    "DEFAULT_THROTTLE_RATES":
        {"ww":"3/minute"}
    # "DEFAULT_THROTTLE_RATES"
}

           

上方的程序为3合一的代码

二、权限组件

权限组件的使用方法和认证组件基本相同:

使用方法:

①写一个权限类,仍然在my_examine.py中:

from rest_framework.permissions import BasePermission
# 创建认证类,BasePermission
class MyPermission(BasePermission):
    message = '权限不足,无法查看'
    # 固定写一个has_permission方法用于定义具体权限内容
    def has_permission(self, request, view):
        # #因为权限在认证之后执行的,所有能取到reuqest.user
        if request.user.user_type == '1':
            return True
        else:
            return False
           

局部使用:

-在视图类中写
permission_classes=[MyPermision,]
           

全局使用:

在settings.py中配置
REST_FRAMEWORK={
    "DEFAULT_PERMISSION_CLASSES":["app01.my_examine.MyPermision",]
}
           

局部禁用:

-在视图类中写
permission_classes = []
           

这里可以设置添加一个代码让返回显示中文提示:

# 在MyPermision类下面添加
message = '权限不足,无法查看'
           

三、频率组件

使用方法:

①写一个频率类,仍然在my_examine.py中:

from rest_framework.throttling import SimpleRateThrottle
class Throttle(SimpleRateThrottle):
    scope = 'info'
    def get_cache_key(self, request, view):
        return self.get_ident(request)
           

要点:继承SimpleRateThrottle,重写get_cache_key,返回self.get_ident(request),必须配置一个scop=字符串

② 在settings.py中配置:

REST_FRAMEWORK = {
    # 一分钟内只能访问6次
    'DEFAULT_THROTTLE_RATES': {'info': '6/m'}
}
           

③使用:

局部使用: 在视图类中配置:

from app01.my_examine import Throttle
class PublishView(APIView):
    throttle_classes = [Throttle, ]
    def get(self, request):

        publish_list = models.Publish.objects.all()
        bs = PublishSer(publish_list, many=True)
        return Response(bs.data)
           

全局使用:在setting中配置

'DEFAULT_THROTTLE_CLASSES':['自己定义的频率类'],
           

局部禁用

throttle_classes=[]