天天看點

利用Redis實作叢集或開發環境下SnowFlake自動配置機器号分布式雪花ID不同機器ID自動化配置

利用Redis實作叢集或開發環境下SnowFlake自動配置機器号

前言:

SnowFlake 雪花ID 算法是推特公司推出的著名分布式ID生成算法。利用預先配置設定好的機器ID,工作區ID,機器時間可以生成全局唯一的随時間趨勢遞增的Long類型ID.長度在17-19位。随着時間的增長而遞增,在MySQL資料庫中,InnoDB存儲引擎可以更快的插入遞增的主鍵。而不像UUID那樣因為寫入是亂序的,InnoDB不得不頻繁的做頁分裂操作,耗時且容易産生碎片。

對于SnowFlake 的原理介紹,可以參考該文章:了解分布式id生成算法SnowFlake

了解了雪花的基本原理之後,我們試想:在分布式叢集或者開發環境下,不同服務之間/相同服務的不同機器之間應該如何産生差異呢?有以下幾種方案:

通過在 yml 檔案中配置不同的參數,啟動 spring 容器時通過讀取該參數來實作不同服務與不同機器的workerId不同。但是這裡不友善新增機器/新同僚的自動化配置

向第三方應用如zookeeper、Redis中注冊ID,以獲得唯一的ID。

對于開發環境,可以取機器的IP後三位。因為大家在一個辦公室的話IP後三位肯定是0-255之前不重複。但是這樣機器ID需要8個Bit,留給資料中心的位數就隻有4個了。

本方案結合了以上方案的優點,按照業務的實際情況對雪花中的資料中心和機器ID所占的位數進行調整:資料中心占4Bit,範圍從0-15。機器ID占6Bit,範圍從0-63

。對不同的服務在yml中配置服務名稱,以服務編号作為資料中心ID。如果按照開發+測試+生産環境區分的話,可以部署5個不同的服務。application.yml 中配置如下的參數

分布式雪花ID不同機器ID自動化配置

snowFlake:

dataCenter: 1 # 資料中心的id

appName: test # 業務類型名稱

而機器ID采用以下的政策實作:

擷取目前機器的IP位址 localIp,模32,獲得0-31的整數 machineId

向Redis中注冊,使用 appName + dataCenter + machineId 作為key ,以本機IP localIp 作為 value。

注冊成功後,設定鍵過期時間 24 h,并開啟一個計時器,在 23h 後更新注冊的 key

如果注冊失敗,可能有以下兩個原因:

上次服務異常中斷,沒有來得及删除key。這裡的解決方案是通過key擷取value,如果value和localIp一緻,則仍然視為注冊成功

IP和别人的IP模32的結果一樣,導緻機器ID沖突。這是就周遊 0-31 擷取其中為注冊的數字作為本機的機器号

如果不幸Redis連接配接失敗,系統将從32-63之間随機擷取ID,并使用 log.error() 列印醒目的提示消息這裡建議IDEA + Grep Console 實作不同級别的日志不同前景色顯示,友善及時擷取錯誤資訊

當服務停止前,向Redis發送請求,删除該Key的占用。

具體的代碼如下:

自動配置機器ID,并在容器啟動時放入SnowFlake執行個體對象

package cn.keats.util;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import javax.annotation.PreDestroy;

import javax.annotation.Resource;

import java.net.InetAddress;

import java.net.UnknownHostException;

import java.util.Date;

import java.util.Timer;

import java.util.TimerTask;

@Configuration

@Slf4j

public class MachineIdConfig {

@Resource
private JedisPool jedisPool;

@Value("${snowFlake.dataCenter}")
private Integer dataCenterId;

@Value("${snowFlake.appName}")
private String APP_NAME;

/**
 * 機器id
 */
public static Integer machineId;
/**
 * 本地ip位址
 */
private static String localIp;

/**
 * 擷取ip位址
 *
 * @return
 * @throws UnknownHostException
 */
private String getIPAddress() throws UnknownHostException {
    InetAddress address = InetAddress.getLocalHost();
    return address.getHostAddress();
}

/**
 * hash機器IP初始化一個機器ID
 */
@Bean
public SnowFlake initMachineId() throws Exception {
    localIp = getIPAddress(); // 192.168.0.233

    Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));// 1921680233
    //
    machineId = ip_.hashCode() % 32;// 0-31
    // 建立一個機器ID
    createMachineId();

    log.info("初始化 machine_id :{}", machineId);
    return new SnowFlake(machineId, dataCenterId);
}

/**
 * 容器銷毀前清除注冊記錄
 */
@PreDestroy
public void destroyMachineId() {
    try (Jedis jedis = jedisPool.getResource()) {
        jedis.del(APP_NAME + dataCenterId + machineId);
    }
}
           
/**
 * 主方法:首先擷取機器 IP 并 % 32 得到 0-31
 * 使用 業務名 + 組名 + IP 作為 Redis 的 key,機器IP作為 value,存儲到Redis中
 *
 * @return
 */
public Integer createMachineId() {
    try {
        // 向redis注冊,并設定逾時時間
        log.info("注冊一個機器ID到Redis " + machineId + " IP:" + localIp);
        Boolean flag = registerMachine(machineId, localIp);
        // 注冊成功
        if (flag) {
            // 啟動一個線程更新逾時時間
            updateExpTimeThread();
            // 傳回機器Id
            log.info("Redis中端口沒有沖突 " + machineId + " IP:" + localIp);
            return machineId;
        }
        // 注冊失敗,可能原因 Hash%32 的結果沖突
        if (!checkIfCanRegister()) {
            // 如果 0-31 已經用完,使用 32-64之間随機的ID
            getRandomMachineId();
            createMachineId();
        } else {
            // 如果存在剩餘的ID
            log.warn("Redis中端口沖突了,使用 0-31 之間未占用的Id " + machineId + " IP:" + localIp);
            createMachineId();
        }
    } catch (Exception e) {
        // 擷取 32 - 63 之間的随機Id
        // 傳回機器Id
        log.error("Redis連接配接異常,不能正确注冊雪花機器号 " + machineId + " IP:" + localIp, e);
        log.warn("使用臨時方案,擷取 32 - 63 之間的随機數作為機器号,請及時檢查Redis連接配接");
        getRandomMachineId();
        return machineId;
    }
    return machineId;
}

/**
 * 檢查是否被注冊滿了
 *
 * @return
 */
private Boolean checkIfCanRegister() {
    // 判斷0~31這個區間段的機器IP是否被占滿
    try (Jedis jedis = jedisPool.getResource()) {
        Boolean flag = true;
        for (int i = 0; i < 32; i++) {
            flag = jedis.exists(APP_NAME + dataCenterId + i);
            // 如果不存在。設定機器Id為這個不存在的數字
            if (!flag) {
                machineId = i;
                break;
            }
        }
        return !flag;
    }
}

/**
 * 1.更新超時時間
 * 注意,更新前檢查是否存在機器ip占用情況
 */
private void updateExpTimeThread() {
    // 開啟一個線程執行定時任務:
    // 每23小時更新一次逾時時間
    new Timer(localIp).schedule(new TimerTask() {
        @Override
        public void run() {
            // 檢查緩存中的ip與本機ip是否一緻, 一緻則更新時間,不一緻則重新擷取一個機器id
            Boolean b = checkIsLocalIp(String.valueOf(machineId));
            if (b) {
                log.info("IP一緻,更新逾時時間 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                try (Jedis jedis = jedisPool.getResource()) {
                    jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24 );
                }
            } else {
                // IP沖突
                log.info("重新生成機器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                // 重新生成機器ID,并且更改雪花中的機器ID
                getRandomMachineId();
                // 重新生成并注冊機器id
                createMachineId();
                // 更改雪花中的機器ID
                SnowFlake.setWorkerId(machineId);
                // 結束目前任務
                log.info("Timer->thread->name:{}", Thread.currentThread().getName());
                this.cancel();
            }
        }
    }, 10 * 1000, 1000 * 60 * 60 * 23);
}

/**
 * 擷取32-63随機數
 */
public void getRandomMachineId() {
    machineId = (int) (Math.random() * 31) + 31;
}
           
/**
 * 檢查Redis中對應Key的Value是否是本機IP
 *
 * @param mechineId
 * @return
 */
private Boolean checkIsLocalIp(String mechineId) {
    try (Jedis jedis = jedisPool.getResource()) {
        String ip = jedis.get(APP_NAME + dataCenterId + mechineId);
        log.info("checkIsLocalIp->ip:{}", ip);
        return localIp.equals(ip);
    }
}

/**
 * 1.注冊機器
 * 2.設定逾時時間
 *
 * @param machineId 取值為0~31
 * @return
 */
private Boolean registerMachine(Integer machineId, String localIp) throws Exception {
    // try with resources 寫法,出異常會釋放括号内的資源 Java7特性
    try (Jedis jedis = jedisPool.getResource()) {
        // key 業務号 + 資料中心ID + 機器ID value 機器IP
        Long result = jedis.setnx(APP_NAME + dataCenterId + machineId, localIp);
        if(result == 1){
            // 過期時間 1 天
            jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
            return true;
        } else {
            // 如果Key存在,判斷Value和目前IP是否一緻,一緻則傳回True
            String value = jedis.get(APP_NAME + dataCenterId + machineId);
            if(localIp.equals(value)){
                // IP一緻,注冊機器ID成功
                jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                return true;
            }
            return false;
        }
    }
}           

}

雪花ID:

/**

  • 功能:分布式ID生成工具類

    *

*/

public class SnowFlake {

/**
 * 開始時間截 (2019-09-08) 服務一旦運作過之後不能修改。會導緻ID生成重複
 */
private final long twepoch = 1567872000000L;

/**
 * 機器Id所占的位數 0 - 64
 */
private final long workerIdBits = 6L;

/**
 * 工作組Id所占的位數 0 - 16
 */
private final long dataCenterIdBits = 4L;

/**
 * 支援的最大機器id,結果是63 (這個移位算法可以很快的計算出幾位二進制數所能表示的最大十進制數)
 */
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

/**
 * 支援的最大資料辨別id,結果是15
 */
private final long maxDatacenterId = -1L ^ (-1L << dataCenterIdBits);

/**
 * 序列在id中占的位數
 */
private final long sequenceBits = 12L;

/**
 * 機器ID向左移12位
 */
private final long workerIdShift = sequenceBits;

/**
 * 資料辨別id向左移17位(12+5)
 */
private final long datacenterIdShift = sequenceBits + workerIdBits;

/**
 * 時間截向左移22位(5+5+12)
 */
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;

/**
 * 生成序列的掩碼,這裡為4095 (0b111111111111=0xfff=4095)
 */
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

/**
 * 工作機器ID(0~63)
 */
private static long workerId;

/**
 * 資料中心ID(0~16)
 */
private long datacenterId;

/**
 * 毫秒内序列(0~4095)
 */
private long sequence = 0L;

/**
 * 上次生成ID的時間截
 */
private long lastTimestamp = -1L;

//==============================Constructors=====================================

/**
 * 構造函數
 *
 * @param workerId     工作ID (0~63)
 * @param datacenterId 資料中心ID (0~15)
 */
public SnowFlake(long workerId, long datacenterId) {
    if (workerId > maxWorkerId || workerId < 0) {
        throw new IllegalArgumentException(String.format("機器ID必須小于 %d 且大于 0", maxWorkerId));
    }
    if (datacenterId > maxDatacenterId || datacenterId < 0) {
        throw new IllegalArgumentException(String.format("工作組ID必須小于 %d 且大于 0", maxDatacenterId));
    }
    this.workerId = workerId;
    this.datacenterId = datacenterId;
}

/**
 * 構造函數
 *
 */
public SnowFlake() {
    this.workerId = 0;
    this.datacenterId = 0;
}

/**
 * 獲得下一個ID (該方法是線程安全的)
 *
 * @return SnowFlakeId
 */
public synchronized long nextId() {
    long timestamp = timeGen();

    //如果目前時間小于上一次ID生成的時間戳,說明系統時鐘回退過這個時候應當抛出異常
    if (timestamp < lastTimestamp) {
        throw new RuntimeException(
                String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
    }

    // 如果是同一時間生成的,則進行毫秒内序列
    if (lastTimestamp == timestamp) {
        sequence = (sequence + 1) & sequenceMask;
        // 毫秒内序列溢出
        if (sequence == 0) {
            // 阻塞到下一個毫秒,獲得新的時間戳
            timestamp = tilNextMillis(lastTimestamp);
        }
    }
    //時間戳改變,毫秒内序列重置
    else {
        sequence = 0L;
    }

    // 上次生成ID的時間截
    lastTimestamp = timestamp;

    // 移位并通過或運算拼到一起組成64位的ID
    return ((timestamp - twepoch) << timestampLeftShift) //
            | (datacenterId << datacenterIdShift) //
            | (workerId << workerIdShift) //
            | sequence;
}

/**
 * 阻塞到下一個毫秒,直到獲得新的時間戳
 *
 * @param lastTimestamp 上次生成ID的時間截
 * @return 目前時間戳
 */
protected long tilNextMillis(long lastTimestamp) {
    long timestamp = timeGen();
    while (timestamp <= lastTimestamp) {
        timestamp = timeGen();
    }
    return timestamp;
}

/**
 * 傳回以毫秒為機關的目前時間
 *
 * @return 目前時間(毫秒)
 */
protected long timeGen() {
    return System.currentTimeMillis();
}

public long getWorkerId() {
    return workerId;
}

public static void setWorkerId(long workerId) {
    SnowFlake.workerId = workerId;
}

public long getDatacenterId() {
    return datacenterId;
}

public void setDatacenterId(long datacenterId) {
    this.datacenterId = datacenterId;
}           

Redis 配置

public class RedisConfig {

@Value("${spring.redis.host}")
private String host;

@Value("${spring.redis.port:6379}")
private Integer port;

@Value("${spring.redis.password:-1}")
private String password;

@Bean
public JedisPool jedisPool() {
    // 1.設定連接配接池的配置對象
    JedisPoolConfig config = new JedisPoolConfig();
    // 設定池中最大連接配接數
    config.setMaxTotal(50);
    // 設定空閑時池中保有的最大連接配接數
    config.setMaxIdle(10);
    config.setMaxWaitMillis(3000L);
    config.setTestOnBorrow(true);
    log.info(password);
    // 2.設定連接配接池對象
    if("-1".equals(password)){
        log.info("Redis不通過密碼連接配接");
        return new JedisPool(config, host, port,0);
    } else {
        log.info("Redis通過密碼連接配接" + password);
        return new JedisPool(config, host, port,0, password);
    }
}           

使用方法

項目中引入 Redis 、 Jedis 依賴

複制上面兩個類到項目until包下

application.yml 配置服務名稱,機器序号,Redis賬号,密碼

配置Jedis,使得項目啟動時池中有Redis連接配接對象

啟動項目

在需要生成ID的類中注入

@Autowired
private SnowFlake snowFlake;
// 生産ID
snowFlake.nextId(); 方法生産ID           

原文位址

https://www.cnblogs.com/keatsCoder/p/12129279.html