天天看點

JXTA中基于管道監聽方式通信

在上一篇中“基于Pipe管道通信”中很多朋友反應說隻有代碼沒有理論,看起來不知道過程。本文将過程補上,而且提供基于Pipe的另種通信方式-監聽方式的實作。

Pipe是Peer之間通信主要機制之一。JXTA中的Pipe和傳統的管道存在着本質的差別。它不是記憶體中存在的資料,也不是硬碟上的檔案,而隻是一個XML廣告文檔。是以,其是抽象的概念。其建立過程在這裡也稱為綁定過程,可以在不同時刻動态地綁定到不同的“實體管道末端”。其具有兩個明顯的特點:

1、Pipe可以綁定到任何端點

2、Pipe是單向的、不可靠的、異步的

JXTA中的Pipe中存在三種類型:JxtaUnicast,JxtaUnicastSecure,JxtaPropagate。

JxtaUnicast,JxtaUnicastSecure類型管道是單向的,可以在一個輸入管道和一個或多個輸出管道之間建立,而JxtaPropagate類型的管道則支援多個輸入端點。

下面給出其具體通信過程:

1、作為接收方的Peer需要具備一個PipeAdvertisement,然後根據這個PipeAdvertisement建立一個InputPipe,然後等待Messages的到達。

2、發送資料方的Peer需要使用相同的PipeAdvertisement,然後根據這個PipeAdvertisement來建立OutputPipe以發送資料。要建立OutputPipe,它要先發送一個Pipe Binding Query Message給所有它知道的Peers。

3、接收方收到這個Pipe Binding Query Message,看看自己緩存的Pipes中有沒有比對的PipeID。如果有,它就回複一個Pipe Binding Answer Message(裡面包含了自己的PeerAdvertisement)給接收方。

4、發送方接收到Pipe Binding Answer Message後,将PeerAdvertisement從中抽取出來。然後使用PeerAdvertisement中的Endpoint資訊來建立OutputPipe,這樣發送方才可以發送資料。

以上就是JXTA中基于Pipe的通信過程,下面我們基于Pipe,利用資訊監聽接口實作通信。在給出代碼之前,先給出下面兩個類所用到的Pipe中的主要類的描述。

PipeService:管道服務實作的接口,用于建立輸入,輸出管道以及建立管道注冊監聽器

InputPipe:輸入管道的接口,為注冊監聽器的時候也可以等待資訊

OutputPipe:輸出管道的接口,用來發送資訊

PipeMsgListener(之前版本是InputPipeListener):由JXTA程式實作的的接口,處理輸入管道消息達到事件

OutputPipeListener:由需要處理的輸出管道事件的應用程式實作的接口,通常在管道綁定完成的時候發生。

PipeMsgEvent:注冊到輸入管道監聽器的類,表示消息的到達。

OutputPipeEvent:注冊到輸出管道監聽器的類,可以用來綁定或者解析成功後的管道的ID或者輸出管道的實體。

寫了這麼,有些人應該看不下去了,呵呵。好,下面就給出代碼,在這裡我使用的PipeAdvertisement與以前的不同,以前是在程式中生成,在這裡是讀pipe.adv來傳回管道廣告。該檔案直接在工程下面,注意:如果讀不到該檔案,應該是路徑問題。

public class PipeServer implements PipeMsgListener {

    static PeerGroup netPg = null;

    transient NetworkConfigurator config;

    private PipeService pipeService;

    private PipeAdvertisement pipeAdv;

    private InputPipe inputPipe = null;

    public PipeServer() {

     config = null;

        try {

            config = new NetworkConfigurator();

            config.setPrincipal("Pipe1");

            config.setPassword("888888888");

            config.save();

            netPg = new NetPeerGroupFactory().getInterface();

        } catch (Exception e) {

            e.printStackTrace();

            System.exit(-1);

        }

        pipeService = netPg.getPipeService();

        pipeAdv = PipeClient.getPipeAdvertisement();

    }

    public static void main(String args[]) {

        PipeServer server = new PipeServer();

        server.start();

    }

    public void start() {

        try {

            System.out.println("服務端=建立輸入管道");

            // 建立輸入管道并注冊等待資訊的到達

            inputPipe = pipeService.createInputPipe(pipeAdv, this);

        } catch (IOException io) {

            io.printStackTrace();

            return;

        }

        if (inputPipe == null) {

            System.out.println("服務端=不能打開輸入管道");

            System.exit(-1);

        }

        System.out.println("服務端=等待輸出管道中的資訊......");

    }

    public void stop() {

        inputPipe.close();

        System.exit(-1);

    }

    public void pipeMsgEvent(PipeMsgEvent event) {

        Message msg;

        try {

            // PipeMsgEvent事件中包含資訊

            msg = event.getMessage();

            if (msg == null) {

                System.out.println("服務端=空資訊");

                return;

            }

        } catch (Exception e) {

            e.printStackTrace();

            return;

        }

        // 取得所有資訊元素

        Message.ElementIterator en = msg.getMessageElements();

        if (!en.hasNext()) {

            return;

        }

        // 取得content元素下的内容

        MessageElement msgElement = msg.getMessageElement(null, PipeClient.MESSAGE_NAME_SPACE);

        // 接收資訊

        if (msgElement.toString() == null) {

            System.out.println("服務端=接到空的資訊 ");

        } else {

            System.out.println("服務端=接收到的資訊 :" + msgElement.toString());

        }

    }

}

首先運作的是上面這個類,如果運作順序錯了,會連接配接不到,而發送不了資訊。

public class PipeClient implements OutputPipeListener {

    public final static String MESSAGE_NAME_SPACE = "content";

    private PipeService pipeService;

    private PipeAdvertisement pipeAdv;

    private OutputPipe outputPipe;

    private NetworkConfigurator config;

    private PeerGroup netPg = null;

    public PipeClient() {

     config = null;

        try {

            config = new NetworkConfigurator();

            config.setPrincipal("Pipe2");

            config.setPassword("888888888");

            config.save();

            netPg = new NetPeerGroupFactory().getInterface();

        } catch (Exception e) {

            e.printStackTrace();

            System.exit(-1);

        }

        // 擷取管道服務

        pipeService = netPg.getPipeService();

        // 建立管道廣告

        pipeAdv = getPipeAdvertisement();

    }

    public static void main(String args[]) {

        // 連接配接目标Peer

        PipeClient client = new PipeClient();

        client.start();

    }

    public static PipeAdvertisement getPipeAdvertisement() {

     PipeAdvertisement advertisement=null;

        FileInputStream is;

  try {

   is = new FileInputStream("pipe.adv");//通過本地特定管道廣告來建立管道

   advertisement=(PipeAdvertisement)AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, is);

   is.close();

  } catch (Exception e) {

   e.printStackTrace();

  }

        return advertisement;

    }

    public synchronized void start() {

        try {

            // 一旦輸出管道建立完成,outputPipeEvent()就被激活

            pipeService.createOutputPipe(pipeAdv, this);

        } catch (IOException e) {

            System.out.println("用戶端=建立輸出管道失敗");

            e.printStackTrace();

            System.exit(-1);

        }

    }

    public void outputPipeEvent(OutputPipeEvent event) {

        System.out.println("收到輸出管道建立事件");

        // 擷取輸出管道對象

        outputPipe = event.getOutputPipe();

        Message msg;

        try {

            System.out.println("發送資訊中......");

            // 執行個體化要發送的資訊對象

            msg = new Message();

            // 字元串資訊元素中添加要發送的内容“Testing Pipe"

            StringMessageElement sme = new StringMessageElement(MESSAGE_NAME_SPACE, "Testing Pipe", null);

            msg.addMessageElement(null, sme);

            // 發送資訊

            outputPipe.send(msg);

            System.out.println("用戶端:資訊已經發送");

        } catch (IOException e) {

            System.out.println("用戶端:發送資訊失敗");

            e.printStackTrace();

            System.exit(-1);

        }

        stop();

    }

    public void stop() {

        outputPipe.close();

        System.exit(-1);

    }

}

終于完了,這是我寫的最長一篇啦。今天就寫到這了。以後會繼續把我學到的JXTA P2P通信方面的知識共享給大家。

忘記提供pipe.adv,真不意思。pipe.adv檔案内容如下:

<?xml version="1.0" encoding="UTF-8"?>

<jxta:PipeAdvertisement>

<Name>Pipe tutorial</Name>

<Id>urn:jxta:uuid-59616261646162614E50472050325033C0C1DE89719B456691A596B983BA0E1004</Id>

<Type>JxtaUnicast</Type>

</jxta:PipeAdvertisement>

繼續閱讀