天天看點

JXTA初探

JXTA初探

最近學習JXTA,寫了一簡單程式,兩個類, CreateServer和DiscoveryPeer,CreateServer先建立一個group(MagcNet)和管道廣告,後利用管道廣告建立JxtaServerSocket監視客消息并作處理,DiscoveryPeer發現并加入MagicNet,再利用管道廣告建立JxtaSocket向Server發送資料,程式很簡單,時間不多,就不詳解程式了。貼出來希望對初學者有所幫助,程式如下:

//package edu.cqwu.magicqq.net.receive;

import java.util.Enumeration;

import java.io.InterruptedIOException;

import java.io.IOException;

import java.io.InputStream;

import java.net.URL;

import java.net.Socket;

import net.jxta.peergroup.PeerGroupFactory;

import net.jxta.exception.PeerGroupException;

import net.jxta.peergroup.PeerGroup;

import net.jxta.peergroup.PeerGroupID;

import net.jxta.discovery.DiscoveryService;

import net.jxta.pipe.PipeService;

import net.jxta.protocol.PeerGroupAdvertisement;

import net.jxta.protocol.PipeAdvertisement;

import net.jxta.protocol.ModuleImplAdvertisement;

import net.jxta.document.AdvertisementFactory;

import net.jxta.id.IDFactory;

import net.jxta.socket.JxtaServerSocket;

import net.jxta.socket.JxtaSocket;

public class CreateServer{

 private PeerGroup netPeerGroup = null; //the NetPeerGroup

 private PeerGroup MagicNet = null; //the Magic PeerGroup

 private DiscoveryService disco = null; //discovery service

 private PipeService pipes = null; //pipe service

 private PipeAdvertisement pipeAdv = null; //pipe advertisement

 private JxtaServerSocket serverSocket = null;

 private int timeout = 3000;

 private String username = "Magic-idea"; //user name

 private static String groupURL = "jxta:uuid-4d6172676572696e204272756e6f202002";

 public CreateServer(PeerGroup parentgroup){

  //this.netPeerGroup = parentgroup;

  //this.username = "Magic-idea";

 }

 //Start service

 public void startServer(){

  try{

   joinMagicNet(); //join the MagicNet peergroup

  }catch(Exception e){

   System.out.println("Can't join or create MagicNet(local peergroup)");

   System.exit(0);

  }

  if(!createMagicPipe()){ //create the pipe

   System.out.println("ceate MagicPeer pipe failure~~");

  }

  System.out.println("starting ServerSocket");

  createServerSocket();

        while (true) {

            try {

                System.out.println("Calling accept");

                Socket socket = serverSocket.accept();

                //serverSocket.getInetAddress();

                //socket.getLocalSocketAddress();

                // set reliable

                if (socket != null) {

                    System.out.println("socket created");

                    receiveData(socket);

                }

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

 }

 //Discover (or create) and join the MagicNet peergroup

 private void joinMagicNet() throws Exception {

  int count = 3;   // maximun number of attempts to discover

        System.out.println("Attempting to Discover the MagicNet PeerGroup");

        // Get the discovery service from the NetPeergroup

        DiscoveryService hdisco = netPeerGroup.getDiscoveryService();

        Enumeration ae = null;  // Holds the discovered peers

        // Loop until we discover the RestoNet or

        // until we've exhausted the desired number of attempts

        while (count-- > 0) {

            try {

                // search first in the peer local cache to find

                // the MagicNet peergroup advertisement

                ae = hdisco.getLocalAdvertisements(DiscoveryService.GROUP,

                                          "Name", "MagicNet");

                // If we found the MagicNet advertisement we are done

                if ((ae != null) && ae.hasMoreElements())

                    break;

                // If we did not find it, we send a discovery request

                hdisco.getRemoteAdvertisements(null,

                       DiscoveryService.GROUP, "Name", "MagicNet", 1, null);

                // Sleep to allow time for peers to respond to the

                // discovery request

                try {

                    Thread.sleep(timeout);

                } catch (InterruptedException ie) {}

            } catch (IOException e) {

                // Found nothing! Move on.

            }

        }

        PeerGroupAdvertisement MagicNetAdv = null;

        // Check if we found the MagicNet advertisement.

        // If we didn't, then either

        //       we are the first peer to join or

        //       no other MagicNet peers are up.

        // In either case, we must create the MagicNet peergroup

        if (ae == null || !ae.hasMoreElements()) {

            System.out.println(

                 "Could not find the MagicNet peergroup; creating one");

            try {

                // Create a new, all-purpose peergroup.

                ModuleImplAdvertisement implAdv =

                    netPeerGroup.getAllPurposePeerGroupImplAdvertisement();

                MagicNet = netPeerGroup.newGroup(

                                mkGroupID(),      // Assign new group ID

                                implAdv,          // The implem. adv

                                "MagicNet",       // Name of peergroup

                                "MagicNet, Inc.");// Descrīption of peergroup

                // Get the PeerGroup Advertisement

                MagicNetAdv = netPeerGroup.getPeerGroupAdvertisement();

            } catch (Exception e) {

                System.out.println("Error in creating MagicNet Peergroup");

                throw e;

            }

        } else {

            // The MagicNet advertisement was found in the cache;

            // that means we can join the existing MagicNet peergroup

            try {

                MagicNetAdv = (PeerGroupAdvertisement) ae.nextElement();

                MagicNet = netPeerGroup.newGroup(MagicNetAdv);

                 System.out.println(

                     "Found the MagicNet Peergroup advertisement; joined existing group");

            } catch (Exception e) {

                System.out.println("Error in creating MagicNet PeerGroup from existing adv");

                throw e;

            }

        }

        try {

            // Get the discovery and pipe services for the MagicNet Peergroup

            disco = MagicNet.getDiscoveryService();

            pipes = MagicNet.getPipeService();

        } catch (Exception e) {

            System.out.println("Error getting services from MagicNet");

            throw e;

        }

        System.out.println("MagicNet MagicPeer (" + username + ") is on-line");

        return;

 }

 //Create and publish a MagicPeer Pipe to receive

 private boolean createMagicPipe(){

  int count = 3;           // Discovery retry count

        Enumeration ae = null;   // Discovery response enumeration

        try {

            System.out.println(

                "Attempting to Discover the MagicPipe");

            // Check if I have already published myself

            while (count-- > 0) {

                try {

                    // Check first locally if we have the advertisement cached

                    ae = disco.getLocalAdvertisements(DiscoveryService.ADV,

                                      "Name", "MagicNet:MagicPipe:" + username);

                    // If we found our pipe advertisement we are done

                    if (ae != null && ae.hasMoreElements())

                        break;

                    // We did not find the advertisement locally;

                    // send a remote request

                    disco.getRemoteAdvertisements(null,

                          DiscoveryService.ADV, "name",

                          "MagicNet:MagicPipe:" + username, 1, null);

                    // Sleep to allow time for peers to respond to the

                    // discovery request

                    try {

                        Thread.sleep(timeout);

                    } catch (InterruptedException e) {}

                } catch (IOException e) {

                    // Found nothing! Move on

                }

            }

            if (ae == null || !ae.hasMoreElements()) {

                // We did not find the pipe advertisement, so create one

                System.out.println(

                    "Could not find the MagicPeer Pipe Advertisement");

                // Create a pipe advertisement for our RestoPeer

                pipeAdv = (PipeAdvertisement)

                    AdvertisementFactory.newAdvertisement(

                        PipeAdvertisement.getAdvertisementType());

                // Assign a unique ID to the pipe

                pipeAdv.setPipeID(

                        IDFactory.newPipeID(MagicNet.getPeerGroupID() ));

                // The symbolic name of the pipe is built from

                // the brand name of MagicPeer;  each MagicPeer

                // must therefore have a unique name.

                pipeAdv.setName("MagicNet:MagicPipe:" + username);

                // Set the type of the pipe to be unidirectional

                pipeAdv.setType(PipeService.UnicastType);

                disco.publish(pipeAdv,PeerGroup.DEFAULT_LIFETIME,PeerGroup.DEFAULT_EXPIRATION);

                System.out.println(

                    "Created the Restaurant Pipe Advertisement");

            } else {

                // We found an existing pipe advertisement

                pipeAdv = (PipeAdvertisement) ae.nextElement();

                System.out.println("Found MagicPeer Pipe Advertisement");

            }

            // Create my input pipe to listen for hungry peers

            // requests

            // 建立輸入管道,暫時未用

            // pipeIn = pipes.createInputPipe(myAdv);

        } catch (Exception e) {

            System.out.println("Could not initialize the MagicPeer pipe");

            return false;

        }

        return true;

 }

 //Create JxtaServerSocket

 private void createServerSocket(){

  try{

   serverSocket = new JxtaServerSocket(this.netPeerGroup,this.pipeAdv,10);

   serverSocket.setSoTimeout(0);

  }catch(Exception e){

   System.out.println("failure to create the JxtaServerSocket!");

   System.exit(1);

  }

  //using the cycle receive the data

  while (true) {

            try {

                System.out.println("Calling accept");

                Socket socket = serverSocket.accept();

                // set reliable

                if (socket != null) {

                    System.out.println("socket created");

                    receiveData(socket);

                }

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

 }

 //opens an output pipe to the MagicPeer requester,and receive the data

 private void receiveData(Socket socket){

  try {

         System.out.println("以下是位址:");

         System.out.println(socket.getRemoteSocketAddress());

            int size = 4096;

            byte[] buf = new byte[size];

            int read;

            InputStream in = socket.getInputStream();

            // this call should block until bits are avail.

            long total = 0;

            long start = System.currentTimeMillis();

            while (true) {

                read = in.read(buf, 0, 4096);

                if (read < 1) {

                    break;

                }

                total += read;

                //System.out.print(".");

                //System.out.flush();

            }

            System.out.println("哈哈~~~~"); 

            System.out.println(Byte.toString(buf[1]));

            char c[]=new char[1040];

            for(int i=0;i<520;i++){

    c[i]=(char)(buf[i*2]<<8);

    c[i]=(char)((c[i]&0xff00)|(0x00ff&buf[i*2+1]));

   }

            String s=null;

   s=new String(c);

            System.out.println("來自客戶的資料: 哈哈+++ "+s.trim());

            System.out.println("");

            long elapsed = System.currentTimeMillis() - start;

            System.out.println("EOT. Received " + total + " bytes in " + elapsed + " ms. Throughput = " + ((total * 8000) / (1024 * elapsed)) + " Kbit/s.");

            socket.close();

            System.out.println("Closed connection. Ready for next connection.");

        } catch (IOException ie) {

            ie.printStackTrace();

        }

 }

 //create group ID

 private PeerGroupID mkGroupID() throws Exception {

        return (PeerGroupID) IDFactory.fromURL(

             new URL("urn", "", groupURL));

    }

    //test this class

    public static void main(String agrs[]){

        CreateServer cs = new CreateServer(null);

        cs.startjxta();

        cs.startServer();

    }

    public void startjxta(){

     try {

            //Discover and join (or start) the default peergroup

            netPeerGroup = PeerGroupFactory.newNetPeerGroup();

        } catch (PeerGroupException e) {

            //Couldn't initialize; can't continue

            System.out.println("Fatal error : creating the NetPeerGroup");

            System.exit(1);

        }

    }

}

package edu.cqwu.magicqq.net.discovery;

import java.io.IOException;

import java.io.ByteArrayInputStream;

import java.io.OutputStream;

import java.util.Enumeration;

import java.net.Socket;

import net.jxta.discovery.DiscoveryService;

import net.jxta.discovery.DiscoveryListener;

import net.jxta.discovery.DiscoveryEvent;

import net.jxta.protocol.DiscoveryResponseMsg;

import net.jxta.peergroup.PeerGroup;

import net.jxta.peergroup.PeerGroupFactory;

import net.jxta.exception.PeerGroupException;

import net.jxta.protocol.PeerGroupAdvertisement;

import net.jxta.protocol.PipeAdvertisement;

import net.jxta.document.AdvertisementFactory;

import net.jxta.document.MimeMediaType;

import net.jxta.pipe.PipeService;

import net.jxta.socket.JxtaServerSocket;

import net.jxta.socket.JxtaSocket;

public class DiscoveryPeer implements DiscoveryListener{

 private DiscoveryService disco = null;

 private PipeService pipes = null;

 private PipeAdvertisement pipeAdv = null;

 private PeerGroup netPeerGroup = null;

 private PeerGroup MagicNet = null;

 private int timeout = 3000;

 private JxtaSocket socket = null;

 private static int ITERATIONS = 1824;

    // payload size

    private static int payloadSize = 1040;//64 * 1024;

 public DiscoveryPeer(PeerGroup parentgroup){

  //this.netPeerGroup = parentgroup;

 }

 //test

 public static void main(String agrs[]){

  DiscoveryPeer dp = new DiscoveryPeer(null);

  dp.startjxta();

  //dp.run();

 }

 //test

 public void startjxta(){

  try{

   netPeerGroup = PeerGroupFactory.newNetPeerGroup();

  }catch(PeerGroupException e){

   System.out.println("Failure to create peergroup!");

   System.exit(0);

  }

  if(!joinMagicNet()){

   System.out.println("Can't join the Magicnet!");

  }

  System.out.println("Find the pipeadv");

  //disco.addDiscoveryListener(this);

        //disco.getRemoteAdvertisements(null,

        //                     DiscoveryService.ADV,

        //                     "name", "MagicNet:MagicPipe:*", 5, null);                

  try {

              // 為 DiscoveryResponse events 添加 DiscoveryListener

              disco.addDiscoveryListener(this);

              while (true) {

                  System.out.println("Sending a Discovery Message");

                  // 尋找Peers,每個Peer最多回報5個 。

                  disco.getRemoteAdvertisements(null, DiscoveryService.ADV,

                                      "Name","MagicNet:MagicPipe:*",5,null);

                  // 等待一分鐘

                  try {

                      Thread.sleep(2 * 1000);

                  } catch(Exception e) {}

            }

          } catch(Exception e) {

              e.printStackTrace();

          }

 }

 private void run(){

 }

 private boolean joinMagicNet() {

        int count = 3; // maximum number of attempts to discover

        System.out.println("Attempting to discover the MagicNet Peergroup");

        // Get the Discovery service handle from the NetPeerGroup

        DiscoveryService hdisco = netPeerGroup.getDiscoveryService();

        // All discovered RestoNet Peers

        Enumeration ae = null;

        // Loop until we find the "RestoNet" Peergroup advertisement

        // or we've exhausted the desired number of attempts

        while (count-- > 0) {

            try {

                // Check if we have the advertisement in the local

                // peer cache

                ae = hdisco.getLocalAdvertisements(DiscoveryService.GROUP,

                                            "Name", "MagicNet");

                // If we found the RestoNet advertisement, we are done

                if ((ae != null) && ae.hasMoreElements())

                    break;

                // The RestoNet advertisement is not in the local

                // cache . Send a discovery request to search for it.

                hdisco.getRemoteAdvertisements(null,

                           DiscoveryService.GROUP, "Name", "MagicNet", 1, null);

                // Wait to give peers a chance to respond

                try {

                    Thread.sleep(timeout);

                } catch (InterruptedException ie) {}

            } catch (IOException e) {

                // Found nothing! Move on.

            }

        }

        // Check if we found the RestoNet advertisement

        if (ae == null || !ae.hasMoreElements()) {

            return false;

        }

        System.out.println("Found the MagicNet PeerGroup Advertisement");

        // Get the advertisement

        PeerGroupAdvertisement adv =

            (PeerGroupAdvertisement) ae.nextElement();

        try {

            // Call the PeerGroup Factory to instantiate a new

            // peergroup instance

            MagicNet = netPeerGroup.newGroup(adv);

            // Get the Discovery and Pipe services to

            // be used within the RestoNet Peergroup

            disco = MagicNet.getDiscoveryService();

            pipes = MagicNet.getPipeService();

        } catch (Exception e) {

          System.out.println("Could not create RestoPeerGroup");

          return false;

        }

        System.out.println("The MagicPeer joined the MagicNet PeerGroup");

        return true;

    }

 public void discoveryEvent(DiscoveryEvent ev){

  System.out.println("Processing discovery event");

        DiscoveryResponseMsg msg = ev.getResponse();

        // The enumeration contains all the pipe advertisements that

        // were found.

        Enumeration e = msg.getResponses();

        while (e.hasMoreElements()) {

            try {

                String s = (String) e.nextElement();

                PipeAdvertisement adv = (PipeAdvertisement)

                    AdvertisementFactory.newAdvertisement(

                        new MimeMediaType("text/xml"),

                        new ByteArrayInputStream(s.getBytes()));

                //send data

                this.sendIM(adv,"哈哈~~~呵呵...測試一下哈...");

            } catch (Exception ex) {

                System.out.println("Can't connect to peer " + ex);

                continue;

            }

        }

 }

 public void sendIM(PipeAdvertisement pipeAdv,String str) throws IOException {

        int bufsize = 4096;

        System.out.println("Connecting to the server");

        socket = new JxtaSocket(netPeerGroup,

                                //no specific peerid

                                null,

                                pipeAdv,

                                //general TO: 30 seconds

                                30000,

                                // reliable connection

                                true);

        // Set buffer size to payload size

        socket.setOutputStreamBufferSize(65536);

        System.out.println("Reading in data");

        OutputStream ōut = socket.getOutputStream();

        //byte[] payload = new byte[payloadSize];

        byte[] bte = new byte[payloadSize];

        for(int i=0;i<str.length();i++){

         bte[i*2]=(byte)(str.charAt(i)>>8);

         bte[i*2+1]=(byte)str.charAt(i);

        }

        bte[str.length()+1]=(byte)(bte[str.length()+1]&0xff);

        long t0 = System.currentTimeMillis();

        //寫出

        out.write(bte, 0, 4096);

        out.flush();

        // include close in timing since it may need to flush the

        // tail end of the stream.

        socket.close();

        long t1 = System.currentTimeMillis();

        System.out.println("Completed in :" + (t1 - t0) + " msec");

        System.out.println("Data Rate :" + ((long) 64 * ITERATIONS * 8000) / (t1 - t0) + " Kbit/sec");

    }

}