天天看點

SQLAlchemy---事務和連接配接管理

管理事務 (Managing Transactions)

在 1.4 版更改: 會話事務管理已修改為更清晰和更易于使用。 特别是,它現在具有“自動開始”操作,這意味着可以控制事務開始的點,而無需使用傳統的“自動送出”模式。。

Session 使用名為 SessionTransaction 的對象一次跟蹤單個“虛拟”事務的狀态。 然後,該對象利用 Session 對象綁定到的一個或多個底層引擎,以便根據需要使用 Connection 對象啟動真正的連接配接級事務。

這個“虛拟”事務會在需要時自動建立,或者可以使用 Session.begin() 方法啟動。 在建立 Session 對象的級别以及維護 SessionTransaction 的範圍方面,都盡可能地支援 Python 上下文管理器的使用。

下面,假設我們從一個 Session 開始:

from sqlalchemy.orm import Session
session = Session(engine)
           

現在,我們可以使用上下文管理器在劃分的事務中運作操作:

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# 最後送出事務,如果有則復原
# 是引發的異常
           

在上述上下文結束時,假設沒有引發異常,任何挂起的對象都将重新整理到資料庫并送出資料庫事務。 如果在上述塊中引發異常,則事務将復原。 在這兩種情況下,退出區塊後的上述 Session 已準備好用于後續事務。

Session.begin() 方法是可選的,Session 也可以用于邊執行邊送出的方法,在這種方法中,它将根據需要自動開始事務;隻需送出或復原以下内容:

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # commits

# will automatically begin again
result = session.execute(< some select statement >)
session.add_all([more_objects, ...])
session.commit()  # commits

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()   # rolls back still_another_object
           

Session本身具有一個 Session.close() 方法。如果 Session 在尚未送出或復原的事務中開始,此方法将取消(即復原)該事務,并删除會話對象狀态中包含的所有對象。如果會話的使用方式是調用 Session.commit()或 Session.rollback()(例如,不在上下文管理器或類似的環境中),可以使用close方法來確定釋放所有資源:\

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()
           

最後,會話構造/關閉過程本身可以通過上下文管理器運作。這是確定 Session 對象的使用範圍在固定塊内的最佳方法。首先通過會話構造函數進行說明:

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # commits

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # commits

    result = session.execute(<some SELECT statement>)

# remaining transactional state from the .execute() call is
# discarded
           

類似地,sessionmaker 也可以同樣的方式使用:

Session = sesssionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits

# closes the Session
           

sessionmaker 本身包括一個 sessionmaker.begin() 方法, 允許同時執行這兩個操作:

with Session.begin() as session:
    session.add(some_object):
           

使用儲存點 (Using SAVEPOINT)

儲存點(savepoint)是事務過程中的一個邏輯點,用于取消部分事務,當結束事務時,會自動的删除該事務中所定義的所有儲存點。當執行rollback時,通過指定儲存點可以回退到指定的點。

如果使用基礎引擎來實作 儲存點事務,可以使用 Session.begin_nested():

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested() # 建立儲存點  establish a savepoint
    session.add(u3)
    nested.rollback()  # 復原 u3, u1 and u2  不變

# 送出 u1 、u2
           

每次調用 Session.begin_nested() 時,都會向資料庫發出一個帶有唯一辨別符的新“BEGIN SAVEPOINT”指令。 當調用 SessionTransaction.commit() 時,會在資料庫上發出“RELEASE SAVEPOINT”,如果改為調用 SessionTransaction.rollback(),則會發出“ROLLBACK TO SAVEPOINT”。

Session.begin_nested() 也可以使用上下文:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()
           

當 Session.begin_nested() 被調用時,會無條件地發出 Session.flush() (不管 autoflush 設定如何)。 這樣一來,當此嵌套事務發生復原時,會話的完整狀态将過期,進而導緻所有後續屬性/執行個體通路在調用 Session.begin_nested() 之前引用 Session 的完整狀态。

  • NestedTransaction - NestedTransaction 類是核心級别的構造,Session 内部使用它來生成 SAVEPOINT 塊。

會話級與引擎級事務控制 (Session-level vs. Engine level transaction control)

從 SQLAlchemy 1.4 開始, sessionmaker 和 Core Engine 對象都支援 2.0 樣式操作,通過使用 Session.future 标志和 create_engine.future 标志,使這兩個對象采用 2.0 樣式語義。

當使用未來模式時,兩個包之間應該有等效的語義,在 sessionmaker 與 Engine 的級别,以及 Session 與 Connection 的級别。 以下部分基于以下方案詳細介紹了這些場景:

ORM (using future Session)                    Core (using future engine)
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:
           
随手送出 (Commit as you go)

Session 和 Connection 都具有 Connection.commit() 和 Connection.rollback() 方法。 使用 SQLAlchemy 2.0 風格的操作,這些方法在所有情況下都會影響最外層的事務。

Engine:

engine = create_engine("postgresql://user:[email protected]/dbname", future=True)

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
    conn.commit()
           

Session:

Session = sessionmaker(engine, future=True)

with Session() as session:
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
    session.commit()
           
開始一次 (Begin Once)

sessionmaker 和 Engine 都有 Engine.begin() 方法,該方法都将擷取一個新對象,用于執行 SQL 語句(分别為 Session 和 Connection),然後傳回一個上下文管理器,該上下文管理器将維護該對象的 begin/commit/rollback 上下文。

Engine:

engine = create_engine("postgresql://user:[email protected]/dbname", future=True)

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
# 送出并自動關閉
           

Session:

Session = sessionmaker(engine, future=True)

with Session.begin() as session:
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
# 送出并自動關閉
           
嵌套事務 (Nested Transaction)

通過 Session.begin_nested() 或 Connection.begin_nested() 方法使用 SAVEPOINT 時,必須使用傳回的事務對象來送出或復原 SAVEPOINT。調用 Session.commit() 或 Connection.commit() 方法将始終送出最外層的事務;這是 SQLAlchemy 2.0 特定行為,與 1.x 系列相反。

Engine:

engine = create_engine("postgresql://user:[email protected]/dbname", future=True)

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"}
        ]
    )
# 送出并自動關閉
           

Session:

Session = sessionmaker(engine, future=True)

with Session.begin() as session:
    session.add_all([
        SomeClass(data="some data one"),
        SomeClass(data="some data two"),
        SomeClass(data="some data three")
    ])
# 送出并自動關閉
           

事務顯式開始 (Explicit Begin)

版本 1.4 中更改:SQLAlchemy 1.4 棄用了“自動送出模式”,該模式在曆史上是使用 Session.autocommit 标志啟用的。展望未來,允許使用 Session.begin() 方法的一種新方法是新的“自動開始”行為,以便現在可以在首次構造 Session 時調用該方法,或者在上一個事務結束之後和開始新事務之前調用該方法。

有關從依賴于 begin() /commit() 對嵌套的架構的“子事務”模式遷移的背景,請參閱下一節從“子事務”模式遷移。

Session 具有“自動開始”行為,這意味着一旦操作開始發生,它會确儲存在 SessionTransaction 以跟蹤正在進行的操作。 此事務在調用 Session.commit() 時完成。

通常需要控制“開始”操作發生的點,特别是在架構內建中。 為了适應這一點,Session 使用“autobegin”政策,這樣可以直接為尚未開始事務的 Session 調用 Session.begin() 方法:

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.query(Item).get(1)
    item2 = session.query(Item).get(2)
    item1.foo = 'bar'
    item2.bar = 'foo'
    session.commit()
except:
    session.rollback()
    raise
           

從“子事務”模式遷移 (Migrating from the “subtransaction” pattern)

  • 自版本 1.4 起已棄用:已棄用 Session.begin.subtransactions 标志。雖然會話在内部仍使用“子事務”模式,但它不适合最終使用者使用,因為它會導緻混淆,此外,一旦删除了“自動送出”模式,它可能會從版本 2.0 中的會話本身中删除。

經常與自動送出模式一起使用的“子事務”模式在 1.4 中也已棄用。 這種模式允許在事務已經開始時使用 Session.begin() 方法,進而産生一個稱為“子事務”的構造,它本質上是一個阻止 Session.commit() 方法實際送出的塊。

這種模式在現實世界的應用程式中已被證明是令人困惑的,并且對于應用程式來說,最好確定使用單個開始/送出對執行最頂層的資料庫操作。

為了為使用此模式的應用程式提供向後相容性,可以使用以下上下文管理器或基于裝飾器的類似實作:

import contextlib

@contextlib.contextmanager
def transaction(session):
    if not session.in_transaction():
        with session.begin():
            yield
    else:
        yield
           

上述上下文管理器的使用方式與“子事務”标志的工作方式相同,例如在以下示例中:

# method_a starts a transaction and calls method_b
def method_a(session):
    with transaction(session):
        method_b(session)

# method_b also starts a transaction, but when
# called from method_a participates in the ongoing
# transaction.
def method_b(session):
    with transaction(session):
        session.add(SomeObject('bat', 'lala'))

Session = sessionmaker(engine)

# create a Session and call method_a
with Session() as session:
    method_a(session)
           

為了與首選的慣用模式進行比較,開始塊應位于最外層。 這消除了對單個函數或方法關注事務劃分細節的需要:

def method_a(session):
    method_b(session)

def method_b(session):
    session.add(SomeObject('bat', 'lala'))

Session = sessionmaker(engine)

# create a Session and call method_a
with Session() as session:
    with session.begin():
        method_a(session)
           

啟用兩階段送出 (Enabling Two-Phase Commit)

對于支援兩階段操作的後端(目前是 MySQL 和 PostgreSQL),可以訓示會話使用兩階段送出語義。 這将協調跨資料庫事務的送出,以便在所有資料庫中送出或復原事務。 您還可以 Session.prepare() 用于與不受 SQLAlchemy 管理的事務互動的會話。 要使用兩階段事務,請在會話上設定标志 twophase=True:

engine1 = create_engine('postgresql://db1')
engine2 = create_engine('postgresql://db2')

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User:engine1, Account:engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()
           

設定事務隔離級别/DBAPI 自動送出 (Setting Transaction Isolation Levels / DBAPI AUTOCOMMIT)

大多數 DBAPI 支援可配置事務隔離級别的概念。這些傳統上是四個級别“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。這些通常在開始新事務之前應用于 DBAPI 連接配接,注意大多數 DBAPI 将在首次發出 SQL 語句時隐式地開始此事務。

支援隔離級别的 DBAPI 通常也支援真正的“自動送出”的概念,這意味着 DBAPI 連接配接本身将被置于非事務性自動送出模式。這通常意味着自動向資料庫發出“BEGIN”的典型 DBAPI 行為不再發生,但它也可能包含其他指令。使用此模式時,DBAPI 在任何情況下都不使用事務。 SQLAlchemy 方法,如 .begin()、.commit() 和 .rollback() 靜默傳遞。

SQLAlchemy 的方言支援基于每個引擎或每個連接配接的可設定隔離模式,使用 create_engine() 級别和 Connection.execution_options() 級别的标志。

使用 ORM Session 時,它充當引擎和連接配接的外觀,但不直接暴露事務隔離。是以,為了影響事務隔離級别,我們需要根據需要對 Engine 或 Connection 采取行動。

  • 設定事務隔離級别,包括 DBAPI 自動送出 - 請務必檢視隔離級别在 SQLAlchemy Connection 對象級别上的工作方式。(https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#migrating-from-the-subtransaction-pattern)
為 Sessionmaker/Engine Wide 設定隔離 (Setting Isolation For A Sessionmaker / Engine Wide)

要在全局範圍内設定具有特定隔離級别的 Session 或 sessionmaker,第一種技術是在所有情況下都可以針對特定隔離級别建構引擎,然後将其用作 Session 和/或 sessionmaker 的連接配接源:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql://scott:[email protected]/test",
    isolation_level='REPEATABLE READ'
)

Session = sessionmaker(eng)
           

如果同時有兩個具有不同隔離級别的引擎,另一個有用的選項是使用 Engine.execution_options() 方法,該方法将生成原始引擎的淺表副本,該副本與父引擎共享相同的連接配接池。 當操作将分為“事務”和“自動送出”操作時,這通常是可取的:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql://scott:[email protected]/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)
           

上面,“eng”和“autocommit_engine”都共享相同的方言和連接配接池。 但是,當從 autocommit_engine 擷取連接配接時,将設定“AUTOCOMMIT”模式。 兩個 sessionmaker 對象“transactional_session”和“autocommit_session”在處理資料庫連接配接時繼承了這些特性。

“autocommit_session”繼續具有事務語義,包括 Session.commit() 和 Session.rollback() 仍然認為自己是“送出”和“復原”對象,但是事務将默默地消失。 出于這個原因,雖然不是嚴格要求,但通常以隻讀方式使用具有 AUTOCOMMIT 隔離的會話,即:

with autocommit_session() as session:
    some_objects = session.execute(<statement>)
    some_other_objects = session.execute(<statement>)

# closes connection
           

為單個會話設定隔離 ( Setting Isolation for Individual Sessions)

當我們建立一個新的 Session 時,無論是直接使用構造函數還是調用 sessionmaker 生成的可調用對象時,我們都可以直接傳遞 bind 參數,覆寫預先存在的綁定。 例如,我們可以從預設 sessionmaker 建立我們的 Session 并傳遞一個用于自動送出的引擎集:

plain_engine = create_engine("postgresql://scott:[email protected]/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# will normally use plain_engine
Session = sessionmaker(plain_engine)

# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
           

對于 Session 或 sessionmaker 配置了多個“綁定”的情況,我們可以完全重新指定綁定參數,或者如果我們隻想替換特定綁定,我們可以使用 Session.bind_mapper() 或 Session.bind_table() 方法:

with Session() as session:
    session.bind_mapper(User, autocommit_engine)
           
為單個事務設定隔離 (Setting Isolation for Individual Transactions)

關于隔離級别的一個關鍵警告是,無法在事務已經開始的連接配接上安全地修改設定。 資料庫無法更改正在進行的事務的隔離級别,并且某些 DBAPI 和 SQLAlchemy 方言在這方面的行為不一緻。

是以,最好使用預先綁定到具有所需隔離級别的引擎的 Session。 但是,在事務開始時使用 Session.connection() 方法會影響每個連接配接的隔離級别:

from sqlalchemy.orm import Session

# 假設會話剛剛建構
sess = Session(bind=engine)

# 在進行任何其他操作之前,使用選項調用 connection()。
# 這将從綁定引擎擷取一個新連接配接并開始一個真正的連接配接
# 資料庫事務。
sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

# ... work with session in SERIALIZABLE isolation level...

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.
           

上面,我們首先使用構造函數或 sessionmaker 生成一個 Session。 然後,我們通過調用 Session.connection() 顯式設定資料庫級事務的開始,它提供了在資料庫級事務開始之前将傳遞給連接配接的執行選項。 事務以這個標明的隔離級别繼續。 當事務完成時,在連接配接傳回到連接配接池之前,連接配接上的隔離級别被重置為其預設值。

Session.begin() 方法也可用于開始會話級事務; 在該調用之後調用 Session.connection() 可用于設定每個連接配接事務的隔離級别:

sess = Session(bind=engine)

with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

    # ... work with session in SERIALIZABLE isolation level...

# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.
           

使用事件跟蹤事務狀态 (Tracking Transaction State with Events)

有關會話事務狀态更改的可用事件挂鈎的概述,請參閱事務事件部分。

将會話加入外部事務(例如用于測試套件) (Joining a Session into an External Transaction (such as for test suites))

如果正在使用已處于事務狀态的連接配接(即已建立事務),則可以通過将會話綁定到該連接配接來使會話參與該事務。 這樣做的通常理由是一個測試套件,它允許 ORM 代碼自由地與 Session 一起工作,包括調用 Session.commit() 的能力,之後整個資料庫互動被復原。

  • 在 1.4 版中更改:本節介紹了新版本的“加入外部事務”配方,該配方同樣适用于 2.0 風格和 1.x 風格的引擎和會話。 這裡來自以前版本(例如 1.3)的配方也将繼續适用于 1.x 引擎和會話。

該配方通過在事務中建立連接配接和可選的 SAVEPOINT 來工作,然後将其作為“綁定”傳遞給會話。 Session 檢測到給定的 Connection 已經在事務中,如果事務實際上是最外層事務,則不會對其運作 COMMIT。 然後當測試結束時,事務會復原,以便在整個測試過程中恢複任何資料更改:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine('postgresql://...')

class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()


        # bind an individual Session to the connection
        self.session = Session(bind=self.connection)


        ###    optional     ###

        # if the database supports SAVEPOINT (SQLite needs special
        # config for this to work), starting a savepoint
        # will allow tests to also use rollback within tests

        self.nested = self.connection.begin_nested()

        @event.listens_for(self.session, "after_transaction_end")
        def end_savepoint(session, transaction):
            if not self.nested.is_active:
                self.nested = self.connection.begin_nested()

        ### ^^^ optional ^^^ ###

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        # if the SAVEPOINT steps are taken, then a test can also
        # use session.rollback() and continue working with the database

        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()
           

上面的配方是 SQLAlchemy 自己的 CI 的一部分,以確定它仍然按預期工作。