天天看点

Java并发编程学习-日记11、ZooKeeper实现分布式命名服务器分布式的ID生成器方案ZooKeeper分布式锁目前分布式锁,比较成熟、主流的方案有两种:

  • 命名服务是为系统中的资源提供标识能力。ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名。
  • 分布式命名服务器应用场景:
  1. 分布式API目录:为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,使用ZooKeeper维护的全局服务接口API的地址列表。
  2. 分布式ID生成器:在分布式系统中,为每一个数据资源提供唯一性的ID标识功能。
  3. 分布式节点命名。

分布式的ID生成器方案

  1. Java的UUID。UUID是经由一定的算法机器生成的,为了保证UUID的唯一性,规范定义了包括网卡MAC地址、时间戳、名字空间(Namespace)、随机或伪随机数、时序等元素,以及从这些元素生成UUID的算法。一个UUID是16字节长的数字,一共128位。转成字符串之后,它会变成一个36字节的字符串。
  • UUID的优点是本地生成ID,不需要进行远程调用,时延低,性能高。
  • UUID的缺点是UUID过长,16字节共128位,通常以36字节长的字符串来表示,在很多应用场景不适用,例如,由于UUID没有排序,无法保证趋势递增,因此用于数据库索引字段的效率就很低,添加记录存储入库时性能差。
  1. 分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生成全局唯一的ID。
  2. Twitter的SnowFlake算法。
  3. ZooKeeper生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID。
  4. MongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库,每插入一条记录会自动生成全局唯一的一个“_id”字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID。

ZooKeeper分布式ID生成器实践:ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,会记录每个子节点创建的先后顺序,这个顺序编号是分布式同步的,也是全局唯一的。节点创建完成后,会返回节点的完整路径,生成的序号放置在路径的末尾,一般为10位数字字符。可以通过截取路径尾部数字作为新生成的ID。

public class IDMaker {

    CuratorFramework client = null;

    public void init() {

        //创建客户端

        client = ClientFactory.createSimple(“127.0.0.1:2181”);

        //启动客户端实例,连接服务器

        client.start();

    }

    public void destroy() {

        if (null != client) {

            client.close();

        }

    public static SnowflakeIdGenerator instance = new SnowflakeIdGenerator();

    public synchronized void init(long workerId) {

        if (workerId > MAX_WORKER_ID) {

            // zk分配的workerId过大

            throw new IllegalArgumentException("woker Id wrong: " + workerId);

        }

        instance.workerId = workerId;

    }

    private SnowflakeIdGenerator() {

    }

    private static final long START_TIME = 1483200000000L;

    private static final int WORKER_ID_BITS = 13;

    private final static int SEQUENCE_BITS = 10;

    private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);

    private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);

    private final static long APP_HOST_ID_SHIFT = SEQUENCE_BITS;

    private final static long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + APP_HOST_ID_SHIFT;

    private long workerId;

    private long lastTimestamp = -1L;

    private long sequence = 0L;

    public Long nextId() {

        return generateId();

    }

    private synchronized long generateId() {

        long current = System.currentTimeMillis();

        if (current < lastTimestamp) {

            // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1

            return -1;

        }

        if (current == lastTimestamp) {

            // 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1

            sequence = (sequence + 1) & MAX_SEQUENCE;

            if (sequence == MAX_SEQUENCE) {

                // 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳

                current = this.nextMs(lastTimestamp);

            }

        } else {

            // 当前的时间戳已经是下一个毫秒

            sequence = 0L;

        }

        // 更新上次生成id的时间戳

        lastTimestamp = current;

        // 进行移位操作生成int64的唯一ID

        //时间戳右移动23位

        long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;

        //workerId 右移动10位

        long workerId = this.workerId << APP_HOST_ID_SHIFT;

        return time | workerId | sequence;

    }

    private long nextMs(long timeStamp) {

        long current = System.currentTimeMillis();

        while (current <= timeStamp) {

            current = System.currentTimeMillis();

        }

        return current;

    }

}

@Slf4j

public class SnowflakeIdTest {

    public static void main(String[] args) throws InterruptedException {

        SnowflakeIdGenerator.instance.init(SnowflakeIdWorker.instance.getId());

        ExecutorService es = Executors.newFixedThreadPool(10);

        final HashSet idSet = new HashSet();

        Collections.synchronizedCollection(idSet);

        long start = System.currentTimeMillis();

        log.info(" start generate id *");

        for (int i = 0; i < 10; i++)

            es.execute(() -> {

                for (long j = 0; j < 5000000; j++) {

                    long id = SnowflakeIdGenerator.instance.nextId();

                    synchronized (idSet) {

                        idSet.add(id);

                    }

                }

            });

        es.shutdown();

        es.awaitTermination(10, TimeUnit.SECONDS);

        long end = System.currentTimeMillis();

        log.info(" end generate id ");

        log.info("* cost " + (end - start) + " ms!");

    }

}

SnowFlake算法的优点:
  • 生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性。
  • 容量大,每秒可生成几百万个ID。
  • ID呈趋势递增,后续插入数据库的索引树时,性能较高。
SnowFlake算法的缺点:
  • 依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成ID冲突,或者ID乱序。
  • 在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险。

ZooKeeper分布式锁

ZooKeeper的临时顺序节点,可以实现分布式锁的原因:

1) ZooKeeper的每一个节点都是一个天然的顺序发号器。

2) ZooKeeper节点的递增有序性可以确保锁的公平。一个ZooKeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都在这个节点下创建个临时顺序节点。

3) ZooKeeper的节点监听机制可以保障占有锁的传递有序而且高效。ZooKeeper内部优越的机制,能保证由于网络异常或者其他原因造成集群中占用锁的客户端失联时,锁能够被有效释放。

4)ZooKeeper的节点监听机制能避免羊群效应。

  • 实例

public interface Lock {

    boolean lock() throws Exception;

    boolean unlock();

}

@Slf4j

public class ZkLock implements Lock {

    //ZkLock的节点链接

    private static final String ZK_PATH = "/test/lock";

    private static final String LOCK_PREFIX = ZK_PATH + "/";

    private static final long WAIT_TIME = 1000;

    //Zk客户端

    CuratorFramework client = null;

    private String locked_short_path = null;

    private String locked_path = null;

    private String prior_path = null;

    final AtomicInteger lockCount = new AtomicInteger(0);

    private Thread thread;

    public ZkLock() {

        ZKclient.instance.init();

        if (!ZKclient.instance.isNodeExist(ZK_PATH)) {

            ZKclient.instance.createNode(ZK_PATH, null);

        }

        client = ZKclient.instance.getClient();

    }

    @Override

    public boolean lock() {

        synchronized (this) {

            if (lockCount.get() == 0) {

                thread = Thread.currentThread();

                lockCount.incrementAndGet();

            } else {

                if (!thread.equals(Thread.currentThread())) {

                    return false;

                }

                lockCount.incrementAndGet();

                return true;

            }

        }

        try {

            boolean locked = false;

            locked = tryLock();

            if (locked) {

                return true;

            }

            while (!locked) {

                await();

                //获取等待的子节点列表

                List<String> waiters = getWaiters();

                if (checkLocked(waiters)) {

                    locked = true;

                }

            }

            return true;

        } catch (Exception e) {

            e.printStackTrace();

            unlock();

        }

        return false;

    }

    @Override

    public boolean unlock() {

        if (!thread.equals(Thread.currentThread())) {

            return false;

        }

        int newLockCount = lockCount.decrementAndGet();

        if (newLockCount < 0) {

            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + locked_path);

        }

        if (newLockCount != 0) {

            return true;

        }

        try {

            if (ZKclient.instance.isNodeExist(locked_path)) {

                client.delete().forPath(locked_path);

            }

        } catch (Exception e) {

            e.printStackTrace();

            return false;

        }

        return true;

    }

    private void await() throws Exception {

        if (null == prior_path) {

            throw new Exception("prior_path error");

        }

        final CountDownLatch latch = new CountDownLatch(1);

        //订阅比自己次小顺序节点的删除事件

        Watcher w = new Watcher() {

            @Override

            public void process(WatchedEvent watchedEvent) {

                System.out.println("监听到的变化 watchedEvent = " + watchedEvent);

                log.info("[WatchedEvent]节点删除");

                latch.countDown();

            }

        };

        client.getData().usingWatcher(w).forPath(prior_path);

        //订阅比自己次小顺序节点的删除事件

        TreeCache treeCache = new TreeCache(client, prior_path);

        TreeCacheListener l = new TreeCacheListener() {

            @Override

            public void childEvent(CuratorFramework client,TreeCacheEvent event) throws Exception {

                ChildData data = event.getData();

                if (data != null) {

                    switch (event.getType()) {

                        case NODE_REMOVED:

                            log.debug("[TreeCache]节点删除, path={}, data={}",

                                    data.getPath(), data.getData());

                            latch.countDown();

                            break;

                        default:

                            break;

                    }

                }

            }

        };

        treeCache.getListenable().addListener(l);

        treeCache.start();

        latch.await(WAIT_TIME, TimeUnit.SECONDS);

    }

    private boolean tryLock() throws Exception {

        //创建临时Znode

        List<String> waiters = getWaiters();

        locked_path = ZKclient.instance

                .createEphemeralSeqNode(LOCK_PREFIX);

        if (null == locked_path) {

            throw new Exception("zk error");

        }

        locked_short_path = getShorPath(locked_path);

        //获取等待的子节点列表,判断自己是否第一个

        if (checkLocked(waiters)) {

            return true;

        }

        // 判断自己排第几个

        int index = Collections.binarySearch(waiters, locked_short_path);

        if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了

            throw new Exception("节点没有找到: " + locked_short_path);

        }

        //如果自己没有获得锁,则要监听前一个节点

        prior_path = ZK_PATH + "/" + waiters.get(index - 1);

        return false;

    }

    private String getShorPath(String locked_path) {

        int index = locked_path.lastIndexOf(ZK_PATH + "/");

        if (index >= 0) {

            index += ZK_PATH.length() + 1;

            return index <= locked_path.length() ? locked_path.substring(index) : "";

        }

        return null;

    }

    private boolean checkLocked(List<String> waiters) {

        //节点按照编号,升序排列

        Collections.sort(waiters);

        // 如果是第一个,代表自己已经获得了锁

        if (locked_short_path.equals(waiters.get(0))) {

            log.info("成功的获取分布式锁,节点为{}", locked_short_path);

            return true;

        }

        return false;

    }

    protected List<String> getWaiters() {

        List<String> children = null;

        try {

            children = client.getChildren().forPath(ZK_PATH);

        } catch (Exception e) {

            e.printStackTrace();

            return null;

        }

        return children;

    }

}

  • ZooKeeper分布式锁优缺点:

(1)优点:ZooKeeper分布式锁(如InterProcessMutex),能有效地解决分布式问题,不可重入问题,使用起来也较为简单。

(2)缺点:ZooKeeper实现的分布式锁,性能并不高。Zk中创建和删除节点只能通过Leader(主)服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower(从)服务器上,这样频繁的网络通信,性能的短板是非常突出。

目前分布式锁,比较成熟、主流的方案有两种:

(1)基于Redis的分布式锁。适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。

(2)基于ZooKeeper的分布式锁。适用于高可靠(高可用),而并发量不是太高的场景。

继续阅读