天天看點

Mybatis源碼閱讀(四):核心接口4.2——Executor(上)

*************************************優雅的分割線 **********************************

分享一波:程式員賺外快-必看的巅峰幹貨

如果以上内容對你覺得有用,并想擷取更多的賺錢方式和免費的技術教程

請關注微信公衆号:HB荷包

Mybatis源碼閱讀(四):核心接口4.2——Executor(上)

一個能讓你學習技術和賺錢方法的公衆号,持續更新

*************************************優雅的分割線 **********************************

Executor

Executor是Mybatis的核心接口之一,其中定義了資料庫操作的基本方法。在實際應用中涉及的SqlSession的操作都是基于Executor實作的。Executor代碼如下。

public interface Executor {

ResultHandler NO_RESULT_HANDLER = null;

int update(MappedStatement ms, Object parameter) throws SQLException;

List query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;

List query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;

Cursor queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;

List flushStatements() throws SQLException;

void commit(boolean required) throws SQLException;

void rollback(boolean required) throws SQLException;

CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);

boolean isCached(MappedStatement ms, CacheKey key);

void clearLocalCache();

void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);

Transaction getTransaction();

void close(boolean forceRollback);

boolean isClosed();

void setExecutorWrapper(Executor executor);

}

[點選并拖拽以移動]

Executor接口的實作中使用到了裝飾器模式和模闆方法模式,關于設計模式的内容可以檢視我之前的文章,這裡就不貼出文章連結了。Executor的實作如圖所示。

BaseExecutor

BaseExecutor是個抽象類,實作了Executor大部分的方法。BaseExecutor中主要提供了緩存管理和事務管理的基本功能,繼承BaseExecutor的子類隻需要實作四個基本的方法來完成資料庫的相關操作即可,分别是doUpdate、doQuery、doQueryCursor、doFlushStatement。其餘的方法在BaseExecutor中都有了實作。BaseExecutor的字段如下

/**
 * 事務對象
 */
protected Transaction transaction;

/**
 * 封裝的Executor對象
 */
protected Executor wrapper;

/**
 * 延遲加載隊列
 */
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;

/**
 * 一級緩存,用于緩存該Executor對象查詢結果集映射得到的結果對象
 */
protected PerpetualCache localCache;

/**
 * 一級緩存,用來緩存輸出類型的參數
 */
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;

/**
 * 記錄嵌套查詢的層數
 */
protected int queryStack;
/**
 * 辨別Executor是否關閉
 */
private boolean closed;
           

一級緩存

常見的系統中,資料庫資源是比較珍貴的,在web系統中的性能瓶頸主要也就是資料庫。在設計系統時,會使用多種優化手段去減少資料庫的直接通路,比如使用緩存。使用緩存可以減少系統與資料庫的網絡互動、減少資料庫通路次數、降低資料庫負擔、降低重複建立和銷毀對象等一系列的開銷,進而提升系統的性能。同時,當資料庫意外當機時,緩存中儲存的資料可以繼續支援系統部分功能的正常展示,提高系統的可用性。Mybatis提供了一級緩存和二級緩存,我們這裡先讨論一級緩存。

一級緩存是會話級别的緩存,在Mybatis中每建立一個SqlSession對象,就表示開啟一次資料庫會話。在一次會話中,系統可能回反複的執行相同的查詢語句,如果不對資料庫進行緩存,那麼短時間内執行多次完全相同的SQL語句,查詢到的結果集也可能完全相同,就造成了資料庫資源的浪費。

為了避免這種問題,Executor對象中會建立一個簡單的緩存,也就是一級緩存。它會将每次查詢結果緩存起來,再執行查詢操作時,會先查詢一級緩存,如果存在完全一樣的查詢語句,則直接從一級緩存中取出相應的結果對象傳回給使用者,進而減少資料庫壓力。

一級緩存的生命周期與SqlSession相同,也就與SqlSession封裝的Executor對象的生命周期相同,當調用了Executor的close方法時,該Executor中的一級緩存将會不可用。同時,一級緩存中對象的存活時間也會受其他因素影響,比如在執行update方法時,也會先清空一級緩存。

query

BaseExecutor方法會首先建立CacheKey對象,并根據CacheKey對象查找一級緩存,如果緩存命中則直接傳回緩存中記錄的結果對象。如果沒有命中則查詢資料庫得到結果集,之後将結果集映射成對象儲存到一級緩存中,同時傳回結果對象。query方法如下所示。

@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    // 擷取BoundSql對象
    BoundSql boundSql = ms.getBoundSql(parameter);
    // 建立CacheKey對象
    CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
    return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
           

在query方法中會先擷取到boundSql對象,并且去建立CacheKey對象,再調用query的一個重載方法。

這裡的CacheKey由MappedStatement的id、對應的offset和limit、包含問号的sql語句、使用者傳遞的實參、Environment的id五部分構成,代碼如下。

/**
 * 建立CacheKey對象
 * CacheKey由Sql節點的id、offset、limit、sql、實參、環境組成
 *
 * @param ms
 * @param parameterObject
 * @param rowBounds
 * @param boundSql
 * @return
 */
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    CacheKey cacheKey = new CacheKey();
    // 将sql節點的id添加到CacheKey
    cacheKey.update(ms.getId());
    // 将offset添加到CacheKey
    cacheKey.update(rowBounds.getOffset());
    // 将limit添加到CacheKey
    cacheKey.update(rowBounds.getLimit());
    // 将SQL添加到CacheKey(包含?的sql)
    cacheKey.update(boundSql.getSql());
    // 擷取參數映射
    List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
    // 擷取類型處理器
    TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
    // 周遊參數映射
    for (ParameterMapping parameterMapping : parameterMappings) {
        // 輸出類型參數不要
        if (parameterMapping.getMode() != ParameterMode.OUT) {
            Object value;
            // 擷取屬性名稱
            String propertyName = parameterMapping.getProperty();
            // 擷取參數值
            if (boundSql.hasAdditionalParameter(propertyName)) {
                value = boundSql.getAdditionalParameter(propertyName);
            } else if (parameterObject == null) {
                value = null;
            } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
                value = parameterObject;
            } else {
                MetaObject metaObject = configuration.newMetaObject(parameterObject);
                value = metaObject.getValue(propertyName);
            }
            // 将實參參數值添加到CacheKey
            cacheKey.update(value);
        }
    }
    // 環境不為空
    if (configuration.getEnvironment() != null) {
        // 将目前環境添加到CacheKey
        cacheKey.update(configuration.getEnvironment().getId());
    }
    return cacheKey;
}
           

而query的重載方法會根據建立的CacheKey對象查詢一級緩存。如果緩存命中則将緩存中記錄的結果對象傳回,如果未命中,則調用doQuery方法查詢資料庫,并存到一級緩存。代碼如下。

public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    // 存入到錯誤上下文中,便于後面操作異常
    ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    // 非嵌套查詢并且目前select節點配置了flushCache
    if (queryStack == 0 && ms.isFlushCacheRequired()) {
        // 先清空緩存
        clearLocalCache();
    }
    List<E> list;
    try {
        // 查詢層數+1
        queryStack++;
        // 先查詢 一級緩存
        list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
        if (list != null) {
            // 針對存儲過程調用的處理。在一級緩存 命中時,擷取緩存中儲存的輸出類型參數,設定到使用者傳入的實參中
            handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
        } else {
            // 資料庫查詢,并得到映射後的結果對象
            list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
        }
    } finally {
        // 目前查詢完成,查詢層數減少
        queryStack--;
    }
    // 延遲加載相關
    if (queryStack == 0) {
        // 觸發DeferredLoad加載一級緩存中記錄的嵌套查詢的結果對象
        for (DeferredLoad deferredLoad : deferredLoads) {
            deferredLoad.load();
        }
        // 加載完成後清除deferredLoads
        deferredLoads.clear();
        if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
            // 根據localCacheScope配置決定是否清空一級緩存
            clearLocalCache();
        }
    }
    return list;
}
           

BaseExecutor中緩存除了緩存結果集以外,在分析嵌套查詢時,如果一級緩存中緩存了嵌套查詢的結果對象,則可以從一級緩存中直接加載該結果對象。如果一級緩存中記錄的嵌套查詢的結果對象并未完全加載,則可以通過DeferredLoad實作類實作延遲加載的功能。與這個流程相關的方法有兩個,isCached方法負責檢測是否緩存了指定查詢的結果對象,deferLoad方法負責建立DeferredLoad對象并添加到deferredLoad集合中。代碼如下。

/**
 * 檢測是否緩存了指定查詢的結果對象
 *
 * @param ms
 * @param key
 * @return
 */
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
    // 檢測緩存中是否花奴才能了CacheKey對象
    return localCache.getObject(key) != null;
}

/**
 * 負責建立DeferredLoad對象并将其添加到deferredLoads集合中
 *
 * @param ms
 * @param resultObject
 * @param property
 * @param key
 * @param targetType
 */
@Override
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    DeferredLoad deferredLoad = new DeferredLoad(resultObject, property, key, localCache, configuration, targetType);
    if (deferredLoad.canLoad()) {
        // 一級緩存中已經記錄了指定查詢結果的對象,直接從緩存中加載對象,并設定到外層對象
        deferredLoad.load();
    } else {
        // 将deferredLoad對象添加到deferredLoads隊列中,待整個外層查詢結束後再加載結果對象
        deferredLoads.add(new DeferredLoad(resultObject, property, key, localCache, configuration, targetType));
    }
}
           

DeferredLoad是定義在BaseExecutor中的内部類,它負責從loadCache緩存中延遲加載結果對象,含義如下。

/**
     * 外層對象對應的MetaObject
     */
    private final MetaObject resultObject;
    /**
     * 延遲加載的屬性名稱
     */
    private final String property;
    /**
     * 延遲加載的屬性類型
     */
    private final Class<?> targetType;
    /**
     * 延遲加載的結果對象在一級緩存中的CacheKey
     */
    private final CacheKey key;
    /**
     * 一級緩存
     */
    private final PerpetualCache localCache;
    private final ObjectFactory objectFactory;
    /**
     * 負責結果對象的類型轉換
     */
    private final ResultExtractor resultExtractor;
           

DeferredLoad的canLoad方法負責檢測緩存項是否已經完全加載到緩存中。BaseExecutor的queryFromDatabase方法中,開始調用doQuery查詢資料庫之前,會先在localCache中放一個占位符,待查詢完畢後會将key替換成真實的資料,此時緩存就完全加載了。queryFromDatabase方法的實作如下。

/**
 * 從資料庫中查詢
 *
 * @param ms
 * @param parameter
 * @param rowBounds
 * @param resultHandler
 * @param key
 * @param boundSql
 * @param <E>
 * @return
 * @throws SQLException
 */
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    List<E> list;
    // 先添加一個占位符,查詢完畢後才将真正的結果對象放入緩存,此時算完全家在
    localCache.putObject(key, EXECUTION_PLACEHOLDER);
    try {
        list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
    } finally {
        // 删除占位符
        localCache.removeObject(key);
    }
    // 将真正的結果對象添加到一級緩存中
    localCache.putObject(key, list);
    // 如果是存儲過程
    if (ms.getStatementType() == StatementType.CALLABLE) {
        // 緩存輸出類型的參數
        localOutputParameterCache.putObject(key, parameter);
    }
    return list;
}
           

canLoad和load方法實作如下。

/**
     * 判斷是否是完全加載
     *
     * @return
     */
    public boolean canLoad() {
        return localCache.getObject(key) != null && localCache.getObject(key) != EXECUTION_PLACEHOLDER;
    }

    /**
     * 負責從緩存中加載結果對象,設定到外層對象 的屬性中
     */
    @SuppressWarnings("unchecked")
    public void load() {
        // 從緩存中查詢指定的結果對象
        List<Object> list = (List<Object>) localCache.getObject(key);
        // 将緩存的結果對象轉換成指定的類型
        Object value = resultExtractor.extractObjectFromList(list, targetType);
        // 設定到外層對象的對應屬性
        resultObject.setValue(property, value);
    }
           

clearLocalCache方法用于清空緩存。query方法會根據flushCache屬性和localCacheScope配置決定是否清空一級緩存。update方法在執行insert、update、delete三類SQL語句之前,會清空緩存。代碼比較簡單這裡就不貼了。

事務操作

在BatchExecutor中可以緩存多條SQL,等待合适的時機将緩存的多條SQL一起發送給資料庫執行。Executor.flushStatements方法主要是針對批處理多條SQL語句的,會調用doFlushStatements方法處理Executor中緩存的多條SQL語句,在BaseExecutor的commit、rollback方法中會首先調用flushStatement方法,再執行相關事務操作,方法具體的實作如下。

public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    return doFlushStatements(isRollBack);
}
           

BaseExecutor.commit方法首先會清空一級緩存,調用flushStatements,最後才根據參數決定是否真正送出事務。代碼如下,

/**
 * 送出事務
 * @param required
 * @throws SQLException
 */
@Override
public void commit(boolean required) throws SQLException {
    if (closed) {
        throw new ExecutorException("Cannot commit, transaction is already closed");
    }
    // 清除緩存
    clearLocalCache();
    // 處理緩存的SQL
    flushStatements();
    if (required) {
        // 送出事務
        transaction.commit();
    }
}
           

*************************************優雅的分割線 **********************************

分享一波:程式員賺外快-必看的巅峰幹貨

如果以上内容對你覺得有用,并想擷取更多的賺錢方式和免費的技術教程

請關注微信公衆号:HB荷包

Mybatis源碼閱讀(四):核心接口4.2——Executor(上)

一個能讓你學習技術和賺錢方法的公衆号,持續更新

*************************************優雅的分割線 **********************************