天天看点

Flask扩展:Flask-SQLAlchemy用法Flask扩展:Flask-SQLAlchemy用法

Flask扩展:Flask-SQLAlchemy用法

【注意】本文在python3.7下环境下实际验证通过,操作过程中请注意python版本。

1、扩展介绍

SQLAlchemy

是Python SQL工具箱和对象关系映射器,为应用程序提供了SQL的全部功能和灵活性。

SQLAlchemy

提供了一整套知名的企业级持久性模型,旨在高效,高性能的访问数据库,并被适配为简单的

Pythonic

语言。

SQLAlchemy

是Python编程语言下的一款ORM框架,该框架建立在数据库API至上,使用关系映射进行数据库操作,简而言之:将对象转换成SQL,然后使用数据库API执行SQL并获取执行结果,下图为

SQLAlchemy

模型:

Flask扩展:Flask-SQLAlchemy用法Flask扩展:Flask-SQLAlchemy用法

ORM方法论的三个核心原则;

  • 简单:已最基本的形式建数据模型;
  • 传达性:数据库结构被任何人都能理解的语言文档化;
  • 精确性:基于数据库模型创建正确标准化的结构。

可以通过三种方式来使用

SQLAlchemy

  • 使用SQL Expression,通过

    SQLAlchemy

    提供的方法写sql表达式,简介的写sql;
  • 使用原生SQL,直接写sql语句;
  • 使用ORM对象映射,将类映射到数据库,通过对象来操作数据库(本文重点说明)。

之所以先介绍

SQLAlchemy

,是因为本文的主角

Flask-SQLAlchemy

是一个简化了的

SQLAlchemy

操作的flask扩展,先理解

SQLAlchemy

,能够更好的学习

Flask-SQLAlchemy

2、操作方法介绍

字段类型说明

类型名 Python中的类型 说明
Integer int 普通整数,一般为32位
SmallInteger int 取值范围小的整数,一般是16位
BigInteger int或long 不限制精度的整数
Float float 浮点数
Numeric decimal.Decimal 普通整数,一般是32位
String str 变长字符串
Text str 变长字符串,对较长或不限长度的字符串做了优化
Unicode unicode 变长Unicode字符串
UnicodeText unicode 变长Unicode字符串,对较长或不限长度的字符串做了优化
Boolean bool 布尔值
Date datetime.date 时间
Time datetime.datetime 日期和时间
LargeBinary str 二进制文件

常用的SQLAlchemy列选项

选项名 说明
primary_key 如果为True,表示主键
unique 如果为True,代表这列不允许出现重复的值
index 如果为True,为这列创建索引,提高查询效率
nullable 如果为True,允许有空值,如果为False,不允许有空值
default 为这列定义默认值

常用的SQLAlchemy关系选项

选项名 说明
backref 在关系的另一模型中添加反向引用,用于设置外键名称
primary join 明确指定两个模型之间使用的联结条件
uselist 如果为False,不使用列表,而是用标量值
order_by 指定关系中记录的排序方式
secondary 指定多对多关系中关系表的名字
secondary join 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件

数据库基本操作

  • Flask-SQLAlchemy

    中,插入、修改、删除操作,均有数据库会话管理。会话用

    db.session

    表示,在准备把数据写入数据库前,要先将数据添加到会话中然后调用

    commit()

    方法提交会话。
  • Flask-SQLAlchemy

    中,查询操作通过

    query

    对象操作数据,最基本的查询操作是返回表中的所有数据,可以通过过滤器进行更精确的数据库查询。

常用的SQLAlchemy查询过滤器

过滤器 说明
filter() 把过滤器添加到原查询上,返回一个新查询
filter_by() 把等值过滤器添加到原查询上,返回一个新查询
limit() 使用指定的值限定原查询返回的结果
offset() 偏移原查询返回的结果,返回一个新查询
order_by() 根据指定条件对原查询结果进行排序,返回一个新查询
group_by() 根据指定条件对原查询结果进行分组,返回一个新查询

常用的SQLAlchemy查询结果的方法

方法 说明
all() 以列表形式返回查询的所有结果
first() 返回查询的第一个结果,如果无数据,返回None
first_or_404() 返回查询的第一个结果,如果无数据,返回404
get() 返回指定主键对应的行,如不存在,返回None
get_or_404() 返回指定主键对应的行,如不存在,返回404
count() 返回查询结果的数量
paginate() 返回一个Paginate对象,它包含指定范围内的结果

重点介绍一下paginate对象的常用方法:

  • has_next:是否还有下一页
  • has_prev:是否还有上一页
  • items:以列表形式返回当前页的所有内容
  • next(error_out=False):返回上一页的Pagination对象
  • prev(error_out=False):返回下一页的Pagination对象
  • page:当前页的页码(从1开始)
  • pages:总页数
  • per_page:每页显示的数量
  • prev_num:上一页页码数
  • next_num:下一页页码数
  • query:返回创建这个Pagination对象的查询对象
  • total:查询返回的记录总数

3、安装

Flask-SQLAlchemy

使用以下语句安装扩展:

sudo pip install flask-sqlalchemy
           

SQLAlchemy

本身无法操作数据库,必须通过

pymysql

等第三方插件,本文使用的为

pymysql

,所以还需要安装:

sudo pip install pymysql
           

SQLAlchemy

模型中的

Dialect

用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,以下为使用不同插件连接数据库的URL语句:

# '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
# pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
# cx_Oracle
oracle+cx_oracle://user:[email protected]:port/dbname[?key=value&key=value...]
           

Flask-SQLAlchemy

中,也是用同样的方式,数据库使用URL指定,而且程序使用的数据库必须保存到Flask配置对象的

SQLALCHEMY_DATABASE_URI

中:

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]'

# 其他设置
# 动态追踪修改设置,如未设置只会提示警告
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
           

4、基本操作

使用官方示例代码,使用过程中注意将数据库连接语句修改为自己的数据库:

  • test1.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]'
# 动态追踪修改设置,如未设置只会提示警告
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)

# 创建的用户模型,有id、username、email三个字段
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
	
    # 便于查看
    def __repr__(self):
        return '<User %r>' % self.username
           

表操作

创建所有表(注意:该方法需要放在模型之后 ,创建完成之后需要去掉,否则可能会报错,但是在

Flask

中不建议这种建库方式,推荐使用

Flask-Migrate

扩展):

# 将上述例程文件保存为test1.py  然后使用python进入python命令行,注意为python3环境
# 导入删除文件,没有报错表示成功,否则需要检查test1.py文件是否正确
from test1 import db
# 创建表 操作完成后会在对应数据库中创建一个名称为User的数据表
db.create_all()
           

删除所有表:

插入数据

# 导入User模型
from test1 import User

# 单条数据添加 执行完成会在数据库中看到添加的数据
admin = User(username='admin', email='[email protected]')
db.session.add(admin)# 添加
db.session.commit()# 提交

# 多条数据添加
user1 = User(username='user1', email='[email protected]')
user2 = User(username='user2', email='[email protected]')
user3 = User(username='user3', email='[email protected]')
db.session.add_all([user1,user2,user3])# 添加
db.session.commit()# 提交
           

查询数据

# 查询到的数据一般以列表形式展示,如果查找了5条数据,需要获取第二条数据的用户名:
# users[1].username

# 返回查询到的所有对象
User.query.all()
# [<User 'admin'>, <User 'user1'>, <User 'user2'>, <User 'user3'>]

# 使用filter_by精确查找姓名为“admin”的用户
User.query.filter_by(username='admin').all()
#[<User 'admin'>]

# 返回查询到的第一个对象
User.query.first()
#<User 'admin'>

# filter模糊查询,返回名字结尾字符为“1”的所有数据。
User.query.filter(User.username.endswith('1')).all()
#[<User 'user1'>]

# 参数为主键,如果主键不存在没有返回内容
User.query.get(1)
#<User 'admin'>

# 逻辑非,返回名字不等于“admin”的所有数据
User.query.filter(User.username!='admin').all()
#[<User 'user1'>, <User 'user2'>, <User 'user3'>]

# not_ 相当于取反
from sqlalchemy import not_
User.query.filter(not_(User.username=='admin')).all()
#[<User 'user1'>, <User 'user2'>, <User 'user3'>]

# 逻辑与,需要导入and,返回and()条件满足的所有数据
from sqlalchemy import and_
User.query.filter(and_(User.username!='admin',User.email.endswith('example.com'))).all()
#[<User 'user1'>, <User 'user2'>, <User 'user3'>]

# 逻辑或,需要导入or_
from sqlalchemy import or_
User.query.filter(or_(User.username!='admin',User.email.endswith('example.com'))).all()
#[<User 'admin'>, <User 'user1'>, <User 'user2'>, <User 'user3'>]
           

删除数据

# 获取所有数据
User.query.all()
# [<User 'admin'>, <User 'user1'>, <User 'user2'>, <User 'user3'>]

# 查询第一条数据
user = User.query.first()
# 删除
db.session.delete(user)
# 提交
db.session.commit()
# 重新查询
User.query.all()
#[<User 'user1'>, <User 'user2'>, <User 'user3'>]
           

更新数据

# 更新数据 修改查询的第一条数据姓名为“sworld”
user = User.query.first()
user.username = 'sworld'
db.session.commit()
User.query.first()
#<User 'sworld'>
           

【注意】:在官方文档中提及了一点,是关于为什们在

User

类中没有定义

__init__

方法,解释原因是

SQLAlchemy

向所有模型类添加了一个隐式构造函数,该构造函数为其所有列和关系接受关键字参数。如果出于任何原因决定重写构造函数,请确保继续接受`** kwargs

并使用这些

** kwargs``调用超级构造函数以保留此行为:

# Foo 为类名
class Foo(db.Model):
    # ...
    def __init__(**kwargs):
        super(Foo, self).__init__(**kwargs)
        # 需要增加的部分
        # ...
           

5、进阶操作

分页查询

# 分页查询 接着上一步
# 第一页,每页20条数据。 默认第一页。
# 参数:error_out 设为True表示页数不是int或超过总页数时,会报错,并返回404状态码。 默认True
paginate_obj = User.query.paginate(page=1, per_page=20, error_out=False)  

# 获取查询出来的数据。 (error_out设为False,页数不合法时会返回空列表)
user_list = paginate_obj.items
# print(user_list)
# [<User 'sworld'>, <User 'user2'>, <User 'user3'>]
 
# 获取总页数
total_page = paginate_obj.pages
# print(total_page)
# 1
           

关联查询

还是使用官方例程,在同级文件夹中添加下述文件:

  • test2.py
from datetime import datetime
# 引入test1.py中的db和User
from test1 import db,User

class Post(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(80), nullable=False)
    body = db.Column(db.Text, nullable=False)
    pub_date = db.Column(db.DateTime, nullable=False,
        default=datetime.utcnow)
	
    # 添加外键声明
    category_id = db.Column(db.Integer, db.ForeignKey('category.id'),
        nullable=False)
    # 添加关联,其中backref表示在关系的另一模型中添加反向引用,用于设置外键名称
    category = db.relationship('Category',
        backref=db.backref('posts', lazy=True))

    def __repr__(self):
        return '<Post %r>' % self.title

class Category(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), nullable=False)

    def __repr__(self):
        return '<Category %r>' % self.name
           

关联的使用方法如下:

# 退出重新进入python命令行,引入test2.py,在test2.py中,会引入test1.py中的内容
from test2 import db,User,Post,Category
# 重新创建表
db.create_all()

# 分下述六步演示关联使用
py = Category(name='Python')
Post(title='Hello Python!', body='Python is pretty cool', category=py)
p = Post(title='Snakes', body='Ssssssss')
py.posts.append(p)
db.session.add(py)
db.session.commit()
           

这是官方提供的比较经典的关联用法,分六步进行说明:

  • 第一步创建了一个

    name='Python'

    Category

    对象;
  • 第二步使用关联创建了一个

    Post

    对象,关联上一个创建的

    py

  • 第三步又创建了一个名为

    p

    Post

    对象;
  • 第四步调用

    py

    对象的

    posts

    方法将第三步创建的

    p

    添加至

    py

    posts

    在Post类中添加了反向引用);
  • 第五步通过会话添加;
  • 第六步提交会话。

通过上面几个步骤就在

Category

表中添加一条数据,在

Post

表中添加两条数据,查询可得:

# 查询Category表全部数据
Category.query.all()
# [<Category 'Python'>]

# 查询Post表全部数据
Post.query.all()
# [<Post 'Hello Python!'>, <Post 'Snakes'>]

# 也可以使用Category模型中的posts关联获取对应的Post数据
Category.query.first().posts
# [<Post 'Hello Python!'>, <Post 'Snakes'>]
           

多表查询

join查询

还是继续上文中的示例,多表查询可以使用join进行查询,以下展示的是使用filter过滤查询:

db.session.query(Category,Post).filter(Category.id==Post.category_id).all()
# [(<Category 'Python'>, <Post 'Hello Python!'>), (<Category 'Python'>, <Post 'Snakes'>)]

# for打印出来
for ctg,pst in db.session\
	.query(Category,Post)\
	.filter(Category.id==Post.category_id).all():
    print(ctg,pst)
# <Category 'Python'> <Post 'Hello Python!'>
# <Category 'Python'> <Post 'Snakes'>
           

使用

join

示例如下:

# 指定外键情况下可适用下述方法
db.session.query(Category,Post).join(Post).all()
# [(<Category 'Python'>, <Post 'Hello Python!'>), (<Category 'Python'>, <Post 'Snakes'>)]

# 没有指定外键则需要在join后面添加过滤参数
db.session.query(Category,Post).join(Post,Category.id==Post.category_id).all()
# [(<Category 'Python'>, <Post 'Hello Python!'>), (<Category 'Python'>, <Post 'Snakes'>)]
           

子查询(subquery)

子查询示例如下:

# 需要查询Category某条数据在Post表中对应的数据条数:
# 对Post进行子查询
# 添加引用
from sqlalchemy.sql import func
spst=db.session\
	.query(Post.category_id,func.count('*')\
    .label('post_count'))\
    .group_by(Post.category_id).subquery()
db.session\
	.query(Category,spst.c.post_count)\
    .outerjoin(spst,Category.id==spst.c.category_id).all()
# [(<Category 'Python'>, 2)]
           

使用别名(aliased)

SQLAlchemy 使用

aliased()

方法表示别名,当我们需要把同一张表连接多次的时候,常常需要用到别名。以下为其他文章的用法,可以做一个参考:

from sqlalchemy.orm import aliased

# 把 Address 表分别设置别名
adalias1 = aliased(Address)
adalias2 = aliased(Address)

for username, email1, email2 in \
    session.query(User.name, adalias1.email_address, adalias2.email_address).\
    join(adalias1, User.addresses).\
    join(adalias2, User.addresses).\
    filter(adalias1.email_address=='[email protected]').\
    filter(adalias2.email_address=='[email protected]'):
    
    print(username, email1, email2)

# 执行结果
jack [email protected].com [email protected].com
           

【参考链接】

  • Python Flask,数据库,SQLAlchemy,数据库查询,分页,分页查询paginate
  • Flask-SQLAlchemy中的分页操作
  • SQLAlchemy —— 多表查询
  • python3 flask 使用Mysql数据库
  • flask数据库操作
  • 官方文档
  • 20.Python笔记之SqlAlchemy使用
  • python里比较流行的ORM框架:sqlalchemy
  • SQLAlchemy的基本使用
  • python中使用SQLAlchemy
  • SQLAlchemy 1.4 Documentation