天天看点

SQLAlchemy---事务和连接管理

管理事务 (Managing Transactions)

在 1.4 版更改: 会话事务管理已修改为更清晰和更易于使用。 特别是,它现在具有“自动开始”操作,这意味着可以控制事务开始的点,而无需使用传统的“自动提交”模式。。

Session 使用名为 SessionTransaction 的对象一次跟踪单个“虚拟”事务的状态。 然后,该对象利用 Session 对象绑定到的一个或多个底层引擎,以便根据需要使用 Connection 对象启动真正的连接级事务。

这个“虚拟”事务会在需要时自动创建,或者可以使用 Session.begin() 方法启动。 在创建 Session 对象的级别以及维护 SessionTransaction 的范围方面,都尽可能地支持 Python 上下文管理器的使用。

下面,假设我们从一个 Session 开始:

from sqlalchemy.orm import Session
session = Session(engine)
           

现在,我们可以使用上下文管理器在划分的事务中运行操作:

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# 最后提交事务,如果有则回滚
# 是引发的异常
           

在上述上下文结束时,假设没有引发异常,任何挂起的对象都将刷新到数据库并提交数据库事务。 如果在上述块中引发异常,则事务将回滚。 在这两种情况下,退出区块后的上述 Session 已准备好用于后续事务。

Session.begin() 方法是可选的,Session 也可以用于边执行边提交的方法,在这种方法中,它将根据需要自动开始事务;只需提交或回滚以下内容:

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # commits

# will automatically begin again
result = session.execute(< some select statement >)
session.add_all([more_objects, ...])
session.commit()  # commits

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()   # rolls back still_another_object
           

Session本身具有一个 Session.close() 方法。如果 Session 在尚未提交或回滚的事务中开始,此方法将取消(即回滚)该事务,并删除会话对象状态中包含的所有对象。如果会话的使用方式是调用 Session.commit()或 Session.rollback()(例如,不在上下文管理器或类似的环境中),可以使用close方法来确保释放所有资源:\

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()
           

最后,会话构造/关闭过程本身可以通过上下文管理器运行。这是确保 Session 对象的使用范围在固定块内的最佳方法。首先通过会话构造函数进行说明:

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # commits

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # commits

    result = session.execute(<some SELECT statement>)

# remaining transactional state from the .execute() call is
# discarded
           

类似地,sessionmaker 也可以同样的方式使用:

Session = sesssionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits

# closes the Session
           

sessionmaker 本身包括一个 sessionmaker.begin() 方法, 允许同时执行这两个操作:

with Session.begin() as session:
    session.add(some_object):
           

使用保存点 (Using SAVEPOINT)

保存点(savepoint)是事务过程中的一个逻辑点,用于取消部分事务,当结束事务时,会自动的删除该事务中所定义的所有保存点。当执行rollback时,通过指定保存点可以回退到指定的点。

如果使用基础引擎来实现 保存点事务,可以使用 Session.begin_nested():

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested() # 建立保存点  establish a savepoint
    session.add(u3)
    nested.rollback()  # 回滚 u3, u1 and u2  不变

# 提交 u1 、u2
           

每次调用 Session.begin_nested() 时,都会向数据库发出一个带有唯一标识符的新“BEGIN SAVEPOINT”命令。 当调用 SessionTransaction.commit() 时,会在数据库上发出“RELEASE SAVEPOINT”,如果改为调用 SessionTransaction.rollback(),则会发出“ROLLBACK TO SAVEPOINT”。

Session.begin_nested() 也可以使用上下文:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()
           

当 Session.begin_nested() 被调用时,会无条件地发出 Session.flush() (不管 autoflush 设置如何)。 这样一来,当此嵌套事务发生回滚时,会话的完整状态将过期,从而导致所有后续属性/实例访问在调用 Session.begin_nested() 之前引用 Session 的完整状态。

  • NestedTransaction - NestedTransaction 类是核心级别的构造,Session 内部使用它来生成 SAVEPOINT 块。

会话级与引擎级事务控制 (Session-level vs. Engine level transaction control)

从 SQLAlchemy 1.4 开始, sessionmaker 和 Core Engine 对象都支持 2.0 样式操作,通过使用 Session.future 标志和 create_engine.future 标志,使这两个对象采用 2.0 样式语义。

当使用未来模式时,两个包之间应该有等效的语义,在 sessionmaker 与 Engine 的级别,以及 Session 与 Connection 的级别。 以下部分基于以下方案详细介绍了这些场景:

ORM (using future Session)                    Core (using future engine)
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:
           
随手提交 (Commit as you go)

Session 和 Connection 都具有 Connection.commit() 和 Connection.rollback() 方法。 使用 SQLAlchemy 2.0 风格的操作,这些方法在所有情况下都会影响最外层的事务。

Engine:

engine = create_engine("postgresql://user:[email protected]/dbname", future=True)

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
    conn.commit()
           

Session:

Session = sessionmaker(engine, future=True)

with Session() as session:
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
    session.commit()
           
开始一次 (Begin Once)

sessionmaker 和 Engine 都有 Engine.begin() 方法,该方法都将获取一个新对象,用于执行 SQL 语句(分别为 Session 和 Connection),然后返回一个上下文管理器,该上下文管理器将维护该对象的 begin/commit/rollback 上下文。

Engine:

engine = create_engine("postgresql://user:[email protected]/dbname", future=True)

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
# 提交并自动关闭
           

Session:

Session = sessionmaker(engine, future=True)

with Session.begin() as session:
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
# 提交并自动关闭
           
嵌套事务 (Nested Transaction)

通过 Session.begin_nested() 或 Connection.begin_nested() 方法使用 SAVEPOINT 时,必须使用返回的事务对象来提交或回滚 SAVEPOINT。调用 Session.commit() 或 Connection.commit() 方法将始终提交最外层的事务;这是 SQLAlchemy 2.0 特定行为,与 1.x 系列相反。

Engine:

engine = create_engine("postgresql://user:[email protected]/dbname", future=True)

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
# 提交并自动关闭
           

Session:

Session = sessionmaker(engine, future=True)

with Session.begin() as session:
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
# 提交并自动关闭
           

事务显式开始 (Explicit Begin)

版本 1.4 中更改:SQLAlchemy 1.4 弃用了“自动提交模式”,该模式在历史上是使用 Session.autocommit 标志启用的。展望未来,允许使用 Session.begin() 方法的一种新方法是新的“自动开始”行为,以便现在可以在首次构造 Session 时调用该方法,或者在上一个事务结束之后和开始新事务之前调用该方法。

有关从依赖于 begin() /commit() 对嵌套的框架的“子事务”模式迁移的背景,请参阅下一节从“子事务”模式迁移。

Session 具有“自动开始”行为,这意味着一旦操作开始发生,它会确保存在 SessionTransaction 以跟踪正在进行的操作。 此事务在调用 Session.commit() 时完成。

通常需要控制“开始”操作发生的点,特别是在框架集成中。 为了适应这一点,Session 使用“autobegin”策略,这样可以直接为尚未开始事务的 Session 调用 Session.begin() 方法:

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.query(Item).get(1)
    item2 = session.query(Item).get(2)
    item1.foo = 'bar'
    item2.bar = 'foo'
    session.commit()
except:
    session.rollback()
    raise
           

从“子事务”模式迁移 (Migrating from the “subtransaction” pattern)

  • 自版本 1.4 起已弃用:已弃用 Session.begin.subtransactions 标志。虽然会话在内部仍使用“子事务”模式,但它不适合最终用户使用,因为它会导致混淆,此外,一旦删除了“自动提交”模式,它可能会从版本 2.0 中的会话本身中删除。

经常与自动提交模式一起使用的“子事务”模式在 1.4 中也已弃用。 这种模式允许在事务已经开始时使用 Session.begin() 方法,从而产生一个称为“子事务”的构造,它本质上是一个阻止 Session.commit() 方法实际提交的块。

这种模式在现实世界的应用程序中已被证明是令人困惑的,并且对于应用程序来说,最好确保使用单个开始/提交对执行最顶层的数据库操作。

为了为使用此模式的应用程序提供向后兼容性,可以使用以下上下文管理器或基于装饰器的类似实现:

import contextlib

@contextlib.contextmanager
def transaction(session):
    if not session.in_transaction():
        with session.begin():
            yield
    else:
        yield
           

上述上下文管理器的使用方式与“子事务”标志的工作方式相同,例如在以下示例中:

# method_a starts a transaction and calls method_b
def method_a(session):
    with transaction(session):
        method_b(session)

# method_b also starts a transaction, but when
# called from method_a participates in the ongoing
# transaction.
def method_b(session):
    with transaction(session):
        session.add(SomeObject('bat', 'lala'))

Session = sessionmaker(engine)

# create a Session and call method_a
with Session() as session:
    method_a(session)
           

为了与首选的惯用模式进行比较,开始块应位于最外层。 这消除了对单个函数或方法关注事务划分细节的需要:

def method_a(session):
    method_b(session)

def method_b(session):
    session.add(SomeObject('bat', 'lala'))

Session = sessionmaker(engine)

# create a Session and call method_a
with Session() as session:
    with session.begin():
        method_a(session)
           

启用两阶段提交 (Enabling Two-Phase Commit)

对于支持两阶段操作的后端(目前是 MySQL 和 PostgreSQL),可以指示会话使用两阶段提交语义。 这将协调跨数据库事务的提交,以便在所有数据库中提交或回滚事务。 您还可以 Session.prepare() 用于与不受 SQLAlchemy 管理的事务交互的会话。 要使用两阶段事务,请在会话上设置标志 twophase=True:

engine1 = create_engine('postgresql://db1')
engine2 = create_engine('postgresql://db2')

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User:engine1, Account:engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()
           

设置事务隔离级别/DBAPI 自动提交 (Setting Transaction Isolation Levels / DBAPI AUTOCOMMIT)

大多数 DBAPI 支持可配置事务隔离级别的概念。这些传统上是四个级别“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。这些通常在开始新事务之前应用于 DBAPI 连接,注意大多数 DBAPI 将在首次发出 SQL 语句时隐式地开始此事务。

支持隔离级别的 DBAPI 通常也支持真正的“自动提交”的概念,这意味着 DBAPI 连接本身将被置于非事务性自动提交模式。这通常意味着自动向数据库发出“BEGIN”的典型 DBAPI 行为不再发生,但它也可能包含其他指令。使用此模式时,DBAPI 在任何情况下都不使用事务。 SQLAlchemy 方法,如 .begin()、.commit() 和 .rollback() 静默传递。

SQLAlchemy 的方言支持基于每个引擎或每个连接的可设置隔离模式,使用 create_engine() 级别和 Connection.execution_options() 级别的标志。

使用 ORM Session 时,它充当引擎和连接的外观,但不直接暴露事务隔离。因此,为了影响事务隔离级别,我们需要根据需要对 Engine 或 Connection 采取行动。

  • 设置事务隔离级别,包括 DBAPI 自动提交 - 请务必查看隔离级别在 SQLAlchemy Connection 对象级别上的工作方式。(https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#migrating-from-the-subtransaction-pattern)
为 Sessionmaker/Engine Wide 设置隔离 (Setting Isolation For A Sessionmaker / Engine Wide)

要在全局范围内设置具有特定隔离级别的 Session 或 sessionmaker,第一种技术是在所有情况下都可以针对特定隔离级别构建引擎,然后将其用作 Session 和/或 sessionmaker 的连接源:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql://scott:[email protected]/test",
    isolation_level='REPEATABLE READ'
)

Session = sessionmaker(eng)
           

如果同时有两个具有不同隔离级别的引擎,另一个有用的选项是使用 Engine.execution_options() 方法,该方法将生成原始引擎的浅表副本,该副本与父引擎共享相同的连接池。 当操作将分为“事务”和“自动提交”操作时,这通常是可取的:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql://scott:[email protected]/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)
           

上面,“eng”和“autocommit_engine”都共享相同的方言和连接池。 但是,当从 autocommit_engine 获取连接时,将设置“AUTOCOMMIT”模式。 两个 sessionmaker 对象“transactional_session”和“autocommit_session”在处理数据库连接时继承了这些特性。

“autocommit_session”继续具有事务语义,包括 Session.commit() 和 Session.rollback() 仍然认为自己是“提交”和“回滚”对象,但是事务将默默地消失。 出于这个原因,虽然不是严格要求,但通常以只读方式使用具有 AUTOCOMMIT 隔离的会话,即:

with autocommit_session() as session:
    some_objects = session.execute(<statement>)
    some_other_objects = session.execute(<statement>)

# closes connection
           

为单个会话设置隔离 ( Setting Isolation for Individual Sessions)

当我们创建一个新的 Session 时,无论是直接使用构造函数还是调用 sessionmaker 生成的可调用对象时,我们都可以直接传递 bind 参数,覆盖预先存在的绑定。 例如,我们可以从默认 sessionmaker 创建我们的 Session 并传递一个用于自动提交的引擎集:

plain_engine = create_engine("postgresql://scott:[email protected]/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# will normally use plain_engine
Session = sessionmaker(plain_engine)

# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
           

对于 Session 或 sessionmaker 配置了多个“绑定”的情况,我们可以完全重新指定绑定参数,或者如果我们只想替换特定绑定,我们可以使用 Session.bind_mapper() 或 Session.bind_table() 方法:

with Session() as session:
    session.bind_mapper(User, autocommit_engine)
           
为单个事务设置隔离 (Setting Isolation for Individual Transactions)

关于隔离级别的一个关键警告是,无法在事务已经开始的连接上安全地修改设置。 数据库无法更改正在进行的事务的隔离级别,并且某些 DBAPI 和 SQLAlchemy 方言在这方面的行为不一致。

因此,最好使用预先绑定到具有所需隔离级别的引擎的 Session。 但是,在事务开始时使用 Session.connection() 方法会影响每个连接的隔离级别:

from sqlalchemy.orm import Session

# 假设会话刚刚构建
sess = Session(bind=engine)

# 在进行任何其他操作之前,使用选项调用 connection()。
# 这将从绑定引擎获取一个新连接并开始一个真正的连接
# 数据库事务。
sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

# ... work with session in SERIALIZABLE isolation level...

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.
           

上面,我们首先使用构造函数或 sessionmaker 生成一个 Session。 然后,我们通过调用 Session.connection() 显式设置数据库级事务的开始,它提供了在数据库级事务开始之前将传递给连接的执行选项。 事务以这个选定的隔离级别继续。 当事务完成时,在连接返回到连接池之前,连接上的隔离级别被重置为其默认值。

Session.begin() 方法也可用于开始会话级事务; 在该调用之后调用 Session.connection() 可用于设置每个连接事务的隔离级别:

sess = Session(bind=engine)

with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

    # ... work with session in SERIALIZABLE isolation level...

# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.
           

使用事件跟踪事务状态 (Tracking Transaction State with Events)

有关会话事务状态更改的可用事件挂钩的概述,请参阅事务事件部分。

将会话加入外部事务(例如用于测试套件) (Joining a Session into an External Transaction (such as for test suites))

如果正在使用已处于事务状态的连接(即已建立事务),则可以通过将会话绑定到该连接来使会话参与该事务。 这样做的通常理由是一个测试套件,它允许 ORM 代码自由地与 Session 一起工作,包括调用 Session.commit() 的能力,之后整个数据库交互被回滚。

  • 在 1.4 版中更改:本节介绍了新版本的“加入外部事务”配方,该配方同样适用于 2.0 风格和 1.x 风格的引擎和会话。 这里来自以前版本(例如 1.3)的配方也将继续适用于 1.x 引擎和会话。

该配方通过在事务中建立连接和可选的 SAVEPOINT 来工作,然后将其作为“绑定”传递给会话。 Session 检测到给定的 Connection 已经在事务中,如果事务实际上是最外层事务,则不会对其运行 COMMIT。 然后当测试结束时,事务会回滚,以便在整个测试过程中恢复任何数据更改:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine('postgresql://...')

class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()


        # bind an individual Session to the connection
        self.session = Session(bind=self.connection)


        ###    optional     ###

        # if the database supports SAVEPOINT (SQLite needs special
        # config for this to work), starting a savepoint
        # will allow tests to also use rollback within tests

        self.nested = self.connection.begin_nested()

        @event.listens_for(self.session, "after_transaction_end")
        def end_savepoint(session, transaction):
            if not self.nested.is_active:
                self.nested = self.connection.begin_nested()

        ### ^^^ optional ^^^ ###

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        # if the SAVEPOINT steps are taken, then a test can also
        # use session.rollback() and continue working with the database

        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()
           

上面的配方是 SQLAlchemy 自己的 CI 的一部分,以确保它仍然按预期工作。