天天看點

TimesTen JMS/XLA 實作 Trigger 功能

TimesTen

支援存儲過程、函數等功能,但是不支援觸發器。相應的,你可以使用

TimesTen JMS/XLA API

來監聽本地資料庫中指定表的更改,并收到這些更改的實時通知。JMS/XLA的首要目的是作為觸發器的高性能、異步的替代方式。

JMS/XLA

實作了

JMS

接口,讓

TimesTen

的事務處理日志(

Transaction Log API

,即

XLA

)的功能可以使用在

Java

應用上。

JMS/XLA 基本概念

Java

應用可以使用

JMS/XLA

來接受

TimesTen

的事件通知,

JMS/XLA

使用

JMS

的釋出訂閱接口來提供對

XLA

更新事件的通路。

通過建立一個

JMS

會話(

Session

)執行個體來訂閱更新,該執行個體提供與

XLA

的連接配接,并用于建立持久訂閱者(

TopicSubscriber

)。你可以通過這個訂閱者同步地接受和處理消息,也可以實作一個監聽器(

MessageListener

)異步地處理更新。

JMS/XLA

被設計為用來監聽本地資料庫,

TimesTen

和和接收通知的應用程式必須在同一個系統上。

XLA 如何讀取事務日志

當應用修改資料庫之後,

TimesTen

會産生一條事務日志記錄來描述更改。新的日志記錄總是添加到緩存區的末尾,并周期性從記憶體同步到磁盤檔案上。應用程式可以使用

XLA

讀取日志,并進行過濾得到感興趣的更改記錄。

XLA 與物化視圖

XLA

可以用來跟蹤表和物化視圖的改變,但是不能跟蹤普通視圖的改變。物化視圖提供單個來源,讓你可以從多個細節表中跟蹤所選行和列的更改。沒有物化視圖,應用程式必須監聽和過濾來自所有細節表的更新記錄,包括應用并不關心的行和列。

一般來說,用于跟蹤表的變化或物化視圖的

XLA

機制之間沒有操作上的差異。但是對于異步物化視圖,要意識到對于異步視圖,

XLA

通知的順序不必和相關聯的細節表一緻,也不必和同步視圖一緻。例如,往一張細節表插入兩條資料,在異步物化視圖中可能是以相反的順序插入的。此外,更新操作可能被

XLA

報告為一個删除操作後跟一個插入操作,多個操作可能被合并為一個單一的操作。依賴精确操作順序的應用程式不應該使用異步物化視圖。

XLA 書簽

XLA

書簽标記着

XLA

訂閱者應用程式從事物日志中讀取的位置,書簽支援持久訂閱,允許應用程式從一個主題斷開連接配接,然後重新連接配接來繼續從它離開的地方接收日志更新。

你可以使用一個持久的主題訂閱者(

TopicSubscriber

)來建立一個XLA消息的消費者,建立訂閱者時用的訂閱辨別符被用做

XLA

書簽名。使用

ttXlaSubscribe

ttXlaUnsubscribe

内置的存儲過程啟動和通知對一個表的

XLA

訂閱時,顯示的指定了使用的書簽名。無論何時收到一個确認消息,書簽重置到上一次的讀取位置。

通過調用會話(

Session

)對象的

unsubscribe()

方法可以移除一個持久訂閱,同時也會删除相關聯的

XLA

書簽,重新連接配接時必須建立一個新的訂閱。當訂閱的書簽在使用時不能被修改,要想修改一個訂閱,必須先關閉消息消費者,然後取消訂閱、重新訂閱,最後再打開消息消費者。

注意:

Session

對象的

unsubscribe()

方法會會删除書簽,但是内置的

ttXlaUnsubscribe

存儲過程不會删除書簽。此外,可以通過内置的

ttXlaBookmarkCreate

ttXlaBookmarkDelete

來建立和删除書簽。

unsubscribe()

方法相當于調用

ttXlaUnsubscribe

ttXlaBookmarkDelete

兩個存儲過程。

XLA

在使用時,

TimesTen

事務日志檔案會持有這個書簽,防止事務日志檔案被清除,直到

XLA

确認不再需要它們。

JMS/XLA 配置檔案和主題

要連接配接到

XLA

,需要建立與特定資料庫對應的

JMS

主題對象的連接配接,

JMS/XLA

配置檔案提供了主題名稱與資料庫之間的映射關系。

JMS/XLA

預設配置檔案為

jmsxla.xml

,位于目前工作目錄下。如果需要使用其它的名字或位置,需要指定環境變量并配置

InitialContext

類以及類路徑,當按照配置找到多個檔案時,采用第一個的配置。

一個主題定義包括:主題名,連接配接字元串,預取值(指定一次檢索多少條更新)。下面的配置檔案将主題

XlaDemo

映射到資料庫

TestDB

,如下:

<xlaconfig>
  <topics>
    <topic name="XlaDemo"
      connectionString="DSN=TestDB"
      xlaPrefetch="100" />
  </topics>
</xlaconfig>
           

XLA 更新

應用程式接收

XLA

更新作為

JMS MapMessage

對象,一個

MapMessage

對象包含了一組類型名/值對,對應于

XLA

更新頭的字段。通過

getter

方法擷取消息字段,可以跳過名字檢索獨立的字段值。所有保留的字段名一雙下劃線開頭,如

__TYPE

所有的更新都有一個

__TYPE

字段,用來訓示消息包含的更新類型。類型值為為一個

Integer

,可以使用

com.timesten.dataserver.jmsxla.XlaConstants

中定義的常量來進行比較。有如下支援的類型:

Type Description
INSERT A row has been added
UPDATE A row has been modified
DELETE A row has been removed
COMMIT_ONLY A transaction has been committed
CREATE_TABLE A table has been created
DROP_TABLE A table has been dropped
CREATE_INDEX An index has been created
DROP_INDEX An index has been dropped
ADD_COLUMNS New columns have been added to the table
DROP_COLUMNS Columns have been removed from the table
CREATE_VIEW A materialized view has been created
DROP_VIEW A materialized view has been dropped
CREATE_SEQ A sequence has been created
DROP_SEQ A sequence has been dropped
CREATE_SYNONYM A synonym has been created
DROP_SYNONYM A synonym has been dropped
TRUNCATE All rows in the table have been delete

XLA 确認模式

XLA

确認機制被設計為用來確定應用程式不僅接收到消息,且正确地處理消息。确認一條更新永遠将應用的

XLA

書簽重置為上一條記錄的讀取位置。避免了以前的消息被重複讀取,確定當應用程式重新連接配接到

XLA

,重用書簽時隻接收新的更新記錄。

JMS/XLA

可以自動确認

XLA

更新消息,應用程式也可以顯示地選擇消息确認模式。建立會話時,可以指定更新如何被确認。

JMS/XLA

支援三種确認模式:

  • AUTO_ACKNOWLEDGE

    :該模式下,當接收到更新時會自動确認。每一條消息隻會被投遞一次,重複的消息不會被發送,是以,當程式失敗的時候,消息可能會丢失。消息的投遞與确認總是互相獨立的,是以

    JMS/XLA

    不會預讀取多條記錄,

    xlaprefetch

    屬性将被忽略。
  • DUPS_OK_ACKNOWLEDGE

    :該模式下,更新會被自動确認,但是當程式失敗的時候,重複的消息會被投遞。當上一個預讀取塊的最後一條記錄被讀取時,

    JMS/XLA

    根據

    xlaprefetch

    屬性值來預讀取記錄并發送确認。如果程式在讀完所有預取記錄之前失敗,那麼程式重新啟動之後,預讀取塊的所有記錄都會存在。
  • CLIENT_ACKNOWLEDGE

    :該模式下,程式負責通過調用

    MapMessage

    執行個體的

    acknowledge()

    方法來确認收到更新消息。

    JMS/XLA

    通過

    xlaprefetch

    屬性值來預讀取記錄。

下例展示了如何設定确認模式:

預讀取更新

一次預讀取多條更新記錄比每次單獨地從

XLA

擷取記錄更加高效。當使用

AUTO_ACKNOWLEDGE

時,更新不會被預讀取,比其他模式更慢。可能的話,應該将程式設計為可以容忍重複的更新,以便可以使用

DUPS_OK_ACKNOWLEDGE

,或者顯示地确認更新。顯示确認更新通常會産生最佳性能,隻要可以避免單獨确認每條消息。

确認更新

在更新消息上調用

acknowledge()

可以明确地确認一個

XLA

更新。确認一條消息會隐式地确認所有以前的消息。通常,你會在确認之間接收并處理多條更新消息。如果使用的是

CLIENT_ACKNOWLEDGE

模式并且想在将來重用持久的訂閱,在退出之前,應該調用

acknowledge()

将書簽重置到上一次讀取的位置。

使用 JMS/XLA 模拟觸發器功能

TimesTen 端配置

1、授權

Command> grant create session, create table, XLA to parkinglotuser;
           

2、建立 Xla 測試表

Command> create table xlatest (id number primary key, name varchar2());
           

3、建立 Xla 書簽

4、訂閱

Command> call ttXlaSubscribe('xlatest', 'bookmark');
           

Java 端程式

5、配置 jmsxla.xml

<xlaconfig>
  <topics>
    <!-- topic for xla demo -->
    <topic name="XlaDemo"
           connectionString="DSN=TestDB"
           xlaPrefetch="100" />
  </topics>
</xlaconfig>
           

6、連接配接 XLA 、接收并處理表的更新

import com.timesten.dataserver.jmsxla.XlaConstants;
import org.junit.Test;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Enumeration;

/**
 * Demo which shows how to use JMS/XLA to process updates.
 */
public class AsyncJMS {

    @Test
    public void run() throws JMSException, NamingException {

        String topicName = "XlaDemo";
        String bookmark = "bookmark";

        Context context = new InitialContext();
        TopicConnectionFactory connFactory = (TopicConnectionFactory) context.lookup("TopicConnectionFactory");
        TopicConnection conn = connFactory.createTopicConnection();
        TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, bookmark);
        MyListener listener = new MyListener();

        subscriber.setMessageListener(listener);
        conn.start();

        System.out.println("Starting JMS XLA subscriber");
        System.out.println();
        System.out.println("Waiting for XLA updates on table xlatest ... ");
        System.out.println("- Please use ttIsql or Level1 to modify the xlatest table");

        while (true) {
            try {
                Thread.sleep();
            } catch (InterruptedException e) {
                // unsubscribe (which will delete the XLA bookmark)
                session.unsubscribe(bookmark);
                // close the connection
                conn.close();
                // ignore
                e.printStackTrace();
            }
        }
    }

    /**
     * Class to receive update messages.
     */
    class MyListener implements MessageListener {

        MyListener() {
        }

        /**
         * This method is called when an XLA event occurs.
         *
         * @param msg a MapMessage describing the XLA event.
         */
        public void onMessage(Message msg) {

            MapMessage message = (MapMessage) msg;

            if (msg == null) {
                System.err.println("MyListener: update message is null");
                return;
            }

            try {
                System.out.println();
                System.out.println("onMessage: got a " + message.getJMSType() + " message");

                // Get the type of event (insert, update, delete, drop table, etc.).
                int type = message.getInt(XlaConstants.TYPE_FIELD);
                if (type == XlaConstants.INSERT) {
                    System.out.println("A row was inserted.");
                } else if (type == XlaConstants.UPDATE) {
                    System.out.println("A row was updated.");
                } else if (type == XlaConstants.DELETE) {
                    System.out.println("A row was deleted.");
                } else {
                    // Messages are also received for DDL events such as CREATE TABLE.
                    // This program processes INSERT, UPDATE, and DELETE events,
                    // and ignores the DDL events.
                    return;
                }

                Enumeration enumeration = message.getMapNames();
                while (enumeration.hasMoreElements()) {
                    String name = enumeration.nextElement().toString();
                    System.out.println("Name: " + name + ", Value: " + message.getObject(name));
                }
                System.out.println();

                /*
                 * 1.type == INSERT || type == DELETE
                 * MapMessage的字段包括監聽表的字段、及類本身的字段
                 * 使用 message.getXXX(field) 讀取插入或删除的字段值
                 * 2.type == UPDATE
                 * MapMessage的字段包括監聽表的字段(分為原始值和更新值)、及類本身的字段
                 * 使用 message.getXXX(field) 讀取更新值
                 * 使用 message.getXXX(_field) 讀取原始值
                 * 
                 * 其中field為監聽表的字段名
                 */

                // todo 處理更新邏輯

            } catch (JMSException e) {
                System.err.println("JMSException in " + this.getClass().getName());
                e.printStackTrace();
            } catch (Exception e) {
                System.err.println("Exception in " + this.getClass().getName());
                e.printStackTrace();
            }
        }
    }
}
           

參考文獻

  • Using JMS/XLA for Event Management
  • Triggers in TimesTen (XLA Application)