注册服务跟发现
定义:微服务时代,多个相同的jar包在不同的服务器上开启相同的服务,可以通过nginx在
服务端
进行负载均衡的配置。也可以通过ZooKeeper在客户端进行负载均衡配置。
-
- 多个服务注册
- 客户端获取中间件地址集合
- 从集合中随机选一个服务执行任务

服务端代码
用SpringBoot完成一个最简单的web服务,并且连接zk服务器,实现注册功能。
ProductController
@RestController
@RequestMapping("/product")
public class ProductController
{
@RequestMapping("/getProduct/{id}")
public Object getProduct(HttpServletRequest request, @PathVariable("id") String id)
{
return new Product(id, "name:" + request.getLocalPort());
}
}
InitListener
public class InitListener implements ServletContextListener {
@Value("${server.port}")
private int port;
@Override
public void contextInitialized(ServletContextEvent sce) {
WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext()).getAutowireCapableBeanFactory().autowireBean(this);
try {
String hostAddress = InetAddress.getLocalHost().getHostAddress();
ServiceRegister.register(hostAddress,port);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
Product
public class Product {
private String id;
private String name;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product(String id, String name) {
this.id = id;
this.name = name;
}
public Product() {
}
}
ServiceRegister
public class ServiceRegister
{
private static final String BASE_SERVICES = "/services";
private static final String SERVICE_NAME = "/products";
public static void register(String address, int port)
{
try
{
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, (watchedEvent) -> {
});
Stat exists = zooKeeper.exists(BASE_SERVICES + SERVICE_NAME, false);
if (exists == null)
{
zooKeeper.create(BASE_SERVICES + SERVICE_NAME, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String server_path = address + ":" + port;
zooKeeper.create(BASE_SERVICES + SERVICE_NAME + "/child", server_path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e)
{
e.printStackTrace();
}
}
}
ProductApp
@SpringBootApplication
public class ProductApp
{
public static void main(String[] args)
{
SpringApplication.run(ProductApp.class, args);
}
@Bean // 随着服务自动启动
public ServletListenerRegistrationBean servletListenerRegistrationBean()
{
ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
servletListenerRegistrationBean.setListener(new InitListener());
return servletListenerRegistrationBean;
}
}
application.properties
server.port=8080
可以改变server.port端口 比如
8080
跟
8081
两个端口 来实现同时启动注册两个服务。
业务端代码
OrderController
@RequestMapping("/order")
@RestController
public class OrderController
{
@Resource
private RestTemplate restTemplate;
private LoadBalance loadBalance = new RamdomLoadBalance();
@RequestMapping("/getOrder/{id}")
public Object getOrder(@PathVariable("id") String id)
{
System.out.println("sowhat1412:" + loadBalance.choseServiceHost());
Product product = restTemplate.getForObject("http://" + loadBalance.choseServiceHost() + "/product/getProduct/1", Product.class);
return new Order(id, "orderName", product);
}
}
InitListener
public class InitListener implements ServletContextListener {
private static final String BASE_SERVICES = "/services";
private static final String SERVICE_NAME="/products";
private ZooKeeper zooKeeper;
@Override
public void contextInitialized(ServletContextEvent sce) {
try {
zooKeeper = new ZooKeeper("127.0.0.1:2181",5000,(watchedEvent)->{
if(watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged && watchedEvent.getPath().equals(BASE_SERVICES+SERVICE_NAME)) {
updateServiceList();
}
});
updateServiceList();
} catch (Exception e) {
e.printStackTrace();
}
}
private void updateServiceList() {
try{
List<String> children = zooKeeper.getChildren(BASE_SERVICES + SERVICE_NAME, true);
List<String> newServerList = new ArrayList<String>();
for(String subNode:children) {
byte[] data = zooKeeper.getData(BASE_SERVICES + SERVICE_NAME + "/" + subNode, false, null);
String host = new String(data, "utf-8");
System.out.println("host:"+host);
newServerList.add(host);
}
LoadBalance.SERVICE_LIST = newServerList;
}catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
Order
public class Order {
private String id;
private String name;
private Product product;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product getProduct() {
return product;
}
public void setProduct(Product product) {
this.product = product;
}
public Order(String id, String name, Product product) {
this.id = id;
this.name = name;
this.product = product;
}
public Order() {
}
}
Product
public class Product {
private String id;
private String name;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product(String id, String name) {
this.id = id;
this.name = name;
}
public Product() {
}
}
LoadBalance
public abstract class LoadBalance
{
public volatile static List<String> SERVICE_LIST;
// 注意此处用 volatile 修饰的 IP池
public abstract String choseServiceHost();
}
public class RamdomLoadBalance extends LoadBalance
{
@Override
public String choseServiceHost()
{
String result = "";
if (!CollectionUtils.isEmpty(SERVICE_LIST))
{
int index = new Random().nextInt(SERVICE_LIST.size());
result = SERVICE_LIST.get(index);
}
return result;
}
}
分布式锁
任务通过竞争获取锁才能对该资源进行操作(①竞争锁);当有一个任务在对资源进行更新时(②占有锁),其他任务都不可以对这个资源进行操作(③任务阻塞),直到该任务完成更新(④释放锁);
-
- 多任务环境中才需要
- 任务都需要对同一共享资源进行写操作;
- 对资源的访问是互斥的
以前的多线程都是一个JVM集群上JUC编程,可以同syn,Lock,AtomicInteger来实现。但是当在集群中的时候就不可用了,因为 JVM 是多个。
老方法
/**
* @author sowhat
* @create 2020-06-10 14:37
*/
public class OrderService implements Runnable
{
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
private static CountDownLatch countDownLatch = new CountDownLatch(50);
private static List<String> result = new Vector<>();
@Override
public void run()
{
countDownLatch.countDown();
result.add(orderNumGenerator.getNumber_byLock());
}
public static void main(String[] args) throws InterruptedException
{
for (int i = 0; i < 50; i++)
{
new Thread(new OrderService()).start();
}
countDownLatch.await();
Thread.sleep(1000);
Collections.sort(result);
result.forEach(s -> System.out.println(s));
}
}
class OrderNumGenerator
{
public static int count = 0;
public String getNumber()
{
SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
return sim.format(new Date()) + "-" + (++count);
}
public static Object obj = new Object();
public String getNumber_syn()
{
synchronized (obj)
{
SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
return sim.format(new Date()) + "-" + (++count);
}
}
private Lock lock = new ReentrantLock();
public String getNumber_byLock()
{
try
{
lock.lock();
SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
return sim.format(new Date()) + "-" + (++count);
} finally
{
lock.unlock();
}
}
}
zookeeper 基于同名节点的分布式锁
核心思想
:在zk中有一个唯一的临时节点,只有拿到节点的才可以操作数据,没拿到就需要等待。
缺点
:可能引发
羊群效应
,第一个用完后瞬间有999个同时并发的向zk请求获得锁。
package cn.sowhat;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
class OrderNumGenerator
{
public static int count = 0;
public String getNumber()
{
SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
return sim.format(new Date()) + "-" + (++count);
}
}
public class ZkLock implements Runnable
{
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
private Lock lock = new ZookeeperDistrbuteLock();
@Override
public void run()
{
getNumber();
}
public void getNumber()
{
try
{
lock.getLock();
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + " ,生成ID: " + number);
} catch (Exception e)
{
e.printStackTrace();
} finally
{
lock.unLock();
}
}
public static void main(String[] args)
{
for (int i = 0; i < 50; i++)
{
new Thread(new ZkLock()).start();
}
}
}
interface Lock
{
public void getLock();
public void unLock();
}
abstract class AbstrackLock implements Lock
{
@Override
public void getLock()
{
if (tryLock())
{
System.out.println("获取lock锁资源");
} else
{
waitLock();
getLock();
}
}
public abstract boolean tryLock();
public abstract void waitLock();
}
abstract class ZookeeperAbstractLock extends AbstrackLock
{
private static final String CONN = "127.0.0.1:2181";
protected ZkClient zkClient = new ZkClient(CONN);
protected static final String PATH = "/lock";
protected static final String PATH2 = "/lock2";
}
class ZookeeperDistrbuteLock extends ZookeeperAbstractLock
{
private CountDownLatch countDownLatch = null;
@Override
public boolean tryLock()
{
try
{
zkClient.createEphemeral(PATH);
// 能成功创建则说明获得锁成功
return true;
} catch (Exception e)
{
return false;
}
}
@Override
public void waitLock()
{
IZkDataListener iZkDataListener = new IZkDataListener()
{
@Override
public void handleDataDeleted(String path) throws Exception
{// 唤醒被等待的线程
if (countDownLatch != null)
{
countDownLatch.countDown();
}
}
@Override
public void handleDataChange(String s, Object data) throws Exception
{
}
};
// 注册事件 监控数据 数据删除了会有反应
zkClient.subscribeDataChanges(PATH, iZkDataListener);
if (zkClient.exists(PATH))
{ // 如果已经有人用了 当先线程就等着吧,什么时候 出现了删除就会调用 iZkDataListener
countDownLatch = new CountDownLatch(1);
try
{
countDownLatch.await();
} catch (Exception e)
{
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
// 拿到锁需要的了 删除监听
}
@Override
public void getLock()
{
super.getLock();
}
@Override
public void unLock()
{
if (zkClient != null)
{
zkClient.delete(PATH);
zkClient.close();
}
}
}
高性能分布式锁
思想:很简单,每个要操作的不要乱抢了,在zk中进行
排队
,轮到了再操作数据。这样可以有效避免并发。
package cn.sowhat;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
class OrderNumGenerator
{
public static int count = 0;
public String getNumber()
{
return ++count + "";
}
}
public class ZkLock implements Runnable
{
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
private Lock lock = new ZookeeperDistrbuteLock2();
@Override
public void run()
{
getNumber();
}
public void getNumber()
{
try
{
lock.getLock();
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + " ,生成ID: " + number);
} catch (Exception e)
{
e.printStackTrace();
} finally
{
lock.unLock();
}
}
public static void main(String[] args)
{
for (int i = 0; i < 50; i++)
{
new Thread(new ZkLock()).start();
}
}
}
interface Lock
{
public void getLock();
public void unLock();
}
abstract class AbstrackLock implements Lock
{
@Override
public void getLock()
{
if (tryLock())
{
System.out.println("获取lock锁资源");
} else
{
waitLock();
getLock();
}
}
public abstract boolean tryLock();
public abstract void waitLock();
}
abstract class ZookeeperAbstractLock extends AbstrackLock
{
private static final String CONN = "127.0.0.1:2181";
protected ZkClient zkClient = new ZkClient(CONN);
protected static final String PATH = "/lock";
protected static final String PATH2 = "/lock2";
}
class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock
{
private CountDownLatch countDownLatch = null;
private String beforePath;
private String currentPath;
public ZookeeperDistrbuteLock2()
{
if (!this.zkClient.exists(PATH2))
{
this.zkClient.createPersistent(PATH2);
// 创建持久性节点
}
}
@Override
public boolean tryLock()
{
// 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentpath
if (currentPath == null || currentPath.length() <= 0)
{
// 创建一个临时 顺序节点
currentPath = this.zkClient.createEphemeralSequential(PATH2 + "/", "lock");
}
// 获取所有 临时节点并且排序,临时节点名称为自增的字符串 如: 0000000000400
List<String> children = this.zkClient.getChildren(PATH2);
Collections.sort(children);
if (currentPath.equals(PATH2 + '/' + children.get(0)))
{
return true;
// 如果当前节点 在所有节点中排名第一则获得锁了
} else
{
int wz = Collections.binarySearch(children, currentPath.substring(7));
beforePath = PATH2 + '/' + children.get(wz - 1);
}
return false;
}
@Override
public void waitLock()
{
IZkDataListener iZkDataListener = new IZkDataListener()
{
@Override
public void handleDataChange(String s, Object o) throws Exception
{
}
@Override
public void handleDataDeleted(String s) throws Exception
{
if(countDownLatch != null){
countDownLatch.countDown();
}
}
};
// 给排在前面的节点增加数据删除的watcher, 本质上是开启另外一个线程 监听前置节点
this.zkClient.subscribeDataChanges(beforePath,iZkDataListener);
if(this.zkClient.exists(beforePath)){
countDownLatch = new CountDownLatch(1);
try{countDownLatch.await();} catch(Exception e){
e.printStackTrace();
}
}
this.zkClient.unsubscribeDataChanges(beforePath,iZkDataListener);
}
@Override
public void getLock()
{
super.getLock();
}
@Override
public void unLock()
{
zkClient.delete(currentPath);
zkClient.close();
}
}
集群选举
思想: 其实跟分布式锁一样,就是分布式锁的简化版本,
package cn.sowhat.order;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletListenerRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Timer;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
/**
* @author sowhat
* @create 2020-06-10 18:55
*/
class IsMaster
{
public static boolean isSurvival;
}
@RestController
class IndexController
{
@RequestMapping("/getserverinfo")
public String getServerInfo()
{
return IsMaster.isSurvival ? "is Master" : "is slave";
}
}
class InitListener implements ServletContextListener
{
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
private String path = "/election";
@Value("${server.port}")
private String serverPort;
private void init()
{
System.out.println("项目启动完成");
createEphemeral();
zkClient.subscribeDataChanges(path, new IZkDataListener()
{
@Override
public void handleDataChange(String s, Object o) throws Exception
{
}
@Override
public void handleDataDeleted(String s) throws Exception
{
System.out.println("主节点挂了,重新选举");
Thread.sleep(5000);
createEphemeral();
}
});
}
private void createEphemeral()
{
try
{
zkClient.createEphemeral(path, serverPort);
IsMaster.isSurvival = true;
} catch (Exception e)
{
IsMaster.isSurvival = false;
}
}
@Override
public void contextInitialized(ServletContextEvent servletContextEvent)
{
init();
}
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent)
{
}
}
@SpringBootApplication
public class selectMaster
{
public static void main(String[] args)
{
SpringApplication.run(selectMaster.class,args);
}
@Bean
public ServletListenerRegistrationBean servletListenerRegistrationBean(){
ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
servletListenerRegistrationBean.setListener(new InitListener());
return servletListenerRegistrationBean;
}
}
配置中心
比如mybatis通过URL访问mysql,我们通过动态的修改URL实现从访问A数据库到访问B数据库的切换。核心就监听 dataChange。
ZK注意事项
- Zk数据与日志清理
dataDir目录、dataLogDir两个目录会随着时间推移变得庞大,容易造成硬盘满了,清理办法:
自己编写shell脚本,保留最新的n个文件
使用zk自带的zkClient.sh保留最新的n个文件,zkClient.sh –n 15
配置autopurge.snapRetainCount和autopurge.purgeInterval两个参数配合使用;
- Too many connections
默认最大连接数 默认为60,配置maxClientCnxns参数,配置单个客户端机器创建的最大连接数;
- 磁盘管理
磁盘的I/O性能直接制约zookeeper更新操作速度,为了提高zk的写性能建议:使用单独的磁盘,Jvm堆内存设置要小心。
- 磁盘管理集群数量
集群中机器的数量并不是越多越好,一个写操作需要半数以上的节点ack,所以集群节点数越多,整个集群可以抗挂点的节点数越多(越可靠),但是吞吐量越差。集群的数量必须为奇数;
- 磁盘管理集群数量
zk是基于内存进行读写操作的,有时候会进行消息广播,因此不建议在节点存取容量比较大的数据;