在上一篇中“基于Pipe管道通信”中很多朋友反應說隻有代碼沒有理論,看起來不知道過程。本文将過程補上,而且提供基于Pipe的另種通信方式-監聽方式的實作。
Pipe是Peer之間通信主要機制之一。JXTA中的Pipe和傳統的管道存在着本質的差別。它不是記憶體中存在的資料,也不是硬碟上的檔案,而隻是一個XML廣告文檔。是以,其是抽象的概念。其建立過程在這裡也稱為綁定過程,可以在不同時刻動态地綁定到不同的“實體管道末端”。其具有兩個明顯的特點:
1、Pipe可以綁定到任何端點
2、Pipe是單向的、不可靠的、異步的
JXTA中的Pipe中存在三種類型:JxtaUnicast,JxtaUnicastSecure,JxtaPropagate。
JxtaUnicast,JxtaUnicastSecure類型管道是單向的,可以在一個輸入管道和一個或多個輸出管道之間建立,而JxtaPropagate類型的管道則支援多個輸入端點。
下面給出其具體通信過程:
1、作為接收方的Peer需要具備一個PipeAdvertisement,然後根據這個PipeAdvertisement建立一個InputPipe,然後等待Messages的到達。
2、發送資料方的Peer需要使用相同的PipeAdvertisement,然後根據這個PipeAdvertisement來建立OutputPipe以發送資料。要建立OutputPipe,它要先發送一個Pipe Binding Query Message給所有它知道的Peers。
3、接收方收到這個Pipe Binding Query Message,看看自己緩存的Pipes中有沒有比對的PipeID。如果有,它就回複一個Pipe Binding Answer Message(裡面包含了自己的PeerAdvertisement)給接收方。
4、發送方接收到Pipe Binding Answer Message後,将PeerAdvertisement從中抽取出來。然後使用PeerAdvertisement中的Endpoint資訊來建立OutputPipe,這樣發送方才可以發送資料。
以上就是JXTA中基于Pipe的通信過程,下面我們基于Pipe,利用資訊監聽接口實作通信。在給出代碼之前,先給出下面兩個類所用到的Pipe中的主要類的描述。
PipeService:管道服務實作的接口,用于建立輸入,輸出管道以及建立管道注冊監聽器
InputPipe:輸入管道的接口,為注冊監聽器的時候也可以等待資訊
OutputPipe:輸出管道的接口,用來發送資訊
PipeMsgListener(之前版本是InputPipeListener):由JXTA程式實作的的接口,處理輸入管道消息達到事件
OutputPipeListener:由需要處理的輸出管道事件的應用程式實作的接口,通常在管道綁定完成的時候發生。
PipeMsgEvent:注冊到輸入管道監聽器的類,表示消息的到達。
OutputPipeEvent:注冊到輸出管道監聽器的類,可以用來綁定或者解析成功後的管道的ID或者輸出管道的實體。
寫了這麼,有些人應該看不下去了,呵呵。好,下面就給出代碼,在這裡我使用的PipeAdvertisement與以前的不同,以前是在程式中生成,在這裡是讀pipe.adv來傳回管道廣告。該檔案直接在工程下面,注意:如果讀不到該檔案,應該是路徑問題。
public class PipeServer implements PipeMsgListener {
static PeerGroup netPg = null;
transient NetworkConfigurator config;
private PipeService pipeService;
private PipeAdvertisement pipeAdv;
private InputPipe inputPipe = null;
public PipeServer() {
config = null;
try {
config = new NetworkConfigurator();
config.setPrincipal("Pipe1");
config.setPassword("888888888");
config.save();
netPg = new NetPeerGroupFactory().getInterface();
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
pipeService = netPg.getPipeService();
pipeAdv = PipeClient.getPipeAdvertisement();
}
public static void main(String args[]) {
PipeServer server = new PipeServer();
server.start();
}
public void start() {
try {
System.out.println("服務端=建立輸入管道");
// 建立輸入管道并注冊等待資訊的到達
inputPipe = pipeService.createInputPipe(pipeAdv, this);
} catch (IOException io) {
io.printStackTrace();
return;
}
if (inputPipe == null) {
System.out.println("服務端=不能打開輸入管道");
System.exit(-1);
}
System.out.println("服務端=等待輸出管道中的資訊......");
}
public void stop() {
inputPipe.close();
System.exit(-1);
}
public void pipeMsgEvent(PipeMsgEvent event) {
Message msg;
try {
// PipeMsgEvent事件中包含資訊
msg = event.getMessage();
if (msg == null) {
System.out.println("服務端=空資訊");
return;
}
} catch (Exception e) {
e.printStackTrace();
return;
}
// 取得所有資訊元素
Message.ElementIterator en = msg.getMessageElements();
if (!en.hasNext()) {
return;
}
// 取得content元素下的内容
MessageElement msgElement = msg.getMessageElement(null, PipeClient.MESSAGE_NAME_SPACE);
// 接收資訊
if (msgElement.toString() == null) {
System.out.println("服務端=接到空的資訊 ");
} else {
System.out.println("服務端=接收到的資訊 :" + msgElement.toString());
}
}
}
首先運作的是上面這個類,如果運作順序錯了,會連接配接不到,而發送不了資訊。
public class PipeClient implements OutputPipeListener {
public final static String MESSAGE_NAME_SPACE = "content";
private PipeService pipeService;
private PipeAdvertisement pipeAdv;
private OutputPipe outputPipe;
private NetworkConfigurator config;
private PeerGroup netPg = null;
public PipeClient() {
config = null;
try {
config = new NetworkConfigurator();
config.setPrincipal("Pipe2");
config.setPassword("888888888");
config.save();
netPg = new NetPeerGroupFactory().getInterface();
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
// 擷取管道服務
pipeService = netPg.getPipeService();
// 建立管道廣告
pipeAdv = getPipeAdvertisement();
}
public static void main(String args[]) {
// 連接配接目标Peer
PipeClient client = new PipeClient();
client.start();
}
public static PipeAdvertisement getPipeAdvertisement() {
PipeAdvertisement advertisement=null;
FileInputStream is;
try {
is = new FileInputStream("pipe.adv");//通過本地特定管道廣告來建立管道
advertisement=(PipeAdvertisement)AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, is);
is.close();
} catch (Exception e) {
e.printStackTrace();
}
return advertisement;
}
public synchronized void start() {
try {
// 一旦輸出管道建立完成,outputPipeEvent()就被激活
pipeService.createOutputPipe(pipeAdv, this);
} catch (IOException e) {
System.out.println("用戶端=建立輸出管道失敗");
e.printStackTrace();
System.exit(-1);
}
}
public void outputPipeEvent(OutputPipeEvent event) {
System.out.println("收到輸出管道建立事件");
// 擷取輸出管道對象
outputPipe = event.getOutputPipe();
Message msg;
try {
System.out.println("發送資訊中......");
// 執行個體化要發送的資訊對象
msg = new Message();
// 字元串資訊元素中添加要發送的内容“Testing Pipe"
StringMessageElement sme = new StringMessageElement(MESSAGE_NAME_SPACE, "Testing Pipe", null);
msg.addMessageElement(null, sme);
// 發送資訊
outputPipe.send(msg);
System.out.println("用戶端:資訊已經發送");
} catch (IOException e) {
System.out.println("用戶端:發送資訊失敗");
e.printStackTrace();
System.exit(-1);
}
stop();
}
public void stop() {
outputPipe.close();
System.exit(-1);
}
}
終于完了,這是我寫的最長一篇啦。今天就寫到這了。以後會繼續把我學到的JXTA P2P通信方面的知識共享給大家。
忘記提供pipe.adv,真不意思。pipe.adv檔案内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<jxta:PipeAdvertisement>
<Name>Pipe tutorial</Name>
<Id>urn:jxta:uuid-59616261646162614E50472050325033C0C1DE89719B456691A596B983BA0E1004</Id>
<Type>JxtaUnicast</Type>
</jxta:PipeAdvertisement>