用于Leader选举,也可以用Shared Reentrant Lock来实现。
如果需要集群中的固定的一台机器去做的事,就可以用此特性来实现,直到这台Leader死去,会产生新的Leader。
还有一种典型的场景,master-slave模式。也可以用Curator Leader Election轻松实现,包括充当maste的机器死掉后,会产生新的master,以及当新加入机器后,该机器会直接参与Leader竞争者。That's Awesome.
1.直接运行LLtest
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientCreate;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientDelete;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientUpdate;
public class LLTest {
public static void main(String[] args) throws Exception {
LeaderListener.main(null);
LeaderListener2.main(null);
LeaderListener3.main(null);
LeaderListener4.main(null);
}
}
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import java.util.List;
public class LeaderListener {
public static final String C_PATH = "/TestLeader";
public static final String CHARSET = "UTF-8";
public static final String APP_NAME = "app1";
public static void main(String[] args) {
try {
new Thread(new Runnable() {
@Override
public void run() {
try{
String zookeeperConnectionString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
//ensure path of /test
new EnsurePath(C_PATH).ensure(client.getZookeeperClient());
final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
try {
int timeMilliSeconds = 6000;
System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);
//once you take the leader ship
//and you want hold the leader ship during the whole life of APP1
//you should use Thread.sleep(Integer.MAX_VALUE)
//once tha APP1 dead, the other APP (participants) will reElect an new leader
for(int i = 0; i < 6; i++){
System.out.println("===" + APP_NAME + " sleep " + i);
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
});
leaderSelector.start();
Thread.sleep(Integer.MAX_VALUE);
client.close();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}catch (Exception e){
e.printStackTrace();
}
}
}
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
public class LeaderListener2 {
public static final String C_PATH = "/TestLeader";
public static final String CHARSET = "UTF-8";
public static final String APP_NAME = "app2";
public static void main(String[] args) {
try {
new Thread(new Runnable() {
@Override
public void run() {
try{
String zookeeperConnectionString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
//ensure path of /test
new EnsurePath(C_PATH).ensure(client.getZookeeperClient());
final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
try {
int timeMilliSeconds = 6000;
System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);
for(int i = 0; i < 6; i++){
System.out.println("===" + APP_NAME + " sleep " + i);
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
});
leaderSelector.start();
Thread.sleep(Integer.MAX_VALUE);
client.close();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}catch (Exception e){
e.printStackTrace();
}
}
}
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
public class LeaderListener3 {
public static final String C_PATH = "/TestLeader";
public static final String CHARSET = "UTF-8";
public static final String APP_NAME = "app3";
public static void main(String[] args) {
try {
new Thread(new Runnable() {
@Override
public void run() {
try{
String zookeeperConnectionString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
//ensure path of /test
new EnsurePath(C_PATH).ensure(client.getZookeeperClient());
final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
try {
int timeMilliSeconds = 6000;
System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);
for(int i = 0; i < 6; i++){
System.out.println("===" + APP_NAME + " sleep " + i);
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
});
leaderSelector.start();
Thread.sleep(Integer.MAX_VALUE);
client.close();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}catch (Exception e){
e.printStackTrace();
}
}
}
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
public class LeaderListener4 {
public static final String C_PATH = "/TestLeader";
public static final String CHARSET = "UTF-8";
public static final String APP_NAME = "app4";
public static void main(String[] args) {
try {
new Thread(new Runnable() {
@Override
public void run() {
try{
String zookeeperConnectionString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
//ensure path of /test
new EnsurePath(C_PATH).ensure(client.getZookeeperClient());
final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
try {
int timeMilliSeconds = 6000;
System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);
for(int i = 0; i < 6; i++){
System.out.println("===" + APP_NAME + " sleep " + i);
Thread.sleep(1000);
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
});
leaderSelector.start();
Thread.sleep(Integer.MAX_VALUE);
client.close();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}catch (Exception e){
e.printStackTrace();
}
}
}