天天看點

elastic-job部署以及簡單例子

原文連結:https://www.2cto.com/kf/201612/577514.html

elastic-job部署以及簡單例子:elastic-job是當當開發的基于qutarz以及zookeeper封裝的作業排程工具,主要有兩個大架構,一個是elastic-job lite另外一個是elastic-job cloud,其中qutarz是一個開源的作業排程工具,zookeeper是分布式排程工具,這兩者結合搭建了elastic-job-lite,這是一個無中心節點的排程,而elastic-job-cloud是一個有中心節點的分布式排程開源工具,隻需要設定好機器以及分片,就可以自動的排程到對應的機器上運作。

與lite的不同時cloud采用了mesos來進行分布式資源管理,簡單的來說兩者的不同是:同一個作業在兩台機器上跑,lite需要手動在兩台機器上跑,但是cloud隻需要上傳作業包,就可以自動的在兩台機器上跑,因為lite不支援作業的排程,為無中心的。

二、環境的搭建

由于elastic-job-cloud的環境暫時未搭建出來,是以在此簡單介紹lite的搭建

(1)jdk的安裝

jdk需要1.7以上,因為裡面有spring相關的代碼,具體的安裝請自行百度,或參考連結https://blog.csdn.net/molong1208/article/details/50537898

(2)zookeeper的安裝

具體的安裝過程見連結https://blog.csdn.net/molong1208/article/details/53675063

(3)maven的安裝

官網maven要求3.0.4以及以上,具體的安裝過程與jdk類似,請自行百度

三、elastic-job-lite的優勢及特點

(1)簡單的概念及适用場景

1. 分片概念

任務的分布式執行,需要将一個任務拆分為n個獨立的任務項,然後由分布式的伺服器分别執行某一個或幾個分片項。

例如:有一個周遊資料庫某張表的作業,現有2台伺服器。為了快速的執行作業,那麼每台伺服器應執行作業的50%。 為滿足此需求,可将作業分成2片,每台伺服器執行1片。作業周遊資料的邏輯應為:伺服器A周遊ID以奇數結尾的資料;伺服器B周遊ID以偶數結尾的資料。 如果分成10片,則作業周遊資料的邏輯應為:每片分到的分片項應為ID%10,而伺服器A被配置設定到分片項0,1,2,3,4;伺服器B被配置設定到分片項5,6,7,8,9,直接的結果就是伺服器A周遊ID以0-4結尾的資料;伺服器B周遊ID以5-9結尾的資料。

2. 分片項與業務處了解耦

Elastic-Job并不直接提供資料處理的功能,架構隻會将分片項配置設定至各個運作中的作業伺服器,開發者需要自行處理分片項與真實資料的對應關系。

3. 個性化參數的适用場景

個性化參數即shardingItemParameter,可以和分片項比對對應關系,用于将分片項的數字轉換為更加可讀的業務代碼。

例如:按照地區水準拆分資料庫,資料庫A是北京的資料;資料庫B是上海的資料;資料庫C是廣州的資料。 如果僅按照分片項配置,開發者需要了解0表示北京;1表示上海;2表示廣州。 合理使用個性化參數可以讓代碼更可讀,如果配置為0=北京,1=上海,2=廣州,那麼代碼中直接使用北京,上海,廣州的枚舉值即可完成分片項和業務邏輯的對應關系。

(2)elastic-job-lite優勢及特點

1. 分布式排程

Elastic-Job-Lite并無作業排程中心節點,而是基于部署作業架構的程式在到達相應時間點時各自觸發排程。

注冊中心僅用于作業注冊和監控資訊存儲。而主作業節點僅用于處理分片和清理等功能。

2. 作業高可用

Elastic-Job-Lite提供最安全的方式執行作業。将分片總數設定為1,并使用多于1台的伺服器執行作業,作業将會以1主n從的方式執行。

一旦執行作業的伺服器崩潰,等待執行的伺服器将會在下次作業啟動時替補執行。開啟失效轉移功能效果更好,可以保證在本次作業執行時崩潰,備機立即啟動替補執行。

3. 最大限度利用資源

Elastic-Job-Lite也提供最靈活的方式,最大限度的提高執行作業的吞吐量。将分片項設定為大于伺服器的數量,最好是大于伺服器倍數的數量,作業将會合理的利用分布式資源,動态的配置設定分片項。

例如:3台伺服器,分成10片,則分片項配置設定結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。 如果伺服器C崩潰,則分片項配置設定結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9。在不丢失分片項的情況下,最大限度的利用現有資源提高吞吐量。

三、簡單的例子

elastic-job的作業類型分為三種,一種是簡單的simple的形式,一種是基于流式資料的處理,一種是基于腳本的排程,因為本人所使用的情況是基于流式的處理,那麼就簡單搭了一個基于流式的demo,其他類型的類似

流式作業的方式适合于不間斷的資料處理的類型,例如需要拉取訂單資料,因為訂單是連續不間斷的,是以需要一直拉取。

按照elastic-job官網上介紹,搭建一個基于dataflow(流式處理)的demo,這個demo的功能就是,從一個資料中心裡面取資料,按照資料中心的資料id%分片個數==分片參數進行拉取資料,拉取完成後将對應的資料id置為完成的狀态,具體代碼如下所示:

(1)入口函數main函數以及作業的配置

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35

package

ElasticJobExample.ElasticJobExample;

import

com.dangdang.ddframe.job.config.JobCoreConfiguration;

import

com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;

import

com.dangdang.ddframe.job.lite.api.JobScheduler;

import

com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;

import

com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;

import

com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;

import

com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

public

class

App

{

public

static

void

main(String[] args) {

new

JobScheduler(createRegistryCenter(), createJobConfiguration()).init();

}

private

static

CoordinatorRegistryCenter createRegistryCenter() {

CoordinatorRegistryCenter regCenter =

new

ZookeeperRegistryCenter(

new

ZookeeperConfiguration(

"ip:2181"

,

"elastic-job-demo"

));

regCenter.init();

return

regCenter;

}

private

static

LiteJobConfiguration createJobConfiguration() {

// 建立作業配置

JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(

"myDataFlowTest"

,

"0/10 * * * * ?"

,

3

).shardingItemParameters(

"0=0,1=1,2=2"

).build();

DataflowJobConfiguration dataflowJobConfig =

new

DataflowJobConfiguration(coreConfig, JavaDataflowJob.

class

.getCanonicalName(),

true

);

LiteJobConfiguration result = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();

return

result;

}

}

(2)作業的邏輯處理部分

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33

package

ElasticJobExample.ElasticJobExample;

import

java.util.ArrayList;

import

java.util.Date;

import

java.util.List;

import

com.dangdang.ddframe.job.api.ShardingContext;

import

com.dangdang.ddframe.job.api.dataflow.DataflowJob;

import

dataflowjob.entity.Foo;

import

dataflowjob.process.DataProcess;

import

dataflowjob.process.DataProcessFactory;

public

class

JavaDataflowJob

implements

DataflowJob<foo> {

private

DataProcess dataProcess = DataProcessFactory.getDataProcess();

@Override

public

List<foo> fetchData(ShardingContext context) {

List<foo> result =

new

ArrayList<foo>();

result = dataProcess.getData(context.getShardingParameter(), context.getShardingTotalCount());

System.out.println(String.format(

"------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s"

, Thread.currentThread().getId(),

new

Date(), context,

"fetch data"

,result));

return

result;

}

@Override

public

void

processData(ShardingContext shardingContext, List<foo> data) {

System.out.println(String.format(

"------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s"

, Thread.currentThread().getId(),

new

Date(), shardingContext,

"finish data"

,data));

for

(Foo foo:data){

dataProcess.setData(foo.getId());

}

}

}</foo></foo></foo></foo></foo>

(3)具體的處理類

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37

package

dataflowjob.process;

import

java.util.ArrayList;

import

java.util.List;

import

java.util.Map;

import

java.util.concurrent.ConcurrentHashMap;

import

dataflowjob.entity.Foo;

public

class

DataProcess {

private

Map<integer, foo=

""

> data =

new

ConcurrentHashMap<>(

30

,

1

);

public

DataProcess()

{

for

(

int

i=

;i<

30

;i++){

data.put(i,

new

Foo(i,Foo.Status.TODO));

}

}

public

List<foo> getData(String tailId,

int

shardNum)

{

int

intId  = Integer.parseInt(tailId);

List<foo> result =

new

ArrayList<foo>();

for

(Map.Entry<integer, foo=

""

> each : data.entrySet()) {

Foo foo = each.getValue();

int

key = each.getKey();

if

(key % shardNum == intId && foo.getStatus() == Foo.Status.TODO) {

result.add(foo);

}

}

return

result;

}

public

void

setData(

int

i){

data.get(i).setStatus(Foo.Status.DONE);

}

}

</integer,></foo></foo></foo></integer,>

(4)entity類Foo

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

package

dataflowjob.entity;

public

class

Foo {

private

int

id;

private

Status status;

public

Foo(

final

int

id,

final

Status status) {

this

.id = id;

this

.status = status;

}

public

int

getId() {

return

id;

}

public

void

setId(

int

id) {

this

.id = id;

}

public

Status getStatus() {

return

status;

}

public

void

setStatus(Status status) {

this

.status = status;

}

public

enum

Status{

TODO,

DONE

}

}

(5)具體處理工廠類

?

1 2 3 4 5 6 7 8 9 10 11

package

dataflowjob.process;

public

class

DataProcessFactory {

private

static

DataProcess dataProcess =

new

DataProcess();

public

static

DataProcess getDataProcess() {

return

dataProcess;

}

}

繼續閱讀