天天看點

PgSQL · 功能分析 · Listen/Notify 功能

listen 和 notify 是pg很有意思的一個功能,可以用來進行多應用間的通信。它們可以在sql中使用,也可以用c、jdbc裡面的api調用。下面介紹一下其使用方法和核心實作。

用一個簡單的例子,來看一下listen/notify如何使用。假設我們有兩個應用a和b,部署在不同的機器上:a機器處理前端使用者請求,同時需要将一些可以異步執行的任務,配置設定給背景伺服器b。b接收到任務并處理完成後回報給a結果。

利用上面的步驟,a和b兩個機器通過pg完成了通信。在上面的過程中,需要注意的是:

b要想接受到消息,必須在a notify之前運作了listen指令;

a需要使用事務commit操作來觸發消息發送;

消息是異步發送到b的,即無論b的狀态如何,消息都會先到達pg的消息隊列(每個pg執行個體隻有一個唯一的存放所有消息的隊列);b要檢視消息,如果使用的是psql用戶端,則需要先發送帶有事務操作的指令(如begin、commit或rollback)給pg;

a 如果連續發送多個消息,b會一次性收到這些消息;

在c代碼裡面,你可以使用如下的調用來擷取所有已到達的消息,如果沒有消息到達,則進入睡眠狀态。

listen/notify的實作其實比較簡單。主要的資料結構是一個消息隊列(<code>asyncqueuecontrol-&gt;tail</code>和<code>asyncqueuecontrol-&gt;head</code>分别指向隊列尾和隊列頭)和一個程序狀态數組(<code>asyncqueuecontrol-&gt;backend</code>),如下圖所示:

PgSQL · 功能分析 · Listen/Notify 功能

消息隊列裡面存放了所有程序的所有通知消息,而狀态數組存放了所有執行了listen指令、準備接收異步消息的程序的狀态資訊。狀态數組中含有每個程序已經讀取到的消息在隊列裡面的位置指針。如果有了新消息,程序就從此指針往後取,直到讀取全部消息。

當一個連接配接的背景程序接收到listen指令時,先将listen的資訊記錄下來,然後在事務送出時,執行listen操作,即把本程序放入狀态數組(參見<code>exec_listenprecommit</code>函數)。

執行notify指令時,async_notify函數負責把通知放入pendingnotifies連結清單。在事務commit操作前後,執行下面的邏輯:

調用precommit_notify函數,将pendingnotifies連結清單中的消息,放入全局消息隊列;

執行commit操作;

利用調用鍊<code>processcompletednotifies-&gt;signalbackends-&gt;sendprocsignal-&gt;kill</code>,向其他所有狀态數組中的程序,發出通知信号。

另一方面,每個程序在接收到信号後,利用函數<code>handlenotifyinterrupt</code>處理信号。如果目前程序處于事務中,則不立即處理消息,等到事務送出完畢,調用<code>prepare_for_client_read</code>讀取下一個使用者指令時,利用<code>processincomingnotify</code>處理消息;否則,立即調用<code>processincomingnotify</code>處理消息。<code>processincomingnotify</code>最終調用<code>notifymyfrontend</code>發送消息到用戶端:

注意,用戶端收到消息後,并不立即顯示出來,而是需要用api進行擷取。例如,psql就是在執行下一個指令時(如begin、commit),會順便把收到的消息顯示出來的。

listen/notify是一個輕量級的應用間通信機制,有了它,具有通路資料庫能力的應用可以輕易的利用pg實作互操作。當然,由于消息隊列是存放在記憶體裡面的,在發生執行個體當機等問題時,消息将丢失,對可靠性要求高的應用,需要自己進行消息持久化(如利用pg存儲消息,進行持久化)。