上一篇簡單介紹了一下akka的Actor建立、消息發送接收。但都僅限于本地的消息傳輸。接下來嘗試一下akka的遠端通路的實作。
AkkaService.java
package com.yonder.akka.test.remote;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
/**
* akka系統服務類
*/
public class AkkaService {
private static final Logger logger = LoggerFactory.getLogger(AkkaService.class);
private AkkaService(int port, String serverName, String actorName) {
this.host = getAddress();
this.port = port;
this.serverName = serverName;
this.actorName = actorName;
}
private ActorSystem actorSystem;
private String serverName;
private String actorName;
private String host;
private int port;
public String getServerName() {
return serverName;
}
public void setServerName(String serverName) {
this.serverName = serverName;
}
public String getActorName() {
return actorName;
}
public void setActorName(String actorName) {
this.actorName = actorName;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
/**
* 擷取akka系統服務對象
* @param port 綁定端口
* @param serverName 服務名稱
* @param actorName 接收消息Actor名稱
* @return
*/
public static AkkaService getInstance(int port, String serverName, String actorName){
return new AkkaService(port, serverName, actorName);
}
public void init() {
logger.info("Start ActorSystem...");
actorSystem = ActorSystem.create(serverName, createConfig());
logger.info("Start ActorSystem...OK");
ActorRef act = actorSystem.actorOf(Props.create(ReceiveActor.class), actorName);
act.tell(actorName + "已監聽成功.", ActorRef.noSender());
}
private Config createConfig() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("akka.loglevel", "ERROR");
map.put("akka.stdout-loglevel", "ERROR");
//開啟akka遠端調用
map.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider");
List<String> remoteTransports = new ArrayList<String>();
remoteTransports.add("akka.remote.netty.tcp");
map.put("akka.remote.enabled-transports", remoteTransports);
map.put("akka.remote.netty.tcp.hostname", host);
map.put("akka.remote.netty.tcp.port", port);
map.put("akka.remote.netty.tcp.maximum-frame-size", 100 * 1024 * 1024);
//forkjoinpool預設線程數 max(min(cpu線程數 * parallelism-factor, parallelism-max), 8)
map.put("akka.actor.default-dispatcher.fork-join-executor.parallelism-factor", "50");
map.put("akka.actor.default-dispatcher.fork-join-executor.parallelism-max", "50");
logger.info("akka.remote.netty.tcp.hostname="+map.get("akka.remote.netty.tcp.hostname"));
logger.info("akka.remote.netty.tcp.port="+map.get("akka.remote.netty.tcp.port"));
return ConfigFactory.parseMap(map);
}
/**
* 擷取本機ip
* @return
*/
private String getAddress() {
try {
Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
if (netInterface.isLoopback() || netInterface.isVirtual() || !netInterface.isUp()) {
continue;
}
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress ip = addresses.nextElement();
if (ip != null && ip instanceof Inet4Address) {
if (ip.getHostAddress().startsWith("192") || ip.getHostAddress().startsWith("10")
|| ip.getHostAddress().startsWith("172") || ip.getHostAddress().startsWith("169")) {
return ip.getHostAddress();
}
}
}
}
return null;
} catch (SocketException e) {
logger.error("Error when getting host ip address", e.getMessage());
return null;
}
}
public void dispose() {
logger.info("Shutdown ActorSystem...");
actorSystem.shutdown();
actorSystem.awaitTermination(Duration.apply(60, TimeUnit.SECONDS));
logger.info("Shutdown ActorSystem...OK");
}
public ActorSystem getActorSystem(){
return actorSystem;
}
/**
* 通路遠端Actor
* @param akkaService
* @param msg
* @return
*/
public String visitService(String serverName, String host, int port, String actorName, String msg) {
try {
ActorSelection selection = actorSystem.actorSelection(toAkkaUrl(serverName, host, port, actorName));
Timeout timeout = new Timeout(Duration.create(45, "seconds"));
Future<Object> future = Patterns.ask(selection, msg, timeout);
Object result = Await.result(future, timeout.duration());
return result.toString();
} catch (Exception e) {
return "出錯啦";
}
}
public String toAkkaUrl(String serviceName, String host, int port, String actorName) {
return "akka.tcp://" + serviceName + "@" + host + ":" + port + "/user/"
+ actorName;
}
}
ReceiveActor.java
package com.yonder.akka.test.remote;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.UntypedActor;
/**
* akka消息接收類
* @author cyd
* 2016年1月11日
*
*/
public class ReceiveActor extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger(ReceiveActor.class);
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof String) {
try {
logger.info("收到消息 msg:" + msg.toString());
this.getSender().tell("Hello I'm " + this.getSelf().path().name(), getSelf());
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
this.getSender().tell("Error!", getSelf());
}
} else {
logger.info(msg.toString());
}
}
}
RemoteAkkaServer.java
package com.yonder.akka.test.remote;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 執行個體程式入口
* 遠端服務
* @author cyd
* 2016年1月11日
*
*/
public class RemoteAkkaServer {
private static final Logger logger = LoggerFactory.getLogger(AkkaService.class);
public static void main(String[] args) {
AkkaService remoteService = AkkaService.getInstance(10002, "remoteServer", "remoteActor");
remoteService.init();
logger.info("remoteServer啟動成功");
}
}
LocalAkkaServer.java
package com.yonder.akka.test.remote;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 執行個體程式入口
* 本地服務
* @author cyd
* 2016年1月11日
*
*/
public class LocalAkkaServer {
private static final Logger logger = LoggerFactory.getLogger(AkkaService.class);
public static void main(String[] args) {
AkkaService localService = AkkaService.getInstance(10001, "localServer", "localActor");
localService.init();
logger.info("localServer啟動成功");
//由于在同一台機器上測試,是以直接取localService的ip
String str = localService.visitService("remoteServer", localService.getHost(), 10002, "remoteActor", "Hello I'm local!");
logger.info("reply:" + str);
}
}
注:本執行個體先啟動RemoteAkkaServer,啟動成功後再啟動LocalAkkaServer
運作結果如下
RemoteAkkaServer:
2016-01-11 14:42:36,836 [main] INFO [com.yonder.akka.test.remote.AkkaService.init:88] - Start ActorSystem...
2016-01-11 14:42:36,839 [main] INFO [com.yonder.akka.test.remote.AkkaService.createConfig:116] - akka.remote.netty.tcp.hostname=192.168.30.45
2016-01-11 14:42:36,839 [main] INFO [com.yonder.akka.test.remote.AkkaService.createConfig:117] - akka.remote.netty.tcp.port=10002
2016-01-11 14:42:37,687 [main] INFO [com.yonder.akka.test.remote.AkkaService.init:90] - Start ActorSystem...OK
2016-01-11 14:42:37,689 [main] INFO [com.yonder.akka.test.remote.AkkaService.main:20] - remoteServer啟動成功
2016-01-11 14:42:37,689 [remoteServer-akka.actor.default-dispatcher-3] INFO [com.yonder.akka.test.remote.ReceiveActor.onReceive:22] - 收到消息 msg:remoteActor已監聽成功.
2016-01-11 14:42:44,355 [remoteServer-akka.actor.default-dispatcher-4] INFO [com.yonder.akka.test.remote.ReceiveActor.onReceive:22] - 收到消息 msg:Hello I'm local!
LocalAkkaServer:
2016-01-11 14:42:43,330 [main] INFO [com.yonder.akka.test.remote.AkkaService.init:88] - Start ActorSystem...
2016-01-11 14:42:43,333 [main] INFO [com.yonder.akka.test.remote.AkkaService.createConfig:116] - akka.remote.netty.tcp.hostname=192.168.30.45
2016-01-11 14:42:43,333 [main] INFO [com.yonder.akka.test.remote.AkkaService.createConfig:117] - akka.remote.netty.tcp.port=10001
2016-01-11 14:42:44,180 [main] INFO [com.yonder.akka.test.remote.AkkaService.init:90] - Start ActorSystem...OK
2016-01-11 14:42:44,182 [main] INFO [com.yonder.akka.test.remote.AkkaService.main:21] - localServer啟動成功
2016-01-11 14:42:44,182 [localServer-akka.actor.default-dispatcher-3] INFO [com.yonder.akka.test.remote.ReceiveActor.onReceive:22] - 收到消息 msg:localActor已監聽成功.
2016-01-11 14:42:44,373 [main] INFO [com.yonder.akka.test.remote.AkkaService.main:24] - reply:Hello I'm remoteActor
這樣就實作了一個簡單的akka遠端通路。
執行個體中遠端Actor傳輸的是字元串,遠端Actor還可以傳輸對象,這讓一些業務邏輯的實作變得更為簡單友善。
akka的并發編後續再進行探讨