天天看点

SQLAlchemy使用笔记一

安装

pip install sqlalchemy
           

引用

import sqlalchemy
print sqlalchemy.__version__
           

连接

from sqlclachemy import create_engine

# PostgreSQL 默认使用psycopg2
# 默认情况(即使用psycopg2)
engine = create_engine('postgresql://scott:[email protected]/mydatabase')
# 使用psycopg2
engine = create_engine('postgresql+psycopg2://scott:[email protected]/mydatabase')
# 使用pg8000
engine = create_engine('postgresql+pg8000://scott:[email protected]/mydatabase')

# MySQL 默认使用mysql-python
# 默认情况(即使用mysql-python)
engine = create_engine('mysql://scott:[email protected]/foo')
# 使用mysql-python
engine = create_engine('mysql+mysqldb://scott:[email protected]/foo')
# 使用MySQL-connector-python
engine = create_engine('mysql+mysqlconnector://scott:[email protected]/foo')
# 使用OurSQL
engine = create_engine('mysql+oursql://scott:[email protected]/foo')

# Oracle 默认使用cx_oracle
# 默认情况(即使用cx_oracle)
engine = create_engine('oracle://scott:[email protected]:1521/sidname')
# 使用cx_oracle
engine = create_engine('oracle+cx_oracle://scott:[email protected]')

# Microsoft SQL Server 默认使用pyodbc
# 使用pyodbc
engine = create_engine('mssql+pyodbc://scott:[email protected]')
# 使用pymssql
engine = create_engine('mssql+pymssql://scott:[email protected]:port/dbname')

# SQLite
# database URL 形式是 sqlite://<nohostname>/<path>
engine = create_engine('sqlite:///foo.db')
# 在Unix/Mac
engine = create_engine('sqlite:absolute/path/to/foo.db')
# 在Windows
engine = create_engine('sqlite:///C:\\path\\to\\foo.db')
# 在Windows 中使用原始字符串
engine = create_engine(r'sqlite:///C:\path\to\foo.db')
# 使用内存
engine = create_engine('sqlite://')
engine = create_engine('sqlite:///:memory:')
           

建表

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Table, MetaData, Column, Integer, String, ForeignKey
from sqlalchemy.orm import mapper


# 创建基类
Base = declarative_base()

# 创建类 Declarative模式
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    password = Column(String)

    def __repr__(self):
        return "<User(name='%s', fullname='%s', password='%s')>" % (
                 self.name, self.fullname, self.password)

# 创建类 非Declarative模式
metadata = MetaData()

user = Table('user', metadata,
            Column('id', Integer, primary_key=True),
            Column('name', String(50)),
            Column('fullname', String(50)),
            Column('password', String(12))
        )

class User(object):
    def __init__(self, name, fullname, password):
        self.name = name
        self.fullname = fullname
        self.password = password

mapper(User, user)

# 创建表
Base.metadata.create_all(engine)
# 删除表
Base.metadata.drop_all(engine)
           

会话

from sqlalchemy.orm import sessionmaker

# 创建
Session = sessionmaker(bind=engine)
# Session = sessionmaker()
# Session.configure(bind=engine)
session = Session()

# 插入
ed_user = User(name='ed', fullname='Ed Jones', password='edspassword')
session.add(ed_user)
session.add_all([
    User(name='wendy', fullname='Wendy Williams', password='foobar'),
    User(name='mary', fullname='Mary Contrary', password='xxg527'),
    User(name='fred', fullname='Fred Flinstone', password='blah')])

# 提交
session.commit()

# 暂存
session.flush()

# 回滚
session.rollback()
           

读取

# 执行query操作之前会自动执行flush,存入内存

for instance in session.query(User).order_by(User.id):
    print(instance.name, instance.fullname)

# 可以只取其中的某些字段
for name, fullname in session.query(User.name, User.fullname):
    print(name, fullname)
for row in session.query(User, User.name).all():
    print(row.User, row.name)
for row in session.query(User.name.label('name_label')).all():
    print(row.name_label)

# 可以使用alias对表取别名,可以在表join本身时用
from sqlalchemy.orm import aliased
user_alias = aliased(User, name='user_alias')
for row in session.query(user_alias, user_alias.name).all():
    print(row.user_alias)

# 使用filter_by筛选
for name, in sesson.query(User.name).filter_by(fullname='Ed Jones'):
    print(name)

# 使用filter,更加的灵活
for name, in sesson.query(User.name).filter(User.fullname=='Ed Jones'):
    print(name)

from sqlalchemy import and_, or_

query.filter(User.name == 'ed')  # 相等
query.filter(User.name != 'ed')  # 不相等
query.filter(User.name.like('%ed%'))  # like
query.filter(User.name.in_(['ed', 'wendy', 'jack']))  # in
query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))))
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))  # not in
query.filter(User.name == None)  # IS NULL
query.filter(User.name.is_(None))
query.filter(User.name != None)  # IS NOT NULL
query.filter(User.name.isnot(None))
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))  # And
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
query.filter(or_(User.name == 'ed', User.name == 'wendy'))  # OR
query.filter(User.name.match('wendy'))  # Match

# 返回列表
query(User).filter(Usre.name.like('%ed%')).all()

# 取出第一个
query.first()
query.one()  # 如果筛选的结果不只一个,报MultipleResultsFound错,如果没有报NoResultFound 错
query.one_or_none()  # 如果筛选的结果不只一个,报MultipleResultsFound错,如果没有返回None
query.scalar()  # 同one_or_none()

# 使用原始的sql
session.query(User).filter('id<200').order_by(text('id')).all()
session.query.(User).filter(text('id<:value and name=:name')).params(value=224, name='fred').order_by(User.id).one()
query = session.query(User).from_statement(text('SELECT * FROM users where name=:name').params(name='ed').all()).all()

# 计数count
from sqlalchemy import func
session.query(User).filter(User.name.like('%ed%')).count()
session.query(func.count(User.name), User.name).group_by(User.name).all()
session.query(func.count('*')).select_from(User).scalar()
session.query(func.count(User.id)).scalar()