天天看点

【Django】代码最佳实践【原创】

目录

参考

概要

目录结构

API返回格式

步骤

1. 建立虚拟环境

2. 安装和建立Django

3. 新建应用App

4. 修改settings.py

5. 测试

6. 增加函数库

7. 修改hello方法

8. 规范目录

9. 增加日志配置

10. 引入Sentry - 非必须

11. 引入Redis

12. 使用MySQL 

13. 建立超管

14. 引入Celery - 非必须

15. 引入rest-framework

参考

https://www.freebuf.com/column/165233.html

https://www.freebuf.com/column/165968.html

概要

代码规范可以参考【Python】代码规范【原创】

基于Django的项目最佳实践

注意:不支持Django3.0,会报错,主要是Celery那里会报错

  • 接入Sentry,实现错误日志管理
  • 使用rest-framework,实现API的restful风格
  • 使用数据库来作为存储
  • 支持使用Redis
  • 接入Celery,实现异步和定时任务
  • 定时任务支持由admin管理后台来控制
  • 增加日志配置

代码可以参考:代码仓库

目录结构

大概的目录结构如下:

项目名
|—— 项目名
|     |—— __init__.py
|     |—— settings.py
|     |—— urls.py
|     |—— wsgi.py
|     |—— views.py
|     |—— config
|           |—— __init__.py
|           |—— local.py
|           |—— dev.py
|           |—— prod.py
|—— 应用名
|     |—— migrations
|           |—— __init__.py
|     |—— models
|           |—— __init__.py
|           |—— base_model.py
|           |—— user.py
|     |—— views
|           |—— __init__.py
|           |—— user.py
|     |—— services
|           |—— __init__.py
|           |—— user_service.py
|     |—— serializers
|           |—— __init__.py
|           |—— user.py
|     |—— tasks
|           |—— __init__.py
|           |—— xxx_task.py
|     |—— __init__.py
|     |—— tests.py
|     |—— apps.py
|     |—— admin.py
|     |—— urls.py
|—— sql
|     |—— init.sql
|     |—— 20190808_create_table.sql
|—— common
|     |—— __init__.py
|     |—— functions.py
|—— docs
|—— nginx
|     |—— xxxx.conf
|—— supervisor
|     |—— __init__.py
|     |—— gunicorn_conf.conf
|     |—— local.conf
|     |—— dev.conf
|     |—— prod.conf
|—— collectedstatic
|     |—— admin
|           |—— css
|           	 |—— vendor
|           	 |—— xxx.css
|           |—— fonts
|           |—— img
|           	 |—— xxx.jpg
|           |—— js
|           	 |—— vendor
|           	 |—— xxx.js
|     |—— 应用名
|           |—— css
|           	 |—— vendor
|           	 |—— xxx.css
|           |—— fonts
|           |—— img
|           	 |—— xxx.jpg
|           |—— js
|           	 |—— vendor
|           	 |—— xxx.js
|—— requirements.txt
|—— manage.py
|—— README.md
           

注意:应用目录下的tasks目录是Celery任务需要的,非必须

原则

应用的views层里面不写具体的业务逻辑,只从请求中获取参数以及返回数据给前端即可,具体的业务逻辑都写在服务层(services),通过服务层去调用模型层(model)的数据,进行筛选和处理然后返回给views层,views层再返回给前端

API返回格式

API返回Json格式,需要返回三个字段,data的格式并不限制,一般是字符串、字典或列表

{
    "code": 0,
    "message": "success",
    "data": {}
}
           

步骤

1. 建立虚拟环境

注意:怎么安装和使用虚拟环境可以参考:【Virtualenv】Python的虚拟环境Virtualenv和Virtualenvwrapper【原创】

mkvirtualenv 项目名
workon 项目名
           

2. 安装和建立Django

pip install django
django-admin startprojrct 项目名
           

注意:建议使用Django 2.2版本,不使用最新的Django3.0版本,会存在admin管理页面进不去(Django 3.0.3和Python 3.7有可能会冲突导致Django停止运行)以及和Celeat-beat冲突

3. 新建应用App

cd 项目名
python manage.py startapp app
           

4. 修改settings.py

增加config目录,config目录里面新增3个文件:local.py,dev.py,prod.py,除了prod.py的Debug模式需要关闭外其他都要开启

local.py:

# -*- coding:UTF-8 -*-

# debug模式为开启
DEBUG = True
           

修改settings.py:

INSTALLED_APPS = [
    ...
    'app'
]

# 允许其他的机器访问该服务器的Django应用
ALLOWED_HOSTS = ['*']

TIME_ZONE = 'Asia/Shanghai'

USE_I18N = True

USE_L10N = True

USE_TZ = False

# 获取环境变量
ENV_PROFILE = os.getenv("ENV")

# 根据环境来加载不同的配置文件
if ENV_PROFILE == "prod":
    from .config.prod import *
elif ENV_PROFILE == "dev":
    from .config.dev import *
else:
    ENV_PROFILE = 'local'
    from .config.local import *
    
           

注意:注释掉settings.py中的DEBUG

5. 测试

app/views.py:

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt


@csrf_exempt
def hello(request):
    result = {
        'code': 0,
        'message': 'success',
        'data': [],
    }
    return JsonResponse(result)
           

app目录增加urls.py:

from django.urls import path
from app import views

urlpatterns = [
    path('hello', views.hello),
]
           

urls.py增加app的路由:

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/app/', include('app.urls'))
]
           

浏览器访问127.0.0.1:8000/api/app/hello来测试

6. 增加函数库

先下载相关依赖:

pip install requests
pip install IPy
           

增加common目录,common目录下新增functions.py

具体的函数库代码可以参考:【Django】函数库

7. 修改hello方法

app/views.py:

from common.functions import render_json
from django.views.decorators.csrf import csrf_exempt

@csrf_exempt
def hello(request):
    return render_json(0, 'success', 'Hello')
           

8. 规范目录

增加sql、logs目录

增加requirements.txt文件:

pip freeze > requirement.txt
           

app目录下增加models包目录和services包目录,app下删掉models.py文件

app/models目录下新增:base_model.py:

from django.db import models


class BaseModel(models.Model):
    """
    模型基类(抽象类)- 所有的模型都应该继承
    """
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    deleted_at = models.DateTimeField(blank=True, null=True)

    class Meta:
        abstract = True
           

注意:deleted_at字段是做软删除用,如有的表不需要该字段则这里可以去掉,由每个表的model自己控制

9. 增加日志配置

修改settings.py:

# 日志位置
LOG_DIR = BASE_DIR + '/logs/'

# 日志设置
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'standard': {
            'format': '%(asctime)s [%(name)s:%(lineno)d] [%(module)s:%(funcName)s] [%(levelname)s]- %(message)s'
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
    },
    'handlers': {
        'default': {
            'level': 'INFO',
            'class': 'logging.handlers.TimedRotatingFileHandler',
            'filename': os.path.join(LOG_DIR, 'django.log'),
            'backupCount': 30,
            'encoding': 'utf8',
            'formatter': 'standard',
            'when': 'midnight',
            'interval': 1,
        },
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'simple'
        },
        'request_handler': {
            'level': 'INFO',
            'class': 'logging.handlers.TimedRotatingFileHandler',
            'filename': os.path.join(LOG_DIR, 'django_request.log'),
            'backupCount': 30,
            'encoding': 'utf8',
            'formatter': 'standard',
            'when': 'midnight',
            'interval': 1,
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'include_html': True,
        },
        'app_handler': {
            'level': 'INFO',
            'class': 'logging.handlers.TimedRotatingFileHandler',
            'filename': os.path.join(LOG_DIR, 'app.log'),
            'backupCount': 30,
            'encoding': 'utf8',
            'formatter': 'standard',
            'when': 'midnight',
            'interval': 1,
        },
    },
    'loggers': {
        'django': {
            'handlers': ['default', 'console'],
            'level': 'INFO',
            'propagate': False
        },
        'django.request': {
            'handlers': ['request_handler'],
            'level': 'INFO',
            'propagate': False
        },
        'app': {
            'handlers': ['app_handler'],
            'level': 'INFO',
            'propagate': False
        },
    }
}
           

10. 引入Sentry - 非必须

目的是为了实时获取事件日志,具体可参考:【Sentry】实时事件日志平台【原创】

安装sentry:

pip install sentry-sdk
           

修改settings.py:

import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration

# Sentry的设置
sentry_sdk.init(
    dsn="https://[email protected]/1831747",
    integrations=[DjangoIntegration()],
    environment=ENV_PROFILE
)
           

11. 引入Redis

安装依赖:

pip install django-redis
pip install redis
           

修改local.py、dev.py、prod.py,增加以下代码:

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/1",
        "OPTIONS": {
            "CONNECTION_POOL_KWARGS": {"max_connections": 100},
            "CONNECTION_POOL_KWARGS": {"max_connections": 100, 'decode_responses': True},
            # "PASSWORD": "密码",
        },
        # 前缀
        "KEY_PREFIX": "DjangoDemo"
    }
}
           

注意:max_connections是连接池,如不需要可去掉

注意:前缀看个人需要,如不需要统一加前缀可以去掉

注意:如果decode_responses不设置为True的话,使用get_redis_connection读取的数据是bytes,需要decode为utf-8,加上该设置的话那么读取出去就不需要自己decode了,然而如果decode_responses设置为True的话,那使用下面的第二种方法cache来读取是会报错的

注意:如有多个Redis的话,可以在CACHE里面添加一个新的,区别于default即可

两种方式使用Redis:

from django_redis import get_redis_connection

# 这里可以传使用哪个redis,不传默认是default
redis = get_redis_connection()
redis.get(key)
redis.set(key, value)
           

或者是:

from django.core.cache import cache

cache.get(key)
cache.set(key, value)
           

推荐使用第一种方式

具体可以参考:【Django】Redis的使用

12. 使用MySQL 

安装依赖:

pip install mysqlclient
           

修改local.py、dev.py、prod.py,增加以下代码:

# 数据库配置
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'djangodemo',
        'USER': 'root',
        'PASSWORD': '123456',
        'HOST': '127.0.0.1',
        'PORT': '3306',
        'OPTIONS': {'charset': 'utf8mb4'}
    }
}
           

数据库迁移:

python manage.py makemigrations
python manage.py migrate
           

注意:数据库的charset可以是多种,比如utf8、utf8mb4等,我这里是utf8mb4

注意:如果有多个数据库的话,可以在DATABASES里面添加一个新的,区别于default即可

13. 建立超管 - 非必须

python manage.py createsuperuser
           

14. 引入Celery - 非必须

目的是为了实现定时任务和异步任务,具体可参考:【异步/定时任务】Django中使用Celery实现异步和定时任务【原创】

安装依赖:

pip install celery
pip install django-celery-results
pip install django-celery-beat
           

settings.py同级目录新增celery.py文件:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# 获取当前文件夹名,即为该Django的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
project_settings = '%s.settings' % project_name

# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_settings)

# 实例化Celery
app = Celery(project_name)

# 使用django的settings文件配置celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# Celery加载所有注册的应用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
           

settings.py同级目录__init__.py修改:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']
           

修改settings.py:

# celery结果返回,可用于跟踪结果,默认是存储到redis,需要安装django_celery_results,这里就可以使用db存储
CELERY_RESULT_BACKEND = 'django-db'

# 设置worker的并发数量
CELERY_CONCURRENCY = 4

# celery内容等消息的格式设置
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# celery时区设置,使用settings中TIME_ZONE同样的时区
CELERY_TIMEZONE = TIME_ZONE

# 导入task所在的文件路径
CELERY_IMPORTS = ('app.tasks.tasks',)

# 定时任务来源从数据库中读取
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# 避免时区的问题
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
           

修改local.py、dev.py、prod.py:

# celery中间人 redis://redis服务所在的ip地址:端口/数据库号
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/3'
           

app目录下新建tasks目录,tasks目录下新建tasks.py文件:

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import datetime


@shared_task
def add(x, y):
    res = x + y
    time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('当前时间为:' + time_format + ' ,两个数相加的结果为:')
    print(res)
    return res


@shared_task
def mul(x, y):
    res = x * y
    time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('当前时间为:' + time_format + ' ,两个数相乘的结果为:')
    print(res)
    return res


@shared_task
def xsum(numbers):
    res = sum(numbers)
    print(res)
    return res
           

执行迁移:

python manage.py migrate
           

启动:

# 启动worker
celery -A 项目名 worker --pool=solo -l info

# 启动定时任务beat
celery -A 项目名 beat -l info
           

测试:

python manage.py shell


from app.tasks.tasks import add, mul, xsum

res = add.delay(2,3)
res.get()
           

也可以在admin管理页面中新建定时任务来进行测试,如图:

【Django】代码最佳实践【原创】

15. 引入rest-framework

目的是为了序列化数据库的时间字段,默认读取出来的时间字段是加了T或者是Z的,另外也可以控制输出的字段范围

为什么引入rest-framework,可以参考:【模型】Django数据库的数据转成Json返回【原创】

安装依赖:

pip install djangorestframework
           

app目录下新增模型:

app\models\devices_info.py

from django.db.models import Q
from app.models.base_model import BaseModel
from django.db import models

STATUS_VALUE = (
    (0, u'代运营'),
    (1, u'隔离中'),
    (2, u'在线'),
    (3, u'离线'),
    (4, u'下架'),
)

POWER_STATE = (
    (0, u'pending'),
    (1, u'running'),
    (2, u'paused'),
    (3, u'shutdown'),
    (4, u'crashed'),
)


class DevicesInfo(BaseModel):
    name = models.CharField(max_length=50, default='', verbose_name=u'名称', blank=True)
    description = models.CharField(max_length=1000, default='', verbose_name=u'描述')
    ip_version = models.IntegerField(default=0, verbose_name=u'IP地址类型,值为4或6,4:IPv4,6:IPv6', blank=True)
    lan_ip = models.CharField(max_length=16, default='', verbose_name=u'内网IP', blank=True)
    wan_ip = models.CharField(max_length=16, default='', verbose_name=u'外网IP', blank=True)
    city_id = models.IntegerField(default=0, verbose_name=u'城市ID', blank=True)
    city_name = models.CharField(max_length=10, default='', verbose_name=u'城市名', blank=True)
    area_id = models.IntegerField(default=0, verbose_name=u'可用区ID', blank=True)
    area_name = models.CharField(max_length=10, default='', verbose_name=u'可用区名', blank=True)
    device_class = models.CharField(max_length=20, default='', verbose_name=u'机型', blank=True)
    owner_main = models.CharField(max_length=100, default='', verbose_name=u'主负责人', blank=True)
    owner_back = models.CharField(max_length=100, default='', verbose_name=u'备负责人', blank=True)
    device_system = models.CharField(max_length=60, default='', verbose_name=u'操作系统', blank=True)
    status = models.IntegerField(default=0, choices=STATUS_VALUE, verbose_name=u'状态值', blank=True)
    issued_time = models.DateTimeField(auto_now_add=True, verbose_name=u'上架时间', blank=True)
    created_user = models.IntegerField(default=0, verbose_name=u'创建用户id', blank=True)
    availability_zone = models.CharField(max_length=30, default='', verbose_name=u'可用区编码', blank=True)
    server_id = models.CharField(max_length=100, default='', verbose_name=u'云服务器唯一标识ID', blank=True)
    power_state = models.IntegerField(default=0, choices=POWER_STATE, verbose_name=u'云服务器电源状态', blank=True)
    host = models.CharField(max_length=50, default='', verbose_name=u'云服务器宿主名称', blank=True)
    image_id = models.CharField(max_length=100, default='', verbose_name=u'云服务器镜像ID', blank=True)
    server_created = models.DateTimeField(null=True, verbose_name=u'云服务器创建时间', blank=True)
    security_groups = models.CharField(max_length=50, default='', verbose_name=u'云服务器所属安全组名称或者uuid', blank=True)
    server_launched_at = models.DateTimeField(null=True, verbose_name=u'云服务器启动时间', blank=True)
    server_updated = models.DateTimeField(null=True, verbose_name=u'云服务器上一次更新时间', blank=True)
    server_status = models.CharField(max_length=20, default='', verbose_name=u'云服务器当前状态信息', blank=True)
    server_flavor = models.CharField(max_length=20, default='', verbose_name=u'云服务器规格名称', blank=True)
    server_vcpus = models.IntegerField(default=0, verbose_name=u'云服务器规格对应的CPU核数', blank=True)
    server_ram = models.IntegerField(default=0, verbose_name=u'云服务器规格对应的内存大小,单位为MB', blank=True)
    disk_used_percent = models.CharField(max_length=15, default='', verbose_name=u'磁盘使用率', blank=True)

    @staticmethod
    def get_one(id):
        return DevicesInfo.objects.filter(id=id)

    @staticmethod
    def get_list(data_filter):
        """
        获取特定条件的事件列表,如为空则获取全部
        :param data_filter:
        :return:
        """
        # 获取筛选条件
        con = DevicesInfo.get_query(data_filter)

        # 获取记录(倒序)
        res = DevicesInfo.objects.filter(con).order_by('-id')
        return res

    @staticmethod
    def add(data):
        """
        创建一条记录,先获取,如存在则更新,如不存在则新建
        :param data:  dict 数据
        :return:
        """
        server_id = data['server_id']

        res = DevicesInfo.objects.update_or_create(server_id=server_id, defaults=data)

        return 0, 'success', res

    @staticmethod
    def get_query(data_filter):
        """
        获取搜索条件
        :param data_filter:
        :return:
        """
        con = Q()
        # 状态搜索
        if 'status' in data_filter:
            con.children.append(('status', data_filter['status']))

        # id搜索
        if 'id' in data_filter:
            con.children.append(('id', data_filter['id']))

        # 根据app_id搜索device
        if 'device_id_list' in data_filter:
            con.children.append(('id__in', data_filter['device_id_list']))

        #  主机名模糊搜索(单个主机或多个主机模糊搜索)
        if 'name' in data_filter:
            con.children.append(('name__contains', data_filter['name']))
        elif 'name_list' in data_filter:
            if data_filter['name_list']:
                q1 = Q()
                q1.connector = 'OR'
                for single_name in data_filter['name_list']:
                    q1.children.append(('name__contains', single_name))
                con.add(q1, 'AND')

        # ip模糊搜索
        if 'ip' in data_filter:
            q1 = Q()
            q1.connector = 'OR'
            q1.children.append(('lan_ip__contains', data_filter['ip']))
            q1.children.append(('wan_ip__contains', data_filter['ip']))
            con.add(q1, 'AND')

        #  区域名搜索
        if 'area_name' in data_filter:
            con.children.append(('area_name', data_filter['area_name']))

        # 事件时间筛选,筛选出时间>=from的
        if 'time_from' in data_filter:
            con.children.append(('server_created__gte', data_filter['time_from']))

        # 事件时间筛选,筛选出时间<=to+1的
        if 'time_to' in data_filter:
            data_filter['time_to'] += ' 23:59:59'
            con.children.append(('server_created__lte', data_filter['time_to']))

        # 默认搜索delete_at为空的数据
        con.children.append(('deleted_at', None))

        return con

    class Meta:
        db_table = 'devices_info'
           

更多模型的复杂查询可以参考:【数据库】Django数据库的详细文档【原创】

app目录下新增serializers目录,新增文件:

app\serializers\devices_info_serializers.py

from rest_framework import serializers
from app.models.devices_info import DevicesInfo


class DevicesInfoSerializer(serializers.ModelSerializer):
    """
    序列化DevicesInfo表数据
    """
    created_at = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S")
    updated_at = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S")
    issued_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S")
    server_created = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S")
    server_launched_at = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S")
    server_updated = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S")

    class Meta:
        model = DevicesInfo
        fields = '__all__'
           

app目录下services目录新增文件:

app\services\devices_service.py

import logging
from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage
from app.models.devices_info import DevicesInfo
from app.serializers.devices_info_serializers import DevicesInfoSerializer

logger = logging.getLogger(__name__)


class DeviceService:
    def __init__(self):
        pass

    @staticmethod
    def get_list(data_filter, is_page=0, page_num=1, page_size=15):
        """
        获取主机列表
        :param data_filter:
        :param is_page: 是否分页
        :param page_num:
        :param page_size:
        :return:
        """
        data = DevicesInfo.get_list(data_filter)
        count = data.count()
        pages_num = 1

        # 是否分页
        if is_page:
            paginator = Paginator(data, page_size)

            try:
                data = paginator.page(page_num)
            except PageNotAnInteger:
                # 如果请求的页数不是整数,返回第一页。
                data = paginator.page(1)
            except EmptyPage:
                # 如果请求的页数不在合法的页数范围内,返回结果的最后一页。
                data = paginator.page(paginator.num_pages)

            count = paginator.count
            pages_num = paginator.num_pages

        # 序列化数据库数据
        res = DevicesInfoSerializer(data, many=True)

        data = {
            'data': res.data,
            'count': count,
            'pages_num': pages_num
        }
        return data
           

app的views.py:

from common.functions import render_json
from django.views.decorators.csrf import csrf_exempt
from app.services.devices_service import DeviceService
import json


@csrf_exempt
def hello(request):
    return render_json(0, 'success', [])


@csrf_exempt
def get_devices(request):
    """
    获取列表
    :param request:
    :return:
    """
    if request.method == 'POST':
        if request.body:
            request_data = json.loads(request.body.decode())
            page_size = request_data.get('pageSize', 15)
            page_num = request_data.get('pageNo', 1)
            is_page = request_data.get('is_page', 0)
            status = request_data.get('status', '')
            name = request_data.get('name', '')
            server_id = request_data.get('server_id', '')
            name_list = request_data.get('name_list', [])

            data_filter = {}
            if status:
                data_filter['status'] = status
            if name:
                data_filter['name'] = name
            if server_id:
                data_filter['server_id'] = server_id
            if name_list:
                data_filter['name_list'] = name_list

            res = DeviceService.get_list(data_filter, is_page, page_num, page_size)

            data = {
                'data': res['data'],
                'pageSize': int(page_size),
                'pageNo': int(page_num),
                'totalPage': res['pages_num'],
                'totalCount': res['count'],
            }

            return render_json(data=data)
        else:
            return render_json(code=200, msg='请求数据不能为空')
    else:
        return render_json(code=100, msg='请求方式错误')
           

app/urls.py:

from django.urls import path
from app import views

urlpatterns = [
    path('hello', views.hello),
    path('get_devices', views.get_devices),
]
           

这样就实现了一个简单的API接口,列表接口,支持分页,并且返回的字段是有经过序列化的(主要是时间字段),支持复杂的查询

可以通过代码仓库中的sql目录来导入测试的数据进行测试