前提摘要
- 服務端與用戶端通信的host與port是預先就确定好的,每一個用戶端都必須知道對應服務的ip與端口号, 并且如果服務挂了或者換位址了,就很麻煩。擴充性也不強, 是以我們需要引入注冊中心
- 在RPC架構中,一般一個服務會有多個提供者支援,如何分散服務提供者的壓力?引入負載均衡 RPC三大要素用戶端和服務端我們都已經有了,現在就隻剩下注冊中心這最後一塊拼圖了!Let's go!
- Zookeeper
注冊中心的位址是固定的(為了高可用一般是叢集,我們看做黑盒即可),服務端上線時,在注冊中心注冊自己的服務與對應的位址,而用戶端調用服務時,去注冊中心根據服務名找到對應的服務端位址。
- Curator
Curator是一套Zookeeper用戶端架構,解決了很多 Zookeeper 用戶端非常底層的細節開發工作,這次項目的更新便基于此。
Zookeeper注冊中心
首先在本機安裝Zookeeper,預設端口為2181。
定義服務注冊接口
// 服務注冊接口,兩大基本功能,注冊:儲存服務與位址。 查詢:根據服務名查找位址
public interface ServiceRegister {
void register(String serviceName, InetSocketAddress serverAddress);
InetSocketAddress serviceDiscovery(String serviceName);
}
複制代碼
服務注冊接口的實作類為
public class ZkServiceRegister implements ServiceRegister {
// curator 提供的zookeeper用戶端
private CuratorFramework client;
// zookeeper根路徑節點
private static final String ROOT_PATH = "MyRPC";
// 這裡負責zookeeper用戶端的初始化,并與zookeeper服務端建立連接配接
public ZkServiceRegister(){
// 指數時間重試
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的位址固定,不管是服務提供者還是,消費者都要與之建立連接配接
// sessionTimeoutMs 與 zoo.cfg中的tickTime 有關系,
// zk還會根據minSessionTimeout與maxSessionTimeout兩個參數重新調整最後的逾時值。預設分别為tickTime 的2倍和20倍
// 使用心跳監聽狀态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
System.out.println("zookeeper 連接配接成功");
}
@Override
public void register(String serviceName, InetSocketAddress serverAddress){
try {
// serviceName建立成永久節點,服務提供者下線時,不删服務名,隻删位址
if(client.checkExists().forPath("/" + serviceName) == null){
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/" + serviceName);
}
// 路徑位址,一個/代表一個節點
String path = "/" + serviceName +"/"+ getServiceAddress(serverAddress);
// 臨時節點,伺服器下線就删除節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (Exception e) {
System.out.println("此服務已存在");
}
}
// 根據服務名傳回位址
@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
List<String> strings = client.getChildren().forPath("/" + serviceName);
// 這裡預設用的第一個,後面加負載均衡
String string = strings.get(0);
return parseAddress(string);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// 位址 -> XXX.XXX.XXX.XXX:port 字元串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() +
":" +
serverAddress.getPort();
}
// 字元串解析為位址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}
複制代碼
用戶端更新
執行個體用戶端的時候不再需要傳入host,post的值了,執行個體在發送請求即sendRequest的過程中會自動通過ZkServiceRegister擷取到zookeeper注冊中心擷取到目前服務需要發送到位址(服務提供端的所在地。
\\不再需要傳入host,post
RPCClient rpcClient = new NettyRPCClient();
複制代碼
\\擷取注冊中心存儲的資訊
InetSocketAddress address = serviceRegister.serviceDiscovery(request.getInterfaceName());
host = address.getHostName();
port = address.getPort();
複制代碼
服務端更新
用戶端不需要再寫入host,post了,相應的服務端就需要給注冊中心傳它能提供的服務以及它的host,port。
ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 8899);
複制代碼
在服務提供類加入注冊的功能
public class ServiceProvider {
private Map<String, Object> interfaceProvider;
private String host;
private int port;
private ServiceRegister serviceRegister;
public ServiceProvider(String host, int port){
this.host = host;
this.port = port;
this.serviceRegister = new ZkServiceRegister();
this.interfaceProvider = new HashMap<>();
}
public void provideServiceInterface(Object service){
Class<?>[] interfaces = service.getClass().getInterfaces();
for(Class clazz : interfaces){
interfaceProvider.put(clazz.getName(),service);
// 在注冊中心注冊服務
serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port));
}
}
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}
複制代碼
負載均衡
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
List<String> strings = client.getChildren().forPath("/" + serviceName);
// 這裡預設用的第一個,後面加負載均衡
String string = strings.get(0);
return parseAddress(string);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
複制代碼
上面的代碼中其實還存在一個小問題,每次我們用戶端問注冊中心要服務對應的位址的時候,永遠是給第一個服務者的位址,這顯然是不合理的,我們需要分散服務提供者的壓力。
- 負載均衡接口
// 給伺服器位址清單,根據不同的負載均衡政策選擇一個
public interface LoadBalance {
String balance(List<String> addressList);
}
/**
* 随機負載均衡
*/
public class RandomLoadBalance implements LoadBalance{
@Override
public String balance(List<String> addressList) {
Random random = new Random();
int choose = random.nextInt(addressList.size());
System.out.println("負載均衡選擇了" + choose + "伺服器");
return addressList.get(choose);
}
}
/**
* 輪詢負載均衡
*/
public class RoundLoadBalance implements LoadBalance{
private int choose = -1;
@Override
public String balance(List<String> addressList) {
choose++;
choose = choose%addressList.size();
return addressList.get(choose);
}
}