天天看点

zookeeper来完成分布式锁的功能

什么时候使用Shell

因为Shell似乎是各UNIX系统之间通用的功能,并且经过了POSIX的标准化。因此,Shell脚本只要“用心写”一次,即可应用到很多系统上。因此,之所以要使用Shell脚本是基于:

•简单性:Shell是一个高级语言;通过它,你可以简洁地表达复杂的操作。

•可移植性:使用POSIX所定义的功能,可以做到脚本无须修改就可在不同的系统上执行。

•开发容易:可以在短时间内完成一个功能强大又妤用的脚本。

但是,考虑到Shell脚本的命令限制和效率问题,下列情况一般不使用Shell:

1.资源密集型的任务,尤其在需要考虑效率时(比如,排序,hash等等)。

2.需要处理大任务的数学操作,尤其是浮点运算,精确运算,或者复杂的算术运算(这种情况一般使用C++或FORTRAN 来处理)。

3.有跨平台(操作系统)移植需求(一般使用C 或Java)。

4.复杂的应用,在必须使用结构化编程的时候(需要变量的类型检查,函数原型,等等)。

5.对于影响系统全局性的关键任务应用。

6.对于安全有很高要求的任务,比如你需要一个健壮的系统来防止入侵、破解、恶意破坏等等。

7.项目由连串的依赖的各个部分组成。

8.需要大规模的文件操作。

9.需要多维数组的支持。

10.需要数据结构的支持,比如链表或数等数据结构。

11.需要产生或操作图形化界面 GUI。

12.需要直接操作系统硬件。

13.需要 I/O 或socket 接口。

14.需要使用库或者遗留下来的老代码的接口。

15.私人的、闭源的应用(shell 脚本把代码就放在文本文件中,全世界都能看到)。

如果你的应用符合上边的任意一条,那么就考虑一下更强大的语言吧——或许是Perl、Tcl、Python、Ruby——或者是更高层次的编译语言比如C/C++,或者是Java。即使如此,你会发现,使用shell来原型开发你的应用,在开发步骤中也是非常有用的。

第一个Shell脚本

打开文本编辑器,新建一个文件,扩展名为sh(sh代表shell),扩展名并不影响脚本执行,见名知意就好,如果你用php写shell 脚本,扩展名就用php好了。

输入一些代码:

复制代码 代码如下:

#!/bin/bash

echo "Hello World !"

 

“#!” 是一个约定的标记,它告诉系统这个脚本需要什么解释器来执行,即使用哪一种Shell。echo命令用于向窗口输出文本。

运行Shell脚本有两种方法。

作为可执行程序

将上面的代码保存为test.sh,并cd到相应目录:

复制代码 代码如下:

chmod +x ./test.sh  #使脚本具有执行权限

./test.sh  #执行脚本

 

注意,一定要写成./test.sh,而不是test.sh。运行其它二进制的程序也一样,直接写test.sh,linux系统会去PATH里寻找有没有叫test.sh的,而只有/bin, /sbin, /usr/bin,/usr/sbin等在PATH里,你的当前目录通常不在PATH里,所以写成test.sh是会找不到命令的,要用./test.sh告诉系统说,就在当前目录找。

通过这种方式运行bash脚本,第一行一定要写对,好让系统查找到正确的解释器。

这里的"系统",其实就是shell这个应用程序(想象一下Windows Explorer),但我故意写成系统,是方便理解,既然这个系统就是指shell,那么一个使用/bin/sh作为解释器的脚本是不是可以省去第一行呢?是的。

作为解释器参数

这种运行方式是,直接运行解释器,其参数就是shell脚本的文件名,如:

复制代码 代码如下:

/bin/sh test.sh

/bin/php test.php

 

这种方式运行的脚本,不需要在第一行指定解释器信息,写了也没用。

 

zookeeper来完成分布式锁的功能,其实本质上是与主从切换的实现代码是非常类似的,但是功能上强调的重点不一样。

至于,为什么需要分布式锁(公平锁)?为什么不使用JAVA 自带的锁的应用?

1,为什么需要分布式锁? 因为在分布式环境下,可能会出现一些事务,这时候我们除了可以在存储层的数据库进行控制,也可以在应用层控制,举个例子来讲,中国的飞机路线,我们都知道任何时候,都只能由一架飞机通过,而这个控制这个由谁通过,什么时候通过,是由一个信号控制台来决定的,分布式的环境下由于节点分散在各个地方,各个区域,所以控制起来比较麻烦,这时候我们就可以使用zookeeper来轻松的完成,分布式锁的功能。

2,为什么不使用JAVA自带的锁?JAVA JDK提供了公平锁,与非公平锁,但这种实现是基于同一个JVM来说的,如果同一台机器上,不同的JVM,则可以使用文件锁,来实现,但是这些并不是分布式的模式,虽然可以通过RMI的方式来实现,但比较繁琐。

 

 

使用zookeeper来完成分布式锁的步骤如下:

序号 内容 1 创建一个持久znode 2 多个程序并发的去zk服务上,创建一个短暂有时序性的节点路径。 3 各个节点监听,比它小的里面,最大的节点的动态。 4 如果发现,比它小的里面,最大的节点发生锁释放或退出,就自动接替为独占锁 5 没发生改变的节点,继续重复步骤,2,3,4

拓扑图如下所示:

 

 

zookeeper来完成分布式锁的功能

注意上图中的master指的就是,获取锁的实例,这其实跟集群环境里只能有一个master的道理一样。

代码如下:

Java代码

zookeeper来完成分布式锁的功能

 

zookeeper来完成分布式锁的功能
zookeeper来完成分布式锁的功能
  1. package com.test;  
  2.   
  3. import java.nio.charset.Charset;  
  4. import java.nio.charset.StandardCharsets;  
  5. import java.text.SimpleDateFormat;  
  6. import java.util.Collections;  
  7. import java.util.Date;  
  8. import java.util.List;  
  9. import java.util.concurrent.CountDownLatch;  
  10.   
  11. import org.apache.zookeeper.CreateMode;  
  12. import org.apache.zookeeper.WatchedEvent;  
  13. import org.apache.zookeeper.Watcher;  
  14. import org.apache.zookeeper.ZooDefs.Ids;  
  15. import org.apache.zookeeper.ZooKeeper;  
  16. import org.apache.zookeeper.Watcher.Event.KeeperState;  
  17. import org.apache.zookeeper.data.Stat;  
  18.   
  19. public class Lock1  implements Watcher {  
  20.       
  21.       
  22.       
  23.     private ZooKeeper zk;  
  24.       
  25.       
  26.     private CountDownLatch down=new CountDownLatch(1);  
  27.       
  28.     public Lock1() {  
  29.         // TODO Auto-generated constructor stub  
  30.     }  
  31.       
  32.       
  33.       
  34.     public Lock1(String host)throws Exception {  
  35.          this.zk=new ZooKeeper(host, 5000   , new Watcher() {  
  36.               
  37.             @Override  
  38.             public void process(WatchedEvent event) {  
  39.                 // TODO Auto-generated method stub  
  40.                   
  41.                 if(event.getState()==KeeperState.SyncConnected){  
  42.                     down.countDown();  
  43.                 }  
  44.                   
  45.             }  
  46.         });  
  47.     }  
  48.       
  49.      private static final Charset CHARSET=StandardCharsets.UTF_8;  
  50.       
  51.         public void write(String path,String value)throws Exception{  
  52.               
  53.             Stat stat=zk.exists(path, false);  
  54.             if(stat==null){  
  55.                 zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  56.             }else{  
  57.                 zk.setData(path, value.getBytes(CHARSET), -1);  
  58.             }  
  59.               
  60.         }  
  61.           
  62.           
  63.           
  64.          public void check()throws Exception{  
  65.              List<String> list=zk.getChildren("/a", null);  
  66.               Collections.sort(list);//排序使得节点有次序  
  67.               if(list.isEmpty()){  
  68.                   System.out.println("此父路径下面没有节点,分布式锁任务完成或还没启动!");  
  69.               }else{  
  70.                     
  71.                   String start=list.get(0);//获取第一个节点  
  72.                     
  73.                   String data=new String(zk.getData("/a/"+start, false,null));  
  74.                   if(data.equals("a")){//等于本身就启动作为Master  
  75.                         
  76.                       if(list.size()==1){  
  77.                           startMaster();//作为Master启动  
  78.                           }else{  
  79.                               automicSwitch();//对于非第一个启动的节点,会调用此方法,因为他的第一个挂了  
  80.                               //或释放锁了,所以它是抢占的  
  81.                           }  
  82.                   }else{  
  83.                       //非当前节点,就打印当前节点,监控的节点  
  84.                       for(int i=0;i<list.size();i++){  
  85.                           //获取那个节点存的此客户端的模拟IP  
  86.                           String temp=new String(zk.getData("/a/"+list.get(i), false, null));  
  87.                           if(temp.equals("a")){  
  88.                               //因为前面作为首位判断,所以这个出现的位置不可能是首位  
  89.                               //需要监听小节点里面的最大的一个节点  
  90.                               String watchPath=list.get(i-1);  
  91.                               System.out.println("Lock1监听的是:  "+watchPath);  
  92.                                 
  93.                               zk.exists("/a/"+watchPath, this);//监听此节点的详细情况,如果发生节点注销事件  
  94.                               //则会触发自身的process方法  
  95.                               break;//结束循环  
  96.                           }  
  97.                             
  98.                       }  
  99.                         
  100.                   }  
  101.                     
  102.                     
  103.               }  
  104.                
  105.          }  
  106.           
  107.           
  108.   
  109.     @Override  
  110.     public void process(WatchedEvent event) {  
  111.         // TODO Auto-generated method stub  
  112.            
  113.         if(event.getType()==Event.EventType.NodeDeleted){  
  114.               
  115.             //如果发现,监听的节点,挂掉了,那么就重新,进行监听   
  116.             try{  
  117.            System.out.println("注意有锁退出或释放,公平锁开始抢占........");  
  118.             check();  
  119.             }catch(Exception e){  
  120.                 e.printStackTrace();  
  121.                   
  122.             }  
  123.         }  
  124.     }  
  125.       
  126.      public String read(String path,Watcher watch)throws Exception{  
  127.            
  128.          byte[] data=zk.getData(path, watch, null);  
  129.            
  130.            
  131.          return new String(data,CHARSET);  
  132.      }  
  133.        
  134.      SimpleDateFormat f=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
  135.   
  136.        
  137.         
  138.      public void close()throws Exception{  
  139.          zk.close();  
  140.      }  
  141.        
  142.        
  143.        
  144.        
  145.      public void automicSwitch()throws Exception{  
  146.            
  147.         // System.out.println("有节点释放锁,Lock1锁占入.......,  时间  "+f.format(new Date()));  
  148.          System.out.println("Lock1的上级锁节点退出或释放锁了,Lock1锁占入.......,  时间  "+f.format(new Date()));  
  149.      }  
  150.        
  151.        
  152.      public void createPersist()throws Exception{  
  153.            
  154.          zk.create("/a", "主节点".getBytes(), Ids.OPEN_ACL_UNSAFE  , CreateMode.PERSISTENT);  
  155.            
  156.          System.out.println("创建主节点成功........");  
  157.            
  158.            
  159.      }  
  160.        
  161.        
  162.      public void createTemp()throws Exception{  
  163.            
  164.          zk.create("/a/b", "a".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  
  165.            
  166.       System.out.println("Lock1注册锁成功,进入公平队列...........");      
  167.            
  168.      }  
  169.        
  170.     public static void main(String[] args)throws Exception {  
  171.               
  172.             //Slave s=new Slave("192.168.120.128:2181");  
  173.             Lock1 lock=new Lock1("192.168.120.128:2181");  
  174.             //  lock.createPersist();//创建主节点  
  175.              lock.createTemp();//注册临时有序节点  
  176.              lock.check();  
  177.               Thread.sleep(Long.MAX_VALUE);  
  178.              //lock.close();  
  179.               
  180.     }  
  181.        
  182.        
  183.      public void startMaster(){  
  184.            
  185.          System.out.println("Lock1节点获取锁了,其他节点等待........");  
  186.      }  
  187.        
  188. }  
package com.test;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/***
 * 基于zookeeper实现的
 * 分布式公平锁
 * 
 * @author qin dong liang 
 * QQ技术群交流:324714439
 * 
 * */
public class Lock1  implements Watcher {
	
	
	/**
	 * ZK实例
	 * */
	private ZooKeeper zk;
	
	/**原子计数锁,防止在zk没有连上前,执行CURD操作*/
	private CountDownLatch down=new CountDownLatch(1);
	
	public Lock1() {
		// TODO Auto-generated constructor stub
	}
	
	
	
	public Lock1(String host)throws Exception {
		 this.zk=new ZooKeeper(host, 5000	, new Watcher() {
			
			@Override
			public void process(WatchedEvent event) {
				// TODO Auto-generated method stub
				/**链接上zk服务,岂可取消阻塞计数**/
				if(event.getState()==KeeperState.SyncConnected){
					down.countDown();
				}
				
			}
		});
	}
	/**
	 * 字符编码
	 * 
	 * **/
	 private static final Charset CHARSET=StandardCharsets.UTF_8;
	/***
	 * 
	 * 此方法是写入数据
	 * 如果不存在此节点
	 * 就会新建,已存在就是
	 * 更新
	 * 
	 * **/
		public void write(String path,String value)throws Exception{
			
			Stat stat=zk.exists(path, false);
			if(stat==null){
				zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}else{
				zk.setData(path, value.getBytes(CHARSET), -1);
			}
			
		}
		
		
		/**
		 * 
		 * 切换锁
		 * 
		 * **/
		 public void check()throws Exception{
			 List<String> list=zk.getChildren("/a", null);
			  Collections.sort(list);//排序使得节点有次序
			  if(list.isEmpty()){
				  System.out.println("此父路径下面没有节点,分布式锁任务完成或还没启动!");
			  }else{
				  
				  String start=list.get(0);//获取第一个节点
				  
				  String data=new String(zk.getData("/a/"+start, false,null));
				  if(data.equals("a")){//等于本身就启动作为Master
					  
					  if(list.size()==1){
						  startMaster();//作为Master启动
						  }else{
							  automicSwitch();//对于非第一个启动的节点,会调用此方法,因为他的第一个挂了
							  //或释放锁了,所以它是抢占的
						  }
				  }else{
					  //非当前节点,就打印当前节点,监控的节点
					  for(int i=0;i<list.size();i++){
						  //获取那个节点存的此客户端的模拟IP
						  String temp=new String(zk.getData("/a/"+list.get(i), false, null));
						  if(temp.equals("a")){
							  //因为前面作为首位判断,所以这个出现的位置不可能是首位
							  //需要监听小节点里面的最大的一个节点
							  String watchPath=list.get(i-1);
							  System.out.println("Lock1监听的是:  "+watchPath);
							  
							  zk.exists("/a/"+watchPath, this);//监听此节点的详细情况,如果发生节点注销事件
							  //则会触发自身的process方法
							  break;//结束循环
						  }
						  
					  }
					  
				  }
				  
				  
			  }
			 
		 }
		
		

	@Override
	public void process(WatchedEvent event) {
		// TODO Auto-generated method stub
		 
		if(event.getType()==Event.EventType.NodeDeleted){
			
			//如果发现,监听的节点,挂掉了,那么就重新,进行监听 
			try{
		   System.out.println("注意有锁退出或释放,公平锁开始抢占........");
			check();
			}catch(Exception e){
				e.printStackTrace();
				
			}
		}
	}
	/**
	 * 
	 * 读取数据,给定一个路径和
	 * 监听事件
	 * 
	 * ***/
	 public String read(String path,Watcher watch)throws Exception{
		 
		 byte[] data=zk.getData(path, watch, null);
		 
		 
		 return new String(data,CHARSET);
	 }
	 
	 SimpleDateFormat f=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

	 
	  /**
	   * 关闭zk连接
	   * 
	   * **/
	 public void close()throws Exception{
		 zk.close();
	 }
	 
	 
	 
	 /**
	  * 释放锁
	  * @throws Exception
	  */
	 public void automicSwitch()throws Exception{
		 
		// System.out.println("有节点释放锁,Lock1锁占入.......,  时间  "+f.format(new Date()));
		 System.out.println("Lock1的上级锁节点退出或释放锁了,Lock1锁占入.......,  时间  "+f.format(new Date()));
	 }
	 
	 /**
	  * 创建一个持久node,
	  * 
	  * **/
	 public void createPersist()throws Exception{
		 
		 zk.create("/a", "主节点".getBytes(), Ids.OPEN_ACL_UNSAFE	, CreateMode.PERSISTENT);
		 
		 System.out.println("创建主节点成功........");
		 
		 
	 }
	 
	 /***
	  * 创建锁node,注意是抢占 的
	  * 
	  * 
	  * */
	 public void createTemp()throws Exception{
		 
		 zk.create("/a/b", "a".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		 
	  System.out.println("Lock1注册锁成功,进入公平队列...........");	 
		 
	 }
	 
	public static void main(String[] args)throws Exception {
			
			//Slave s=new Slave("192.168.120.128:2181");
			Lock1 lock=new Lock1("192.168.120.128:2181");
		    //  lock.createPersist();//创建主节点
			 lock.createTemp();//注册临时有序节点
			 lock.check();
			  Thread.sleep(Long.MAX_VALUE);
			 //lock.close();
			
	}
	 
	 /***
	  * 获取锁成功
	  * 
	  * */
	 public void startMaster(){
		 
		 System.out.println("Lock1节点获取锁了,其他节点等待........");
	 }
	 
}
           

代码如上,所示,测试的时候,需要搭建一个3个节点的zookeeper集群,关于怎么搭建zookeeper集群,散仙前面的文章里有介绍,需要注意的是myid文件不要漏掉。

上面这个类,需要拷贝多份,并改变里面的节点的值,放在不同的eclipse中,进行模拟测试。