天天看點

JXTA, P2P程式設計技術例程(1)

在這個例子中,我們會設計和編寫一個分布式JXTA應用,可以解決并行計算問題。我們将用一個反複使用的形式建立這個應用,在每步中擴大它的能力和 使用到的API集。這些原代碼隻能覆寫最重要的部分,需要全部代碼可以到網站上查詢

一些大型計算問題的子集可以用并行的方法解決。一個工作的并行執行意味着你可以将一個問題分解為幾個小的子問題, 這些子問題又可以被同時的被執行。當一個子問題結束後,它将傳回自己的結果到主程式,主程式将這些傳回的小結果組合為更大的答案。

舉個例子,考慮将任何2個整數間的質數列出來的工作。質數就是那些隻能被它自己和1整除的自然數。 那些可以被1和其他比自己小的數整除的叫合數。是以,最簡單的方法産生一個質數清單就是在自然數清單中消除所有的合數,剩餘的就是質數了。

這個方法反複消除一系列自然數中的合數。它将清單中的每個數都用2到它的平方根去除。如果任何一個數被這樣除了之後沒有餘數,那麼這個數就是合數,并且對它作上标記。 當這些反複(工作)結束了之後,所有的被标記的數将被消除,剩下的就是質數了。

但是如果是一個很長的清單,有數百萬的長度,那麼我們将它分為幾個小一點的清單,然後對每個清單執行上述方法。每個子計算被分發到網絡上不同的機器去執行,充分利用分布的計算資源。質數的查詢是這些可以被并行分解的大型問題集中的一個,現在流行的P2P風格的軟體有不少,比如[email protected]工程,它的目标是對來自外太空的信号解碼,尋找外星智能生物,類似的工程還有讓使用者共享出他們空閑CPU的資源去模拟蛋白質的折疊或者解碼DNA串。

在這個應用中,主程式将向使用者請求2個數,然後産生一個在2數之間包含是以質數的數列。這個主程式将首先在JXTA網絡上嘗試找到提供質數查詢服務的的其他peer,然後将清單的片段分發給它們去執行。當一個peer完成了它的那一部分,它将傳回這個片段中質數的數組。

為了這樣的配置設定能夠進行,我們要讓所有peer能夠在網絡上通告(advertise)它具備質數查詢的能力以讓其他的peer可以找到并連接配接它。

JXTA Application Design

可能這個應用最不平常的一個方面就是,每個peer既扮演主程式(master process)的角色和一個從屬程式的角色(slave),也有可能一個從屬程式還要決定是否進一步的分解這個問題到的子任務。這種服務模式/客戶模式操作是P2P程式設計的實質。我們将它定義為SM/CM操作

消息定義

當我們設計一個JXTA程式的時候,我們必須忍受JXTA是一個基于消息的系統:2個peer中的主要協定(contract)主要是通過消息。這樣,設計程式的第一個工作就是定義消息傳遞。在這個質數查詢應用中,一個peer傳遞一條包含2個邊界數的消息給另外一個peer,接受的一方計算在這2個邊界數中的質數,傳回到以前的peer

net.jxta.endpoint.Message 類 概括出一個消息的感念,它允許賦予任何一個消息集一個鍵(KEY)。我們将在下表中用鍵-值 來表示這個類的執行個體

Table 16.1. Request Message
Key Value
ServiceConstants.LOW_INT Lower boundary of the (sub)list
ServiceConstants.HIGH_INT Upper boundary of the (sub)list
Table 16.2. Response Message
Key Value
ServiceConstants.LOW_INT Lower boundary of the (sub)list
ServiceConstants.HIGH_INT Upper boundary of the (sub)list
ServiceConstants.PRIMELIST A string containing all primes between the bounds of the list. The primes are separated by ; characters.
服務的定義與發現

下一步,我們必須定義一個讓主程式找到從屬程式的方法,換句話說,我們比如讓peer預先了解到提供這個服務的其他peer。

就象早先提及的那樣,一個JXTA服務是用它的module類和specification定義的。是以, 我們将為這個質數查詢服務module和specification定義一個通告. 然後讓一個peer提供一個讓這些通告在JXTA網絡中傳播的服務。這個質數查詢module類将采用JXTACLASS:com.sams.p2p.primecruncher這個名字,module的spec将采用the name JXTASPEC:com.sams.p2p.primecruncher這個名字

主程式将用這個名字去發現通告module的說明。

是以, 除了消息的定義, 服務名字字元串也是peer在設計階段應該擷取的資訊。所有 peer互動的資訊将在運作的時候被發現。

服務實作

當一個質數查詢服務開始的時候,它将初始化JXTA平台以得到去World and Net Peer Group的通道。當初始化隻後,peer将創造并釋出它的通告,包括它的module類和module說明通告。

模闆說明通告将包含一個管道通告。 那些發現有此服務的一個模闆說明通告的用戶端必須得到這個管道通告,并且通過這個管道連接配接到那個服務。

釋出了通告之後,我們的服務打開一個輸入通道并且對進來的消息進行監聽。當一個消息到達之後,這個服務嘗試從這個消息中獲得high與low這2個邊界數字,然後将它們傳遞到一個僅産生質數連結清單的元件中。 當那個元件傳回了結果(一個含質數的數組),這個質數查訊服務将産生一個包含結果的消息并将此消息送回用戶端。在最先的反複中,服務将簡單地列印出它接受到的消息。接下來的提煉中,它将打開一個管道将結果送回用戶端。用戶端将把這些從各個peer中得到的結果組合起來并将最後的數列存入檔案中。.

Listing 16.2 Outline of PrimePeer and Initialization of a JXTA Peer

package primecruncher;

import net.jxta.peergroup.PeerGroup;

import net.jxta.peergroup.PeerGroupFactory;

import net.jxta.peergroup.PeerGroupID;

import net.jxta.discovery.DiscoveryService;

import net.jxta.pipe.PipeService;

import net.jxta.pipe.InputPipe;

import net.jxta.pipe.PipeID;

import net.jxta.exception.PeerGroupException;

import net.jxta.protocol.ModuleClassAdvertisement;

import net.jxta.protocol.ModuleSpecAdvertisement;

import net.jxta.protocol.PipeAdvertisement;

import net.jxta.document.*;

import net.jxta.platform.ModuleClassID;

import net.jxta.platform.ModuleSpecID;

import net.jxta.id.IDFactory;

import net.jxta.endpoint.Message;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.FileOutputStream;

import java.io.StringWriter;

public class PrimePeer {

        private static PeerGroup group;

        private static DiscoveryService discoSvc;

        private static PipeService pipeSvc;

        private InputPipe inputPipe;

        private static final String PIPE_ADV_FILE = "primeserver_pipe.adv";

        public static void main(String[] argv) {

               PrimePeer pp = new PrimePeer();

               pp.startJxta();

               pp.doAdvertise();

               pp.startService();

        }

        public PrimePeer() {

        }

        private void startJxta() {

               try {

                       group = PeerGroupFactory.newNetPeerGroup();

                       discoSvc = group.getDiscoveryService();

                       pipeSvc = group.getPipeService();

               } catch (PeerGroupException e) {

                       System.out.println("Cannot create Net Peer Group: " + e.getMessage(

));

                       System.exit(-1);

               }

        }

        private void doAdvertise() {

           ...

        }

        private void startService() {

           ...

        }

        private void processInput(String high, String low) {

           ...

        }

}

在這個startJxta()服務初始化方法中,我們首先獲得一個通往World Peer Group的引用(reference):這是通過一個靜态PeerGroupFacrory實作的。調用這個方法将為JXTA的運作作好準備。下一步中,我們将獲得一個到2個Net Peer Group提供的peer group服務的引用:DiscoveryService和the PipeService。我們将在建立服務通告的時候用到它們2個。