JXTA初探
最近學習JXTA,寫了一簡單程式,兩個類, CreateServer和DiscoveryPeer,CreateServer先建立一個group(MagcNet)和管道廣告,後利用管道廣告建立JxtaServerSocket監視客消息并作處理,DiscoveryPeer發現并加入MagicNet,再利用管道廣告建立JxtaSocket向Server發送資料,程式很簡單,時間不多,就不詳解程式了。貼出來希望對初學者有所幫助,程式如下:
//package edu.cqwu.magicqq.net.receive;
import java.util.Enumeration;
import java.io.InterruptedIOException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.Socket;
import net.jxta.peergroup.PeerGroupFactory;
import net.jxta.exception.PeerGroupException;
import net.jxta.peergroup.PeerGroup;
import net.jxta.peergroup.PeerGroupID;
import net.jxta.discovery.DiscoveryService;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PeerGroupAdvertisement;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.protocol.ModuleImplAdvertisement;
import net.jxta.document.AdvertisementFactory;
import net.jxta.id.IDFactory;
import net.jxta.socket.JxtaServerSocket;
import net.jxta.socket.JxtaSocket;
public class CreateServer{
private PeerGroup netPeerGroup = null; //the NetPeerGroup
private PeerGroup MagicNet = null; //the Magic PeerGroup
private DiscoveryService disco = null; //discovery service
private PipeService pipes = null; //pipe service
private PipeAdvertisement pipeAdv = null; //pipe advertisement
private JxtaServerSocket serverSocket = null;
private int timeout = 3000;
private String username = "Magic-idea"; //user name
private static String groupURL = "jxta:uuid-4d6172676572696e204272756e6f202002";
public CreateServer(PeerGroup parentgroup){
//this.netPeerGroup = parentgroup;
//this.username = "Magic-idea";
}
//Start service
public void startServer(){
try{
joinMagicNet(); //join the MagicNet peergroup
}catch(Exception e){
System.out.println("Can't join or create MagicNet(local peergroup)");
System.exit(0);
}
if(!createMagicPipe()){ //create the pipe
System.out.println("ceate MagicPeer pipe failure~~");
}
System.out.println("starting ServerSocket");
createServerSocket();
while (true) {
try {
System.out.println("Calling accept");
Socket socket = serverSocket.accept();
//serverSocket.getInetAddress();
//socket.getLocalSocketAddress();
// set reliable
if (socket != null) {
System.out.println("socket created");
receiveData(socket);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//Discover (or create) and join the MagicNet peergroup
private void joinMagicNet() throws Exception {
int count = 3; // maximun number of attempts to discover
System.out.println("Attempting to Discover the MagicNet PeerGroup");
// Get the discovery service from the NetPeergroup
DiscoveryService hdisco = netPeerGroup.getDiscoveryService();
Enumeration ae = null; // Holds the discovered peers
// Loop until we discover the RestoNet or
// until we've exhausted the desired number of attempts
while (count-- > 0) {
try {
// search first in the peer local cache to find
// the MagicNet peergroup advertisement
ae = hdisco.getLocalAdvertisements(DiscoveryService.GROUP,
"Name", "MagicNet");
// If we found the MagicNet advertisement we are done
if ((ae != null) && ae.hasMoreElements())
break;
// If we did not find it, we send a discovery request
hdisco.getRemoteAdvertisements(null,
DiscoveryService.GROUP, "Name", "MagicNet", 1, null);
// Sleep to allow time for peers to respond to the
// discovery request
try {
Thread.sleep(timeout);
} catch (InterruptedException ie) {}
} catch (IOException e) {
// Found nothing! Move on.
}
}
PeerGroupAdvertisement MagicNetAdv = null;
// Check if we found the MagicNet advertisement.
// If we didn't, then either
// we are the first peer to join or
// no other MagicNet peers are up.
// In either case, we must create the MagicNet peergroup
if (ae == null || !ae.hasMoreElements()) {
System.out.println(
"Could not find the MagicNet peergroup; creating one");
try {
// Create a new, all-purpose peergroup.
ModuleImplAdvertisement implAdv =
netPeerGroup.getAllPurposePeerGroupImplAdvertisement();
MagicNet = netPeerGroup.newGroup(
mkGroupID(), // Assign new group ID
implAdv, // The implem. adv
"MagicNet", // Name of peergroup
"MagicNet, Inc.");// Descrīption of peergroup
// Get the PeerGroup Advertisement
MagicNetAdv = netPeerGroup.getPeerGroupAdvertisement();
} catch (Exception e) {
System.out.println("Error in creating MagicNet Peergroup");
throw e;
}
} else {
// The MagicNet advertisement was found in the cache;
// that means we can join the existing MagicNet peergroup
try {
MagicNetAdv = (PeerGroupAdvertisement) ae.nextElement();
MagicNet = netPeerGroup.newGroup(MagicNetAdv);
System.out.println(
"Found the MagicNet Peergroup advertisement; joined existing group");
} catch (Exception e) {
System.out.println("Error in creating MagicNet PeerGroup from existing adv");
throw e;
}
}
try {
// Get the discovery and pipe services for the MagicNet Peergroup
disco = MagicNet.getDiscoveryService();
pipes = MagicNet.getPipeService();
} catch (Exception e) {
System.out.println("Error getting services from MagicNet");
throw e;
}
System.out.println("MagicNet MagicPeer (" + username + ") is on-line");
return;
}
//Create and publish a MagicPeer Pipe to receive
private boolean createMagicPipe(){
int count = 3; // Discovery retry count
Enumeration ae = null; // Discovery response enumeration
try {
System.out.println(
"Attempting to Discover the MagicPipe");
// Check if I have already published myself
while (count-- > 0) {
try {
// Check first locally if we have the advertisement cached
ae = disco.getLocalAdvertisements(DiscoveryService.ADV,
"Name", "MagicNet:MagicPipe:" + username);
// If we found our pipe advertisement we are done
if (ae != null && ae.hasMoreElements())
break;
// We did not find the advertisement locally;
// send a remote request
disco.getRemoteAdvertisements(null,
DiscoveryService.ADV, "name",
"MagicNet:MagicPipe:" + username, 1, null);
// Sleep to allow time for peers to respond to the
// discovery request
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {}
} catch (IOException e) {
// Found nothing! Move on
}
}
if (ae == null || !ae.hasMoreElements()) {
// We did not find the pipe advertisement, so create one
System.out.println(
"Could not find the MagicPeer Pipe Advertisement");
// Create a pipe advertisement for our RestoPeer
pipeAdv = (PipeAdvertisement)
AdvertisementFactory.newAdvertisement(
PipeAdvertisement.getAdvertisementType());
// Assign a unique ID to the pipe
pipeAdv.setPipeID(
IDFactory.newPipeID(MagicNet.getPeerGroupID() ));
// The symbolic name of the pipe is built from
// the brand name of MagicPeer; each MagicPeer
// must therefore have a unique name.
pipeAdv.setName("MagicNet:MagicPipe:" + username);
// Set the type of the pipe to be unidirectional
pipeAdv.setType(PipeService.UnicastType);
disco.publish(pipeAdv,PeerGroup.DEFAULT_LIFETIME,PeerGroup.DEFAULT_EXPIRATION);
System.out.println(
"Created the Restaurant Pipe Advertisement");
} else {
// We found an existing pipe advertisement
pipeAdv = (PipeAdvertisement) ae.nextElement();
System.out.println("Found MagicPeer Pipe Advertisement");
}
// Create my input pipe to listen for hungry peers
// requests
// 建立輸入管道,暫時未用
// pipeIn = pipes.createInputPipe(myAdv);
} catch (Exception e) {
System.out.println("Could not initialize the MagicPeer pipe");
return false;
}
return true;
}
//Create JxtaServerSocket
private void createServerSocket(){
try{
serverSocket = new JxtaServerSocket(this.netPeerGroup,this.pipeAdv,10);
serverSocket.setSoTimeout(0);
}catch(Exception e){
System.out.println("failure to create the JxtaServerSocket!");
System.exit(1);
}
//using the cycle receive the data
while (true) {
try {
System.out.println("Calling accept");
Socket socket = serverSocket.accept();
// set reliable
if (socket != null) {
System.out.println("socket created");
receiveData(socket);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//opens an output pipe to the MagicPeer requester,and receive the data
private void receiveData(Socket socket){
try {
System.out.println("以下是位址:");
System.out.println(socket.getRemoteSocketAddress());
int size = 4096;
byte[] buf = new byte[size];
int read;
InputStream in = socket.getInputStream();
// this call should block until bits are avail.
long total = 0;
long start = System.currentTimeMillis();
while (true) {
read = in.read(buf, 0, 4096);
if (read < 1) {
break;
}
total += read;
//System.out.print(".");
//System.out.flush();
}
System.out.println("哈哈~~~~");
System.out.println(Byte.toString(buf[1]));
char c[]=new char[1040];
for(int i=0;i<520;i++){
c[i]=(char)(buf[i*2]<<8);
c[i]=(char)((c[i]&0xff00)|(0x00ff&buf[i*2+1]));
}
String s=null;
s=new String(c);
System.out.println("來自客戶的資料: 哈哈+++ "+s.trim());
System.out.println("");
long elapsed = System.currentTimeMillis() - start;
System.out.println("EOT. Received " + total + " bytes in " + elapsed + " ms. Throughput = " + ((total * 8000) / (1024 * elapsed)) + " Kbit/s.");
socket.close();
System.out.println("Closed connection. Ready for next connection.");
} catch (IOException ie) {
ie.printStackTrace();
}
}
//create group ID
private PeerGroupID mkGroupID() throws Exception {
return (PeerGroupID) IDFactory.fromURL(
new URL("urn", "", groupURL));
}
//test this class
public static void main(String agrs[]){
CreateServer cs = new CreateServer(null);
cs.startjxta();
cs.startServer();
}
public void startjxta(){
try {
//Discover and join (or start) the default peergroup
netPeerGroup = PeerGroupFactory.newNetPeerGroup();
} catch (PeerGroupException e) {
//Couldn't initialize; can't continue
System.out.println("Fatal error : creating the NetPeerGroup");
System.exit(1);
}
}
}
package edu.cqwu.magicqq.net.discovery;
import java.io.IOException;
import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.util.Enumeration;
import java.net.Socket;
import net.jxta.discovery.DiscoveryService;
import net.jxta.discovery.DiscoveryListener;
import net.jxta.discovery.DiscoveryEvent;
import net.jxta.protocol.DiscoveryResponseMsg;
import net.jxta.peergroup.PeerGroup;
import net.jxta.peergroup.PeerGroupFactory;
import net.jxta.exception.PeerGroupException;
import net.jxta.protocol.PeerGroupAdvertisement;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.document.AdvertisementFactory;
import net.jxta.document.MimeMediaType;
import net.jxta.pipe.PipeService;
import net.jxta.socket.JxtaServerSocket;
import net.jxta.socket.JxtaSocket;
public class DiscoveryPeer implements DiscoveryListener{
private DiscoveryService disco = null;
private PipeService pipes = null;
private PipeAdvertisement pipeAdv = null;
private PeerGroup netPeerGroup = null;
private PeerGroup MagicNet = null;
private int timeout = 3000;
private JxtaSocket socket = null;
private static int ITERATIONS = 1824;
// payload size
private static int payloadSize = 1040;//64 * 1024;
public DiscoveryPeer(PeerGroup parentgroup){
//this.netPeerGroup = parentgroup;
}
//test
public static void main(String agrs[]){
DiscoveryPeer dp = new DiscoveryPeer(null);
dp.startjxta();
//dp.run();
}
//test
public void startjxta(){
try{
netPeerGroup = PeerGroupFactory.newNetPeerGroup();
}catch(PeerGroupException e){
System.out.println("Failure to create peergroup!");
System.exit(0);
}
if(!joinMagicNet()){
System.out.println("Can't join the Magicnet!");
}
System.out.println("Find the pipeadv");
//disco.addDiscoveryListener(this);
//disco.getRemoteAdvertisements(null,
// DiscoveryService.ADV,
// "name", "MagicNet:MagicPipe:*", 5, null);
try {
// 為 DiscoveryResponse events 添加 DiscoveryListener
disco.addDiscoveryListener(this);
while (true) {
System.out.println("Sending a Discovery Message");
// 尋找Peers,每個Peer最多回報5個 。
disco.getRemoteAdvertisements(null, DiscoveryService.ADV,
"Name","MagicNet:MagicPipe:*",5,null);
// 等待一分鐘
try {
Thread.sleep(2 * 1000);
} catch(Exception e) {}
}
} catch(Exception e) {
e.printStackTrace();
}
}
private void run(){
}
private boolean joinMagicNet() {
int count = 3; // maximum number of attempts to discover
System.out.println("Attempting to discover the MagicNet Peergroup");
// Get the Discovery service handle from the NetPeerGroup
DiscoveryService hdisco = netPeerGroup.getDiscoveryService();
// All discovered RestoNet Peers
Enumeration ae = null;
// Loop until we find the "RestoNet" Peergroup advertisement
// or we've exhausted the desired number of attempts
while (count-- > 0) {
try {
// Check if we have the advertisement in the local
// peer cache
ae = hdisco.getLocalAdvertisements(DiscoveryService.GROUP,
"Name", "MagicNet");
// If we found the RestoNet advertisement, we are done
if ((ae != null) && ae.hasMoreElements())
break;
// The RestoNet advertisement is not in the local
// cache . Send a discovery request to search for it.
hdisco.getRemoteAdvertisements(null,
DiscoveryService.GROUP, "Name", "MagicNet", 1, null);
// Wait to give peers a chance to respond
try {
Thread.sleep(timeout);
} catch (InterruptedException ie) {}
} catch (IOException e) {
// Found nothing! Move on.
}
}
// Check if we found the RestoNet advertisement
if (ae == null || !ae.hasMoreElements()) {
return false;
}
System.out.println("Found the MagicNet PeerGroup Advertisement");
// Get the advertisement
PeerGroupAdvertisement adv =
(PeerGroupAdvertisement) ae.nextElement();
try {
// Call the PeerGroup Factory to instantiate a new
// peergroup instance
MagicNet = netPeerGroup.newGroup(adv);
// Get the Discovery and Pipe services to
// be used within the RestoNet Peergroup
disco = MagicNet.getDiscoveryService();
pipes = MagicNet.getPipeService();
} catch (Exception e) {
System.out.println("Could not create RestoPeerGroup");
return false;
}
System.out.println("The MagicPeer joined the MagicNet PeerGroup");
return true;
}
public void discoveryEvent(DiscoveryEvent ev){
System.out.println("Processing discovery event");
DiscoveryResponseMsg msg = ev.getResponse();
// The enumeration contains all the pipe advertisements that
// were found.
Enumeration e = msg.getResponses();
while (e.hasMoreElements()) {
try {
String s = (String) e.nextElement();
PipeAdvertisement adv = (PipeAdvertisement)
AdvertisementFactory.newAdvertisement(
new MimeMediaType("text/xml"),
new ByteArrayInputStream(s.getBytes()));
//send data
this.sendIM(adv,"哈哈~~~呵呵...測試一下哈...");
} catch (Exception ex) {
System.out.println("Can't connect to peer " + ex);
continue;
}
}
}
public void sendIM(PipeAdvertisement pipeAdv,String str) throws IOException {
int bufsize = 4096;
System.out.println("Connecting to the server");
socket = new JxtaSocket(netPeerGroup,
//no specific peerid
null,
pipeAdv,
//general TO: 30 seconds
30000,
// reliable connection
true);
// Set buffer size to payload size
socket.setOutputStreamBufferSize(65536);
System.out.println("Reading in data");
OutputStream ōut = socket.getOutputStream();
//byte[] payload = new byte[payloadSize];
byte[] bte = new byte[payloadSize];
for(int i=0;i<str.length();i++){
bte[i*2]=(byte)(str.charAt(i)>>8);
bte[i*2+1]=(byte)str.charAt(i);
}
bte[str.length()+1]=(byte)(bte[str.length()+1]&0xff);
long t0 = System.currentTimeMillis();
//寫出
out.write(bte, 0, 4096);
out.flush();
// include close in timing since it may need to flush the
// tail end of the stream.
socket.close();
long t1 = System.currentTimeMillis();
System.out.println("Completed in :" + (t1 - t0) + " msec");
System.out.println("Data Rate :" + ((long) 64 * ITERATIONS * 8000) / (t1 - t0) + " Kbit/sec");
}
}