資料釋出訂閱/配置中心
實作配置資訊的集中式管理和資料的動态更新,實作配置中心有兩種模式:push(推送)、pull(主動拉取,長輪詢)。
zookeeper采用的是推拉相結合的方式,用戶端向伺服器端注冊自己需要關注的節點。一旦節點資料發生變化,那麼伺服器端就會向用戶端發送watcher事件通知。用戶端收到通知後,主動到伺服器端擷取更新後的資料。其特點如下:
- 資料量比較小
- 資料内容在運作時會發生動态變更
- 叢集中的各個機器共享配置
Zookeeper配置管理
程式總是需要配置的,如果程式分散部署在多台伺服器上,要逐個改變配置就變得很麻煩。而如果把這些配置全部放到zookeeper上去,儲存在zookeeper的某個目錄節點中,然後所有相關應用程式對這個目錄節點進行監聽,一旦配置資訊發生變化,每個應用程式就會收到zookeeper的通知,然後從zookeeper擷取新的配置資訊應用到系統中。

實作分布式鎖的幾種常用方式
1. 資料庫實作思路
在資料庫中建立一個表,表中包含方法名等字段,并在方法名字段上建立唯一索引,想要執行某個方法,就使用這個方法名向表中插入資料,成功插入則擷取鎖,執行完成後删除對應的行資料釋放鎖。
2. Redis實作思路
主要用到的指令如下:
(1)SETNX key val:當且僅當key不存在時,set一個key為val的字元串,傳回1;若key存在,則什麼都不做,傳回0。
(2)expire key timeout:為key設定一個逾時時間,機關為second,超過這個時間鎖會自動釋放,避免死鎖。
(3)delete key:删除key
實作思想:
(1)擷取鎖的時候,使用setnx加鎖,并使用expire指令為鎖添加一個逾時時間,超過該時間則自動釋放鎖,鎖的value值為一個随機生成的UUID,通過此在釋放鎖的時候進行判斷。
(2)擷取鎖的時候還設定一個擷取的逾時時間,若超過這個時間則放棄擷取鎖。
(3)釋放鎖的時候,通過UUID判斷是不是該鎖,若是該鎖,則執行delete進行鎖釋放。
3. zookeeper實作思路及代碼
ZooKeeper的内部是一個分層的檔案系統目錄樹結構,規定同一個目錄下隻能有一個唯一檔案名。基于ZooKeeper實作分布式鎖的步驟如下:
(1)建立一個目錄mylock;
(2)線程A想擷取鎖就在mylock目錄下建立臨時順序節點;
(3)擷取mylock目錄下所有的子節點,然後擷取比自己小的兄弟節點,如果不存在,則說明目前線程順序号最小,獲得鎖;
(4)線程B擷取所有節點,判斷自己不是最小節點,設定監聽比自己次小的節點;
(5)線程A處理完,删除自己的節點,線程B監聽到變更事件,判斷自己是不是最小的節點,如果是則獲得鎖。
Tips:MySQL中的共享鎖與排他鎖
共享鎖又稱為讀鎖,簡稱S鎖,顧名思義,共享鎖就是多個事務對于同一資料可以共享一把鎖,都能通路到資料,但是隻能讀不能修改。
排他鎖又稱為寫鎖,簡稱X鎖,顧名思義,排他鎖就是不能與其他所并存,如一個事務擷取了一個資料行的排他鎖,其他事務就不能再擷取該行的其他鎖,包括共享鎖和排他鎖,但是擷取排他鎖的事務是可以對資料就行讀取和修改。
對于共享鎖大家可能很好了解,就是多個事務隻能讀資料不能改資料,對于排他鎖大家的了解可能就有些差别,我當初就犯了一個錯誤,以為排他鎖鎖住一行資料後,其他事務就不能讀取和修改該行資料,其實不是這樣的。排他鎖指的是一個事務在一行資料加上排他鎖後,其他事務不能再在其上加其他的鎖。mysql InnoDB引擎預設的修改資料語句,update,delete,insert都會自動給涉及到的資料加上排他鎖,select語句預設不會加任何鎖類型,如果加排他鎖可以使用select …for update語句,加共享鎖可以使用select … lock in share mode語句。是以加過排他鎖的資料行在其他事務種是不能修改資料的,也不能通過for update和lock in share mode鎖的方式查詢資料,但可以直接通過select …from…查詢資料,因為普通查詢沒有任何鎖機制。
用zookeeper的Java API實作分布式共享鎖:
public class DistributeLock {
private static final String ROOT_LOCKS = "/LOCKS";//根節點
private ZooKeeper zooKeeper;
private int sessionTimeout; //會話逾時時間
private String lockID; //記錄鎖節點id
private final static byte[] data = {1, 2}; //節點的資料
private CountDownLatch countDownLatch = new CountDownLatch(1);
public DistributeLock() throws IOException, InterruptedException {
this.zooKeeper = ZookeeperClient.getInstance();
this.sessionTimeout = ZookeeperClient.getSessionTimeout();
}
//擷取鎖的方法
public boolean lock(){
try {
//LOCKS/00000001
lockID = zooKeeper.create(ROOT_LOCKS+"/", data, ZooDefs.Ids.
OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + "->成功建立了lock節點[" + lockID + "],開始去競争鎖");
//擷取根節點下的所有子節點
List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);
//排序,從小到大
SortedSet<String> sortedSet = new TreeSet<String>();
for(String children : childrenNodes){
sortedSet.add(ROOT_LOCKS + "/" + children);
}
String first = sortedSet.first(); //拿到最小的節點
if(lockID.equals(first)){
//表示目前就是最小的節點
System.out.println(Thread.currentThread().getName() + "->成功獲得鎖,lock節點為:[" + lockID + "]");
return true;
}
//擷取所有比目前lockID小的節點集合
SortedSet<String> lessThanLockId = sortedSet.headSet(lockID);
if(!lessThanLockId.isEmpty()){
//拿到比目前lockID小的上一節點
String prevLockID = lessThanLockId.last();
//監聽前一個節點是否被釋放,然後設定監聽的過期時間,如果監聽逾時則放棄擷取鎖,若節點被釋放則立刻去擷取鎖
zooKeeper.exists(prevLockID, new LockWatcher(countDownLatch));
boolean flag = countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
if(!flag)
return false;
System.out.println(Thread.currentThread().getName() + " 成功擷取鎖:[" + lockID + "]");
}
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public boolean unlock(){
System.out.println(Thread.currentThread().getName() + "->開始釋放鎖:[" + lockID + "]");
try {
zooKeeper.delete(lockID, -1);
System.out.println("節點[" + lockID + "]成功被删除");
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(10);
Random random = new Random();
for(int i=0; i<10; i++){
new Thread(()->{
DistributeLock lock = null;
try {
lock = new DistributeLock();
//使用閉鎖,當10個線程都countDown()後,釋放閉鎖,使10個線程同時去争奪鎖
latch.countDown();
latch.await();
lock.lock();
Thread.sleep(random.nextInt(500));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if(lock != null){
lock.unlock();
}
}
}).start();
}
}
}
/**
* @author 全恒
* 建立會話
*/
public class ZookeeperClient {
private final static String CONNECTSTRING = "192.168.123.38:2181,192.168.123.55:2181," +
"192.168.123.45:2181,192.168.123.174:2181";
private static int sessionTimeout = 5000;
//擷取連接配接
public static ZooKeeper getInstance() throws IOException, InterruptedException {
final CountDownLatch conectStatus = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected){
conectStatus.countDown();
}
}
});
conectStatus.await();
return zooKeeper;
}
public static int getSessionTimeout() {
return sessionTimeout;
}
}
public class LockWatcher implements Watcher{
private CountDownLatch latch;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
public void process(WatchedEvent event) {
//如果節點被删除,則釋放 閉鎖
if(event.getType() == Event.EventType.NodeDeleted){
latch.countDown();
}
}
}
Zookeeper實作master選舉
随着分布式系統的流行,現在許多服務都需要24小時工作,如果機器挂了的話,我們希望能有其他機器頂替他繼續工作。這種問題通常采用master-slave模式,即是正常情況下主機提供服務,備用機器監聽主機狀态,當主機發生異常時,會選取一個備用機器代替主機器提供服務,這個選舉過程即是master選舉。
由于zookeeper能保證同一時刻隻能具有一個主節點,即使有網絡問題,zookeeper的叢集也完全可以解決,而且zookeeper的通知機制,用戶端可以監聽節點的變化,是以可以考慮采用zookeeper實作master選舉。其實作原理圖如下:
上圖中,左側是zookeeper節點,master節點儲存目前master的機器,server節點儲存可用的備用機器,每個伺服器在建立的時候會進行選舉,即每個伺服器同時去建立master節點,隻有一個機器能建立成功,建立成功的機器則被選為master,其他機器會同時監聽master節點的變化,當主機器挂了時,master節點會被删除,其他機器會重新進行master選舉。
用zkclient實作master選舉
/**
* @author 全恒
* 使用者中心,用來模拟 争搶master節點的機器
*/
public class UserCenter implements Serializable{
private static final long serialVersionUID = -1776114173857775665L;
private int mc_id; //機器資訊
private String mc_name;//機器名稱
public int getMc_id() {
return mc_id;
}
public void setMc_id(int mc_id) {
this.mc_id = mc_id;
}
public String getMc_name() {
return mc_name;
}
public void setMc_name(String mc_name) {
this.mc_name = mc_name;
}
@Override
public String toString() {
return "UserCenter{" + "mc_id=" + mc_id + ", mc_name='" + mc_name + '\'' + '}';
}
}
/**
* @author 全恒
* 提供master選舉的服務
*/
public class MasterSelector {
private ZkClient zkClient;
private final static String MASTER_PATH = "/master"; //需要争搶的master節點
private IZkDataListener dataListener; //注冊節點内容變化
private UserCenter server; //其他伺服器
private UserCenter master; //master節點
private boolean isRunning = false;
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public MasterSelector(UserCenter server, ZkClient zkClient) {
System.out.println("[" + server + "] 去争搶master權限");
this.server = server;
this.zkClient = zkClient;
this.dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
//節點如果被删除, 重新進行master選舉
chooseMaster();
}
};
}
public void start(){
//開始選舉,如果目前機器已經啟動,則不進行任何處理
if(!isRunning){
isRunning = true;
//注冊節點事件,使目前用戶端機器監聽master節點的删除動作
zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
chooseMaster();
}
}
public void stop(){
//停止,如果目前機器已經停止,則不進行任何處理
if(isRunning){
isRunning = false;
//關閉定時器
scheduledExecutorService.shutdown();
//取消訂閱
zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
releaseMaster();
}
}
//具體選master的實作邏輯
private void chooseMaster(){
if(!isRunning){
System.out.println("目前服務沒有啟動");
return ;
}
try {
zkClient.createEphemeral(MASTER_PATH, server);
master=server; //把server節點指派給master
System.out.println(master + "->我現在已經是master,你們要聽我的");
//使用定時器模拟 主伺服器出現故障,每2秒釋放一次
scheduledExecutorService.schedule(()->{
releaseMaster();//釋放鎖(模拟故障的發生)
}, 2, TimeUnit.SECONDS);
} catch (ZkNodeExistsException e){
//表示master已經存在
UserCenter userCenter = zkClient.readData(MASTER_PATH, true);
if(userCenter == null) {
System.out.println("啟動操作:");
chooseMaster(); //再次擷取master
} else {
master = userCenter;
}
}
}
//釋放鎖(模拟故障的發生)
private void releaseMaster(){
//判斷目前是不是master,隻有master才需要釋放
if(checkMaster()){
zkClient.delete(MASTER_PATH); //删除
}
}
//判斷目前的server是不是master
private boolean checkMaster(){
UserCenter userCenter = zkClient.readData(MASTER_PATH);
if(userCenter.getMc_name().equals(server.getMc_name())){
master = userCenter;
return true;
}
return false;
}
}
/**
* @author 全恒
* 模拟多個機器搶奪master角色
*/
public class MasterChooseTest {
private final static String CONNECTSTRING = "192.168.123.38:2181,192.168.123.55:2181," +
"192.168.123.45:2181,192.168.123.174:2181";
public static void main(String[] args) throws IOException {
List<MasterSelector> selectorLists = new ArrayList<>();
try {
for(int i=0; i<10; i++) {
ZkClient zkClient = new ZkClient(CONNECTSTRING, 5000,
5000, new SerializableSerializer());
UserCenter userCenter = new UserCenter();
userCenter.setMc_id(i);
userCenter.setMc_name("用戶端:" + i);
MasterSelector selector = new MasterSelector(userCenter, zkClient);
selectorLists.add(selector);
selector.start();//觸發選舉操作
TimeUnit.SECONDS.sleep(1);//睡眠1秒
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
for(MasterSelector selector:selectorLists){
selector.stop();
}
}
}
}
用curator實作master選舉
public class MasterSelector {
private final static String CONNECTSTRING = "192.168.123.38:2181,192.168.123.55:2181," +
"192.168.123.45:2181,192.168.123.174:2181";
private final static String MASTER_PATH = "/curator_master_path";
public static void main(String[] args) {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString(CONNECTSTRING)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
@SuppressWarnings("resource")
LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("獲得leader成功");
TimeUnit.SECONDS.sleep(2);
}
});
leaderSelector.autoRequeue();//自動争搶leader
leaderSelector.start();//開始選舉
}
}