天天看點

擴充Oozie

在前面的兩篇文章中[1,2],我們描述了Oozie工作流伺服器,并且展示了幾個工作流的示例。我們還描述了針對Oozie的工作流的部署和配置,以及用來啟動、停止和監控Oozie工作流的工具。

在本文中,我們會向你展示Oozie的可擴充性,并說明它是如何支援我們實作自定義的、協同工作的語言擴充。

為什麼需要自定義節點(Custom Node)?

正如我們在文章[1]中所說明的,Oozie之是以與衆不同,是因為它提供了一種“最小化”的工作流語言,其中隻包含少數幾種控制和動作節點。盡管 其中的一種動作節點是java動作節點,它讓我們可以從Oozie工作流調用任意一個帶有main方法的java類,但這種方法并非總是最佳的。原因之一 就在于,java動作是在Hadoop簇集中作為map-reduce作業執行的,并且隻帶有唯一的Mapper任務。一方面,這帶來了很多好處:

  • 它擁有内建的可伸縮性以及對map/reduce架構的災難恢複支援,這樣我們就不必這些特性建構到Oozie中。
  • 外部執行機制,這讓Oozie引擎變得更輕量級,進而可以支援更多并發運作的過程。

另一方面,這種方法也有一些缺點:

  • 它把每個java節點都作為mapper任務啟動,這會導緻在Hadoop簇集中啟動新的JVM而産生額外的開銷。
  • 在外部執行java類導緻了額外的網絡傳輸,用于與Oozie伺服器同步這些執行結果。
  • 從java節點傳遞參數成了非常耗費資源的操作。

盡管如此,在運作時間相對較長(幾分鐘甚至幾小時)的map/reduce或者Pig作業中,好處會大大超過負載的缺點,但是,在簡單的java節 點中(參見[2]),我們就需要注意外部執行所導緻的開銷了。是以,使用自定義動作節點的原因之一,就是為了支援在Oozie的執行上下文中直接執行輕量 級的java類[1]。

使用自定義動作的另一個原因是為了提高工作流的語義和可讀性。由于Oozie是一種支援基于Hadoop處理元件的工作流引擎,是以它的文法完全是 以Hadoop執行為中心的——Hadoop檔案系統、map/reduce、Pig等等。這種文法能夠很好地符合Hadoop開發者的習慣,但是并沒有 涉及到太多關于給定動作的功能資訊。我們可以為動作本身制定與業務相關的命名轉換規則,但這隻是特别用來解決問題的——對動作的命名隻反映了給定過程的語 法,既不是總體上的主題領域,也不能解決動作參數的問題,那些問題仍然隻能由開發者來解決。

幸運的是,Oozie支援非常棒的擴充機制——自定義動作節點[3],它讓我們可以很容易地解決這兩個問題。自定義的動作節點讓我們可以使用附加的動作(動詞)來擴充Oozie的語言。Oozie的動作節點可以是同步的,也可以是異步的。

  • 同步節點——它在Oozie内部執行,在繼續執行之前會等待這些節點完成動作。這些節點是為輕量級的任務所用的,像自定義計算,檔案系統的移動、建立目錄、删除等等。
  • 異步節點——它是由Oozie啟動的,但是在Oozie引擎的外部執行,它會監控正在執行的動作,直到完成。這是通過動作的回調或者Oozie針對動作狀态的polling操作完成的。

實作Oozie自定義動作處理程式

在這個例子中,我們會對獨立的郵件程式進行轉換,并展現到自定義的email動作中[2](代碼1)。

package com.navteq.assetmgmt.oozie.custom;  import java.util.Properties; import java.util.StringTokenizer;  import javax.mail.Message; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage;  import org.apache.oozie.ErrorCode; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.action.ActionExecutorException.ErrorType; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.Namespace;  public class EmailActionExecutor extends ActionExecutor {           private static final String NODENAME = "eMail";           private static final String SUCCEEDED = "OK";   private static final String FAILED = "FAIL";   private static final String KILLED = "KILLED";           private static final String DEFAULMAILSERVER = "imailchi.navtech.com";          private static final String EMAILSERVER = "emailServer";          private static final String SUBJECT = "emailSubject";          private static final String MESSAGE = "emailBody";          private static final String FROM = "emailFrom";          private static final String TO = "emailTo";    public EmailActionExecutor() {     super(NODENAME); }           @Override          public void check(Context context, WorkflowAction action) throws ActionExecutorException {                    // Should not be called for synch operation                   throw new UnsupportedOperationException();          }           @Override          public void end(Context context, WorkflowAction action)throws ActionExecutorException {                    String externalStatus = action.getExternalStatus();                   WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ?                          WorkflowAction.Status.OK : WorkflowAction.Status.ERROR;                   context.setEndData(status, getActionSignal(status));          }           @Override          public boolean isCompleted(String arg0) {                    return true;          }           @Override          public void kill(Context context, WorkflowAction action) throws ActionExecutorException {  context.setExternalStatus(KILLED); context.setExecutionData(KILLED, null);          }           @Override          public void start(Context context, WorkflowAction action) throws ActionExecutorException {                    // Get parameters from Node configuration                   try{                            Element actionXml = XmlUtils.parseXml(action.getConf());                            Namespace ns = Namespace.getNamespace("uri:custom:email-action:0.1");                             String server = actionXml.getChildTextTrim(EMAILSERVER, ns);                            String subject = actionXml.getChildTextTrim(SUBJECT, ns);                            String message = actionXml.getChildTextTrim(MESSAGE, ns);                            String from = actionXml.getChildTextTrim(FROM, ns);                            String to = actionXml.getChildTextTrim(TO, ns);                             // Check if all parameters are there                            if(server == null)                                     server = DEFAULMAILSERVER;                            if((message == null) || (from == null) || (to == null))                                     throw new ActionExecutorException(ErrorType.FAILED,    ErrorCode.E0000.toString(), "Not all parameters are defined");                             // Execute action synchronously                            SendMail(server, subject, message, from, to);                            context.setExecutionData(SUCCEEDED, null);                 }                 catch(Exception e){      context.setExecutionData(FAILED, null);                        throw new ActionExecutorException(ErrorType.FAILED,    ErrorCode.E0000.toString(), e.getMessage());                 }      }       // Sending an email      public void SendMail(String server, String subject, String message,                            String from, String to) throws Exception {                  // create some properties and get the default Session                 Properties props = new Properties();                 props.setProperty("mail.smtp.host", server);                 Session session = Session.getDefaultInstance(props, null);                  // create a message                 Message msg = new MimeMessage(session);                  // set the from and to address                 InternetAddress addressFrom = new InternetAddress(from);                 msg.setFrom(addressFrom);                  // To is a comma separated list                 StringTokenizer st = new StringTokenizer(to, ",");                 String [] recipients = new String[st.countTokens()];                 int rc = 0;                 while(st.hasMoreTokens())                         recipients[rc++] = st.nextToken();                 InternetAddress[] addressTo = new InternetAddress[recipients.length];                 for (int i = 0; i < recipients.length; i++){                         addressTo[i] = new InternetAddress(recipients[i]);                 }                 msg.setRecipients(Message.RecipientType.TO, addressTo);                  // Setting the Subject and Content Type                 msg.setSubject(subject);                 msg.setContent(message, "text/plain");                 Transport.send(msg);      } }       

代碼1: Email自定義動作

這個實作對ActionExecutor[2]類(由Oozie提供)進行了擴充,并重寫了一些必要的方法。因為郵件的發送過程是一種非常快速的操作,是以我們決定将其實作為同步的動作處理程式,那意味着它會在Oozie的執行上下文中執行。

我們的實作(代碼1)遵循了Oozie文檔并實作了所有必需的方法:

  • 對于所有自定義動作處理程式都需要沒有參數的構造函數。這個構造函數會注冊動作處理程式的名稱(使用動作名稱來調用父類),我們會在工作流XML中使用它。
  • 我們可以使用InitActionType[3]方法來注冊執行動作時可能發生的異常,以及它們的類型和錯誤資訊,并為執行程式本身執行初始化操作。
  • Start方法是用來啟動動作的執行操作的。因為我們實作的是同步動作,是以整個動作都會在此執行。這個方法是由Oozie調用的,它有 兩個參數:Context和WorkflowAction。Context參數提供了對Oozie工作流執行上下文的通路,其中主要包含了工作流的變量, 并提供了對其進行操作的非常簡單的API(set、get)[4]。而WorkflowAction則提供了Oozie對目前動作的定義。
  • 還有Check方法,Oozie會使用它來檢查動作的狀态。它不能作為同步動作來調用。
  • Kill方法,可以用來中斷運作的作業或者動作。
  • End方法,可以用于所有清理動作,或者用于可能在完成動作之後所要做的處理。它還需要設定執行的結果。

部署并使用Oozie自定義動作處理程式

實作了自定義的動作執行方式之後,我們需要為新的email動作定義XML模式[5](代碼2)。

<?xml version="1.0" encoding="UTF-8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"                    xmlns:email="uri:custom:email-action:0.1"      elementFormDefault="qualified"      targetNamespace="uri:custom:email-action:0.1"> <xs:complexType name="EMAIL">         <xs:sequence>                  <xs:element name="emailServer" type="xs:string" minOccurs="0" maxOccurs="1" />                   <xs:element name="emailSubject" type="xs:string" />                  <xs:element name="emailFrom" type="xs:string" />                   <xs:element name="emailTo" type="xs:string" />                  <xs:element name="emailBody" type="xs:string" />         </xs:sequence> </xs:complexType>       

<xs:element name="eMail" type="email:EMAIL"></xs:element>

</xs:schema>

代碼2: 為email元件所用的XML schema

自定義動作節點和XML schema檔案都需要打包在單獨的jar檔案中,比方說emailAction.Jar。我們可以使用Oozie的oozie-setup.sh腳本執行下面的指令,進而把這個(以及其他所有)jar檔案添加到Oozie的war檔案中。

$ bin/oozie-setup.sh -jars emailAction.jar:mail.jar (See Adding Jars to Oozie)       

代碼3: 部署指令

向Oozie添加Jar檔案

你要知道,Cloudera推薦的oozie-setup.sh指令行會重新建構你的war檔案,并且, 如果你使用網頁來監控作業,那麼就會丢失java的腳本擴充。在測試方面,我們難以同時包含-extjs和-jars選項。作為權宜之計,我們會把jar 檔案複制到${CATALINA_BASE}/webapps/oozie/WEB-INF/lib中,其中${CATALINA_BASE}代表的是 /var/lib/oozie/oozie-server。請注意,這裡存在一定的風險,如果其他人重新建構了war檔案,那麼你就會丢失這些擴充,并且 他們以後需要手動添加。對于測試來說,我們建議複制jar檔案,然而,對于生産環境,我們建議把jar添加到war檔案中。

現在我們需要把關于自定義執行器的資訊注冊到Oozie的運作時中。這是通過擴充oozie-site.xml完成的[6]。我們可以通過在Oozie配置檔案oozie-site.xml中添加或者修改“oozie.service.ActionService.executor.ext.classes”[7]來注冊自定義動作本身(代碼4)。

…………………………………… <property>        <name>oozie.service.ActionService.executor.ext.classes</name>        <value>com.navteq.assetmgmt.oozie.custom. EmailActionExecutor </value> </property> ……………………………………      

代碼4: 自定義執行配置

為新動作(代碼2)所用的XML schema應該添加到oozie-site.xml中,位于屬性“oozie.service.WorkflowSchemaService.ext.schemas”[8]之下(代碼5)。

……………………………………… <property>        <name>oozie.service.SchemaService.wf.ext.schemas</name>        <value> emailAction.xsd</value> </property> …………………………………      

代碼5: 自定義模式配置

最後,Tomcat啟動之後,我們就可以在工作流中使用自定義的動作節點了。

為了測試我們的實作,我們建立了簡單的工作流(代碼6),它會使用我們的執行器來發送email。

<!-- Copyright (c) 2011 NAVTEQ! Inc. All rights reserved. Test email Oozie Script --> <workflow-app xmlns='uri:oozie:workflow:0.1' name='emailTester'>  <start to='simpleEmail'/>   <action name='simpleEmail'>    <eMail xlmns=“ uri:custom:email-action:0.1”>      <emailSubject>test</emailSubject>      <emailFrom>mike.segel@<mycompany>.com</emailFrom>       <emailTo>boris.lublinsky@<mycompany>.com</emailTo>      <emailMessage>This is a test message, if you can see this, Mikey did something right! :)</emailMessage>    </eMail>   <error to="fail"/>    <ok to="end"/>  </action>  <kill name="fail">    <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>   </kill>  <end name='end'/> </workflow-app>      

代碼6: 使用email自定義動作執行器的簡單工作流

結論

在這篇文章中,我們展示了如何通過建立自定義的動作執行器來擴充Oozie。這樣做讓我們可以定義和實作部門或者企業專用的Oozie語言(領域專用語言),其中具有部門或者企業的功能。這樣的領域專用語言能夠簡化特定部門或者企業的建構過程,并提高代碼的可讀性。

盡管Oozie相對還不夠成熟,但是它已經為處理包含多個map/reduce作業、并且能夠向總體的業務過程添加非map-reduce作業的過 程提供了基本的架構。随着越來越多的使用者使用Oozie并提供回報,我們相信它有足夠的潛力,可以成為Hadoop環境中強大的內建部分。

對于Oozie來說,還有很多我們沒有在這三篇文章中包含的内容。我們隻是期望它們可以引起你對Oozie工作流引擎的足夠興趣,并且能夠成為進一步研究Oozie的不錯的起點。

參考資訊

  1. Boris Lublinsky, Mike Segel. Introduction to Oozie.
  2. Boris Lublinsky, Mike Segel. Oozie by Example
  3. Oozie custom Action Nodes 
  4. Oozie source code 

關于作者

Boris Lublinsky是NAVTEQ公司的首席架構師,在這家公司中他的工作是為大型資料管理和處理、SOA以及實作各種 NAVTEQ的項目定義架構的願景。他還是InfoQ的SOA編輯,以及OASIS的SOA RA工作組的參與者。Boris是一位作者,還經常發表演講,他最新的一本書是《Applied SOA》。

Michael Segel在過去二十多年間一直與客戶寫作,識别并解決他們的業務問題。Michael已經作為多種角色、在多個行業中工作過。他是一位獨立顧問,總是期望能夠解決所有有挑戰的問題。Michael擁有俄亥俄州立大學的軟體工程學位。

[1] 這種類的例子可能是各種計數器操作、簡單的計算等等。

[2] 所有Oozie動作執行器都是Oozie分發程式的一部分,它們都是通過擴充這個類實作的。

[3] 在我們的實作中,我們使用的是預設的實作,這樣就是為什麼沒有在代碼中展現的原因。你可以檢視一下Oozie的源代碼[4],就可以知道在現存的Oozie動作處理程式中是如何實作這個方法的。

[4] 配置自定義執行器有兩種方式——工作流變量和/或動作配置。在我們的例子中展示的是後者,但是在實際情況中,實際上總是二者的組合。

[5] 確定不僅要定義複雜的類型,還要對元素進行定義。那是Oozie所期望的。

[6] 通常在Oozie的分發包中叫做oozie-default.xml。

chaunceyhao