概览
本文跟一下leaf的雪花模式的算法
关注点:
- workerid生成
- 时间回拨问题解决
leaf是美团开源的分布式id 项目
源码分析
- 首先从server的Controller出发,看一下雪花算法生成的方法
-
@RequestMapping(value = "/api/snowflake/get/{key}") public String getSnowflakeId(@PathVariable("key") String key) { return get(key, snowflakeService.getId(key)); }
- 进入到
snowflakeService
- 发现核心生成id的类是
SnowflakeIDGenImpl
-
private final long workerIdBits = 10L; private final long sequenceBits = 12L;
- 正常的workerid长度1024,以及序号4096
- 时间起始时间是
//Thu Nov 04 2010 09:42:54 GMT+0800 (中国标准时间) this(zkAddress, port, 1288834974657L);
-
public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) { this.twepoch = twepoch; Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime"); final String ip = Utils.getIp(); SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress);//获得zookeeper的连接其 LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port); boolean initFlag = holder.init();//在这里是生成id if (initFlag) { workerId = holder.getWorkerID();//获得当前生成器机器的id LOGGER.info("START SUCCESS USE ZK WORKERID-{}", workerId); } else { Preconditions.checkArgument(initFlag, "Snowflake Id Gen is not init ok"); } Preconditions.checkArgument(workerId >= 0 && workerId <= maxWorkerId, "workerID must gte 0 and lte 1023"); }
- 来看看
方法SnowflakeZookeeperHolder.init()
-
public boolean init() { try { CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000); curator.start(); Stat stat = curator.checkExists().forPath(PATH_FOREVER); if (stat == null) { //不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据 zk_AddressNode = createNode(curator); //worker id 默认是0 updateLocalWorkerID(workerID); //定时上报本机时间给forever节点 ScheduledUploadData(curator, zk_AddressNode); return true; } else { Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001 Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001) //存在根节点,先检查是否有属于自己的根节点 List<String> keys = curator.getChildren().forPath(PATH_FOREVER); for (String key : keys) { String[] nodeKey = key.split("-"); realNode.put(nodeKey[0], key); nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1])); } Integer workerid = nodeMap.get(listenAddress); if (workerid != null) { //有自己的节点,zk_AddressNode=ip:port zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress); workerID = workerid;//启动worder时使用会使用 if (!checkInitTimeStamp(curator, zk_AddressNode)) { throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time"); } //准备创建临时节点 doService(curator); updateLocalWorkerID(workerID); LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID); } else { //表示新启动的节点,创建持久节点 ,不用check时间 String newNode = createNode(curator); zk_AddressNode = newNode; String[] nodeKey = newNode.split("-"); workerID = Integer.parseInt(nodeKey[1]); doService(curator); updateLocalWorkerID(workerID); LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID); } } } catch (Exception e) { LOGGER.error("Start node ERROR {}", e); try { Properties properties = new Properties(); properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + "")))); workerID = Integer.valueOf(properties.getProperty("workerID")); LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID); } catch (Exception e1) { LOGGER.error("Read file error ", e1); return false; } } return true; }
-
逻辑梳理 连接zookeeper if(业务节点为空) 创建根节点 记录根节点信息 else(业务节点不为空) 获取业务节点下的所有子节点,放到map中//ip:port->00001 检查有没有自己 if 有://这里是防止机器宕机重启,从而可以来获取wokerid workid 则就是zookeeper的序号 else 没有: 创建持久有序节点 获取workerid
- workerid是最关键的,其他都是次要
- 再来看雪花算法如何获取id
- 这里从leafController的
来获取idgetId()
- 一路跟到SnowflakeIDGenImpl
-
@Override public synchronized Result get(String key) { long timestamp = timeGen(); if (timestamp < lastTimestamp) {//时钟回拨发生 这里单位是毫秒 long offset = lastTimestamp - timestamp; if (offset <= 5) { try { wait(offset << 1); // 等待最多10ms timestamp = timeGen(); if (timestamp < lastTimestamp) { //如果还出现问题,则是无法解决,召唤程序员 return new Result(-1, Status.EXCEPTION); } } catch (InterruptedException e) { LOGGER.error("wait interrupted"); return new Result(-2, Status.EXCEPTION); } } else {//超过5ms, 直接召唤程序员 return new Result(-3, Status.EXCEPTION); } } if (lastTimestamp == timestamp) {//时间相同, 则seq累加呗 sequence = (sequence + 1) & sequenceMask; if (sequence == 0) { //seq 为0的时候表示是下一毫秒时间开始对seq做随机 sequence = RANDOM.nextInt(100); timestamp = tilNextMillis(lastTimestamp); } } else { //如果是新的ms开始 sequence = RANDOM.nextInt(100);//这里的随机启动我有点不太明白 } lastTimestamp = timestamp; long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;//移位处理,组成64位的long return new Result(id, Status.SUCCESS); }
- 可以看到, leaf解决时钟回拨的思路就是 延迟等待 , 不行就召唤程序员
- 除了雪花模式,leaf也支持号段模式,因为之前详细分析了tinyid, 所以这里就不再解释了
- 本文的目的也是在关注leaf在雪花模式中如何来生成workerid以及解决时钟回拨问题
总结
- 用zookeeper来生成workerid, 通过 持久有序节点, 保证生成器宕机重启,还能使用之前的workerid
- 延迟等待来解决时间回拨问题
源码地址
https://github.com/Meituan-Dianping/Leaf