天天看点

Apache Curator Leader Election

用于Leader选举,也可以用Shared Reentrant Lock来实现。

如果需要集群中的固定的一台机器去做的事,就可以用此特性来实现,直到这台Leader死去,会产生新的Leader。

还有一种典型的场景,master-slave模式。也可以用Curator Leader Election轻松实现,包括充当maste的机器死掉后,会产生新的master,以及当新加入机器后,该机器会直接参与Leader竞争者。That's Awesome.

1.直接运行LLtest

package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientCreate;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientDelete;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientUpdate;

public class LLTest {
    public static void main(String[] args) throws Exception {
        LeaderListener.main(null);
        LeaderListener2.main(null);
        LeaderListener3.main(null);
        LeaderListener4.main(null);
    }
}
           
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

import java.util.List;

public class LeaderListener {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app1";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    //once you take the leader ship
                                    //and you want hold the leader ship during the whole life of APP1
                                    //you should use Thread.sleep(Integer.MAX_VALUE)
                                    //once tha APP1 dead, the other APP (participants) will reElect an new leader
                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
           
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener2 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app2";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
           
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener3 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app3";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
           
package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener4 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app4";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
           

继续阅读