天天看點

分布式改造劇集2---DIY分布式鎖

前言:

​ 好了,終于又開始播放分布式改造劇集了。前面一集中(

http://www.cnblogs.com/Kidezyq/p/8748961.html

)我們DIY了一個Hessian轉發實作,最後我們也留下了一個展望方向:可以實作一個管理界面管理節點,實作簡單的服務治理的功能。這一集我們接着繼續DIY分布式鎖。

第二集:分布式鎖DIY

探索之路

​ 由于業務互斥的需要,目前項目中實作了一個記憶體鎖。鎖的大緻模型是分為鎖類型和鎖鍵值,隻有當鎖類型和鍵值都相同的時候,整個業務才互斥。但是必須提供一個方法,來判斷某種類型的鎖是否存在。大緻代碼如下:

/**
 * 記憶體鎖
 *
 */
public class MemoryLock {
    /**
     * 同步鎖
     */
    private final Object lock = new Object();
    
    /**
     * 記憶體鎖模型
     */
    private ConcurrentHashMap<String, ConcurrentHashMap<String, String>> lockMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, String>>();

    /**
     * 嘗試擷取到鎖
     * @param lockType 鎖類型
     * @param key       鎖鍵值
     * @return 如果目前擷取到鎖,則傳回true。否則,傳回false。
     */
    private boolean tryLock(String lockType, String key) {
        synchronized (this.lock) {
            ConcurrentHashMap<String, String> map = this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<String, String>();
                this.lockMap.put(lockType, map);
            }
            return (map.putIfAbsent(key, key) == null);
        }
    }
    
    /**
     * 判斷某種類型的鎖是不是空的
     * @param lockType  鎖類型
     * @return true,不存在某種類型的鎖;false,存在某種類型的鎖。
     */
    public boolean isLockTypeEmpty(String lockType) {
        if (null != this.lockMap.get(lockType)) {
            return this.lockMap.get(lockType).size() == 0;
        }
        return true;
    }

    /**
     * 擷取鎖
     * @param lockType  鎖類型
     * @param key       鎖鍵值
     * @param timeout   逾時時間(毫秒)
     * @throws TimeoutException 如果逾時之後還沒有獲得到鎖,則抛出逾時異常
     */
    public void lock(String lockType, String key, long timeout) throws TimeoutException {
        // 是否沒有逾時設定,當傳入的逾時時間為負數或者為0時,表示沒有逾時時間
        boolean noTimeOutFlag = false;
        if (timeout <= 0L) {
            noTimeOutFlag = true;
        }

        long expireTime = System.currentTimeMillis() + timeout;
        do {
            if (tryLock(lockType, key))
                return;
            try {
                Thread.sleep(100L);
            } catch (InterruptedException localInterruptedException) {
            }
        } while ((noTimeOutFlag) || (System.currentTimeMillis() < expireTime));
        
        throw new TimeoutException();
    }

    /**
     * 釋放鎖
     * @param lockType 鎖類型
     * @param key      鎖鍵值
     */
    public void unlock(String lockType, String key) {
        synchronized (this.lock) {
            ConcurrentMap<String, String> map = this.lockMap.get(lockType);
            if (map != null)
                map.remove(key);
        }
    }
}           

​ 可以看到,單機模式下的互斥鎖是直接在記憶體中儲存一個

ConcurrentHashMap

,然後利用

putIfAbsent

的原子特性。該鎖的使用方式如下:

try {
    memoryLock.lock(lockType, lockKey, 0l);
} catch(TimeOutException e) {
    // TODO: Exception caught  
} finally {
    memoryLock.unlock(lockType, lockKey);
}
           

​ 當應用部署在分布式環境中的時候。顯然,原來的記憶體鎖已經不适用。那麼在分布式情況下,如何實作鎖服務呢?網上給出的分布式鎖的實作方案一般有三種:

  1. 利用資料庫的for update行鎖
  2. 利用Redis的setnx
  3. 利用zookeeper的分布式一緻性算法

​ 考慮到盡量不增加新的應用部署,那麼先排除2、3,隻剩下資料庫的行級鎖。但其實資料庫的行級鎖在并發量特别大的時候會對資料庫性能造成較大影響,而且估計我想使用DBA都不會允許.....

​ 那麼,有沒有什麼其他更好的辦法呢?這次我們利用

曲線救國

的方式來實作,将分布式轉變成非分布式。

實作Demo

​ 在分布式改造劇集第一集中,我們的實作方式中有一個主節點,主節點為配置檔案中預設配置的Hessian服務的位址。隻有加上了

Distribute

注解的服務,才會在用戶端進行Hessian調用的時候進行路由,否則最終調用的Hessian服務位址即為配置檔案中配置的主節點。依賴于這個特性,我們可以不給鎖服務添加

Distribute

注解,使得所有分機部署的服務請求都落到主節點上。具體實作步驟如下:

​ 定義一個記憶體鎖Hessian服務

​ 其實簡單來說我們直接将原來的

MemoryLock

釋出成Hessian服務,并且不使用

Distribute

注解就可以實作将分布式鎖轉換成單機鎖。但是還有以下兩點需要特殊考慮:

  1. 分布式服務的多機特性: 記憶體鎖的釋放必須顯示釋放,如果一個服務調用

    unlock

    方法之前就挂掉,就可能導緻某一個鎖永遠被鎖住。是以我們還需要一個類似于Redis分布式鎖實作中的鎖逾時移除機制。
  2. 遠端RPC調用的可能逾時: 最終鎖的服務調用是需要通過Hessian來實作的,考慮到Hessian調用存在逾時時間,如果将前面

    MemoryLock

    lock

    方法等待實作在Hessian服務中,那麼等待時間超長的話會直接導緻Hessian服務調用逾時。是以改造後的

    MemoryLock

    不實作

    lock

    方法,隻實作

    tryLock

    方法,調用該方法時立即傳回目前是否可以獲得到鎖。
  3. 本地服務實作鎖等待以及減少Hessian調用: 如第2點所說,我們的鎖等待特性不能在記憶體鎖的Hessian服務中實作,隻能通過本地服務中實作。另外頻繁的Hessian調用會影響應用程式的性能,也需要一個本地的鎖服務來巧妙地減少遠端服務調用

​ 改造後的

MemoryLock

代碼如下:

@Service("moemoryLockServiceFacade")
public class MemoryLockServiceImpl implements MemoryLockService {
    
    /**
     * 自動逾時時間:目前設定為10分鐘 機關為納秒
     */
    private final static long AUTO_EXPIRE_TIME = 1000000000l * 60 * 10;
    
    /**
     * 鎖
     */
    private Object semaphore = new Object();

    /**
     * 記憶體鎖結構,雙層Map 首層Map的Key存鎖類型,value為内層Map。内層Map額key為鎖鍵值,value為鎖的加入時間
     */
    private ConcurrentMap<String, ConcurrentMap<Object, Long>> lockMap = new ConcurrentHashMap<String, ConcurrentMap<Object, Long>>();
    
    /**
     * 守護線程: 用來清理過期記憶體緩存(如果加鎖的用戶端由于各種原因沒有顯示解鎖,則可能出現其他服務無法擷取鎖的情況)
     */
    private Thread daemonThread;
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLockServiceImpl.class);
    
    /**
     * 是否終止守護線程的辨別
     */
    private volatile boolean stop = false;
    
    /**
     * 清理失效鎖的線程
     *
     */
    private class ClearExpireLockThread extends Thread {
        
        @Override
        public void run() {
            Iterator<Entry<String, ConcurrentMap<Object, Long>>> outerIterator = null;
            Iterator<Entry<Object, Long>> innerIterator = null;
            
            // 清理超過逾時時間的鎖
            while (!stop) {
                synchronized (semaphore) {
                    long expireNanoTimes = System.nanoTime() - AUTO_EXPIRE_TIME;    // 算出逾時時間,小于該時間的緩存都應該被移除
                    outerIterator = lockMap.entrySet().iterator();
                    while (outerIterator.hasNext()) {
                        Entry<String, ConcurrentMap<Object, Long>> entrySet = outerIterator.next();
                        innerIterator = entrySet.getValue().entrySet().iterator();
                        boolean allDeleted = true;  // 是否全部删除的辨別,預設設為true
                        while (innerIterator.hasNext()) {
                            Entry<Object, Long> innerEntry = innerIterator.next();
                            if (expireNanoTimes > innerEntry.getValue()) {
                                innerIterator.remove();
                                LOGGER.info("守護線程移除類型為【{}】鍵值為【{}】的鎖......", entrySet.getKey(), innerEntry.getKey());
                            } else {
                                allDeleted = false;
                            }
                        }
                        
                        // 如果所類型下的所有鎖都被清除,則鎖類型也該被移除
                        if (allDeleted) {
                            outerIterator.remove();
                            LOGGER.info("守護線程移除類型為【{}】的鎖......", entrySet.getKey());
                        }
                    }
                }
                
                try {
                    // 如果逾時時間為1秒,則等待千分之一秒
                    Thread.sleep(AUTO_EXPIRE_TIME / 1000000000l);
                } catch (InterruptedException e) {
                }
            }
        }
    }
    
    /**
     * 終止守護線程
     */
    @PreDestroy
    public void stopDeamonThread() {
        this.stop = true;
        this.daemonThread.interrupt();
    }
    
    /**
     * 初始化守護線程,用來掃描移除逾時的記憶體鎖
     */
    @PostConstruct
    public void initDeamonThread() {
        daemonThread = new ClearExpireLockThread();
        daemonThread.setDaemon(true);
        daemonThread.start();
    }

    @Override
    public boolean tryLock(String lockType, Object key) {
        synchronized (this.semaphore) {
            ConcurrentMap<Object, Long> map = (ConcurrentMap<Object, Long>) this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<Object, Long>();
                this.lockMap.put(lockType, map);
            }
            
            // 這裡的value值設定為加鎖的初始時間
            return (map.putIfAbsent(key, System.nanoTime()) == null);
        }
    }

    @Override
    public boolean isLockTypeEmpty(String lockType){
        return MapUtils.isEmpty(this.lockMap.get(lockType));
    }
    
    @Override
    public void unlock(String lockType, Object key) {
        synchronized (this.semaphore) {
            ConcurrentMap<Object, Long> map = (ConcurrentMap<Object, Long>) this.lockMap.get(lockType);
            if (map != null) {
                map.remove(key);
                LOGGER.info("手工釋放類型為【{}】鍵值為【{}】的鎖......", lockType, key);
            }
        }
    }
}           

​ 定義一個分布式鎖管理服務實作

​ 定義一個

DistributeLock

服務,該服務作為本地服務,用來實作鎖等待以及減少Hessian鎖請求調用。在本地鎖服務中注入原來的記憶體鎖Hessian服務實作。具體代碼如下:

/**
 * 分布式鎖管理類
 *
 */
@Service
public class DistributeLock {
    /**
     * 注入hessian接口的實作類
     */
    @Resource(name="moemoryLockServiceFacade")
    private MemoryLockService memoryLockService;
    
    private Object semaphore = new Object(); 

    /**
     * 記憶體鎖結構,雙層Map 首層Map的Key存鎖類型,value為内層Map。内層Map額key為鎖鍵值,value為鎖住的嘗試遠端hessian調用擷取鎖的線程
     */
    private ConcurrentMap<String, ConcurrentMap<String, Thread>> lockMap = new ConcurrentHashMap<String, ConcurrentMap<String, Thread>>();

    /**
     * 判斷是否能夠獲得鎖,不阻塞立即傳回
     * @param lockType 鎖類型
     * @param key  鎖的鍵值
     * @return  true,能夠獲得鎖.false,不能獲得鎖.
     */
    private boolean tryLock(String lockType, String key) {
        // 提升效率,先内部map判斷是否存在鎖,如果存在,則直接等待
        synchronized (this.semaphore) {
            ConcurrentMap<String, Thread> map = (ConcurrentMap<String, Thread>) this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<String, Thread>();
                this.lockMap.put(lockType, map);
            }
            Thread t = map.putIfAbsent(key, Thread.currentThread());
            
             // 單個服務隻有首先獲得本機記憶體鎖的線程才有機會去遠端調用hessian服務判斷是否有鎖
            if (t != null && Thread.currentThread() != t) {
                return false;
            }
        }
        
        return memoryLockService.tryLock(lockType, key);
    }
    
    /**
     * 獲得鎖,在獲得鎖之前阻塞
     * @param lockType  鎖類型
     * @param key   鎖鍵值
     * @param timeout 逾時時間
     * @throws TimeoutException 逾時抛出逾時異常
     */
    public void lock(String lockType, String key, long timeout) throws TimeoutException {
        // 是否沒有逾時設定,當傳入的逾時時間為負數或者為0時,表示沒有逾時時間
        boolean noTimeOutFlag = false;
        if (timeout <= 0L) {
            noTimeOutFlag = true;
        }

        long expireTime = System.currentTimeMillis() + timeout;
        do {
            if (tryLock(lockType, key))
                return;
            try {
                Thread.sleep(100L);
            } catch (InterruptedException localInterruptedException) {
            }
        } while ((noTimeOutFlag) || (System.currentTimeMillis() < expireTime));

        synchronized(this.semaphore) {
            // 需釋放目前線程占用的本地記憶體鎖
            this.lockMap.get(lockType).remove(key, Thread.currentThread());
        }
        
        throw new TimeoutException();
    }

    /**
     * 是否指定的鎖類型,目前鎖的數量為空
     * @param lockType 鎖類型
     * @return true,目前鎖類型的鎖的數量為空;false,目前鎖類型的鎖鎖的數量不為空
     */
    public boolean isLockTypeEmpty(String lockType){
        // 直接内部判斷
        if (MapUtils.isNotEmpty(lockMap.get(lockType))) {
            return false;
        }
        
        // 内部判斷成功還需遠端調用判斷
        return memoryLockService.isLockTypeEmpty(lockType);
    }
    
    /**
     * 釋放鎖
     * @param lockType 鎖類型
     * @param key       鎖的鍵值
     */
    public void unlock(String lockType, String key) {
        // 移除本機記憶體鎖模型
        synchronized (this.semaphore) {
            ConcurrentMap<String, Thread> map = (ConcurrentMap<String, Thread>) this.lockMap.get(lockType);
            if (map != null)
                map.remove(key);
        }
        
        // 遠端調用釋放鎖
        memoryLockService.unlock(lockType, key);
    }
}
           

​ 好了,分布式鎖的Demo順利完成。使用的時候隻要将原來的

MemoryLock

替換成

DistributeLock

即可。

展望

​ 分布式鎖的實作就到這裡,其實作的本質在于将分布式轉變成非分布式。這裡也可以說我是鑽了"分布式"的空子

​ 那麼既然分布式鎖的最終實作也是通過記憶體鎖實作的,且利用了主節點的特性。那麼其實我們在實作分布式鎖之後,還有下面兩個方向可以優化:

  1. 鎖管理: 可以增加一個鎖管理頁面,來展示目前記憶體中存在的鎖,以及移除需要馬上移除的鎖
  2. 主節點替換: 目前的分布式鎖的實作還是依賴于主節點。考慮到主節點可能也挂掉,需要增加主節點可以動态切換的功能。嚴格上來講這個是分布式改造劇集1應該實作的功能

後續

​ 好了,分布式鎖的改造暫且到此。可以看到其實分布式其實并沒有我們想象的這麼複雜,分布式技術也沒有特别地遙不可及。面對不斷革新的技術,我們應該除了拿來主義之外,多思考,真正了解技術背後的實作原理。就像我一直認為的:相比于用輪子造輪子的能力要重要的多!

黎明前最黑暗,成功前最絕望!