要想使用Leader選舉功能,需要添加recipes包,可以在maven中添加如下依賴:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.9.0</version>
</dependency>
當然了,由于recipes需要使用framework,是以你肯定還要添加如下依賴:
<dependency>
<artifactId>curator-framework</artifactId>
最後,為了簡化測試也為了便于學習,可以添加test依賴:
<artifactId>curator-test</artifactId>
LeaderLatch使用流程
recipes包裡面提供了Leader選舉實作,Spark中的master選舉使用的就是reciples包裡面的LeaderLatch,使用他們可以極大的簡化代碼,使你将注意力更多的放在核心業務邏輯上。Leader選舉的實作在org.apache.curator.framework.recipes.leader包中,這個包提供了兩組Leader選舉:
1.LeaderLatch,LeaderLatchListener
2.LeaderSelector,LeaderSelectorListener,LeaderSelectorListenerAdapter
這兩組類都可以實作Leader選舉,spark 使用的是第一種。再這篇文章裡,隻介紹第一種。
第一組使用起來非常簡單,使用思路大緻如下:假設你有3個節點,姑且叫做node0,node1,node2。你需要為每一個node建立一個CuratorFramework,LeaderLatch,LeaderLatchListener,如下:
node0:
1.CuratorFramework client=CuratorFrameworkFactory.newClient(.....);client.start();
2.new LeaderLatch(client,path)->addListener(LeaderLatchListener )->start()
node1:
node2:
你首先要建立CuratorFramework,然後并啟動它,一個CuratorFramework就是一個ZooKeeper用戶端。然後建立LeaderLatch,并制定剛才建立的CuratorFramework和一個leaderPath,leaderPath是一個ZooKeepe路徑,node0,node1,node2中的leaderPath必須一緻。建立好LeaderLatch之後,需要為他注冊一個LeaderLatchListener回掉,如果某個node成為leader,那麼會調用這個node的LeaderLatchListener的isLeader(),是以你可以在這裡寫自己的業務邏輯。最後,調用LeaderLatch的start(),這個LeaderLatch将參加選舉了。
可以參考如下代碼:
import java.util.ArrayList;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class LeaderDemo {
public static void main(String[]args) throws Exception{
List<LeaderLatch>leaders=new ArrayList<LeaderLatch>();
List<CuratorFramework>clients=new ArrayList<CuratorFramework>();
TestingServer server=new TestingServer();
try{
for(int i=0;i<10;i++){
CuratorFramework client=CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(20000,3));
clients.add(client);
LeaderLatch leader=new LeaderLatch(client,"/francis/leader");
leader.addListener(new LeaderLatchListener(){
@Override
public void isLeader() {
// TODO Auto-generated method stub
System.out.println("I am Leader");
}
@Override
public void notLeader() {
// TODO Auto-generated method stub
System.out.println("I am not Leader");
}});
leaders.add(leader);
client.start();
leader.start();
}
Thread.sleep(Integer.MAX_VALUE);
}finally{
for(CuratorFramework client:clients){
CloseableUtils.closeQuietly(client);
}
for(LeaderLatch leader:leaders){
CloseableUtils.closeQuietly(leader);
}
CloseableUtils.closeQuietly(server);
}
Thread.sleep(Integer.MAX_VALUE);
}
}
LeaderLatch和LeaderLatchListener方法介紹
LeaderLatch提供了如下方法:
start()/close():啟動/停止LeaderLatch
addListener(LeaderLatchListener)/removeListener(LeaderLatchListener):添加/移除LeaderLatchListener
hasLeadership():如果LeaderLatch是Leader,那麼傳回true,否則false。
getLeader():
await:等待Leaderlatch成為Leader。
LeaderLatchListener提供了如下方法:
isLeader():當LeaderLatch的hasLeaderShip()從false到true後,就會調用isLeader(),表明這個LeaderLatch成為leader了。
notLeader():當LeaderLatch的hahLeaderShip從true到false後,就會調用notLeader(),表明這個LeaderLatch不再是leader了。
LeaderLatch在Master-Slave中的應用
在一個典型的master-slave場景下。你可以在isLeader中做如下處理:
1.每一個master類都有一個state屬性,初始值為standby.
2.在isLeader中,從持久話引擎中讀取要恢複的資料到一個臨時的記憶體緩存中
3.将這個master的state修改為recovering
4.通知所有worker将其内部的master修改為目前master。
5.将臨時記憶體緩存中的資料恢複到master内部。
6.将master狀态修改為alive,然後這個master就可以對外服務了。
注意第5步,由于将持久話引擎中的資料添加到了master内部的記憶體中,是以需要確定之多恢複一次語義。