天天看點

JXTA, P2P程式設計技術例程(2)

JXTA, P2P程式設計技術例程(2)
釋出時間:2006.03.10 06:47     來源:p2p unleashed    作者:

建立與釋出通告

就如我們早先說的那樣,JXTA虛拟網絡依靠JXTA ID 去鑒别網絡資源。而這些資源的發現是通過通告。net.jxta.id包 包含了ID類, 同僚包含了建立各種ID的工廠(factory)。

在JXTA中, net.jxta.document.Document是一個資料通用容器。一個在JXTA中的文檔被MIME媒體類型的内容所定義。這樣,文檔就類似于HTTP流(stream。 JXTA并不嘗試去解釋文檔的内容:這個内容是一個應用層協定的一部分)。 一個通告是由一個類似于XML結構化文檔的可嵌套的多層元素組成的StructuredDocument,它可以使一個文檔在它的資料沒有實體表示(physical representation)的時候被操作。

就如和其他任何StructureDocument一樣,一個通告可以被XML或者簡單文本格式表所表示。一個通告包括它所要通告的資源的ID, 通告的類型, 以及過期的時間絕對值。 JXTA API提供一個友善的工廠, AdvertisementFactory ,來建立各種通告類型。 Listing 16.3 說明了一個ModuleClassAdvertisement通過這個工廠的建立。注意ModuleClassID被加入到通告中的方式)

Listing 16.3 Creating and Advertising a Module Class

private void doAdvertise() {

        ModuleClassAdvertisement classAd =

               (ModuleClassAdvertisement)AdvertisementFactory.newAdvertisement(

                       ModuleClassAdvertisement.getAdvertisementType());

        ModuleClassID classID = IDFactory.newModuleClassID();

        classAd.setModuleClassID(classID);

        classAd.setName(ServiceConstants.CLASS_NAME);

        classAd.setDescription("A prime number crunching service.");

        try {

               discoSvc.publish(classAd, DiscoveryService.ADV);

               discoSvc.remotePublish(classAd, DiscoveryService.ADV);

               System.out.println("Published module class adv.");

        } catch (IOException e) {

               System.out.println("Trouble publishing module class adv: " +

                      e.getMessage());

        }

JXTA net.jxta.discovery.DiscoveryService是一個Net Peer Group 提供的組服務(group service)。 它提供釋出發現大學與遠端2種模式。本地模式在Peer本地的緩沖中去發現通告, 本地釋出就是讓通告放入本地的緩沖中。遠端就是在整個peer組中去發現與釋出。是以,請求消息通過JXTA虛拟網絡在我們早先描述的協定的情況下進行傳播,在它們到底的時候就對這些請求的進行回應。 是以,遠端發現是一個異步的過程,找到在網絡上所要的通告類型也需要一定時間。Listing16.3說明ModuleClassAdvertisement的遠端與本地2種釋出方式。

類型前面的過程,我們通過IDFactory類建立一個ModuleSpec ID, 并從AdvertisementFactory獲得與它相應的通告。(見listing 16.4)

Listing 16.4 Creating a New ModuleSpecAdvertisement

        ModuleSpecAdvertisement specAd =

               (ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement(

                       ModuleSpecAdvertisement.getAdvertisementType());

        ModuleSpecID specID = IDFactory.newModuleSpecID(classID);

        specAd.setModuleSpecID(specID);

        specAd.setName(ServiceConstants.SPEC_NAME);

        specAd.setDescription("Specification for a prime number crunching service");

        specAd.setCreator("Sams Publishing");

        specAd.setSpecURI("http://www.samspulishing.com/p2p/primecruncher");

specAd.setVersion("Version 1.0");

我們應該記得ModuleSpecAdvertisement定義了一個電報協定,或者說是一個網絡行為,來通路一個服務,是以我們需要提供一個PipeAdvertisement 作為一個到ModuleSpecAdvertisement的參數。 因為module的通告将被放在網絡上peer的緩沖之中, 那麼我們必須确認每一個ModuleSpecAdvertisement 在同一個PIPE中。 這樣,我們必須将PIPE的通告到永久存儲器并且在建立新管道的時候一直從這個存儲器中讀取資料。(如果這個通告沒有被存儲到磁盤, 那麼重新建立一個新的。)

Listing 16.5 Creating a Pipe Advertisement

        PipeAdvertisement pipeAd = null;

        try {

               FileInputStream is = new FileInputStream(PIPE_ADV_FILE);

               pipeAd = (PipeAdvertisement)AdvertisementFactory. newAdvertisement(

                    new MimeMediaType("text/xml"), is);

               is.close();

        } catch (IOException e) {

               pipeAd = (PipeAdvertisement)AdvertisementFactory. newAdvertisement(

                    PipeAdvertisement.getAdvertisementType());

               PipeID pid = IDFactory.newPipeID(group.getPeerGroupID());

               pipeAd.setPipeID(pid);

               //save pipeAd in file

               Document pipeAdDoc = pipeAd.getDocument(new MimeMediaType ("text/xml"));

               try {

                     FileOutputStream os = new FileOutputStream(PIPE_ADV_FILE);

                     pipeAdDoc.sendToStream(os);

                     os.flush();

                     os.close();

                     System.out.println("Wrote pipe advertisement to disk.");

               } catch (IOException ex) {

                     System.out.println("Can't save pipe advertisement to file " +

                            PIPE_ADV_FILE);

                     System.exit(-1);

               }

         }

下面的代碼段在磁盤上儲存一個管道廣告為XML格式, 例如,運作這個代碼得到後面的XML文檔。

<?xml version="1.0"?>

<!DOCTYPE jxta:PipeAdvertisement>

<jxta:PipeAdvertisement xmlns:jxta="http://jxta.org">

        <Id>

urn:jxta:uuid-59616261646162614E5047205032503382CCB236202640F5A242ACE15A8F9D7C04

        </Id>

        <Type>

              JxtaUnicast

        </Type>

</jxta:PipeAdvertisement>

随後我們将PipeAdvertisement 作為一個參數傳到ModuleSpecAdvertisement,如Listing 16.6

Listing 16.6 Adding the PipeAdvertisement as a Parameter to the ModuleSpecAdvertisement

specAd.setPipeAdvertisement(pipeAdv);

這時,我們已經準備好将ModuleSpecAdvertisement釋出到本地和遠端了。

Listing 16.7 Local and Remote Publishing of a ModuleSpecAdvertisement

        try {

               discoSvc.publish(specAd, DiscoveryService.ADV);

               discoSvc.remotePublish(specAd, DiscoveryService.ADV);

               System.out.println("Published module spec adv");

        } catch (IOException e) {

               System.out.println("Trouble publishing module spec adv: " +

                       e.getMessage());

}

(在listing 16.8中,我們最終在這個管道通告上建立了一個InputPipe。

Listing 16.8 InputPipe Creation from a PipeAdvertisement

        //create an input pipe based on the advertisement

        try {

               inputPipe = pipeSvc.createInputPipe(pipeAd);

               System.out.println("Created input pipe");

        } catch (IOException e) {

               System.out.println("Can't create input pipe. " + e.getMessage());

        }

}

這些是釋出一個新的JXTA服務的需要的所有步驟。 我們知道一個module的類通告說明了module在peer組中的功能:它是一個非常抽象的概念,有點類似于JAVA的定義API的接口,但是不提供實作。 一個module的說明通告在另一方面又闡明了一個電報協定來通路一個服務。在這種情況下,這個電報協定包括了一個能讓其他peer能給他發送消息的InputPipe, 正是這些消息包含了需要的2個邊界數值)

Processing Messages from an InputPipe(從InputPipe裡處理消息)

下一步是實作讓質數查詢peer處理接受到的消息, 我們将操作進來的消息,算出需要的質數序列,并且送出響應。

Listing 16.9 Processing Messages on an InputPipe

private void startService() {

        while (true) {

               Message msg = null;

               try {

                       msg = inputPipe.waitForMessage();

               } catch (InterruptedException ex) {

                       inputPipe.close();

                       return;

               }

               String highInt = msg.getString(ServiceConstants.HIGH_INT);

               String lowInt = msg.getString(ServiceConstants.LOW_INT);

               if (highInt != null || lowInt != null) {

                      processInput(highInt, lowInt);

               }

        }

}

就如以前說的,net.jxta.endpoint.Message對象被EndpointService送到2個peer之間)  (一個消息包括了一套MessageElements, 說明了一個目的地使它的路徑通過JXTA網絡更加友善。一個消息元素可以是任何位元組數組, 消息也包括以字元串方式提取元素的能力。 當一個新的消息元素被指明的時候, 它可以與一個MIME類型相關聯,就如一個作為元素值的字元串。在這個方法的實作中,我們參考ServiceConstants.HIGH_INT 和 ServiceConstants.LOW_INT的鍵值提取消息元素, 如果這2個元素都是有效的字元串,我們将他們傳入一個私有方法中:processInput()

processInput()對執行這個算法起作用, 産生一個包括所有質數的清單(在LOW_INT 和HIGH_INT 之間), 為了節約空間,我們不會将這個部分的代碼寫在這裡。

繼續閱讀