天天看點

RocketMQ源碼分析之NameServer核心元件KVConfigManager源碼分析

作者:程式員阿龍

一、前言

前文我們介紹了NameServer有那些核心元件,我們再對這些核心元件一一進行分析,本文主要分析KVConfigManager元件,用來管理namespace命名空間的多個kv配置資料;

二、源碼導讀

KVConfigManager元件主要是用來管理namespace命名空間的配置資料,我們就看幾個核心方法,其餘的方法都差不多;

  • load:加載磁盤資料至記憶體;
  • putKVConfig:添加資料;
  • persist:持久化資料;
  • getKVConfig:根據namespace和key擷取資料;

注:加載資料之前有講解過,本文本進行分析;

public class KVConfigManager {

    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvController namesrvController;

    // jdk提供的讀寫鎖
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // 基于記憶體中的hashmap資料結構來進行kv配置的存放
    // 每個namespace命名空間都會有多個kv配置,是這樣的一個資料結構
    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
        new HashMap<String, HashMap<String, String>>();

    public KVConfigManager(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

		// 加載磁盤資料至記憶體
    public void load() {
        String content = null;
        try {
            // kv config path檔案裡存放的是json格式的一個大字元串,包含了所有的kv配置
            content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
        } catch (IOException e) {
            log.warn("Load KV config table exception", e);
        }
        if (content != null) {
            // 他會基于json序列化格式進行反序列化,從json格式轉換為hashmap<string, hashmap<string, string>>
            KVConfigSerializeWrapper kvConfigSerializeWrapper =
                KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
            if (null != kvConfigSerializeWrapper) {
                this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
                log.info("load KV config table OK");
            }
        }
    }

		// 添加資料
    public void putKVConfig(final String namespace, final String key, final String value) {
        try {
            // 對源碼做一些分析,這個搞法不太好,預設用的是非線程安全的hashmap
            // 對hashmap進行讀寫做線程安全控制,讀寫鎖
            // 鎖粒度太粗了,一個大map有很多namespace的配置資料在裡面,就一把讀寫鎖
            // 如果你是對不同的namespace的配置資料進行讀寫操作,加的鎖都是一把鎖,粒度太粗了,線程并發性能會不好
            // 比較正确的一個做法應該是,使用細粒度的鎖,每個namespace對應一把鎖,每次加鎖就針對這個namespace加鎖就可以了

            this.lock.writeLock().lockInterruptibly();
            try {
                HashMap<String, String> kvTable = this.configTable.get(namespace);
                if (null == kvTable) {
                    kvTable = new HashMap<String, String>();
                    this.configTable.put(namespace, kvTable);
                    log.info("putKVConfig create new Namespace {}", namespace);
                }

                final String prev = kvTable.put(key, value);
                if (null != prev) {
                    log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",
                        namespace, key, value);
                } else {
                    log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
                        namespace, key, value);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("putKVConfig InterruptedException", e);
        }
				// 持久化資料
        this.persist();
    }

		// 持久化資料
    public void persist() {
        try {
            // 為什麼說在進行持久化的時候加的是讀鎖呢?
            // 持久化的動作本身就是一個讀的動作在裡面,讀取記憶體裡的資料寫入到磁盤裡去
            // 我在進行持久化的動作過程中,我希望的是你的資料是不能變化的
            this.lock.readLock().lockInterruptibly();
            try {
                KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
                kvConfigSerializeWrapper.setConfigTable(this.configTable);

                String content = kvConfigSerializeWrapper.toJson();

                if (null != content) {
                    MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());
                }
            } catch (IOException e) {
                log.error("persist kvconfig Exception, "
                    + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("persist InterruptedException", e);
        }

    }

		// 根據namespace和key擷取資料
    public String getKVConfig(final String namespace, final String key) {
        try {
            this.lock.readLock().lockInterruptibly();
            try {
                HashMap<String, String> kvTable = this.configTable.get(namespace);
                if (null != kvTable) {
                    return kvTable.get(key);
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getKVConfig InterruptedException", e);
        }

        return null;
    }
}           

三、持久化資料

添加和查詢資料沒有什麼額外的代碼需要分析的,是以這裡不進行再次分析了;

這裡主要需要看一下的是NameServer是如何對檔案進行持久化的,他這裡采用了臨時檔案的概念;

public void persist() {
    try {
        // 為什麼說在進行持久化的時候加的是讀鎖呢?
        // 持久化的動作本身就是一個讀的動作在裡面,讀取記憶體裡的資料寫入到磁盤裡去
        // 我在進行持久化的動作過程中,我希望的是你的資料是不能變化的
        this.lock.readLock().lockInterruptibly();
        try {
            KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
            kvConfigSerializeWrapper.setConfigTable(this.configTable);

            String content = kvConfigSerializeWrapper.toJson();

            if (null != content) {
                MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());
            }
        } catch (IOException e) {
            log.error("persist kvconfig Exception, "
                + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
        } finally {
            this.lock.readLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("persist InterruptedException", e);
    }

}           
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";           
public static void string2File(final String str, final String fileName) throws IOException {
    
    // 磁盤持久化是怎麼來做的,這塊也是比較巧妙一些的,先往kv config path臨時檔案裡寫入
    String tmpFile = fileName + ".tmp";
    string2FileNotSafe(str, tmpFile);

    // 舊内容寫入到.bak備份檔案裡去
    String bakFile = fileName + ".bak"; 
    
    // 把老檔案的内容讀取出來,寫入到.bak備份檔案裡去
    String prevContent = file2String(fileName);
    if (prevContent != null) {
        string2FileNotSafe(prevContent, bakFile);
    }

    // 删除老檔案
    File file = new File(fileName);
    file.delete();

    // 檔案重命名
    file = new File(tmpFile);
    file.renameTo(new File(fileName));
}           

三、RocketMQ中KVConfigManager元件存在的問題

  1. RocketMQ作者使用讀寫鎖的想法是好的,但是沒有做鎖粒度的一個細化,導緻鎖的資料比較多,降低了線程并發性能;
  2. 持久化資料加的讀鎖,本意是好的,因為持久化資料是比較耗時的操作,是以RocketMQ作者想加讀鎖,這樣最起碼資料是可讀的。但是會存在一定的問題,如果2個寫操作同時完成,同時加了寫鎖,那麼持久化資料的時候不是寫操作1覆寫寫操作2,就是寫操作2覆寫寫操作1的資料,肯定會少資料,這時機器重新開機,從磁盤加載到記憶體中的資料就會少資料;

四、總結

對于開源項目的源碼我們需要帶着疑問去看,作者為什麼要這麼去寫,如:讀寫鎖,作者就是想讀讀是不互斥的,可以增加線程的并發能力,但是作者可能疏忽了,使鎖的粒度比較粗;

持久化加讀鎖我們可以猜測作者可能是想因為磁盤持久化畢竟耗時,在加上之前已經加了寫鎖,是以這裡就加了讀鎖,隻不過在極端情況下還是會有問題,如果真的要持久化的話,可以日志先行再每隔一段時間批量刷盤,因為順序寫速度比這樣每次随機寫肯定會好一點;這也是一個方案吧,如果有好的想法可以在下方評論。

繼續閱讀