天天看点

zookeeper的java客户端

需要jar 包:

安装目录

zookeeper-3.4.9.jar

lib包下jar包

package com.bigdata.zk;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

public class SimpleZkClient {
  private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
  private static final int sessionTimeout = 2000;

  ZooKeeper zkClient = null;

  @Before
  public void init() throws IOException {
    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

      @Override
      public void process(WatchedEvent event) {
        // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
        System.out.println(event.getType() + "---" + event.getPath());

        try {
          zkClient.getChildren("/", true);
        } catch (Exception e) {
        }
      }
    });
  }

  // 获取子节点
  @Test
  public void getChildren() throws Exception {
    List<String> children = zkClient.getChildren("/", true);
    for (String child : children) {
      System.out.println(child);
    }
    Thread.sleep(Long.MAX_VALUE);
  }

  // 创建数据节点到zk中
  @Test
  public void testCreate() throws KeeperException, InterruptedException {
    // 参数1:要创建的节点的路径 参数2:节点的数据 参数3:节点的权限 参数4:节点的类型
    String nodeCreated = zkClient.create("/servers", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE,
        CreateMode.PERSISTENT);
    // 上传的数据可以是任何类型,但都要转成byte[]
  }

  // 判断节点是否存在
  @Test
  public void testExist() throws KeeperException, InterruptedException {
    Stat stat = zkClient.exists("/java6", false);
    System.out.println(stat == null ? "not exist" : "exist");
  }

  // 获取节点的数据
  @Test
  public void getData() throws KeeperException, InterruptedException {
    byte[] data = zkClient.getData("/java6", false, null);
    System.out.println(new String(data));
  }

  // 删除节点
  @Test
  public void deleteZnode() throws InterruptedException, KeeperException {
    // 参数2:指定要删除的版本,-1表示删除所有版本
    zkClient.delete("/java6", -1);
  }

  // 更新节点数据
  @Test
  public void setData() throws KeeperException, InterruptedException {
    zkClient.setData("/java6", "I Miss you".getBytes(), -1);

    byte[] data = zkClient.getData("/java6", false, null);
    System.out.println(new String(data));
  }

}      

简单的分布式应用系统服务器上下线动态感知程序开发

package com.bigdata.zkdist;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class DistributedServer {
  private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
  private static final int sessionTimeout = 2000;
  private static final String parentNode = "/servers";

  ZooKeeper zkClient = null;

  /**
   * 创建到zk的客户端连接
   * 
   * @throws IOException
   */
  public void getConnect() throws IOException {
    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

      @Override
      public void process(WatchedEvent event) {
        // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
        System.out.println(event.getType() + "---" + event.getPath());

        try {
          zkClient.getChildren("/", true);
        } catch (Exception e) {
        }
      }
    });
  }

  /**
   * 向zk集群注册服务器信息
   * 
   * @throws InterruptedException
   * @throws KeeperException
   * 
   */
  public void registerServer(String hostname) throws KeeperException, InterruptedException {
    // 创建临时序号节点
    String create = zkClient.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
        CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(hostname + "is online.." + create);
  }

  /**
   * 业务功能
   * 
   * @throws InterruptedException
   */
  public void handleBussiness(String hostname) throws InterruptedException {
    System.out.println(hostname + "start working.....");
    Thread.sleep(Long.MAX_VALUE);
  }

  public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
    // 获取zk连接
    DistributedServer server = new DistributedServer();
    server.getConnect();
    // 利用zk连接注册服务器信息
    server.registerServer(args[0]);
    // 启动业务功能
    server.handleBussiness(args[0]);
  }

}      
package com.bigdata.zkdist;

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class DistributedClient {

  private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
  private static final int sessionTimeout = 2000;
  private static final String parentNode = "/servers";
  // 注意:加volatile的意义何在?
  private volatile List<String> serverList;
  private ZooKeeper zk = null;

  /**
   * 创建到zk的客户端连接
   * 
   * @throws Exception
   */
  public void getConnect() throws Exception {

    zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
        try {
          //重新更新服务器列表,并且注册了监听
          getServerList();

        } catch (Exception e) {
        }
      }
    });

  }

  /**
   * 获取服务器信息列表
   * 
   * @throws Exception
   */
  public void getServerList() throws Exception {

    // 获取服务器子节点信息,并且对父节点进行监听
    List<String> children = zk.getChildren(parentNode, true);

    // 先创建一个局部的list来存服务器信息
    List<String> servers = new ArrayList<String>();
    for (String child : children) {
      // child只是子节点的节点名
      byte[] data = zk.getData(parentNode + "/" + child, false, null);
      servers.add(new String(data));
    }
    // 把servers赋值给成员变量serverList,已提供给各业务线程使用
    serverList = servers;
    
    //打印服务器列表
    System.out.println(serverList);
    
  }

  /**
   * 业务功能
   * 
   * @throws InterruptedException
   */
  public void handleBussiness() throws InterruptedException {
    System.out.println("client start working.....");
    Thread.sleep(Long.MAX_VALUE);
  }
  
  
  
  
  public static void main(String[] args) throws Exception {

    // 获取zk连接
    DistributedClient client = new DistributedClient();
    client.getConnect();
    // 获取servers的子节点信息(并监听),从中获取服务器信息列表
    client.getServerList();

    // 业务线程启动
    client.handleBussiness();
    
  }

}      

分别把客户端和服务端打jar 包

File->Export->Runnable JAR file