mysql之sqlalchemy
安裝
pip install SQLAlchemy
檢視安裝版本
import sqlalchemy
sqlalchemy.__version__
連接配接資料庫
from sqlalchemy import create_engine
import pymysql
pymysql.install_as_MySQLdb()
DB_CON_STR = 'mysql+mysqldb://<user>:<password>@<host_ip>/<database>?[params]'
engine = create_engine(DB_CON_STR, echo=False)
SQL表達式
定義和建立資料表
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
metadata = MetaData()
users = Table(
"users", metadata,
Column("id", Integer, primary_key=True),
Column("name", String(36)),
Column('fullname', String(36)),
)
address = Table(
"address", metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey('users.id')),
Column("email_address", String(120), nullable=False)
)
metadata.create_all(engine)
插入語句
ins = users.insert()
# 使用 str() 函數檢視生成的語句
str(ins)
# 使用 values()限定插入參數
ins = ins.values(name='jack', fullname='Jack Jones')
str(ins) # 放入的資料并沒有呈現出來
# 将語句編譯之後,通過 params檢視參數
ins.compile().params
執行
# 先擷取資料庫連接配接
conn = engine.connect()
result = conn.execute(ins)
# result(ResultProxy) 類似于遊标,我可以從中擷取重要資訊
result.inserted_primary_key # 擷取生成的主鍵
執行多條語句
# insert 可以不用和 values 一起使用
ins = users.insert()
conn.execute(ins, id=2, name='wendy', fullname='Wendy Williams')
conn.execute(ins, name='xiaoming', fullname='xiaoming.li')
# 使用字典清單插入多條資料
conn.execute(ins, [
{"name": 'xiaolu', 'fullname': "小璐.李"},
{"name": 'rongrong', 'fullname': "容.馬"}
])
conn.execute(address.insert(), [
{'user_id': 1, 'email_address' : '[email protected]'},
{'user_id': 1, 'email_address' : '[email protected]'},
{'user_id': 2, 'email_address' : '[email protected]'},
{'user_id': 2, 'email_address' : '[email protected]'},
])
查詢
from sqlalchemy.sql import select
s = select([users])
result = conn.execute(s) # result和遊标類似
# 疊代資料
for row in result:
print(row)
# 像通過列名通路row中的資料
row = result.fetchone()
print("name:", row['name'], "; fullname:", row['fullname'])
# 通過索引通路
print("name:", row[1], "; fullname:", row[2])
# 通過 Column對象通路
print("name:", row[users.c.name], "; fullname:", row[users.c.fullname])
# 用完之後該關閉 result
result.close()
# 指定 select 的列
s = select([users.c.name, users.c.fullname])
...
多表查詢
# 下面的語句會産生笛卡爾積
for row in conn.execute(select([users, address])):
print(row)
# where 條件
s = select([users, address]).where(users.c.id == address.c.user_id)
str(users.c.id == address.c.user_id) # > 'users.id = address.user_id'
運算符
print(users.c.id == addresses.c.user_id) # > users.id = addresses.user_id
print(users.c.id == 7) # users.id = :id_1
(users.c.id == 7).compile().params # > {u'id_1': 7}
print(users.c.id != 7) # users.id != :id_1
print(users.c.name == None) # users.name IS NULL
print('fred' > users.c.name) # users.name < :name_1
print(users.c.name + users.c.fullname) # users.name || users.fullname,字元串拼接
print((users.c.name + users.c.fullname).compile(bind=create_engine('mysql://')))
# concat(users.name, users.fullname),會自動轉換成對應驅動環境的語義。
#如果真有不可用的運算符 可以使用 Operators.op()方法
print(users.c.name.op('tiddlywinks')('foo')) # users.name tiddlywinks :name_1
users.c.id.op('&')(0xFF) # users.id & :id_1
conn.execute(select([users]).where(users.c.name.op('like')("%xiaolu%")))
連詞
from sqlalchemy.sql import and_, or_, not_
print(and_(
users.c.name.like('j%'),
users.c.id == address.c.user_id,
or_(
address.c.email_address == '[email protected]',
address.c.email_address == '[email protected]'
),
not_(users.c.id > 5)
)
)
# 使用 & |
print(users.c.name.like('j%') & (users.c.id == addresses.c.user_id) &
(
(addresses.c.email_address == '[email protected]') |
(addresses.c.email_address == '[email protected]')
)
& ~(users.c.id>5)
)
#users.name LIKE :name_1 AND users.id = address.user_id AND (address.email_address = :email_address_1 OR address.email_address = :email_address_2) AND users.id <= :id_1
# 使用 between()[産生一個BETWEEN子句] 和 label() [使用AS 關鍵字産生标簽]
print(select([users.c.fullname.label('n')]).where(users.c.id.between(10, 20)))
'''
SELECT users.fullname AS n
FROM users
WHERE users.id BETWEEN :id_1 AND :id_2
'''
别名和子查詢
a1 = addresses.alias()
a2 = addresses.alias()
s = select([users]).\
where(and_(
users.c.id == a1.c.user_id,
users.c.id == a2.c.user_id,
a1.c.email_address == '[email protected]',
a2.c.email_address == '[email protected]'
))
'''
SELECT users.id, users.name, users.fullname
FROM users, addresses AS addresses_1, addresses AS addresses_2
WHERE users.id = addresses_1.user_id
AND users.id = addresses_2.user_id
AND addresses_1.email_address = ?
AND addresses_2.email_address = ?
('[email protected]', '[email protected]')
'''
addresses_subq = s.alias()
s = select([users.c.name]).where(users.c.id == addresses_subq.c.id)
'''
SELECT users.name
FROM users,
(SELECT users.id AS id, users.name AS name, users.fullname AS fullname
FROM users, addresses AS addresses_1, addresses AS addresses_2
WHERE users.id = addresses_1.user_id AND users.id = addresses_2.user_id
AND addresses_1.email_address = ?
AND addresses_2.email_address = ?) AS anon_1
WHERE users.id = anon_1.id
('[email protected]', '[email protected]')
'''
使用連接配接
print(users.join(addresses)) # users JOIN addresses ON users.id = addresses.user_id
print(users.join(addresses, addresses.c.email_address.like(users.c.name + '%' )))
# users JOIN addresses ON addresses.email_address LIKE users.name || :name_1
綁定參數對象
from sqlalchemy.sql import bindparam
s = users.select(users.c.name == bindparam('username'))
conn.execute(s, username='wendy').fetchall()
排序,分組,限制,偏移…
stmt = select([users.c.name]).order_by(users.c.name)
stmt = select([users.c.name]).order_by(users.c.name.desc())
stmt = select([users.c.name, func.count(addresses.c.id)]).\
select_from(users.join(addresses)).\
group_by(users.c.name)
stmt = select([users.c.name, func.count(addresses.c.id)]).\
select_from(users.join(addresses)).\
group_by(users.c.name).\
having(func.length(users.c.name) > 4)
stmt = select([users.c.name]).\
where(addresses.c.email_address.
contains(users.c.name)).\
distinct()
stmt = select([users.c.name, addresses.c.email_address]).\
select_from(users.join(addresses)).\
limit(1).offset(1)
更新和删除
stmt = users.update().values(fullname="Fullname: " + users.c.name).where(xxx)
# 多次執行
stmt = users.insert().values(name=bindparam('_name') + " .. name")
conn.execute(stmt, [
{'id':4, '_name':'name1'}, # name1..name ,連接配接
{'id':5, '_name':'name2'},
{'id':6, '_name':'name3'},
])
stmt = users.update().\
where(users.c.name == bindparam('oldname')).\
values(name=bindparam('newname'))
conn.execute(stmt, [
{'oldname':'jack', 'newname':'ed'},
{'oldname':'wendy', 'newname':'mary'},
{'oldname':'jim', 'newname':'jake'},
])
相關更新
stmt = select([addresses.c.email_address]).\
where(addresses.c.user_id == users.c.id).\
limit(1)
conn.execute(users.update().values(fullname=stmt))
删除
比對的行數
result.rowcount
引用自: https://docs.sqlalchemy.org/en/13/core/tutorial.html
ORM
聲明一個映射
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
定義一個類
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(30))
fullname = Column(String(60))
nickname = Column(String(60))
def __repr__(self):
return "<User(name='%s', fullname='%s', nickname='%s')>" % (self.name, self.fullname, self.nickname)
Base.metadata.create_all(engine)
建立一個對象
建立會話
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
# 執行個體化一個Session
session = Session()
添加和更新對象
ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
session.add(ed_user)
# 我們可以User使用一次添加更多對象 add_all():
session.add_all([
User(name='wendy', fullname='Wendy Williams', nickname='windy'),
User(name='mary', fullname='Mary Contrary', nickname='mary'),
User(name='fred', fullname='Fred Flintstone', nickname='freddy')])
# 改名
ed_user.nickname = 'eddie'
print(ession.dirty) # 有一個對象被修改
print(session.new) # 有三個對象等待處理
session.commit() # 同步到資料庫
復原
ed_user.name = 'Edwardo'
fake_user = User(name='fakeuser', fullname='Invalid', nickname='12345')
session.add(fake_user)
session.rollback()
print(ed_user.name) # u'ed'
print(fake_user in session) # False
查詢
for instance in session.query(User).order_by(User.id):
print(instance.name, instance.fullname)
# ...
for name, fullname in session.query(User.name, User.fullname):
...
過濾器運算符
- equals
- not equals
- like/ilike
- in
- not in
- is null / is not null
query.filter(User.name == None) query.filter(User.name != None)
- AND
- OR
from sqlalchemy import or_ query.filter(or_(User.name == 'ed', User.name == 'wendy'))
傳回
- all()
query = session.query(User).filter(User.name.like('%ed')).order_by(User.id) query.all()
- first()
- one()
計數
關系
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", back_populates="addresses")
def __repr__(self):
return "<Address(email_address='%s')>" % self.email_address
>>> User.addresses = relationship(
"Address", order_by=Address.id, back_populates="user")
Base.metadata.create_all(engine)
相關對象
jack = User(name='jack', fullname='Jack Bean', nickname='gjffdd')
jack.addresses = [
Address(email_address='[email protected]'),
Address(email_address='[email protected]')]
session.add(jack)
session.commit()
# 查詢
jack = session.query(User).filter_by(name='jack').one()
print(jack.addresses)
連接配接
for u, a in session.query(User, Address).filter(User.id==Address.user_id).filter(Address.email_address=='[email protected]').all():
...
删除
# 先查詢 --> 後删除
session.delete(jack)
session.commit()
多對多關系
class BlogPost(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
headline = Column(String(255), nullable=False)
body = Column(Text)
# many to many BlogPost<->Keyword
keywords = relationship('Keyword',
secondary=post_keywords,
back_populates='posts')
def __init__(self, headline, body, author):
self.author = author
self.headline = headline
self.body = body
def __repr__(self):
return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author)
class Keyword(Base):
__tablename__ = 'keywords'
id = Column(Integer, primary_key=True)
keyword = Column(String(50), nullable=False, unique=True)
# many to many BlogPost<->Keyword
posts = relationship('BlogPost',
secondary=post_keywords,
back_populates='keywords')
def __init__(self, keyword):
self.keyword = keyword
引用自:https://docs.sqlalchemy.org/en/13/orm/tutorial.html