天天看點

ZooKeeper--一個具有高可用性的高性能協調服務ZooKeeper是什麼ZooKeeper使用場景安裝和運作ZooKeeperJava開發的znode操作代碼Java開發的配置服務代碼ZooKeeper配置檔案

ZooKeeper是什么

ZooKeeper是一个具有高可用性的高性能协调服务。

ZooKeeper维护着一个树形层次结构,树中的节点被称为znode。znode可以用来存储数据,并且有一个与之相关联的ACL(权限),znode不能大于1M。

ZooKeeper使用场景

ZooKeeper主要用来解决分布式系统中的“部分失败”问题。部分失败是分布式系统的固有的特征,ZooKeeper不能根除部分失败,也不会隐藏部分失败;但是可以提供一组工具,使你在构建分布式应用时对部分失败进行处理。这主要利用的是观察者模式。

Hadoop内置了ZooKeeper,阿里的开源框架Dubbo、Otter等也都是用ZooKeeper作为协调服务。

安装和运行ZooKeeper

安装

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz

tar zxvf zookeeper-3.3.6.tar.gz

cd zookeeper-3.3.6

cp conf/zoo_sample.cfg conf/zoo.cfg

运行

bin/zkServer.sh start 

测试是否正确运行:echo ruok | nc localhost 2181,如果启动成功,则返回imok

zkCli.sh命令行工具

bin/zkCli.sh -server localhost,连接到ZooKeeper,可以执行各种znode操作。

Java开发的znode操作代码

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ZkTest implements Watcher
{
	private static final int SESSION_TIMEOUT = 5000;
	private ZooKeeper zk;
	private CountDownLatch connectedSignal = new CountDownLatch(1);

	public void connect(String hosts) throws IOException,InterruptedException
	{
		zk = new ZooKeeper(hosts,SESSION_TIMEOUT,this);
		connectedSignal.await();
	}
	@Override
	public void process(WatchedEvent event)
	{
		if(event.getState() == KeeperState.SyncConnected)
		{
			connectedSignal.countDown();
		}
	}
	//新建表示组的znode
	public void create(String groupName) throws KeeperException,InterruptedException
	{
		String path = "/" + groupName;
		String createdPath = zk.create(path,null/*data*/,Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
		System.out.println("Created " + createdPath);
	}
	//用户将成员加入组中:临时成员
	public void join(String groupName,String memberName) throws KeeperException,InterruptedException
	{
		String path = "/" + groupName + "/" + memberName;
		String createdPath = zk.create(path,null/*data*/,Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
		System.out.println("Joined " + createdPath);
	}
	//列出组成员
	public void list(String groupName) throws KeeperException,InterruptedException
	{
		String path = "/" + groupName;
		try
		{
			List<String> children = zk.getChildren(path,false);
			if(children.isEmpty())
			{
				System.out.printf("No members in group %s\n",groupName);
				System.exit(1);
			}
			for(String child:children)
			{
				System.out.println(child);
			}
		}
		catch (KeeperException.NoNodeException e)
		{
			System.out.printf("No members in group %s\n",groupName);
			System.exit(1);
		}
	}
	//删除一个组及其所有成员
	public void delete(String groupName) throws KeeperException,InterruptedException
	{
		String path = "/" + groupName;
		try
		{
			List<String> children = zk.getChildren(path,false);
			for(String child:children)
			{
				zk.delete(path + "/" + child,-1);
			}
			zk.delete(path,-1);
		}
		catch (KeeperException.NoNodeException e)
		{
			System.out.printf("Group %s does not exist\n",groupName);
			System.exit(1);
		}
	}
	public void close() throws InterruptedException
	{
		zk.close();
	}
	public static void main(String[] args) throws Exception
	{
		ZkTest zkTest = new ZkTest();
		zkTest.connect("192.168.211.230");
		//zkTest.create("longlonggroup");
		//zkTest.join("longlonggroup", "aaa");
		//Thread.sleep(60000); //这里睡60秒是为了让你通过zkCli.sh之类的工具看到/creategroup下的aaa这个临时节点
		//zkTest.list("CMS");
		//zkTest.delete("longlonggroup");
		zkTest.close();
	}
}
           

需要引用jar包,Maven方式如下:

<dependency>
		<groupId>org.apache.zookeeper</groupId>
		<artifactId>zookeeper</artifactId>
		<version>3.4.6</version>
	</dependency> 
           

Java开发的配置服务代码

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

class ConnectionWatcher implements Watcher
{
	private static final int SESSION_TIMEOUT = 5000;
	protected ZooKeeper zk;
	private CountDownLatch connectedSignal = new CountDownLatch(1);

	public void connect(String hosts) throws IOException,InterruptedException
	{
		zk = new ZooKeeper(hosts,SESSION_TIMEOUT,this);
		connectedSignal.await();
	}
	@Override
	public void process(WatchedEvent event)
	{
		if(event.getState() == KeeperState.SyncConnected)
		{
			connectedSignal.countDown();
		}
	}
	public void close() throws InterruptedException
	{
		zk.close();
	}
}
class ActiveKeyValueStore extends ConnectionWatcher
{
	private static final Charset CHARSET = Charset.forName("UTF-8");
	public void write(String path,String value) throws InterruptedException,KeeperException
	{
		Stat stat = zk.exists(path,false);
		if(stat == null)
		{
			zk.create(path,value.getBytes(CHARSET),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
		} else {
			zk.setData(path,value.getBytes(CHARSET),-1);
		}
	}
	public String read(String path,Watcher watcher) throws InterruptedException,KeeperException
	{
		byte[] data = zk.getData(path,watcher,null/*stat*/);
		return new String(data,CHARSET);
	}
}
//随机更新ZooKeeper中的属性
class ConfigUpdater
{
	public static final String PATH = "/configtestbypuma";
	private ActiveKeyValueStore store;
	private Random random = new Random();
	public ConfigUpdater() throws IOException,InterruptedException
	{
		store = new ActiveKeyValueStore();
		store.connect("localhost");
	}
	public void run() throws InterruptedException,KeeperException
	{
		String value = random.nextInt(100) + "";
		store.write(PATH, value);
		System.out.printf("Set %s to %s\n",PATH,value);
		Thread.sleep(random.nextInt(10)*1000);
	}
	public static void main(String[] args) throws Exception
	{
		ConfigUpdater configUpdater = new ConfigUpdater();
		configUpdater.run();
	}
}
//观察Zookeeper中属性的更新情况,并打印到控制台(注意,本例只会观察最新的znode情况,对于二次观察之前的更新不关心)
class ConfigWatcher implements Watcher
{
	private ActiveKeyValueStore store;
	public ConfigWatcher() throws IOException,InterruptedException
	{
		store = new ActiveKeyValueStore();
		store.connect("localhost");
	}
	public void displayConfig() throws InterruptedException,KeeperException
	{
		String value = store.read(ConfigUpdater.PATH, this);
		System.out.printf("Read %s as %s\n",ConfigUpdater.PATH,value);
	}
	@Override
	public void process(WatchedEvent event)
	{
		if(event.getType() == EventType.NodeDataChanged)
		{
			try
			{
				displayConfig();
			} catch(InterruptedException e) {
				System.err.println("Interrupted. Exiting.");
				Thread.currentThread().interrupt();
			} catch (KeeperException e)
			{
				System.err.printf("KeeperException: %s. Exiting. \n", e);
			}
		}
	}
	public static void main(String[] args) throws Exception
	{
		ConfigWatcher configWatcher = new ConfigWatcher();
		configWatcher.displayConfig();
		
		Thread.sleep(Long.MAX_VALUE);
	}
}
           

ZooKeeper配置文件

如下是一个包含有3台机器的复制模式下的配置例子:

tickTime=2000

dataDir=/diskl/zookeeper

dataLogDir=/disk2/zookeeper

clientPort=2181

initlimit=5

syncLimit=2

server.l=zookeeperl:2888:3888

server.2=zookeeper2:2888:3888

server.3=zookeeper3:2888:3888

tickTime:以毫秒为单位,用来控制心跳和超时,默认情况超时的时间为两倍的tickTime

监听端口:2181端口用于客户端连接;对于领导者来说,2888端口用于跟随者连接;3888端口用于领导者选举阶段的其他服务器连接。当一个ZooKeeper服务器启动时,它读取myid文件用于确定自己的服务器ID,然后通过读取配置文件来确定应当在哪个端口进行监听,同时确定集合体中的其他服务器的网络地址。

在复制模式下,有两个额外的强制参数:initLimit和syncLimit,两者都是以滴答参数的倍数进行度量。

initlimit 参数:The number of ticks that the initial synchronization phase can take,设定了允许所有跟随者与领导者进行连接并同步的时间。如果在设定的时间段内,半数以上的跟随者未能完成同步,领导者便会宣布放弃领导地位,然后进行另外一次领导者选举。如果这种情况经常发生(可以通过日志中的记录发现这种情况) ,则表明设定的值太小。

syncLimit 参数:The number of ticks that can pass between sending a request and getting an acknowledgement,设定了允许一个跟随者与领导者进行同步的时间。如果在设定的时间段内,一个跟随者未能完成同步,它将会自己重启。所有关联到这个跟随者的客户端将连接到另一个跟随者。

以上是服务器集群所需的最小配置,可以通过“ZooKeeper管理员指南”查看更多配置,进行性能调优。

关于配置的更多参数的详细说明,运维技巧,可以参考一下书籍:《从Paxos到ZooKeeper - 分布式一致性原理与实践》第8章,ZooKeeper运维。

繼續閱讀