天天看點

zookeeper的java用戶端

需要jar 包:

安裝目錄

zookeeper-3.4.9.jar

lib包下jar包

package com.bigdata.zk;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

public class SimpleZkClient {
  private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
  private static final int sessionTimeout = 2000;

  ZooKeeper zkClient = null;

  @Before
  public void init() throws IOException {
    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

      @Override
      public void process(WatchedEvent event) {
        // 收到事件通知後的回調函數(應該是我們自己的事件處理邏輯)
        System.out.println(event.getType() + "---" + event.getPath());

        try {
          zkClient.getChildren("/", true);
        } catch (Exception e) {
        }
      }
    });
  }

  // 擷取子節點
  @Test
  public void getChildren() throws Exception {
    List<String> children = zkClient.getChildren("/", true);
    for (String child : children) {
      System.out.println(child);
    }
    Thread.sleep(Long.MAX_VALUE);
  }

  // 建立資料節點到zk中
  @Test
  public void testCreate() throws KeeperException, InterruptedException {
    // 參數1:要建立的節點的路徑 參數2:節點的資料 參數3:節點的權限 參數4:節點的類型
    String nodeCreated = zkClient.create("/servers", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE,
        CreateMode.PERSISTENT);
    // 上傳的資料可以是任何類型,但都要轉成byte[]
  }

  // 判斷節點是否存在
  @Test
  public void testExist() throws KeeperException, InterruptedException {
    Stat stat = zkClient.exists("/java6", false);
    System.out.println(stat == null ? "not exist" : "exist");
  }

  // 擷取節點的資料
  @Test
  public void getData() throws KeeperException, InterruptedException {
    byte[] data = zkClient.getData("/java6", false, null);
    System.out.println(new String(data));
  }

  // 删除節點
  @Test
  public void deleteZnode() throws InterruptedException, KeeperException {
    // 參數2:指定要删除的版本,-1表示删除所有版本
    zkClient.delete("/java6", -1);
  }

  // 更新節點資料
  @Test
  public void setData() throws KeeperException, InterruptedException {
    zkClient.setData("/java6", "I Miss you".getBytes(), -1);

    byte[] data = zkClient.getData("/java6", false, null);
    System.out.println(new String(data));
  }

}      

簡單的分布式應用系統伺服器上下線動态感覺程式開發

package com.bigdata.zkdist;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class DistributedServer {
  private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
  private static final int sessionTimeout = 2000;
  private static final String parentNode = "/servers";

  ZooKeeper zkClient = null;

  /**
   * 建立到zk的用戶端連接配接
   * 
   * @throws IOException
   */
  public void getConnect() throws IOException {
    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

      @Override
      public void process(WatchedEvent event) {
        // 收到事件通知後的回調函數(應該是我們自己的事件處理邏輯)
        System.out.println(event.getType() + "---" + event.getPath());

        try {
          zkClient.getChildren("/", true);
        } catch (Exception e) {
        }
      }
    });
  }

  /**
   * 向zk叢集注冊伺服器資訊
   * 
   * @throws InterruptedException
   * @throws KeeperException
   * 
   */
  public void registerServer(String hostname) throws KeeperException, InterruptedException {
    // 建立臨時序号節點
    String create = zkClient.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
        CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(hostname + "is online.." + create);
  }

  /**
   * 業務功能
   * 
   * @throws InterruptedException
   */
  public void handleBussiness(String hostname) throws InterruptedException {
    System.out.println(hostname + "start working.....");
    Thread.sleep(Long.MAX_VALUE);
  }

  public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
    // 擷取zk連接配接
    DistributedServer server = new DistributedServer();
    server.getConnect();
    // 利用zk連接配接注冊伺服器資訊
    server.registerServer(args[0]);
    // 啟動業務功能
    server.handleBussiness(args[0]);
  }

}      
package com.bigdata.zkdist;

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class DistributedClient {

  private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
  private static final int sessionTimeout = 2000;
  private static final String parentNode = "/servers";
  // 注意:加volatile的意義何在?
  private volatile List<String> serverList;
  private ZooKeeper zk = null;

  /**
   * 建立到zk的用戶端連接配接
   * 
   * @throws Exception
   */
  public void getConnect() throws Exception {

    zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        // 收到事件通知後的回調函數(應該是我們自己的事件處理邏輯)
        try {
          //重新更新伺服器清單,并且注冊了監聽
          getServerList();

        } catch (Exception e) {
        }
      }
    });

  }

  /**
   * 擷取伺服器資訊清單
   * 
   * @throws Exception
   */
  public void getServerList() throws Exception {

    // 擷取伺服器子節點資訊,并且對父節點進行監聽
    List<String> children = zk.getChildren(parentNode, true);

    // 先建立一個局部的list來存伺服器資訊
    List<String> servers = new ArrayList<String>();
    for (String child : children) {
      // child隻是子節點的節點名
      byte[] data = zk.getData(parentNode + "/" + child, false, null);
      servers.add(new String(data));
    }
    // 把servers指派給成員變量serverList,已提供給各業務線程使用
    serverList = servers;
    
    //列印伺服器清單
    System.out.println(serverList);
    
  }

  /**
   * 業務功能
   * 
   * @throws InterruptedException
   */
  public void handleBussiness() throws InterruptedException {
    System.out.println("client start working.....");
    Thread.sleep(Long.MAX_VALUE);
  }
  
  
  
  
  public static void main(String[] args) throws Exception {

    // 擷取zk連接配接
    DistributedClient client = new DistributedClient();
    client.getConnect();
    // 擷取servers的子節點資訊(并監聽),從中擷取伺服器資訊清單
    client.getServerList();

    // 業務線程啟動
    client.handleBussiness();
    
  }

}      

分别把用戶端和服務端打jar 包

File->Export->Runnable JAR file