天天看点

JXTA中基于管道监听方式通信

在上一篇中“基于Pipe管道通信”中很多朋友反应说只有代码没有理论,看起来不知道过程。本文将过程补上,而且提供基于Pipe的另种通信方式-监听方式的实现。

Pipe是Peer之间通信主要机制之一。JXTA中的Pipe和传统的管道存在着本质的区别。它不是内存中存在的数据,也不是硬盘上的文件,而只是一个XML广告文档。因此,其是抽象的概念。其创建过程在这里也称为绑定过程,可以在不同时刻动态地绑定到不同的“物理管道末端”。其具有两个明显的特点:

1、Pipe可以绑定到任何端点

2、Pipe是单向的、不可靠的、异步的

JXTA中的Pipe中存在三种类型:JxtaUnicast,JxtaUnicastSecure,JxtaPropagate。

JxtaUnicast,JxtaUnicastSecure类型管道是单向的,可以在一个输入管道和一个或多个输出管道之间创建,而JxtaPropagate类型的管道则支持多个输入端点。

下面给出其具体通信过程:

1、作为接收方的Peer需要具备一个PipeAdvertisement,然后根据这个PipeAdvertisement创建一个InputPipe,然后等待Messages的到达。

2、发送数据方的Peer需要使用相同的PipeAdvertisement,然后根据这个PipeAdvertisement来创建OutputPipe以发送数据。要创建OutputPipe,它要先发送一个Pipe Binding Query Message给所有它知道的Peers。

3、接收方收到这个Pipe Binding Query Message,看看自己缓存的Pipes中有没有匹配的PipeID。如果有,它就回复一个Pipe Binding Answer Message(里面包含了自己的PeerAdvertisement)给接收方。

4、发送方接收到Pipe Binding Answer Message后,将PeerAdvertisement从中抽取出来。然后使用PeerAdvertisement中的Endpoint信息来创建OutputPipe,这样发送方才可以发送数据。

以上就是JXTA中基于Pipe的通信过程,下面我们基于Pipe,利用信息监听接口实现通信。在给出代码之前,先给出下面两个类所用到的Pipe中的主要类的描述。

PipeService:管道服务实现的接口,用于创建输入,输出管道以及创建管道注册监听器

InputPipe:输入管道的接口,为注册监听器的时候也可以等待信息

OutputPipe:输出管道的接口,用来发送信息

PipeMsgListener(之前版本是InputPipeListener):由JXTA程序实现的的接口,处理输入管道消息达到事件

OutputPipeListener:由需要处理的输出管道事件的应用程序实现的接口,通常在管道绑定完成的时候发生。

PipeMsgEvent:注册到输入管道监听器的类,表示消息的到达。

OutputPipeEvent:注册到输出管道监听器的类,可以用来绑定或者解析成功后的管道的ID或者输出管道的实体。

写了这么,有些人应该看不下去了,呵呵。好,下面就给出代码,在这里我使用的PipeAdvertisement与以前的不同,以前是在程序中生成,在这里是读pipe.adv来返回管道广告。该文件直接在工程下面,注意:如果读不到该文件,应该是路径问题。

public class PipeServer implements PipeMsgListener {

    static PeerGroup netPg = null;

    transient NetworkConfigurator config;

    private PipeService pipeService;

    private PipeAdvertisement pipeAdv;

    private InputPipe inputPipe = null;

    public PipeServer() {

     config = null;

        try {

            config = new NetworkConfigurator();

            config.setPrincipal("Pipe1");

            config.setPassword("888888888");

            config.save();

            netPg = new NetPeerGroupFactory().getInterface();

        } catch (Exception e) {

            e.printStackTrace();

            System.exit(-1);

        }

        pipeService = netPg.getPipeService();

        pipeAdv = PipeClient.getPipeAdvertisement();

    }

    public static void main(String args[]) {

        PipeServer server = new PipeServer();

        server.start();

    }

    public void start() {

        try {

            System.out.println("服务端=创建输入管道");

            // 创建输入管道并注册等待信息的到达

            inputPipe = pipeService.createInputPipe(pipeAdv, this);

        } catch (IOException io) {

            io.printStackTrace();

            return;

        }

        if (inputPipe == null) {

            System.out.println("服务端=不能打开输入管道");

            System.exit(-1);

        }

        System.out.println("服务端=等待输出管道中的信息......");

    }

    public void stop() {

        inputPipe.close();

        System.exit(-1);

    }

    public void pipeMsgEvent(PipeMsgEvent event) {

        Message msg;

        try {

            // PipeMsgEvent事件中包含信息

            msg = event.getMessage();

            if (msg == null) {

                System.out.println("服务端=空信息");

                return;

            }

        } catch (Exception e) {

            e.printStackTrace();

            return;

        }

        // 取得所有信息元素

        Message.ElementIterator en = msg.getMessageElements();

        if (!en.hasNext()) {

            return;

        }

        // 取得content元素下的内容

        MessageElement msgElement = msg.getMessageElement(null, PipeClient.MESSAGE_NAME_SPACE);

        // 接收信息

        if (msgElement.toString() == null) {

            System.out.println("服务端=接到空的信息 ");

        } else {

            System.out.println("服务端=接收到的信息 :" + msgElement.toString());

        }

    }

}

首先运行的是上面这个类,如果运行顺序错了,会连接不到,而发送不了信息。

public class PipeClient implements OutputPipeListener {

    public final static String MESSAGE_NAME_SPACE = "content";

    private PipeService pipeService;

    private PipeAdvertisement pipeAdv;

    private OutputPipe outputPipe;

    private NetworkConfigurator config;

    private PeerGroup netPg = null;

    public PipeClient() {

     config = null;

        try {

            config = new NetworkConfigurator();

            config.setPrincipal("Pipe2");

            config.setPassword("888888888");

            config.save();

            netPg = new NetPeerGroupFactory().getInterface();

        } catch (Exception e) {

            e.printStackTrace();

            System.exit(-1);

        }

        // 获取管道服务

        pipeService = netPg.getPipeService();

        // 创建管道广告

        pipeAdv = getPipeAdvertisement();

    }

    public static void main(String args[]) {

        // 连接目标Peer

        PipeClient client = new PipeClient();

        client.start();

    }

    public static PipeAdvertisement getPipeAdvertisement() {

     PipeAdvertisement advertisement=null;

        FileInputStream is;

  try {

   is = new FileInputStream("pipe.adv");//通过本地特定管道广告来创建管道

   advertisement=(PipeAdvertisement)AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, is);

   is.close();

  } catch (Exception e) {

   e.printStackTrace();

  }

        return advertisement;

    }

    public synchronized void start() {

        try {

            // 一旦输出管道创建完成,outputPipeEvent()就被激活

            pipeService.createOutputPipe(pipeAdv, this);

        } catch (IOException e) {

            System.out.println("客户端=创建输出管道失败");

            e.printStackTrace();

            System.exit(-1);

        }

    }

    public void outputPipeEvent(OutputPipeEvent event) {

        System.out.println("收到输出管道创建事件");

        // 获取输出管道对象

        outputPipe = event.getOutputPipe();

        Message msg;

        try {

            System.out.println("发送信息中......");

            // 实例化要发送的信息对象

            msg = new Message();

            // 字符串信息元素中添加要发送的内容“Testing Pipe"

            StringMessageElement sme = new StringMessageElement(MESSAGE_NAME_SPACE, "Testing Pipe", null);

            msg.addMessageElement(null, sme);

            // 发送信息

            outputPipe.send(msg);

            System.out.println("客户端:信息已经发送");

        } catch (IOException e) {

            System.out.println("客户端:发送信息失败");

            e.printStackTrace();

            System.exit(-1);

        }

        stop();

    }

    public void stop() {

        outputPipe.close();

        System.exit(-1);

    }

}

终于完了,这是我写的最长一篇啦。今天就写到这了。以后会继续把我学到的JXTA P2P通信方面的知识共享给大家。

忘记提供pipe.adv,真不意思。pipe.adv文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>

<jxta:PipeAdvertisement>

<Name>Pipe tutorial</Name>

<Id>urn:jxta:uuid-59616261646162614E50472050325033C0C1DE89719B456691A596B983BA0E1004</Id>

<Type>JxtaUnicast</Type>

</jxta:PipeAdvertisement>

继续阅读