天天看點

Azkaban使用

https://azkaban.github.io/azkaban/docs/latest/#how-to

建立一個流程:

一個流程是一個依賴其他job的job。其他依賴項經常會運作在這個流程job之前。

1 #this is flow bar.job
2 type=command
3 dependencies=test
4 command=echo bar
           

這個job依賴于之前的test.job

在Azkaban中,type值得是運作的類型,command指的是一條Linux指令,同時Azkaban還支援python,java等直接運作,也就可以是hadoop的shell。

一個流程也可以作為一個節點嵌入到其他job檔案中,形成嵌入流

type=flow
flow.name=bar
           

Azkaban任務失敗重試及重試間隔指令

.job

檔案中,添加如下指令:

retries=12
retry.backoff=300000  #代表重試間隔時間
           

------------------------------------------------------------------------------------------------------

一、概述

原生的 Azkaban 支援的plugin類型有以下這些:

  1. command:Linux shell指令行任務
  2. gobblin:通用資料采集工具
  3. hadoopJava:運作hadoopMR任務
  4. java:原生java任務
  5. hive:支援執行hiveSQL
  6. pig:pig腳本任務
  7. spark:spark任務
  8. hdfsToTeradata:把資料從hdfs導入Teradata
  9. teradataToHdfs:把資料從Teradata導入hdfs

其中最簡單而且最常用的是command類型,我們在上一篇文章中已經描述了如何編寫一個command的job任務。如果使用command類型,效果其實跟在本地執行Linux shell指令一樣,這樣的話,還不如把shell放到crontable 中運作。是以我們把重點放到Azkaban支援的比較常用的四種類型:java、hadoopJava、hive、spark

二、java類型

1、代碼編寫:MyJavaJob.java

package com.dataeye.java;

public class MyJavaJob {

    public static void main(String[] args) {
        System.out.println("#################################");
        System.out.println("####  MyJavaJob class exec... ###");
        System.out.println("#################################");
    }

}
           

2、打包成jar檔案:使用maven或者eclipse導出為jar檔案

3、編寫job檔案:java.job

type=javaprocess

classpath=./lib/*,${azkaban.home}/lib/*

java.class=com.dataeye.java.MyJavaJob
           

4、組成一個完整的運作包

建立一個目錄,在該目錄下建立一個lib檔案夾,把第二步打包的jar檔案放到這裡,把job檔案放到和lib檔案夾同一級的目錄下,如下所示:

Azkaban使用

完整的運作包

5、打包成zip檔案

把lib目錄和job檔案打包成zip檔案,如下的java.zip:

Azkaban使用

zip檔案

6、送出運作,過程跟之前文章介紹的步驟一樣,不再詳述,執行結果如下:

Azkaban使用

執行結果

從輸出日志可以看出,代碼已經正常執行。

以上是java類型的任務編寫和執行的過程。接下來介紹其他任務編寫的時候,隻會介紹代碼的編寫和job的編寫,其他過程與上面的一緻。

三、hadoopJava類型

1、資料準備

以下内容是運作wordcount任務時需要的輸入檔案input.txt:

1   Ross    male    33  3674
2   Julie   male    42  2019
3   Gloria  female  45  3567
4   Carol   female  36  2813
5   Malcolm male    42  2856
6   Joan    female  22  2235
7   Niki    female  27  3682
8   Betty   female  20  3001
9   Linda   male    21  2511
10  Whitney male    35  3075
11  Lily    male    27  3645
12  Fred    female  39  2202
13  Gary    male    28  3925
14  William female  38  2056
15  Charles male    48  2981
16  Michael male    25  2606
17  Karl    female  32  2260
18  Barbara male    39  2743
19  Elizabeth   female  26  2726
20  Helen   female  47  2457
21  Katharine   male    45  3638
22  Lee female  43  3050
23  Ann male    35  2874
24  Diana   male    37  3929
25  Fiona   female  45  2955
26  Bob female  21  3382
27  John    male    48  3677
28  Thomas  female  22  2784
29  Dean    male    38  2266
30  Paul    female  31  2679
           

把input.txt檔案拷貝到hdfs的 /data/yann/input 目錄下

2、代碼準備:

package azkaban.jobtype.examples.java;

import azkaban.jobtype.javautils.AbstractHadoopJob;
import azkaban.utils.Props;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;

public class WordCount extends AbstractHadoopJob
{
  private static final Logger logger = Logger.getLogger(WordCount.class);
  private final String inputPath;
  private final String outputPath;
  private boolean forceOutputOverrite;

  public WordCount(String name, Props props)
  {
    super(name, props);
    this.inputPath = props.getString("input.path");
    this.outputPath = props.getString("output.path");
    this.forceOutputOverrite = props.getBoolean("force.output.overwrite", false);
  }

  public void run()
    throws Exception
  {
    logger.info(String.format("Starting %s", new Object[] { getClass().getSimpleName() }));

    JobConf jobconf = getJobConf();
    jobconf.setJarByClass(WordCount.class);

    jobconf.setOutputKeyClass(Text.class);
    jobconf.setOutputValueClass(IntWritable.class);

    jobconf.setMapperClass(Map.class);
    jobconf.setReducerClass(Reduce.class);

    jobconf.setInputFormat(TextInputFormat.class);
    jobconf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.addInputPath(jobconf, new Path(this.inputPath));
    FileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));

    if (this.forceOutputOverrite)
    {
      FileSystem fs = FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);
      fs.delete(FileOutputFormat.getOutputPath(jobconf), true);
    }

    super.run();
  }

  public static class Map extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable>
  {
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private long numRecords = 0L;

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
    {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        this.word.set(tokenizer.nextToken());
        output.collect(this.word, one);
        reporter.incrCounter(Counters.INPUT_WORDS, 1L);
      }

      if (++this.numRecords % 100L == 0L)
        reporter.setStatus("Finished processing " + this.numRecords + " records " + "from the input file");
    }

    static enum Counters
    {
      INPUT_WORDS;
    }
  }

  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable>
  {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException
    {
      int sum = 0;
      while (values.hasNext()) {
        sum += ((IntWritable)values.next()).get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }
}
           

3、編寫job檔案

wordcount.job檔案内容如下:

type=hadoopJava

job.extend=false

job.class=azkaban.jobtype.examples.java.WordCount

classpath=./lib/*,${azkaban.home}/lib/*

force.output.overwrite=true

input.path=/data/yann/input

output.path=/data/yann/output
           

這樣hadoopJava類型的任務已經完成,打包送出到Azkaban中執行即可

四、hive類型

1、編寫 hive.sql檔案

use azkaban;

INSERT OVERWRITE TABLE 
 user_table1 PARTITION (day_p='2017-02-08') 
SELECT appid,uid,country,province,city 
 FROM user_table0 where adType=1;
           

以上是标準的hive的sql腳本,首先切換到azkaban資料庫,然後把user_table0 的資料插入到user_table1 表的指定day_p分區。需要先準備好 user_table0 和 user_table1 表結構和資料。

編寫完成後,把檔案放入 res 檔案夾中。

2、編寫hive.job檔案

type=hive

user.to.proxy=azkaban

classpath=./lib/*,${azkaban.home}/lib/*

azk.hive.action=execute.query

hive.script=res/hive.sql
           

關鍵的參數是 hive.script,該參數指定使用的sql腳本在 res目錄下的hive.sql檔案

五、spark類型

spark任務有兩種運作方式,一種是command類型,另一種是spark類型

首先準備好spark任務的代碼

package com.dataeye.template.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext}

object WordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage:WordCount <hdfs_file>")
      System.exit(1)
    }

    System.out.println("get first param ==> " + args(0))
    System.out.println("get second param ==> " + args(1))

    /** spark 2.0的方式
      * val spark = SparkSession.builder().appName("WordCount").getOrCreate()
      */
    val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
    val spark = new SQLContext(sc)
    val file = spark.sparkContext.textFile(args(0))
    val wordCounts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    // 資料collect 到driver端列印
    wordCounts.collect().foreach(println _)
  }
}

           

然後準備資料,資料就使用前面hadoopJava中的資料即可。

最後打包成jar檔案:spark-template-1.0-SNAPSHOT.jar

1、command類型

command類型的配置方式比較簡單,spark.job檔案如下:

type=command

command=${spark.home}/bin/spark-submit --master yarn-cluster --class com.dataeye.template.spark.WordCount lib/spark-template-1.0-SNAPSHOT.jar   hdfs://de-hdfs/data/yann/info.txt   paramtest
           

2、spark類型

type=spark

master=yarn-cluster
execution-jar=lib/spark-template-1.0-SNAPSHOT.jar
class=com.dataeye.template.spark.WordCount
params=hdfs://de-hdfs/data/yann/info.txt  paramtest
           

以上就是Azkaban支援的幾種常用的任務類型。

------------------------------------------------------------------------------------------------------------------------------------------------------

Azkaban安裝部署

準備工作

本次安裝目錄為/usr/local/azkaban

Azkaban Web伺服器

azkaban-web-server-2.5.0.tar.gz

Azkaban執行伺服器 

azkaban-executor-server-2.5.0.tar.gz

Azkaban 的資料庫檔案

azkaban-sql-script-2.5.0.tar.gz

MySQL

為Azkaban建立一個mysql庫:庫名azkaban,使用者名azkaban,密碼azkaban

1)為Azkaban建立一個資料庫:

# Exampledatabase creation command, although the db name doesn't need to be 'azkaban'

mysql> CREATEDATABASE azkaban;

2) 為Azkaban建立一個資料庫使用者:

# Exampledatabase creation command. The user name doesn't need to be 'azkaban'

mysql> CREATEUSER 'username'@'%' IDENTIFIED BY 'password';

3) 為使用者賦予Azkaban資料庫的增删查改的權限:

# Replace db,username with the ones created by the previous steps.

mysql> GRANT ALLON <database>.* to '<username>'@'%' WITH GRANT OPTION;

安裝

将安裝檔案上傳到叢集,最好上傳到安裝 hive、sqoop的機器上,友善指令的執行

建立azkaban目錄,用于存放azkaban運作程式

azkaban web伺服器安裝

解壓azkaban-web-server-2.5.0.tar.gz

指令: tar –zxvf azkaban-web-server-2.5.0.tar.gz

azkaban 執行服器安裝

解壓azkaban-executor-server-2.5.0.tar.gz

指令:tar –zxvf azkaban-executor-server-2.5.0.tar.gz

azkaban腳本導入

解壓: azkaban-sql-script-2.5.0.tar.gz

指令:tar –zxvf azkaban-sql-script-2.5.0.tar.gz

将解壓後的mysql 腳本,導入到mysql中:

進入mysql

mysql> create database azkaban;

mysql> use azkaban;

Database changed

mysql> source /usr/local/azkaban/azkaban-2.5.0/create-all-sql-2.5.0.sql;

建立SSL配置

參考位址: http://docs.codehaus.org/display/JETTY/How+to+configure+SSL

指令: keytool -keystore keystore -alias jetty -genkey -keyalg RSA

運作此指令後,會提示輸入目前生成 keystor的密碼及相應資訊,輸入的密碼請勞記,資訊如下:

輸入keystore密碼:  本次安裝輸入的密碼為123456

再次輸入新密碼:

您的名字與姓氏是什麼?

  [Unknown]: 

您的組織機關名稱是什麼?

  [Unknown]: 

您的組織名稱是什麼?

  [Unknown]: 

您所在的城市或區域名稱是什麼?

  [Unknown]: 

您所在的州或省份名稱是什麼?

  [Unknown]: 

該機關的兩字母國家代碼是什麼

  [Unknown]:  CN

CN=Unknown, OU=Unknown, O=Unknown,L=Unknown, ST=Unknown, C=CN 正确嗎?

  [否]:  y

輸入<jetty>的主密碼

        (如果和 keystore 密碼相同,按回車): 

再次輸入新密碼:

完成上述工作後,将在目前目錄生成 keystore 證書檔案,将keystore 考貝到 azkaban web伺服器根目錄中.如:cp keystore azkaban-web-2.5.0

配置檔案

注:先配置好伺服器節點上的時區

1、先生成時區配置檔案Asia/Shanghai,用互動式指令 tzselect 即可

2、拷貝該時區檔案,覆寫系統本地時區配置

cp /usr/share/zoneinfo/Asia/Shanghai etc/localtime  /

azkaban web伺服器配置

進入azkaban web伺服器安裝目錄 conf目錄

v  修改azkaban.properties檔案

指令vi azkaban.properties

内容說明如下:

#Azkaban Personalization Settings

azkaban.name=Test                           #伺服器UI名稱,用于伺服器上方顯示的名字

azkaban.label=My Local Azkaban                               #描述

azkaban.color=#FF3601                                                 #UI顔色

azkaban.default.servlet.path=/index                         #

web.resource.dir=web/                                                 #預設根web目錄

default.timezone.id=Asia/Shanghai                           #預設時區,已改為亞洲/上海 預設為美國

#Azkaban UserManager class

user.manager.class=azkaban.user.XmlUserManager   #使用者權限管理預設類

user.manager.xml.file=conf/azkaban-users.xml              #使用者配置,具體配置參加下文

#Loader for projects

executor.global.properties=conf/global.properties    # global配置檔案所在位置

azkaban.project.dir=projects                                                #

database.type=mysql                                                              #資料庫類型

mysql.port=3306                                                                       #端口号

mysql.host=localhost                                                      #資料庫連接配接IP

mysql.database=azkaban                                                       #資料庫執行個體名

mysql.user= azkaban                                                                 #資料庫使用者名

mysql.password= azkaban                                                        #資料庫密碼

mysql.numconnections=100                                                  #最大連接配接數

# Velocity dev mode

velocity.dev.mode=false

# Jetty伺服器屬性.

jetty.maxThreads=25                                                               #最大線程數

jetty.ssl.port=8443                                                                   #Jetty SSL端口

jetty.port=8081                                                                         #Jetty端口

jetty.keystore=keystore                                                            #SSL檔案名

jetty.password=123456                                                             #SSL檔案密碼

jetty.keypassword=123456                                                      #Jetty主密碼 與 keystore檔案相同

jetty.truststore=keystore                                                            #SSL檔案名

jetty.trustpassword=123456                                                   # SSL檔案密碼

# 執行伺服器屬性

executor.port=12321                                                               #執行伺服器端口

# 郵件設定

mail.sender=                                                    #發送郵箱

mail.host=                                                                 #發送郵箱smtp位址

mail.user=                                                #發送郵件時顯示的名稱

mail.password=                                                          #郵箱密碼

job.failure.email=                                               #任務失敗時發送郵件的位址

job.success.email=                                             #任務成功時發送郵件的位址

lockdown.create.projects=false                                           #

cache.directory=cache                                                            #緩存目錄

v  使用者配置

進入azkaban web伺服器conf目錄,修改azkaban-users.xml

vi azkaban-users.xml 增加 管理者使用者

<azkaban-users>

        <user username="azkaban" password="azkaban" roles="admin" groups="azkaban" />

        <user username="metrics" password="metrics" roles="metrics"/>

        <user username="admin" password="admin" roles="admin,metrics" />

        <role name="admin" permissions="ADMIN" />

        <role name="metrics" permissions="METRICS"/>

</azkaban-users>

v  azkaban 執行伺服器executor配置

進入執行伺服器安裝目錄conf,修改azkaban.properties

vi azkaban.properties

#Azkaban

default.timezone.id=Asia/Shanghai                                              #時區

# Azkaban JobTypes 插件配置

azkaban.jobtype.plugin.dir=plugins/jobtypes                   #jobtype 插件所在位置

#Loader for projects

executor.global.properties=conf/global.properties

azkaban.project.dir=projects

#資料庫設定

database.type=mysql                                                                       #資料庫類型(目前隻支援mysql)

mysql.port=3306                                                                                #資料庫端口号

mysql.host=localhost                                                                 #資料庫IP位址

mysql.database=azkaban                                                                #資料庫執行個體名

mysql.user=azkaban                                                                       #資料庫使用者名

mysql.password= azkaban                                  #資料庫密碼

mysql.numconnections=100                                                           #最大連接配接數

# 執行伺服器配置

executor.maxThreads=50                                                                #最大線程數

executor.port=12321                                                               #端口号(如修改,請與web服務中一緻)

executor.flow.threads=30                                                                #線程數

啟動

web伺服器

在azkaban web伺服器目錄下執行啟動指令

bin/azkaban-web-start.sh

注:在web伺服器根目錄運作

或者啟動到背景

nohup bin/azkaban-web-start.sh 1>/tmp/azstd.out 2>/tmp/azerr.out &

執行伺服器

在執行伺服器目錄下執行啟動指令

bin/azkaban-executor-start.sh

注:隻能要執行伺服器根目錄運作

啟動完成後,在浏覽器(建議使用谷歌浏覽器)中輸入https://伺服器IP位址:8443 ,即可通路azkaban服務了.在登入中輸入剛才新的戶用名及密碼,點選 login.

Azkaban Demo

Command類型單一job示例

1、建立job描述檔案

vi command.job

#command.job

type=command                                                   

command=echo 'hello'

2、将job資源檔案打包成zip檔案

zip command.job

3、通過azkaban的web管理平台建立project并上傳job壓縮包

首先建立project

Azkaban使用
Azkaban使用

上傳zip包

Azkaban使用

4、啟動執行該job

Azkaban使用

Command類型多job工作流flow

1、建立有依賴關系的多個job描述

第一個job:one.job

# one.job

type=command

command=echo one

第二個job:two.job依賴one.job

# two.job

type=command

dependencies= one

command=echo two

2、将所有job資源檔案打到一個zip包中

Azkaban使用

3、在azkaban的web管理界面建立工程并上傳zip包

4、啟動工作流flow

Azkaban使用

5、

Azkaban使用

-------------------------------------------------------------------------------------------------------------------------------------------------------------

1.配置郵件請在azkaban-web-server中進行配置:如下圖:

     /opt/azkaban/azkaban/azkaban-web-server/build/install/azkaban-web-server/conf

Azkaban使用
Azkaban使用

注意:

          郵件伺服器和接受郵件對像使用了QQ郵箱。azkaban可能不支援QQ郵箱。

          解決辦法:改成其它郵箱,我改成了126.com的郵箱。

2.測試:

     在web UI 頁面執行個job,成功則發郵件,如下:

Azkaban使用
Azkaban使用

看結果,我輸入了三個郵箱,而且此job也執行成功,看看三個郵箱都收到了。

除了在web UI 中直接配置也可以在job裡進行配置。

azkaban配置郵件内容log連結

步驟:

1.打開azkaban server伺服器conf下的azkaban.properties檔案

2.在jetty參數配置處,添加jetty.hostname=localhost

其中:localhost:為azkaban 的server伺服器,目前伺服器的ip位址

3.重新開機azkanba 執行器和server伺服器 驗證郵件發送即可。

繼續閱讀