Flask扩展:Flask-SQLAlchemy用法
【注意】本文在python3.7下环境下实际验证通过,操作过程中请注意python版本。
1、扩展介绍
SQLAlchemy
是Python SQL工具箱和对象关系映射器,为应用程序提供了SQL的全部功能和灵活性。
SQLAlchemy
提供了一整套知名的企业级持久性模型,旨在高效,高性能的访问数据库,并被适配为简单的
Pythonic
语言。
SQLAlchemy
是Python编程语言下的一款ORM框架,该框架建立在数据库API至上,使用关系映射进行数据库操作,简而言之:将对象转换成SQL,然后使用数据库API执行SQL并获取执行结果,下图为
SQLAlchemy
模型:
ORM方法论的三个核心原则;
- 简单:已最基本的形式建数据模型;
- 传达性:数据库结构被任何人都能理解的语言文档化;
- 精确性:基于数据库模型创建正确标准化的结构。
可以通过三种方式来使用
SQLAlchemy
:
- 使用SQL Expression,通过
提供的方法写sql表达式,简介的写sql;SQLAlchemy
- 使用原生SQL,直接写sql语句;
- 使用ORM对象映射,将类映射到数据库,通过对象来操作数据库(本文重点说明)。
之所以先介绍
SQLAlchemy
,是因为本文的主角
Flask-SQLAlchemy
是一个简化了的
SQLAlchemy
操作的flask扩展,先理解
SQLAlchemy
,能够更好的学习
Flask-SQLAlchemy
。
2、操作方法介绍
字段类型说明
类型名 | Python中的类型 | 说明 |
---|---|---|
Integer | int | 普通整数,一般为32位 |
SmallInteger | int | 取值范围小的整数,一般是16位 |
BigInteger | int或long | 不限制精度的整数 |
Float | float | 浮点数 |
Numeric | decimal.Decimal | 普通整数,一般是32位 |
String | str | 变长字符串 |
Text | str | 变长字符串,对较长或不限长度的字符串做了优化 |
Unicode | unicode | 变长Unicode字符串 |
UnicodeText | unicode | 变长Unicode字符串,对较长或不限长度的字符串做了优化 |
Boolean | bool | 布尔值 |
Date | datetime.date | 时间 |
Time | datetime.datetime | 日期和时间 |
LargeBinary | str | 二进制文件 |
常用的SQLAlchemy列选项
选项名 | 说明 |
---|---|
primary_key | 如果为True,表示主键 |
unique | 如果为True,代表这列不允许出现重复的值 |
index | 如果为True,为这列创建索引,提高查询效率 |
nullable | 如果为True,允许有空值,如果为False,不允许有空值 |
default | 为这列定义默认值 |
常用的SQLAlchemy关系选项
选项名 | 说明 |
---|---|
backref | 在关系的另一模型中添加反向引用,用于设置外键名称 |
primary join | 明确指定两个模型之间使用的联结条件 |
uselist | 如果为False,不使用列表,而是用标量值 |
order_by | 指定关系中记录的排序方式 |
secondary | 指定多对多关系中关系表的名字 |
secondary join | 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件 |
数据库基本操作
- 在
中,插入、修改、删除操作,均有数据库会话管理。会话用Flask-SQLAlchemy
表示,在准备把数据写入数据库前,要先将数据添加到会话中然后调用db.session
方法提交会话。commit()
- 在
中,查询操作通过Flask-SQLAlchemy
对象操作数据,最基本的查询操作是返回表中的所有数据,可以通过过滤器进行更精确的数据库查询。query
常用的SQLAlchemy查询过滤器
过滤器 | 说明 |
---|---|
filter() | 把过滤器添加到原查询上,返回一个新查询 |
filter_by() | 把等值过滤器添加到原查询上,返回一个新查询 |
limit() | 使用指定的值限定原查询返回的结果 |
offset() | 偏移原查询返回的结果,返回一个新查询 |
order_by() | 根据指定条件对原查询结果进行排序,返回一个新查询 |
group_by() | 根据指定条件对原查询结果进行分组,返回一个新查询 |
常用的SQLAlchemy查询结果的方法
方法 | 说明 |
---|---|
all() | 以列表形式返回查询的所有结果 |
first() | 返回查询的第一个结果,如果无数据,返回None |
first_or_404() | 返回查询的第一个结果,如果无数据,返回404 |
get() | 返回指定主键对应的行,如不存在,返回None |
get_or_404() | 返回指定主键对应的行,如不存在,返回404 |
count() | 返回查询结果的数量 |
paginate() | 返回一个Paginate对象,它包含指定范围内的结果 |
重点介绍一下paginate对象的常用方法:
- has_next:是否还有下一页
- has_prev:是否还有上一页
- items:以列表形式返回当前页的所有内容
- next(error_out=False):返回上一页的Pagination对象
- prev(error_out=False):返回下一页的Pagination对象
- page:当前页的页码(从1开始)
- pages:总页数
- per_page:每页显示的数量
- prev_num:上一页页码数
- next_num:下一页页码数
- query:返回创建这个Pagination对象的查询对象
- total:查询返回的记录总数
3、安装 Flask-SQLAlchemy
Flask-SQLAlchemy
使用以下语句安装扩展:
sudo pip install flask-sqlalchemy
SQLAlchemy
本身无法操作数据库,必须通过
pymysql
等第三方插件,本文使用的为
pymysql
,所以还需要安装:
sudo pip install pymysql
SQLAlchemy
模型中的
Dialect
用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,以下为使用不同插件连接数据库的URL语句:
# '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
# pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
# cx_Oracle
oracle+cx_oracle://user:[email protected]:port/dbname[?key=value&key=value...]
在
Flask-SQLAlchemy
中,也是用同样的方式,数据库使用URL指定,而且程序使用的数据库必须保存到Flask配置对象的
SQLALCHEMY_DATABASE_URI
中:
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]'
# 其他设置
# 动态追踪修改设置,如未设置只会提示警告
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
4、基本操作
使用官方示例代码,使用过程中注意将数据库连接语句修改为自己的数据库:
- test1.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]'
# 动态追踪修改设置,如未设置只会提示警告
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 查询时会显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
# 创建的用户模型,有id、username、email三个字段
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
# 便于查看
def __repr__(self):
return '<User %r>' % self.username
表操作
创建所有表(注意:该方法需要放在模型之后 ,创建完成之后需要去掉,否则可能会报错,但是在
Flask
中不建议这种建库方式,推荐使用
Flask-Migrate
扩展):
# 将上述例程文件保存为test1.py 然后使用python进入python命令行,注意为python3环境
# 导入删除文件,没有报错表示成功,否则需要检查test1.py文件是否正确
from test1 import db
# 创建表 操作完成后会在对应数据库中创建一个名称为User的数据表
db.create_all()
删除所有表:
插入数据
# 导入User模型
from test1 import User
# 单条数据添加 执行完成会在数据库中看到添加的数据
admin = User(username='admin', email='[email protected]')
db.session.add(admin)# 添加
db.session.commit()# 提交
# 多条数据添加
user1 = User(username='user1', email='[email protected]')
user2 = User(username='user2', email='[email protected]')
user3 = User(username='user3', email='[email protected]')
db.session.add_all([user1,user2,user3])# 添加
db.session.commit()# 提交
查询数据
# 查询到的数据一般以列表形式展示,如果查找了5条数据,需要获取第二条数据的用户名:
# users[1].username
# 返回查询到的所有对象
User.query.all()
# [<User 'admin'>, <User 'user1'>, <User 'user2'>, <User 'user3'>]
# 使用filter_by精确查找姓名为“admin”的用户
User.query.filter_by(username='admin').all()
#[<User 'admin'>]
# 返回查询到的第一个对象
User.query.first()
#<User 'admin'>
# filter模糊查询,返回名字结尾字符为“1”的所有数据。
User.query.filter(User.username.endswith('1')).all()
#[<User 'user1'>]
# 参数为主键,如果主键不存在没有返回内容
User.query.get(1)
#<User 'admin'>
# 逻辑非,返回名字不等于“admin”的所有数据
User.query.filter(User.username!='admin').all()
#[<User 'user1'>, <User 'user2'>, <User 'user3'>]
# not_ 相当于取反
from sqlalchemy import not_
User.query.filter(not_(User.username=='admin')).all()
#[<User 'user1'>, <User 'user2'>, <User 'user3'>]
# 逻辑与,需要导入and,返回and()条件满足的所有数据
from sqlalchemy import and_
User.query.filter(and_(User.username!='admin',User.email.endswith('example.com'))).all()
#[<User 'user1'>, <User 'user2'>, <User 'user3'>]
# 逻辑或,需要导入or_
from sqlalchemy import or_
User.query.filter(or_(User.username!='admin',User.email.endswith('example.com'))).all()
#[<User 'admin'>, <User 'user1'>, <User 'user2'>, <User 'user3'>]
删除数据
# 获取所有数据
User.query.all()
# [<User 'admin'>, <User 'user1'>, <User 'user2'>, <User 'user3'>]
# 查询第一条数据
user = User.query.first()
# 删除
db.session.delete(user)
# 提交
db.session.commit()
# 重新查询
User.query.all()
#[<User 'user1'>, <User 'user2'>, <User 'user3'>]
更新数据
# 更新数据 修改查询的第一条数据姓名为“sworld”
user = User.query.first()
user.username = 'sworld'
db.session.commit()
User.query.first()
#<User 'sworld'>
【注意】:在官方文档中提及了一点,是关于为什们在
User
类中没有定义
__init__
方法,解释原因是
SQLAlchemy
向所有模型类添加了一个隐式构造函数,该构造函数为其所有列和关系接受关键字参数。如果出于任何原因决定重写构造函数,请确保继续接受`** kwargs
并使用这些
** kwargs``调用超级构造函数以保留此行为:
# Foo 为类名
class Foo(db.Model):
# ...
def __init__(**kwargs):
super(Foo, self).__init__(**kwargs)
# 需要增加的部分
# ...
5、进阶操作
分页查询
# 分页查询 接着上一步
# 第一页,每页20条数据。 默认第一页。
# 参数:error_out 设为True表示页数不是int或超过总页数时,会报错,并返回404状态码。 默认True
paginate_obj = User.query.paginate(page=1, per_page=20, error_out=False)
# 获取查询出来的数据。 (error_out设为False,页数不合法时会返回空列表)
user_list = paginate_obj.items
# print(user_list)
# [<User 'sworld'>, <User 'user2'>, <User 'user3'>]
# 获取总页数
total_page = paginate_obj.pages
# print(total_page)
# 1
关联查询
还是使用官方例程,在同级文件夹中添加下述文件:
- test2.py
from datetime import datetime
# 引入test1.py中的db和User
from test1 import db,User
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(80), nullable=False)
body = db.Column(db.Text, nullable=False)
pub_date = db.Column(db.DateTime, nullable=False,
default=datetime.utcnow)
# 添加外键声明
category_id = db.Column(db.Integer, db.ForeignKey('category.id'),
nullable=False)
# 添加关联,其中backref表示在关系的另一模型中添加反向引用,用于设置外键名称
category = db.relationship('Category',
backref=db.backref('posts', lazy=True))
def __repr__(self):
return '<Post %r>' % self.title
class Category(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), nullable=False)
def __repr__(self):
return '<Category %r>' % self.name
关联的使用方法如下:
# 退出重新进入python命令行,引入test2.py,在test2.py中,会引入test1.py中的内容
from test2 import db,User,Post,Category
# 重新创建表
db.create_all()
# 分下述六步演示关联使用
py = Category(name='Python')
Post(title='Hello Python!', body='Python is pretty cool', category=py)
p = Post(title='Snakes', body='Ssssssss')
py.posts.append(p)
db.session.add(py)
db.session.commit()
这是官方提供的比较经典的关联用法,分六步进行说明:
- 第一步创建了一个
的name='Python'
对象;Category
- 第二步使用关联创建了一个
对象,关联上一个创建的Post
;py
- 第三步又创建了一个名为
的p
对象;Post
- 第四步调用
对象的py
方法将第三步创建的posts
添加至p
(py
在Post类中添加了反向引用);posts
- 第五步通过会话添加;
- 第六步提交会话。
通过上面几个步骤就在
Category
表中添加一条数据,在
Post
表中添加两条数据,查询可得:
# 查询Category表全部数据
Category.query.all()
# [<Category 'Python'>]
# 查询Post表全部数据
Post.query.all()
# [<Post 'Hello Python!'>, <Post 'Snakes'>]
# 也可以使用Category模型中的posts关联获取对应的Post数据
Category.query.first().posts
# [<Post 'Hello Python!'>, <Post 'Snakes'>]
多表查询
join查询
还是继续上文中的示例,多表查询可以使用join进行查询,以下展示的是使用filter过滤查询:
db.session.query(Category,Post).filter(Category.id==Post.category_id).all()
# [(<Category 'Python'>, <Post 'Hello Python!'>), (<Category 'Python'>, <Post 'Snakes'>)]
# for打印出来
for ctg,pst in db.session\
.query(Category,Post)\
.filter(Category.id==Post.category_id).all():
print(ctg,pst)
# <Category 'Python'> <Post 'Hello Python!'>
# <Category 'Python'> <Post 'Snakes'>
使用
join
示例如下:
# 指定外键情况下可适用下述方法
db.session.query(Category,Post).join(Post).all()
# [(<Category 'Python'>, <Post 'Hello Python!'>), (<Category 'Python'>, <Post 'Snakes'>)]
# 没有指定外键则需要在join后面添加过滤参数
db.session.query(Category,Post).join(Post,Category.id==Post.category_id).all()
# [(<Category 'Python'>, <Post 'Hello Python!'>), (<Category 'Python'>, <Post 'Snakes'>)]
子查询(subquery)
子查询示例如下:
# 需要查询Category某条数据在Post表中对应的数据条数:
# 对Post进行子查询
# 添加引用
from sqlalchemy.sql import func
spst=db.session\
.query(Post.category_id,func.count('*')\
.label('post_count'))\
.group_by(Post.category_id).subquery()
db.session\
.query(Category,spst.c.post_count)\
.outerjoin(spst,Category.id==spst.c.category_id).all()
# [(<Category 'Python'>, 2)]
使用别名(aliased)
SQLAlchemy 使用
aliased()
方法表示别名,当我们需要把同一张表连接多次的时候,常常需要用到别名。以下为其他文章的用法,可以做一个参考:
from sqlalchemy.orm import aliased
# 把 Address 表分别设置别名
adalias1 = aliased(Address)
adalias2 = aliased(Address)
for username, email1, email2 in \
session.query(User.name, adalias1.email_address, adalias2.email_address).\
join(adalias1, User.addresses).\
join(adalias2, User.addresses).\
filter(adalias1.email_address=='[email protected]').\
filter(adalias2.email_address=='[email protected]'):
print(username, email1, email2)
# 执行结果
jack [email protected].com [email protected].com
【参考链接】
- Python Flask,数据库,SQLAlchemy,数据库查询,分页,分页查询paginate
- Flask-SQLAlchemy中的分页操作
- SQLAlchemy —— 多表查询
- python3 flask 使用Mysql数据库
- flask数据库操作
- 官方文档
- 20.Python笔记之SqlAlchemy使用
- python里比较流行的ORM框架:sqlalchemy
- SQLAlchemy的基本使用
- python中使用SQLAlchemy
- SQLAlchemy 1.4 Documentation