在前面的兩篇文章中[1,2],我們描述了Oozie工作流伺服器,并且展示了幾個工作流的示例。我們還描述了針對Oozie的工作流的部署和配置,以及用來啟動、停止和監控Oozie工作流的工具。
在本文中,我們會向你展示Oozie的可擴充性,并說明它是如何支援我們實作自定義的、協同工作的語言擴充。
為什麼需要自定義節點(Custom Node)?
正如我們在文章[1]中所說明的,Oozie之是以與衆不同,是因為它提供了一種“最小化”的工作流語言,其中隻包含少數幾種控制和動作節點。盡管 其中的一種動作節點是java動作節點,它讓我們可以從Oozie工作流調用任意一個帶有main方法的java類,但這種方法并非總是最佳的。原因之一 就在于,java動作是在Hadoop簇集中作為map-reduce作業執行的,并且隻帶有唯一的Mapper任務。一方面,這帶來了很多好處:
- 它擁有内建的可伸縮性以及對map/reduce架構的災難恢複支援,這樣我們就不必這些特性建構到Oozie中。
- 外部執行機制,這讓Oozie引擎變得更輕量級,進而可以支援更多并發運作的過程。
另一方面,這種方法也有一些缺點:
- 它把每個java節點都作為mapper任務啟動,這會導緻在Hadoop簇集中啟動新的JVM而産生額外的開銷。
- 在外部執行java類導緻了額外的網絡傳輸,用于與Oozie伺服器同步這些執行結果。
- 從java節點傳遞參數成了非常耗費資源的操作。
盡管如此,在運作時間相對較長(幾分鐘甚至幾小時)的map/reduce或者Pig作業中,好處會大大超過負載的缺點,但是,在簡單的java節 點中(參見[2]),我們就需要注意外部執行所導緻的開銷了。是以,使用自定義動作節點的原因之一,就是為了支援在Oozie的執行上下文中直接執行輕量 級的java類[1]。
使用自定義動作的另一個原因是為了提高工作流的語義和可讀性。由于Oozie是一種支援基于Hadoop處理元件的工作流引擎,是以它的文法完全是 以Hadoop執行為中心的——Hadoop檔案系統、map/reduce、Pig等等。這種文法能夠很好地符合Hadoop開發者的習慣,但是并沒有 涉及到太多關于給定動作的功能資訊。我們可以為動作本身制定與業務相關的命名轉換規則,但這隻是特别用來解決問題的——對動作的命名隻反映了給定過程的語 法,既不是總體上的主題領域,也不能解決動作參數的問題,那些問題仍然隻能由開發者來解決。
幸運的是,Oozie支援非常棒的擴充機制——自定義動作節點[3],它讓我們可以很容易地解決這兩個問題。自定義的動作節點讓我們可以使用附加的動作(動詞)來擴充Oozie的語言。Oozie的動作節點可以是同步的,也可以是異步的。
- 同步節點——它在Oozie内部執行,在繼續執行之前會等待這些節點完成動作。這些節點是為輕量級的任務所用的,像自定義計算,檔案系統的移動、建立目錄、删除等等。
- 異步節點——它是由Oozie啟動的,但是在Oozie引擎的外部執行,它會監控正在執行的動作,直到完成。這是通過動作的回調或者Oozie針對動作狀态的polling操作完成的。
實作Oozie自定義動作處理程式
在這個例子中,我們會對獨立的郵件程式進行轉換,并展現到自定義的email動作中[2](代碼1)。
package com.navteq.assetmgmt.oozie.custom; import java.util.Properties; import java.util.StringTokenizer; import javax.mail.Message; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import org.apache.oozie.ErrorCode; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.action.ActionExecutorException.ErrorType; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.Namespace; public class EmailActionExecutor extends ActionExecutor { private static final String NODENAME = "eMail"; private static final String SUCCEEDED = "OK"; private static final String FAILED = "FAIL"; private static final String KILLED = "KILLED"; private static final String DEFAULMAILSERVER = "imailchi.navtech.com"; private static final String EMAILSERVER = "emailServer"; private static final String SUBJECT = "emailSubject"; private static final String MESSAGE = "emailBody"; private static final String FROM = "emailFrom"; private static final String TO = "emailTo"; public EmailActionExecutor() { super(NODENAME); } @Override public void check(Context context, WorkflowAction action) throws ActionExecutorException { // Should not be called for synch operation throw new UnsupportedOperationException(); } @Override public void end(Context context, WorkflowAction action)throws ActionExecutorException { String externalStatus = action.getExternalStatus(); WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK : WorkflowAction.Status.ERROR; context.setEndData(status, getActionSignal(status)); } @Override public boolean isCompleted(String arg0) { return true; } @Override public void kill(Context context, WorkflowAction action) throws ActionExecutorException { context.setExternalStatus(KILLED); context.setExecutionData(KILLED, null); } @Override public void start(Context context, WorkflowAction action) throws ActionExecutorException { // Get parameters from Node configuration try{ Element actionXml = XmlUtils.parseXml(action.getConf()); Namespace ns = Namespace.getNamespace("uri:custom:email-action:0.1"); String server = actionXml.getChildTextTrim(EMAILSERVER, ns); String subject = actionXml.getChildTextTrim(SUBJECT, ns); String message = actionXml.getChildTextTrim(MESSAGE, ns); String from = actionXml.getChildTextTrim(FROM, ns); String to = actionXml.getChildTextTrim(TO, ns); // Check if all parameters are there if(server == null) server = DEFAULMAILSERVER; if((message == null) || (from == null) || (to == null)) throw new ActionExecutorException(ErrorType.FAILED, ErrorCode.E0000.toString(), "Not all parameters are defined"); // Execute action synchronously SendMail(server, subject, message, from, to); context.setExecutionData(SUCCEEDED, null); } catch(Exception e){ context.setExecutionData(FAILED, null); throw new ActionExecutorException(ErrorType.FAILED, ErrorCode.E0000.toString(), e.getMessage()); } } // Sending an email public void SendMail(String server, String subject, String message, String from, String to) throws Exception { // create some properties and get the default Session Properties props = new Properties(); props.setProperty("mail.smtp.host", server); Session session = Session.getDefaultInstance(props, null); // create a message Message msg = new MimeMessage(session); // set the from and to address InternetAddress addressFrom = new InternetAddress(from); msg.setFrom(addressFrom); // To is a comma separated list StringTokenizer st = new StringTokenizer(to, ","); String [] recipients = new String[st.countTokens()]; int rc = 0; while(st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++){ addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // Setting the Subject and Content Type msg.setSubject(subject); msg.setContent(message, "text/plain"); Transport.send(msg); } }
代碼1: Email自定義動作
這個實作對ActionExecutor[2]類(由Oozie提供)進行了擴充,并重寫了一些必要的方法。因為郵件的發送過程是一種非常快速的操作,是以我們決定将其實作為同步的動作處理程式,那意味着它會在Oozie的執行上下文中執行。
我們的實作(代碼1)遵循了Oozie文檔并實作了所有必需的方法:
- 對于所有自定義動作處理程式都需要沒有參數的構造函數。這個構造函數會注冊動作處理程式的名稱(使用動作名稱來調用父類),我們會在工作流XML中使用它。
- 我們可以使用InitActionType[3]方法來注冊執行動作時可能發生的異常,以及它們的類型和錯誤資訊,并為執行程式本身執行初始化操作。
- Start方法是用來啟動動作的執行操作的。因為我們實作的是同步動作,是以整個動作都會在此執行。這個方法是由Oozie調用的,它有 兩個參數:Context和WorkflowAction。Context參數提供了對Oozie工作流執行上下文的通路,其中主要包含了工作流的變量, 并提供了對其進行操作的非常簡單的API(set、get)[4]。而WorkflowAction則提供了Oozie對目前動作的定義。
- 還有Check方法,Oozie會使用它來檢查動作的狀态。它不能作為同步動作來調用。
- Kill方法,可以用來中斷運作的作業或者動作。
- End方法,可以用于所有清理動作,或者用于可能在完成動作之後所要做的處理。它還需要設定執行的結果。
部署并使用Oozie自定義動作處理程式
實作了自定義的動作執行方式之後,我們需要為新的email動作定義XML模式[5](代碼2)。
<?xml version="1.0" encoding="UTF-8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:email="uri:custom:email-action:0.1" elementFormDefault="qualified" targetNamespace="uri:custom:email-action:0.1"> <xs:complexType name="EMAIL"> <xs:sequence> <xs:element name="emailServer" type="xs:string" minOccurs="0" maxOccurs="1" /> <xs:element name="emailSubject" type="xs:string" /> <xs:element name="emailFrom" type="xs:string" /> <xs:element name="emailTo" type="xs:string" /> <xs:element name="emailBody" type="xs:string" /> </xs:sequence> </xs:complexType>
<xs:element name="eMail" type="email:EMAIL"></xs:element>
</xs:schema>
代碼2: 為email元件所用的XML schema
自定義動作節點和XML schema檔案都需要打包在單獨的jar檔案中,比方說emailAction.Jar。我們可以使用Oozie的oozie-setup.sh腳本執行下面的指令,進而把這個(以及其他所有)jar檔案添加到Oozie的war檔案中。
$ bin/oozie-setup.sh -jars emailAction.jar:mail.jar (See Adding Jars to Oozie)
代碼3: 部署指令
向Oozie添加Jar檔案 你要知道,Cloudera推薦的oozie-setup.sh指令行會重新建構你的war檔案,并且, 如果你使用網頁來監控作業,那麼就會丢失java的腳本擴充。在測試方面,我們難以同時包含-extjs和-jars選項。作為權宜之計,我們會把jar 檔案複制到${CATALINA_BASE}/webapps/oozie/WEB-INF/lib中,其中${CATALINA_BASE}代表的是 /var/lib/oozie/oozie-server。請注意,這裡存在一定的風險,如果其他人重新建構了war檔案,那麼你就會丢失這些擴充,并且 他們以後需要手動添加。對于測試來說,我們建議複制jar檔案,然而,對于生産環境,我們建議把jar添加到war檔案中。 |
現在我們需要把關于自定義執行器的資訊注冊到Oozie的運作時中。這是通過擴充oozie-site.xml完成的[6]。我們可以通過在Oozie配置檔案oozie-site.xml中添加或者修改“oozie.service.ActionService.executor.ext.classes”[7]來注冊自定義動作本身(代碼4)。
…………………………………… <property> <name>oozie.service.ActionService.executor.ext.classes</name> <value>com.navteq.assetmgmt.oozie.custom. EmailActionExecutor </value> </property> ……………………………………
代碼4: 自定義執行配置
為新動作(代碼2)所用的XML schema應該添加到oozie-site.xml中,位于屬性“oozie.service.WorkflowSchemaService.ext.schemas”[8]之下(代碼5)。
……………………………………… <property> <name>oozie.service.SchemaService.wf.ext.schemas</name> <value> emailAction.xsd</value> </property> …………………………………
代碼5: 自定義模式配置
最後,Tomcat啟動之後,我們就可以在工作流中使用自定義的動作節點了。
為了測試我們的實作,我們建立了簡單的工作流(代碼6),它會使用我們的執行器來發送email。
<!-- Copyright (c) 2011 NAVTEQ! Inc. All rights reserved. Test email Oozie Script --> <workflow-app xmlns='uri:oozie:workflow:0.1' name='emailTester'> <start to='simpleEmail'/> <action name='simpleEmail'> <eMail xlmns=“ uri:custom:email-action:0.1”> <emailSubject>test</emailSubject> <emailFrom>mike.segel@<mycompany>.com</emailFrom> <emailTo>boris.lublinsky@<mycompany>.com</emailTo> <emailMessage>This is a test message, if you can see this, Mikey did something right! :)</emailMessage> </eMail> <error to="fail"/> <ok to="end"/> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name='end'/> </workflow-app>
代碼6: 使用email自定義動作執行器的簡單工作流
結論
在這篇文章中,我們展示了如何通過建立自定義的動作執行器來擴充Oozie。這樣做讓我們可以定義和實作部門或者企業專用的Oozie語言(領域專用語言),其中具有部門或者企業的功能。這樣的領域專用語言能夠簡化特定部門或者企業的建構過程,并提高代碼的可讀性。
盡管Oozie相對還不夠成熟,但是它已經為處理包含多個map/reduce作業、并且能夠向總體的業務過程添加非map-reduce作業的過 程提供了基本的架構。随着越來越多的使用者使用Oozie并提供回報,我們相信它有足夠的潛力,可以成為Hadoop環境中強大的內建部分。
對于Oozie來說,還有很多我們沒有在這三篇文章中包含的内容。我們隻是期望它們可以引起你對Oozie工作流引擎的足夠興趣,并且能夠成為進一步研究Oozie的不錯的起點。
參考資訊
- Boris Lublinsky, Mike Segel. Introduction to Oozie.
- Boris Lublinsky, Mike Segel. Oozie by Example
- Oozie custom Action Nodes
- Oozie source code
關于作者
Boris Lublinsky是NAVTEQ公司的首席架構師,在這家公司中他的工作是為大型資料管理和處理、SOA以及實作各種 NAVTEQ的項目定義架構的願景。他還是InfoQ的SOA編輯,以及OASIS的SOA RA工作組的參與者。Boris是一位作者,還經常發表演講,他最新的一本書是《Applied SOA》。
Michael Segel在過去二十多年間一直與客戶寫作,識别并解決他們的業務問題。Michael已經作為多種角色、在多個行業中工作過。他是一位獨立顧問,總是期望能夠解決所有有挑戰的問題。Michael擁有俄亥俄州立大學的軟體工程學位。
[1] 這種類的例子可能是各種計數器操作、簡單的計算等等。
[2] 所有Oozie動作執行器都是Oozie分發程式的一部分,它們都是通過擴充這個類實作的。
[3] 在我們的實作中,我們使用的是預設的實作,這樣就是為什麼沒有在代碼中展現的原因。你可以檢視一下Oozie的源代碼[4],就可以知道在現存的Oozie動作處理程式中是如何實作這個方法的。
[4] 配置自定義執行器有兩種方式——工作流變量和/或動作配置。在我們的例子中展示的是後者,但是在實際情況中,實際上總是二者的組合。
[5] 確定不僅要定義複雜的類型,還要對元素進行定義。那是Oozie所期望的。
[6] 通常在Oozie的分發包中叫做oozie-default.xml。
chaunceyhao