https://azkaban.github.io/azkaban/docs/latest/#how-to
建立一個流程:
一個流程是一個依賴其他job的job。其他依賴項經常會運作在這個流程job之前。
1 #this is flow bar.job
2 type=command
3 dependencies=test
4 command=echo bar
這個job依賴于之前的test.job
在Azkaban中,type值得是運作的類型,command指的是一條Linux指令,同時Azkaban還支援python,java等直接運作,也就可以是hadoop的shell。
一個流程也可以作為一個節點嵌入到其他job檔案中,形成嵌入流
type=flow
flow.name=bar
Azkaban任務失敗重試及重試間隔指令
在
.job
檔案中,添加如下指令:
retries=12
retry.backoff=300000 #代表重試間隔時間
------------------------------------------------------------------------------------------------------
一、概述
原生的 Azkaban 支援的plugin類型有以下這些:
- command:Linux shell指令行任務
- gobblin:通用資料采集工具
- hadoopJava:運作hadoopMR任務
- java:原生java任務
- hive:支援執行hiveSQL
- pig:pig腳本任務
- spark:spark任務
- hdfsToTeradata:把資料從hdfs導入Teradata
- teradataToHdfs:把資料從Teradata導入hdfs
其中最簡單而且最常用的是command類型,我們在上一篇文章中已經描述了如何編寫一個command的job任務。如果使用command類型,效果其實跟在本地執行Linux shell指令一樣,這樣的話,還不如把shell放到crontable 中運作。是以我們把重點放到Azkaban支援的比較常用的四種類型:java、hadoopJava、hive、spark
二、java類型
1、代碼編寫:MyJavaJob.java
package com.dataeye.java;
public class MyJavaJob {
public static void main(String[] args) {
System.out.println("#################################");
System.out.println("#### MyJavaJob class exec... ###");
System.out.println("#################################");
}
}
2、打包成jar檔案:使用maven或者eclipse導出為jar檔案
3、編寫job檔案:java.job
type=javaprocess
classpath=./lib/*,${azkaban.home}/lib/*
java.class=com.dataeye.java.MyJavaJob
4、組成一個完整的運作包
建立一個目錄,在該目錄下建立一個lib檔案夾,把第二步打包的jar檔案放到這裡,把job檔案放到和lib檔案夾同一級的目錄下,如下所示:
完整的運作包
5、打包成zip檔案
把lib目錄和job檔案打包成zip檔案,如下的java.zip:
zip檔案
6、送出運作,過程跟之前文章介紹的步驟一樣,不再詳述,執行結果如下:
執行結果
從輸出日志可以看出,代碼已經正常執行。
以上是java類型的任務編寫和執行的過程。接下來介紹其他任務編寫的時候,隻會介紹代碼的編寫和job的編寫,其他過程與上面的一緻。
三、hadoopJava類型
1、資料準備
以下内容是運作wordcount任務時需要的輸入檔案input.txt:
1 Ross male 33 3674
2 Julie male 42 2019
3 Gloria female 45 3567
4 Carol female 36 2813
5 Malcolm male 42 2856
6 Joan female 22 2235
7 Niki female 27 3682
8 Betty female 20 3001
9 Linda male 21 2511
10 Whitney male 35 3075
11 Lily male 27 3645
12 Fred female 39 2202
13 Gary male 28 3925
14 William female 38 2056
15 Charles male 48 2981
16 Michael male 25 2606
17 Karl female 32 2260
18 Barbara male 39 2743
19 Elizabeth female 26 2726
20 Helen female 47 2457
21 Katharine male 45 3638
22 Lee female 43 3050
23 Ann male 35 2874
24 Diana male 37 3929
25 Fiona female 45 2955
26 Bob female 21 3382
27 John male 48 3677
28 Thomas female 22 2784
29 Dean male 38 2266
30 Paul female 31 2679
把input.txt檔案拷貝到hdfs的 /data/yann/input 目錄下
2、代碼準備:
package azkaban.jobtype.examples.java;
import azkaban.jobtype.javautils.AbstractHadoopJob;
import azkaban.utils.Props;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;
public class WordCount extends AbstractHadoopJob
{
private static final Logger logger = Logger.getLogger(WordCount.class);
private final String inputPath;
private final String outputPath;
private boolean forceOutputOverrite;
public WordCount(String name, Props props)
{
super(name, props);
this.inputPath = props.getString("input.path");
this.outputPath = props.getString("output.path");
this.forceOutputOverrite = props.getBoolean("force.output.overwrite", false);
}
public void run()
throws Exception
{
logger.info(String.format("Starting %s", new Object[] { getClass().getSimpleName() }));
JobConf jobconf = getJobConf();
jobconf.setJarByClass(WordCount.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(IntWritable.class);
jobconf.setMapperClass(Map.class);
jobconf.setReducerClass(Reduce.class);
jobconf.setInputFormat(TextInputFormat.class);
jobconf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.addInputPath(jobconf, new Path(this.inputPath));
FileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));
if (this.forceOutputOverrite)
{
FileSystem fs = FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);
fs.delete(FileOutputFormat.getOutputPath(jobconf), true);
}
super.run();
}
public static class Map extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
private long numRecords = 0L;
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
this.word.set(tokenizer.nextToken());
output.collect(this.word, one);
reporter.incrCounter(Counters.INPUT_WORDS, 1L);
}
if (++this.numRecords % 100L == 0L)
reporter.setStatus("Finished processing " + this.numRecords + " records " + "from the input file");
}
static enum Counters
{
INPUT_WORDS;
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext()) {
sum += ((IntWritable)values.next()).get();
}
output.collect(key, new IntWritable(sum));
}
}
}
3、編寫job檔案
wordcount.job檔案内容如下:
type=hadoopJava
job.extend=false
job.class=azkaban.jobtype.examples.java.WordCount
classpath=./lib/*,${azkaban.home}/lib/*
force.output.overwrite=true
input.path=/data/yann/input
output.path=/data/yann/output
這樣hadoopJava類型的任務已經完成,打包送出到Azkaban中執行即可
四、hive類型
1、編寫 hive.sql檔案
use azkaban;
INSERT OVERWRITE TABLE
user_table1 PARTITION (day_p='2017-02-08')
SELECT appid,uid,country,province,city
FROM user_table0 where adType=1;
以上是标準的hive的sql腳本,首先切換到azkaban資料庫,然後把user_table0 的資料插入到user_table1 表的指定day_p分區。需要先準備好 user_table0 和 user_table1 表結構和資料。
編寫完成後,把檔案放入 res 檔案夾中。
2、編寫hive.job檔案
type=hive
user.to.proxy=azkaban
classpath=./lib/*,${azkaban.home}/lib/*
azk.hive.action=execute.query
hive.script=res/hive.sql
關鍵的參數是 hive.script,該參數指定使用的sql腳本在 res目錄下的hive.sql檔案
五、spark類型
spark任務有兩種運作方式,一種是command類型,另一種是spark類型
首先準備好spark任務的代碼
package com.dataeye.template.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext}
object WordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage:WordCount <hdfs_file>")
System.exit(1)
}
System.out.println("get first param ==> " + args(0))
System.out.println("get second param ==> " + args(1))
/** spark 2.0的方式
* val spark = SparkSession.builder().appName("WordCount").getOrCreate()
*/
val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
val spark = new SQLContext(sc)
val file = spark.sparkContext.textFile(args(0))
val wordCounts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
// 資料collect 到driver端列印
wordCounts.collect().foreach(println _)
}
}
然後準備資料,資料就使用前面hadoopJava中的資料即可。
最後打包成jar檔案:spark-template-1.0-SNAPSHOT.jar
1、command類型
command類型的配置方式比較簡單,spark.job檔案如下:
type=command
command=${spark.home}/bin/spark-submit --master yarn-cluster --class com.dataeye.template.spark.WordCount lib/spark-template-1.0-SNAPSHOT.jar hdfs://de-hdfs/data/yann/info.txt paramtest
2、spark類型
type=spark
master=yarn-cluster
execution-jar=lib/spark-template-1.0-SNAPSHOT.jar
class=com.dataeye.template.spark.WordCount
params=hdfs://de-hdfs/data/yann/info.txt paramtest
以上就是Azkaban支援的幾種常用的任務類型。
------------------------------------------------------------------------------------------------------------------------------------------------------
Azkaban安裝部署
準備工作
本次安裝目錄為/usr/local/azkaban
Azkaban Web伺服器
azkaban-web-server-2.5.0.tar.gz
Azkaban執行伺服器
azkaban-executor-server-2.5.0.tar.gz
Azkaban 的資料庫檔案
azkaban-sql-script-2.5.0.tar.gz
MySQL
為Azkaban建立一個mysql庫:庫名azkaban,使用者名azkaban,密碼azkaban
1)為Azkaban建立一個資料庫:
# Exampledatabase creation command, although the db name doesn't need to be 'azkaban'
mysql> CREATEDATABASE azkaban;
2) 為Azkaban建立一個資料庫使用者:
# Exampledatabase creation command. The user name doesn't need to be 'azkaban'
mysql> CREATEUSER 'username'@'%' IDENTIFIED BY 'password';
3) 為使用者賦予Azkaban資料庫的增删查改的權限:
# Replace db,username with the ones created by the previous steps.
mysql> GRANT ALLON <database>.* to '<username>'@'%' WITH GRANT OPTION;
安裝
将安裝檔案上傳到叢集,最好上傳到安裝 hive、sqoop的機器上,友善指令的執行
建立azkaban目錄,用于存放azkaban運作程式
azkaban web伺服器安裝
解壓azkaban-web-server-2.5.0.tar.gz
指令: tar –zxvf azkaban-web-server-2.5.0.tar.gz
azkaban 執行服器安裝
解壓azkaban-executor-server-2.5.0.tar.gz
指令:tar –zxvf azkaban-executor-server-2.5.0.tar.gz
azkaban腳本導入
解壓: azkaban-sql-script-2.5.0.tar.gz
指令:tar –zxvf azkaban-sql-script-2.5.0.tar.gz
将解壓後的mysql 腳本,導入到mysql中:
進入mysql
mysql> create database azkaban;
mysql> use azkaban;
Database changed
mysql> source /usr/local/azkaban/azkaban-2.5.0/create-all-sql-2.5.0.sql;
建立SSL配置
參考位址: http://docs.codehaus.org/display/JETTY/How+to+configure+SSL
指令: keytool -keystore keystore -alias jetty -genkey -keyalg RSA
運作此指令後,會提示輸入目前生成 keystor的密碼及相應資訊,輸入的密碼請勞記,資訊如下:
輸入keystore密碼: 本次安裝輸入的密碼為123456
再次輸入新密碼:
您的名字與姓氏是什麼?
[Unknown]:
您的組織機關名稱是什麼?
[Unknown]:
您的組織名稱是什麼?
[Unknown]:
您所在的城市或區域名稱是什麼?
[Unknown]:
您所在的州或省份名稱是什麼?
[Unknown]:
該機關的兩字母國家代碼是什麼
[Unknown]: CN
CN=Unknown, OU=Unknown, O=Unknown,L=Unknown, ST=Unknown, C=CN 正确嗎?
[否]: y
輸入<jetty>的主密碼
(如果和 keystore 密碼相同,按回車):
再次輸入新密碼:
完成上述工作後,将在目前目錄生成 keystore 證書檔案,将keystore 考貝到 azkaban web伺服器根目錄中.如:cp keystore azkaban-web-2.5.0
配置檔案
注:先配置好伺服器節點上的時區
1、先生成時區配置檔案Asia/Shanghai,用互動式指令 tzselect 即可
2、拷貝該時區檔案,覆寫系統本地時區配置
cp /usr/share/zoneinfo/Asia/Shanghai etc/localtime /
azkaban web伺服器配置
進入azkaban web伺服器安裝目錄 conf目錄
v 修改azkaban.properties檔案
指令vi azkaban.properties
内容說明如下:
#Azkaban Personalization Settings
azkaban.name=Test #伺服器UI名稱,用于伺服器上方顯示的名字
azkaban.label=My Local Azkaban #描述
azkaban.color=#FF3601 #UI顔色
azkaban.default.servlet.path=/index #
web.resource.dir=web/ #預設根web目錄
default.timezone.id=Asia/Shanghai #預設時區,已改為亞洲/上海 預設為美國
#Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager #使用者權限管理預設類
user.manager.xml.file=conf/azkaban-users.xml #使用者配置,具體配置參加下文
#Loader for projects
executor.global.properties=conf/global.properties # global配置檔案所在位置
azkaban.project.dir=projects #
database.type=mysql #資料庫類型
mysql.port=3306 #端口号
mysql.host=localhost #資料庫連接配接IP
mysql.database=azkaban #資料庫執行個體名
mysql.user= azkaban #資料庫使用者名
mysql.password= azkaban #資料庫密碼
mysql.numconnections=100 #最大連接配接數
# Velocity dev mode
velocity.dev.mode=false
# Jetty伺服器屬性.
jetty.maxThreads=25 #最大線程數
jetty.ssl.port=8443 #Jetty SSL端口
jetty.port=8081 #Jetty端口
jetty.keystore=keystore #SSL檔案名
jetty.password=123456 #SSL檔案密碼
jetty.keypassword=123456 #Jetty主密碼 與 keystore檔案相同
jetty.truststore=keystore #SSL檔案名
jetty.trustpassword=123456 # SSL檔案密碼
# 執行伺服器屬性
executor.port=12321 #執行伺服器端口
# 郵件設定
mail.sender= #發送郵箱
mail.host= #發送郵箱smtp位址
mail.user= #發送郵件時顯示的名稱
mail.password= #郵箱密碼
job.failure.email= #任務失敗時發送郵件的位址
job.success.email= #任務成功時發送郵件的位址
lockdown.create.projects=false #
cache.directory=cache #緩存目錄
v 使用者配置
進入azkaban web伺服器conf目錄,修改azkaban-users.xml
vi azkaban-users.xml 增加 管理者使用者
<azkaban-users>
<user username="azkaban" password="azkaban" roles="admin" groups="azkaban" />
<user username="metrics" password="metrics" roles="metrics"/>
<user username="admin" password="admin" roles="admin,metrics" />
<role name="admin" permissions="ADMIN" />
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
v azkaban 執行伺服器executor配置
進入執行伺服器安裝目錄conf,修改azkaban.properties
vi azkaban.properties
#Azkaban
default.timezone.id=Asia/Shanghai #時區
# Azkaban JobTypes 插件配置
azkaban.jobtype.plugin.dir=plugins/jobtypes #jobtype 插件所在位置
#Loader for projects
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
#資料庫設定
database.type=mysql #資料庫類型(目前隻支援mysql)
mysql.port=3306 #資料庫端口号
mysql.host=localhost #資料庫IP位址
mysql.database=azkaban #資料庫執行個體名
mysql.user=azkaban #資料庫使用者名
mysql.password= azkaban #資料庫密碼
mysql.numconnections=100 #最大連接配接數
# 執行伺服器配置
executor.maxThreads=50 #最大線程數
executor.port=12321 #端口号(如修改,請與web服務中一緻)
executor.flow.threads=30 #線程數
啟動
web伺服器
在azkaban web伺服器目錄下執行啟動指令
bin/azkaban-web-start.sh
注:在web伺服器根目錄運作
或者啟動到背景
nohup bin/azkaban-web-start.sh 1>/tmp/azstd.out 2>/tmp/azerr.out &
執行伺服器
在執行伺服器目錄下執行啟動指令
bin/azkaban-executor-start.sh
注:隻能要執行伺服器根目錄運作
啟動完成後,在浏覽器(建議使用谷歌浏覽器)中輸入https://伺服器IP位址:8443 ,即可通路azkaban服務了.在登入中輸入剛才新的戶用名及密碼,點選 login.
Azkaban Demo
Command類型單一job示例
1、建立job描述檔案
vi command.job
#command.job
type=command
command=echo 'hello'
2、将job資源檔案打包成zip檔案
zip command.job
3、通過azkaban的web管理平台建立project并上傳job壓縮包
首先建立project
上傳zip包
4、啟動執行該job
Command類型多job工作流flow
1、建立有依賴關系的多個job描述
第一個job:one.job
# one.job
type=command
command=echo one
第二個job:two.job依賴one.job
# two.job
type=command
dependencies= one
command=echo two
2、将所有job資源檔案打到一個zip包中
3、在azkaban的web管理界面建立工程并上傳zip包
4、啟動工作流flow
5、
-------------------------------------------------------------------------------------------------------------------------------------------------------------
1.配置郵件請在azkaban-web-server中進行配置:如下圖:
/opt/azkaban/azkaban/azkaban-web-server/build/install/azkaban-web-server/conf
注意:
郵件伺服器和接受郵件對像使用了QQ郵箱。azkaban可能不支援QQ郵箱。
解決辦法:改成其它郵箱,我改成了126.com的郵箱。
2.測試:
在web UI 頁面執行個job,成功則發郵件,如下:
看結果,我輸入了三個郵箱,而且此job也執行成功,看看三個郵箱都收到了。
除了在web UI 中直接配置也可以在job裡進行配置。
azkaban配置郵件内容log連結
步驟:
1.打開azkaban server伺服器conf下的azkaban.properties檔案
2.在jetty參數配置處,添加jetty.hostname=localhost
其中:localhost:為azkaban 的server伺服器,目前伺服器的ip位址
3.重新開機azkanba 執行器和server伺服器 驗證郵件發送即可。