天天看点

Zookeeper实战

注册服务跟发现

定义:微服务时代,多个相同的jar包在不同的服务器上开启相同的服务,可以通过nginx在​

​服务端​

​进行负载均衡的配置。也可以通过ZooKeeper在客户端进行负载均衡配置。

  1. 多个服务注册
  2. 客户端获取中间件地址集合
  3. 从集合中随机选一个服务执行任务
Zookeeper实战
Zookeeper实战
Zookeeper实战

服务端代码

用SpringBoot完成一个最简单的web服务,并且连接zk服务器,实现注册功能。

Zookeeper实战

ProductController

@RestController
@RequestMapping("/product")
public class ProductController
{

  @RequestMapping("/getProduct/{id}")
  public Object getProduct(HttpServletRequest request, @PathVariable("id") String id)
  {
    return new Product(id, "name:" + request.getLocalPort());
  }
}      

InitListener

public class InitListener implements ServletContextListener {

    @Value("${server.port}")
    private int port;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext()).getAutowireCapableBeanFactory().autowireBean(this);
        try {
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            ServiceRegister.register(hostAddress,port);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}      

Product

public class Product {
    private  String id;

    private String name;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Product(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public Product() {
    }
}      

ServiceRegister

public class ServiceRegister
{

  private static final String BASE_SERVICES = "/services";
  private static final String SERVICE_NAME = "/products";

  public static void register(String address, int port)
  {
    try
    {
      ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, (watchedEvent) -> {
      });
      Stat exists = zooKeeper.exists(BASE_SERVICES + SERVICE_NAME, false);
      if (exists == null)
      {
        zooKeeper.create(BASE_SERVICES + SERVICE_NAME, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      }
      String server_path = address + ":" + port;
      zooKeeper.create(BASE_SERVICES + SERVICE_NAME + "/child", server_path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    } catch (Exception e)
    {
      e.printStackTrace();
    }
  }

}      

ProductApp

@SpringBootApplication
public class ProductApp
{
  public static void main(String[] args)
  {
    SpringApplication.run(ProductApp.class, args);
  }

  @Bean  // 随着服务自动启动
  public ServletListenerRegistrationBean servletListenerRegistrationBean()
  {
    ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
    servletListenerRegistrationBean.setListener(new InitListener());
    return servletListenerRegistrationBean;
  }
}      

application.properties

server.port=8080      

可以改变server.port端口 比如 ​

​8080​

​​ 跟 ​

​8081​

​ 两个端口 来实现同时启动注册两个服务。

业务端代码

Zookeeper实战

OrderController

@RequestMapping("/order")
@RestController
public class OrderController
{

  @Resource
  private RestTemplate restTemplate;

  private LoadBalance loadBalance = new RamdomLoadBalance();

  @RequestMapping("/getOrder/{id}")
  public Object getOrder(@PathVariable("id") String id)
  {
    System.out.println("sowhat1412:" + loadBalance.choseServiceHost());
    Product product = restTemplate.getForObject("http://" + loadBalance.choseServiceHost() + "/product/getProduct/1", Product.class);
    return new Order(id, "orderName", product);
  }
}      

InitListener

public class InitListener implements ServletContextListener {

    private  static final String BASE_SERVICES = "/services";
    private static final String  SERVICE_NAME="/products";

    private ZooKeeper zooKeeper;
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        try {
             zooKeeper = new ZooKeeper("127.0.0.1:2181",5000,(watchedEvent)->{
                if(watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged  && watchedEvent.getPath().equals(BASE_SERVICES+SERVICE_NAME)) {
                    updateServiceList();
                }
            });

            updateServiceList();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private void updateServiceList() {
       try{
           List<String> children = zooKeeper.getChildren(BASE_SERVICES  + SERVICE_NAME, true);
           List<String> newServerList = new ArrayList<String>();
           for(String subNode:children) {
               byte[] data = zooKeeper.getData(BASE_SERVICES  + SERVICE_NAME + "/" + subNode, false, null);
               String host = new String(data, "utf-8");
               System.out.println("host:"+host);
               newServerList.add(host);
           }
           LoadBalance.SERVICE_LIST = newServerList;
       }catch (Exception e) {
           e.printStackTrace();
       }
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}      

Order

public class Order {
    private String id;
    private  String name;
    private Product product;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Product getProduct() {
        return product;
    }

    public void setProduct(Product product) {
        this.product = product;
    }

    public Order(String id, String name, Product product) {
        this.id = id;
        this.name = name;
        this.product = product;
    }

    public Order() {
    }
}      

Product

public class Product {
    private  String id;

    private String name;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Product(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public Product() {
    }
}      

LoadBalance

public abstract class LoadBalance
{
  public volatile static List<String> SERVICE_LIST;
  //  注意此处用 volatile 修饰的 IP池

  public abstract String choseServiceHost();
}      
public class RamdomLoadBalance extends LoadBalance
{
  @Override
  public String choseServiceHost()
  {
    String result = "";
    if (!CollectionUtils.isEmpty(SERVICE_LIST))
    {
      int index = new Random().nextInt(SERVICE_LIST.size());
      result = SERVICE_LIST.get(index);
    }
    return result;
  }
}      

分布式锁

任务通过竞争获取锁才能对该资源进行操作(①竞争锁);当有一个任务在对资源进行更新时(②占有锁),其他任务都不可以对这个资源进行操作(③任务阻塞),直到该任务完成更新(④释放锁);

  1. 多任务环境中才需要
  2. 任务都需要对同一共享资源进行写操作;
  3. 对资源的访问是互斥的

以前的多线程都是一个JVM集群上JUC编程,可以同syn,Lock,AtomicInteger来实现。但是当在集群中的时候就不可用了,因为 JVM 是多个。

老方法

/**
 * @author sowhat
 * @create 2020-06-10 14:37
 */
public class OrderService implements Runnable
{
  private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
  private static CountDownLatch countDownLatch = new CountDownLatch(50);
  private static List<String> result = new Vector<>();


  @Override
  public void run()
  {
    countDownLatch.countDown();
    result.add(orderNumGenerator.getNumber_byLock());
  }

  public static void main(String[] args) throws InterruptedException
  {
    for (int i = 0; i < 50; i++)
    {
      new Thread(new OrderService()).start();
    }
    countDownLatch.await();
    Thread.sleep(1000);
    Collections.sort(result);
    result.forEach(s -> System.out.println(s));
  }
}

class OrderNumGenerator
{
  public static int count = 0;

  public String getNumber()
  {
    SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
    return sim.format(new Date()) + "-" + (++count);
  }

  public static Object obj = new Object();

  public String getNumber_syn()
  {
    synchronized (obj)
    {
      SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
      return sim.format(new Date()) + "-" + (++count);
    }

  }

  private Lock lock = new ReentrantLock();

  public String getNumber_byLock()
  {
    try
    {
      lock.lock();
      SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
      return sim.format(new Date()) + "-" + (++count);
    } finally
    {
      lock.unlock();
    }
  }
}      

zookeeper 基于同名节点的分布式锁

​核心思想​

​:在zk中有一个唯一的临时节点,只有拿到节点的才可以操作数据,没拿到就需要等待。

​缺点​

​:可能引发​

​羊群效应​

​,第一个用完后瞬间有999个同时并发的向zk请求获得锁。

Zookeeper实战
package cn.sowhat;


import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;


class OrderNumGenerator
{
  public static int count = 0;

  public String getNumber()
  {
    SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
    return sim.format(new Date()) + "-" + (++count);
  }

}

public class ZkLock implements Runnable
{
  private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
  private Lock lock = new ZookeeperDistrbuteLock();

  @Override
  public void run()
  {
    getNumber();
  }

  public void getNumber()
  {
    try
    {
      lock.getLock();
      String number = orderNumGenerator.getNumber();
      System.out.println(Thread.currentThread().getName() + " ,生成ID: " + number);

    } catch (Exception e)
    {
      e.printStackTrace();
    } finally
    {
      lock.unLock();
    }
  }

  public static void main(String[] args)
  {
    for (int i = 0; i < 50; i++)
    {
      new Thread(new ZkLock()).start();
    }
  }
}

interface Lock
{
  public void getLock();

  public void unLock();
}

abstract class AbstrackLock implements Lock
{
  @Override
  public void getLock()
  {
    if (tryLock())
    {
      System.out.println("获取lock锁资源");
    } else
    {
      waitLock();
      getLock();
    }
  }

  public abstract boolean tryLock();

  public abstract void waitLock();
}

abstract class ZookeeperAbstractLock extends AbstrackLock
{
  private static final String CONN = "127.0.0.1:2181";
  protected ZkClient zkClient = new ZkClient(CONN);
  protected static final String PATH = "/lock";
  protected static final String PATH2 = "/lock2";
}

class ZookeeperDistrbuteLock extends ZookeeperAbstractLock
{

  private CountDownLatch countDownLatch = null;

  @Override
  public boolean tryLock()
  {
    try
    {
      zkClient.createEphemeral(PATH);
      // 能成功创建则说明获得锁成功
      return true;
    } catch (Exception e)
    {
      return false;
    }
  }

  @Override
  public void waitLock()
  {
    IZkDataListener iZkDataListener = new IZkDataListener()
    {
      @Override
      public void handleDataDeleted(String path) throws Exception
      {// 唤醒被等待的线程
        if (countDownLatch != null)
        {
          countDownLatch.countDown();
        }
      }

      @Override
      public void handleDataChange(String s, Object data) throws Exception
      {
      }
    };
    // 注册事件 监控数据  数据删除了会有反应
    zkClient.subscribeDataChanges(PATH, iZkDataListener);
    if (zkClient.exists(PATH))
    { // 如果已经有人用了 当先线程就等着吧,什么时候 出现了删除就会调用 iZkDataListener
      countDownLatch = new CountDownLatch(1);
      try
      {
        countDownLatch.await();
      } catch (Exception e)
      {
        e.printStackTrace();
      }
    }

    zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
    // 拿到锁需要的了 删除监听

  }

  @Override
  public void getLock()
  {
    super.getLock();
  }

  @Override
  public void unLock()
  {
    if (zkClient != null)
    {
      zkClient.delete(PATH);
      zkClient.close();

    }
  }

}      

高性能分布式锁

思想:很简单,每个要操作的不要乱抢了,在zk中进行​

​排队​

​,轮到了再操作数据。这样可以有效避免并发。

Zookeeper实战
Zookeeper实战
package cn.sowhat;


import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;


class OrderNumGenerator
{
  public static int count = 0;

  public String getNumber()
  {
    return ++count + "";
  }

}

public class ZkLock implements Runnable
{
  private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
  private Lock lock = new ZookeeperDistrbuteLock2();

  @Override
  public void run()
  {
    getNumber();
  }

  public void getNumber()
  {
    try
    {
      lock.getLock();
      String number = orderNumGenerator.getNumber();
      System.out.println(Thread.currentThread().getName() + " ,生成ID: " + number);

    } catch (Exception e)
    {
      e.printStackTrace();
    } finally
    {
      lock.unLock();
    }
  }

  public static void main(String[] args)
  {
    for (int i = 0; i < 50; i++)
    {
      new Thread(new ZkLock()).start();
    }
  }
}

interface Lock
{
  public void getLock();

  public void unLock();
}

abstract class AbstrackLock implements Lock
{
  @Override
  public void getLock()
  {
    if (tryLock())
    {
      System.out.println("获取lock锁资源");
    } else
    {
      waitLock();
      getLock();
    }
  }

  public abstract boolean tryLock();

  public abstract void waitLock();
}

abstract class ZookeeperAbstractLock extends AbstrackLock
{
  private static final String CONN = "127.0.0.1:2181";
  protected ZkClient zkClient = new ZkClient(CONN);
  protected static final String PATH = "/lock";
  protected static final String PATH2 = "/lock2";
}

class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock
{

  private CountDownLatch countDownLatch = null;
  private String beforePath;
  private String currentPath;

  public ZookeeperDistrbuteLock2()
  {
    if (!this.zkClient.exists(PATH2))
    {
      this.zkClient.createPersistent(PATH2);
      // 创建持久性节点
    }
  }

  @Override
  public boolean tryLock()
  {
    // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentpath
    if (currentPath == null || currentPath.length() <= 0)
    {
      // 创建一个临时 顺序节点
      currentPath = this.zkClient.createEphemeralSequential(PATH2 + "/", "lock");
    }
    // 获取所有 临时节点并且排序,临时节点名称为自增的字符串 如: 0000000000400
    List<String> children = this.zkClient.getChildren(PATH2);
    Collections.sort(children);
    if (currentPath.equals(PATH2 + '/' + children.get(0)))
    {
      return true;
      // 如果当前节点 在所有节点中排名第一则获得锁了
    } else
    {
      int wz = Collections.binarySearch(children, currentPath.substring(7));
      beforePath = PATH2 + '/' + children.get(wz - 1);
    }
    return false;
  }

  @Override
  public void waitLock()
  {
    IZkDataListener iZkDataListener = new IZkDataListener()
    {
      @Override
      public void handleDataChange(String s, Object o) throws Exception
      {

      }

      @Override
      public void handleDataDeleted(String s) throws Exception
      {
        if(countDownLatch != null){
          countDownLatch.countDown();
        }
      }
    };
    // 给排在前面的节点增加数据删除的watcher, 本质上是开启另外一个线程 监听前置节点
    this.zkClient.subscribeDataChanges(beforePath,iZkDataListener);
    if(this.zkClient.exists(beforePath)){
      countDownLatch = new CountDownLatch(1);
      try{countDownLatch.await();} catch(Exception e){
        e.printStackTrace();
      }
    }
    this.zkClient.unsubscribeDataChanges(beforePath,iZkDataListener);
  }

  @Override
  public void getLock()
  {
    super.getLock();
  }

  @Override
  public void unLock()
  {
    zkClient.delete(currentPath);
    zkClient.close();
  }

}      

集群选举

Zookeeper实战
Zookeeper实战

思想: 其实跟分布式锁一样,就是分布式锁的简化版本,

package cn.sowhat.order;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletListenerRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Timer;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;



/**
 * @author sowhat
 * @create 2020-06-10 18:55
 */

class IsMaster
{
  public static boolean isSurvival;
}

@RestController
class IndexController
{
  @RequestMapping("/getserverinfo")
  public String getServerInfo()
  {
    return IsMaster.isSurvival ? "is Master" : "is slave";
  }
}

class InitListener implements ServletContextListener
{

  ZkClient zkClient = new ZkClient("127.0.0.1:2181");
  private String path = "/election";

  @Value("${server.port}")
  private String serverPort;

  private void init()
  {
    System.out.println("项目启动完成");
    createEphemeral();
    zkClient.subscribeDataChanges(path, new IZkDataListener()
    {
      @Override
      public void handleDataChange(String s, Object o) throws Exception
      {

      }

      @Override
      public void handleDataDeleted(String s) throws Exception
      {
        System.out.println("主节点挂了,重新选举");
        Thread.sleep(5000);
        createEphemeral();

      }
    });

  }

  private void createEphemeral()
  {
    try
    {
      zkClient.createEphemeral(path, serverPort);
      IsMaster.isSurvival = true;
    } catch (Exception e)
    {
      IsMaster.isSurvival = false;
    }
  }

  @Override
  public void contextInitialized(ServletContextEvent servletContextEvent)
  {
    init();
  }

  @Override
  public void contextDestroyed(ServletContextEvent servletContextEvent)
  {

  }
}

@SpringBootApplication
public class selectMaster
{
  public static void main(String[] args)
  {
    SpringApplication.run(selectMaster.class,args);
  }
  @Bean
  public ServletListenerRegistrationBean servletListenerRegistrationBean(){
    ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
    servletListenerRegistrationBean.setListener(new InitListener());
    return servletListenerRegistrationBean;
  }

}      

配置中心

比如mybatis通过URL访问mysql,我们通过动态的修改URL实现从访问A数据库到访问B数据库的切换。核心就监听 dataChange。

Zookeeper实战

ZK注意事项

  1. Zk数据与日志清理

dataDir目录、dataLogDir两个目录会随着时间推移变得庞大,容易造成硬盘满了,清理办法:

自己编写shell脚本,保留最新的n个文件

使用zk自带的zkClient.sh保留最新的n个文件,zkClient.sh –n 15

配置autopurge.snapRetainCount和autopurge.purgeInterval两个参数配合使用;

  1. Too many connections
默认最大连接数 默认为60,配置maxClientCnxns参数,配置单个客户端机器创建的最大连接数;
  1. 磁盘管理
磁盘的I/O性能直接制约zookeeper更新操作速度,为了提高zk的写性能建议:使用单独的磁盘,Jvm堆内存设置要小心。
  1. 磁盘管理集群数量
集群中机器的数量并不是越多越好,一个写操作需要半数以上的节点ack,所以集群节点数越多,整个集群可以抗挂点的节点数越多(越可靠),但是吞吐量越差。集群的数量必须为奇数;
  1. 磁盘管理集群数量
zk是基于内存进行读写操作的,有时候会进行消息广播,因此不建议在节点存取容量比较大的数据;

参考