天天看點

分布式協調服務zookeeper的應用場景(分布式筆記)

資料釋出訂閱/配置中心

  實作配置資訊的集中式管理和資料的動态更新,實作配置中心有兩種模式:push(推送)、pull(主動拉取,長輪詢)。

  zookeeper采用的是推拉相結合的方式,用戶端向伺服器端注冊自己需要關注的節點。一旦節點資料發生變化,那麼伺服器端就會向用戶端發送watcher事件通知。用戶端收到通知後,主動到伺服器端擷取更新後的資料。其特點如下:

  1. 資料量比較小
  2. 資料内容在運作時會發生動态變更
  3. 叢集中的各個機器共享配置

Zookeeper配置管理

  程式總是需要配置的,如果程式分散部署在多台伺服器上,要逐個改變配置就變得很麻煩。而如果把這些配置全部放到zookeeper上去,儲存在zookeeper的某個目錄節點中,然後所有相關應用程式對這個目錄節點進行監聽,一旦配置資訊發生變化,每個應用程式就會收到zookeeper的通知,然後從zookeeper擷取新的配置資訊應用到系統中。

分布式協調服務zookeeper的應用場景(分布式筆記)

實作分布式鎖的幾種常用方式

1. 資料庫實作思路

  在資料庫中建立一個表,表中包含方法名等字段,并在方法名字段上建立唯一索引,想要執行某個方法,就使用這個方法名向表中插入資料,成功插入則擷取鎖,執行完成後删除對應的行資料釋放鎖。

2. Redis實作思路

主要用到的指令如下:

(1)SETNX key val:當且僅當key不存在時,set一個key為val的字元串,傳回1;若key存在,則什麼都不做,傳回0。

(2)expire key timeout:為key設定一個逾時時間,機關為second,超過這個時間鎖會自動釋放,避免死鎖。

(3)delete key:删除key

實作思想:

(1)擷取鎖的時候,使用setnx加鎖,并使用expire指令為鎖添加一個逾時時間,超過該時間則自動釋放鎖,鎖的value值為一個随機生成的UUID,通過此在釋放鎖的時候進行判斷。

(2)擷取鎖的時候還設定一個擷取的逾時時間,若超過這個時間則放棄擷取鎖。

(3)釋放鎖的時候,通過UUID判斷是不是該鎖,若是該鎖,則執行delete進行鎖釋放。

3. zookeeper實作思路及代碼

  ZooKeeper的内部是一個分層的檔案系統目錄樹結構,規定同一個目錄下隻能有一個唯一檔案名。基于ZooKeeper實作分布式鎖的步驟如下:

(1)建立一個目錄mylock;

(2)線程A想擷取鎖就在mylock目錄下建立臨時順序節點;

(3)擷取mylock目錄下所有的子節點,然後擷取比自己小的兄弟節點,如果不存在,則說明目前線程順序号最小,獲得鎖;

(4)線程B擷取所有節點,判斷自己不是最小節點,設定監聽比自己次小的節點;

(5)線程A處理完,删除自己的節點,線程B監聽到變更事件,判斷自己是不是最小的節點,如果是則獲得鎖。

Tips:MySQL中的共享鎖與排他鎖

  共享鎖又稱為讀鎖,簡稱S鎖,顧名思義,共享鎖就是多個事務對于同一資料可以共享一把鎖,都能通路到資料,但是隻能讀不能修改。

  排他鎖又稱為寫鎖,簡稱X鎖,顧名思義,排他鎖就是不能與其他所并存,如一個事務擷取了一個資料行的排他鎖,其他事務就不能再擷取該行的其他鎖,包括共享鎖和排他鎖,但是擷取排他鎖的事務是可以對資料就行讀取和修改。

  對于共享鎖大家可能很好了解,就是多個事務隻能讀資料不能改資料,對于排他鎖大家的了解可能就有些差别,我當初就犯了一個錯誤,以為排他鎖鎖住一行資料後,其他事務就不能讀取和修改該行資料,其實不是這樣的。排他鎖指的是一個事務在一行資料加上排他鎖後,其他事務不能再在其上加其他的鎖。mysql InnoDB引擎預設的修改資料語句,update,delete,insert都會自動給涉及到的資料加上排他鎖,select語句預設不會加任何鎖類型,如果加排他鎖可以使用select …for update語句,加共享鎖可以使用select … lock in share mode語句。是以加過排他鎖的資料行在其他事務種是不能修改資料的,也不能通過for update和lock in share mode鎖的方式查詢資料,但可以直接通過select …from…查詢資料,因為普通查詢沒有任何鎖機制。

用zookeeper的Java API實作分布式共享鎖:

public class DistributeLock {

    private static final String ROOT_LOCKS = "/LOCKS";//根節點

    private ZooKeeper zooKeeper;

    private int sessionTimeout; //會話逾時時間

    private String lockID; //記錄鎖節點id

    private final static byte[] data = {1, 2}; //節點的資料

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public DistributeLock() throws IOException, InterruptedException {
        this.zooKeeper = ZookeeperClient.getInstance();
        this.sessionTimeout = ZookeeperClient.getSessionTimeout();
    }

    //擷取鎖的方法
    public boolean lock(){
        try {
            //LOCKS/00000001
            lockID = zooKeeper.create(ROOT_LOCKS+"/", data, ZooDefs.Ids.
                    OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(Thread.currentThread().getName() + "->成功建立了lock節點[" + lockID + "],開始去競争鎖");

            //擷取根節點下的所有子節點
            List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);
            //排序,從小到大
            SortedSet<String> sortedSet = new TreeSet<String>();
            for(String children : childrenNodes){
                sortedSet.add(ROOT_LOCKS + "/" + children);
            }
            String first = sortedSet.first(); //拿到最小的節點
            if(lockID.equals(first)){
                //表示目前就是最小的節點
                System.out.println(Thread.currentThread().getName() + "->成功獲得鎖,lock節點為:[" + lockID + "]");
                return true;
            }
            //擷取所有比目前lockID小的節點集合
            SortedSet<String> lessThanLockId = sortedSet.headSet(lockID);
            if(!lessThanLockId.isEmpty()){
            	//拿到比目前lockID小的上一節點
                String prevLockID = lessThanLockId.last();
                //監聽前一個節點是否被釋放,然後設定監聽的過期時間,如果監聽逾時則放棄擷取鎖,若節點被釋放則立刻去擷取鎖
                zooKeeper.exists(prevLockID, new LockWatcher(countDownLatch));
                boolean flag = countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
                if(!flag)
                	return false;
                System.out.println(Thread.currentThread().getName() + " 成功擷取鎖:[" + lockID + "]");
            }
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean unlock(){
        System.out.println(Thread.currentThread().getName() + "->開始釋放鎖:[" + lockID + "]");
        try {
            zooKeeper.delete(lockID, -1);
            System.out.println("節點[" + lockID + "]成功被删除");
            return true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }

    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(10);
        Random random = new Random();
        for(int i=0; i<10; i++){
            new Thread(()->{
                DistributeLock lock = null;
                try {
                    lock = new DistributeLock();
                    //使用閉鎖,當10個線程都countDown()後,釋放閉鎖,使10個線程同時去争奪鎖
                    latch.countDown();
                    latch.await();
                    lock.lock();
                    Thread.sleep(random.nextInt(500));
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    if(lock != null){
                        lock.unlock();
                    }
                }
            }).start();
        }
    }
}
           
/**
* @author 全恒
* 建立會話
*/
public class ZookeeperClient {

	private final static String CONNECTSTRING = "192.168.123.38:2181,192.168.123.55:2181," +
            "192.168.123.45:2181,192.168.123.174:2181";

    private static int sessionTimeout = 5000;

    //擷取連接配接
    public static ZooKeeper getInstance() throws IOException, InterruptedException {
        final CountDownLatch conectStatus = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, sessionTimeout, new Watcher() {
            public void process(WatchedEvent event) {
                if(event.getState() == Event.KeeperState.SyncConnected){
                    conectStatus.countDown();
                }
            }
        });
        conectStatus.await();
        return zooKeeper;
    }

    public static int getSessionTimeout() {
        return sessionTimeout;
    }
}
           
public class LockWatcher implements Watcher{

    private CountDownLatch latch;

    public LockWatcher(CountDownLatch latch) {
        this.latch = latch;
    }

    public void process(WatchedEvent event) {
    	//如果節點被删除,則釋放 閉鎖
        if(event.getType() == Event.EventType.NodeDeleted){
            latch.countDown();
        }
    }
}
           

Zookeeper實作master選舉

  随着分布式系統的流行,現在許多服務都需要24小時工作,如果機器挂了的話,我們希望能有其他機器頂替他繼續工作。這種問題通常采用master-slave模式,即是正常情況下主機提供服務,備用機器監聽主機狀态,當主機發生異常時,會選取一個備用機器代替主機器提供服務,這個選舉過程即是master選舉。

  由于zookeeper能保證同一時刻隻能具有一個主節點,即使有網絡問題,zookeeper的叢集也完全可以解決,而且zookeeper的通知機制,用戶端可以監聽節點的變化,是以可以考慮采用zookeeper實作master選舉。其實作原理圖如下:

分布式協調服務zookeeper的應用場景(分布式筆記)

  上圖中,左側是zookeeper節點,master節點儲存目前master的機器,server節點儲存可用的備用機器,每個伺服器在建立的時候會進行選舉,即每個伺服器同時去建立master節點,隻有一個機器能建立成功,建立成功的機器則被選為master,其他機器會同時監聽master節點的變化,當主機器挂了時,master節點會被删除,其他機器會重新進行master選舉。

用zkclient實作master選舉

/**
* @author 全恒
* 使用者中心,用來模拟 争搶master節點的機器
*/
public class UserCenter implements Serializable{

    private static final long serialVersionUID = -1776114173857775665L;
    
    private int mc_id; //機器資訊

    private String mc_name;//機器名稱

    public int getMc_id() {
        return mc_id;
    }

    public void setMc_id(int mc_id) {
        this.mc_id = mc_id;
    }

    public String getMc_name() {
        return mc_name;
    }

    public void setMc_name(String mc_name) {
        this.mc_name = mc_name;
    }

    @Override
    public String toString() {
        return "UserCenter{" + "mc_id=" + mc_id + ", mc_name='" + mc_name + '\'' + '}';
    }
}
           
/**
* @author 全恒
* 提供master選舉的服務
*/
public class MasterSelector {

    private ZkClient zkClient;

    private final static String MASTER_PATH = "/master"; //需要争搶的master節點

    private IZkDataListener dataListener; //注冊節點内容變化

    private UserCenter server;  //其他伺服器

    private UserCenter master;  //master節點

    private boolean isRunning = false;

    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

    public MasterSelector(UserCenter server, ZkClient zkClient) {
        System.out.println("[" + server + "] 去争搶master權限");
        this.server = server;
        this.zkClient = zkClient;

        this.dataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                //節點如果被删除, 重新進行master選舉
                chooseMaster();
            }
        };
    }

    public void start(){
        //開始選舉,如果目前機器已經啟動,則不進行任何處理
        if(!isRunning){
            isRunning = true;
            //注冊節點事件,使目前用戶端機器監聽master節點的删除動作
            zkClient.subscribeDataChanges(MASTER_PATH, dataListener); 
            chooseMaster();
        }
    }

    public void stop(){
        //停止,如果目前機器已經停止,則不進行任何處理
        if(isRunning){
            isRunning = false;
            //關閉定時器
            scheduledExecutorService.shutdown();
            //取消訂閱
            zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
            releaseMaster();
        }
    }

    //具體選master的實作邏輯
    private void chooseMaster(){
        if(!isRunning){
            System.out.println("目前服務沒有啟動");
            return ;
        }
        try {
            zkClient.createEphemeral(MASTER_PATH, server);
            master=server; //把server節點指派給master
            System.out.println(master + "->我現在已經是master,你們要聽我的");

            //使用定時器模拟 主伺服器出現故障,每2秒釋放一次
            scheduledExecutorService.schedule(()->{
                releaseMaster();//釋放鎖(模拟故障的發生)
            }, 2, TimeUnit.SECONDS);
        } catch (ZkNodeExistsException e){
            //表示master已經存在
            UserCenter userCenter = zkClient.readData(MASTER_PATH, true);
            if(userCenter == null) {
                System.out.println("啟動操作:");
                chooseMaster(); //再次擷取master
            } else {
                master = userCenter;
            }
        }
    }

    //釋放鎖(模拟故障的發生)
    private void releaseMaster(){
        //判斷目前是不是master,隻有master才需要釋放
        if(checkMaster()){
            zkClient.delete(MASTER_PATH); //删除
        }
    }

    //判斷目前的server是不是master
    private boolean checkMaster(){
        UserCenter userCenter = zkClient.readData(MASTER_PATH);
        if(userCenter.getMc_name().equals(server.getMc_name())){
            master = userCenter;
            return true;
        }
        return false;
    }
}
           
/**
* @author 全恒
* 模拟多個機器搶奪master角色
*/
public class MasterChooseTest {

	private final static String CONNECTSTRING = "192.168.123.38:2181,192.168.123.55:2181," +
            "192.168.123.45:2181,192.168.123.174:2181";

    public static void main(String[] args) throws IOException {
        List<MasterSelector> selectorLists = new ArrayList<>();
        try {
            for(int i=0; i<10; i++) {
                ZkClient zkClient = new ZkClient(CONNECTSTRING, 5000,
                        5000, new SerializableSerializer());
                UserCenter userCenter = new UserCenter();
                userCenter.setMc_id(i);
                userCenter.setMc_name("用戶端:" + i);

                MasterSelector selector = new MasterSelector(userCenter, zkClient);
                selectorLists.add(selector);
                selector.start();//觸發選舉操作
                TimeUnit.SECONDS.sleep(1);//睡眠1秒
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            for(MasterSelector selector:selectorLists){
                selector.stop();
            }
        }
    }
}
           

用curator實作master選舉

public class MasterSelector {

	private final static String CONNECTSTRING = "192.168.123.38:2181,192.168.123.55:2181," +
            "192.168.123.45:2181,192.168.123.174:2181";

    private final static String MASTER_PATH = "/curator_master_path";
    
    public static void main(String[] args) {

        CuratorFramework curatorFramework = CuratorFrameworkFactory
        		.builder()
        		.connectString(CONNECTSTRING)
        		.retryPolicy(new ExponentialBackoffRetry(1000, 3))
        		.build();

        @SuppressWarnings("resource")
		LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListenerAdapter() {
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("獲得leader成功");
                TimeUnit.SECONDS.sleep(2);
            }
        });
        leaderSelector.autoRequeue();//自動争搶leader
        leaderSelector.start();//開始選舉
    }
}
           

繼續閱讀