天天看點

架構設計:系統間通信(14)——RPC執行個體Apache Thrift 下篇(2)3、正式開始編碼3-1、編寫服務端主程式3-2、編寫服務具體實作3-3、編寫用戶端實作3-4、工程結構說明

(接上篇《架構設計:系統間通信(13)——RPC執行個體Apache Thrift 下篇(1)》)

3、正式開始編碼

我已經在CSDN的資源區上傳了這個示例工程的所有代碼(http://download.csdn.net/detail/yinwenjie/9289999)。讀者可以直接到資源下載下傳站進行下載下傳(不收積分哦~~^_^)。這篇文章将緊接上文,主要介紹這個工程幾個主要的類代碼。

3-1、編寫服務端主程式

服務端主程式的類名:processor.MainProcessor,它負責在服務端啟動Apache Thrift并且在服務監聽啟動成功後,連接配接到zookeeper,注冊這個服務的基本資訊。

這裡要注意一下,Apache Thrift的服務監聽是阻塞式的,是以processor.MainProcessor的Apache Thrift操作應該另起線程進行(processor.MainProcessor.StartServerThread),并且通過線程間的鎖定操作,保證zookeeper的連接配接一定是在Apache Thrift成功啟動後才進行。

package processor;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Executors;

import net.sf.json.JSONObject;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadPoolServer.Args;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

import business.BusinessServicesMapping;
import thrift.iface.DIYFrameworkService;
import thrift.iface.DIYFrameworkService.Iface;

public class MainProcessor {
    static {
        BasicConfigurator.configure();
    }

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(MainProcessor.class);

    private static final Integer SERVER_PORT = 8090;

    /**
     * 專門用于鎖定以保證這個主線程不退出的一個object對象
     */
    private static final Object WAIT_OBJECT = new Object();

    /**
     * 标記apache thrift是否啟動成功了
     * 隻有apache thrift啟動成功了,才需要連接配接到zk
     */
    private boolean isthriftStart = false;

    public static void main(String[] args) {
        /*
         * 主程式要做的事情:
         * 
         * 1、啟動thrift服務。并且服務調用者的請求
         * 2、連接配接到zk,并向zk注冊自己提供的服務名稱,告知zk真實的通路位址、通路端口
         * (向zk注冊的服務,存儲在BusinessServicesMapping這個類的K-V常量中)
         * */

        //1、========啟動thrift服務
        MainProcessor mainProcessor = new MainProcessor();
        mainProcessor.startServer();

        // 一直等待,apache thrift啟動完成
        synchronized (mainProcessor) {
            try {
                while(!mainProcessor.isthriftStart) {
                    mainProcessor.wait();
                }
            } catch (InterruptedException e) {
                MainProcessor.LOGGER.error(e);
                System.exit(-1);
            }
        }

        //2、========連接配接到zk
        try {
            mainProcessor.connectZk();
        } catch (IOException | KeeperException | InterruptedException e) {
            MainProcessor.LOGGER.error(e);
            System.exit(-1);
        }

        // 這個wait在業務層面,沒有任何意義。隻是為了保證這個守護線程不會退出
        synchronized (MainProcessor.WAIT_OBJECT) {
            try {
                MainProcessor.WAIT_OBJECT.wait();
            } catch (InterruptedException e) {
                MainProcessor.LOGGER.error(e);
                System.exit(-1);
            }
        }
    }

    /**
     * 這個私有方法用于連接配接到zk上,并且注冊相關服務
     * @throws IOException 
     * @throws InterruptedException 
     * @throws KeeperException 
     */
    private void connectZk() throws IOException, KeeperException, InterruptedException {
        // 讀取這個服務提供者,需要在zk上注冊的服務
        Set<String> serviceNames = BusinessServicesMapping.SERVICES_MAPPING.keySet();
        // 如果沒有任何服務需要注冊到zk,那麼這個服務提供者就沒有繼續注冊的必要了
        if(serviceNames == null || serviceNames.isEmpty()) {
            return;
        }

        // 預設的監聽器
        MyDefaultWatcher defaultWatcher = new MyDefaultWatcher();
        // 連接配接到zk伺服器叢集,添加預設的watcher監聽
        ZooKeeper zk = new ZooKeeper("192.168.61.128:2181", 120000, defaultWatcher);

        //建立一個父級節點Service
        Stat pathStat = null;
        try {
            pathStat = zk.exists("/Service", defaultWatcher);
            //如果條件成立,說明節點不存在(隻需要判斷一個節點的存在性即可)
            //建立的這個節點是一個“永久狀态”的節點
            if(pathStat == null) {
                zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch(Exception e) {
            System.exit(-1);
        }

        // 開始添加子級節點,每一個子級節點都表示一個這個服務提供者提供的業務服務
        for (String serviceName : serviceNames) {
            JSONObject nodeData = new JSONObject();
            nodeData.put("ip", "127.0.0.1");
            nodeData.put("port", MainProcessor.SERVER_PORT);
            zk.create("/Service/" + serviceName, nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }

        //執行到這裡,說明所有的service都啟動完成了
        MainProcessor.LOGGER.info("===================所有service都啟動完成了,主線程開始啟動===================");
    }

    /**
     * 這個私有方法用于開啟Apache thrift服務端,并進行持續監聽
     * @throws TTransportException
     */
    private void startServer() {
        Thread startServerThread = new Thread(new StartServerThread());
        startServerThread.start();
    }

    private class StartServerThread implements Runnable {
        @Override
        public void run() {
            MainProcessor.LOGGER.info("看到這句就說明thrift服務端準備工作 ....");

            // 服務執行控制器(隻要是排程服務的具體實作該如何運作)
            TProcessor tprocessor = new DIYFrameworkService.Processor<Iface>(new DIYFrameworkServiceImpl());

            // 基于阻塞式同步IO模型的Thrift服務,正式生産環境不建議用這個
            TServerSocket serverTransport = null;
            try {
                serverTransport = new TServerSocket(MainProcessor.SERVER_PORT);
            } catch (TTransportException e) {
                MainProcessor.LOGGER.error(e);
                System.exit(-1);
            }

            // 為這個伺服器設定對應的IO網絡模型、設定使用的消息格式封裝、設定線程池參數
            Args tArgs = new Args(serverTransport);
            tArgs.processor(tprocessor);
            tArgs.protocolFactory(new TBinaryProtocol.Factory());
            tArgs.executorService(Executors.newFixedThreadPool(100));

            // 啟動這個thrift服務
            TThreadPoolServer server = new TThreadPoolServer(tArgs);
            server.setServerEventHandler(new StartServerEventHandler());
            server.serve();
        }
    }

    /**
     * 為這個TThreadPoolServer對象,設定是一個事件處理器。
     * 以便在TThreadPoolServer正式開始監聽服務請求前,通知mainProcessor:
     * “Apache Thrift已經成功啟動了”
     * @author yinwenjie
     *
     */
    private class StartServerEventHandler implements TServerEventHandler {
        @Override
        public void preServe() {
            /*
             * 需要實作這個方法,以便在服務啟動成功後,
             * 通知mainProcessor: “Apache Thrift已經成功啟動了”
             * */
            MainProcessor.this.isthriftStart = true;
            synchronized (MainProcessor.this) {
                MainProcessor.this.notify();
            }
        }

        /* (non-Javadoc)
         * @see org.apache.thrift.server.TServerEventHandler#createContext(org.apache.thrift.protocol.TProtocol, org.apache.thrift.protocol.TProtocol)
         */
        @Override
        public ServerContext createContext(TProtocol input, TProtocol output) {
            /*
             * 無需實作
             * */
            return null;
        }

        @Override
        public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
            /*
             * 無需實作
             * */
        }

        @Override
        public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
            /*
             * 無需實作
             * */
        }
    }

    /**
     * 這是預設的watcher,什麼也沒有,也不需要有什麼<br>
     * 因為按照功能需求,伺服器端并不需要監控zk上的任何目錄變化事件
     * @author yinwenjie
     */
    private class MyDefaultWatcher implements Watcher {
        public void process(WatchedEvent event) {

        }
    }
}
           

3-2、編寫服務具體實作

服務端具體實作的代碼很簡單,就是在IDL腳本生成了java代碼後,對DIYFrameworkService接口進行的實作。

package processor;

import java.nio.ByteBuffer;

import net.sf.json.JSONObject;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.TException;

import business.BusinessService;
import business.BusinessServicesMapping;
import business.exception.BizException;
import business.exception.ResponseCode;
import business.pojo.AbstractPojo;
import business.pojo.BusinessResponsePojo;
import business.pojo.DescPojo;
import thrift.iface.DIYFrameworkService.Iface;
import thrift.iface.EXCCODE;
import thrift.iface.RESCODE;
import thrift.iface.Reponse;
import thrift.iface.Request;
import thrift.iface.ServiceException;
import utils.JSONUtils;

/**
 * IDL檔案中,我們定義的唯一服務接口DIYFrameworkService.Iface的唯一實作
 * @author yinwenjie
 *
 */
public class DIYFrameworkServiceImpl implements Iface {

    /**
     * 日志
     */
    public static final Log LOGGER = LogFactory.getLog(DIYFrameworkServiceImpl.class);

    /* (non-Javadoc)
     * @see thrift.iface.DIYFrameworkService.Iface#send(thrift.iface.Request)
     */
    @SuppressWarnings("unchecked")
    @Override
    public Reponse send(Request request) throws ServiceException, TException {
        /*
         * 由于MainProcessor中,在Apache Thrift 服務端啟動時已經加入了線程池,是以這裡就不需要再使用線程池了
         * 這個服務方法的實作,需要做以下事情:
         * 
         * 1、根據request中,描述的具體服務名稱,在配置資訊中查找具體的服務類
         * 2、使用java的反射機制,調用具體的服務類(BusinessService接口的實作類)。
         * 3、根據具體的業務處理結構,構造Reponse對象,并進行傳回
         * */

        //1、===================
        String serviceName = request.getServiceName();
        String className = BusinessServicesMapping.SERVICES_MAPPING.get(serviceName);
        //未發現服務
        if(StringUtils.isEmpty(className)) {
            return this.buildErrorReponse("無效的服務" , null);
        }

        //2、===================
        // 首先得到以json為描述格式的請求參數資訊
        JSONObject paramJSON = null;
        try {
            byte [] paramJSON_bytes = request.getParamJSON();
            if(paramJSON_bytes != null && paramJSON_bytes.length > 0) {
                String paramJSON_string = new String(paramJSON_bytes);
                paramJSON = JSONObject.fromObject(paramJSON_string);
            }
        } catch(Exception e) {
            DIYFrameworkServiceImpl.LOGGER.error(e);
            // 向調用者抛出異常
            throw new ServiceException(EXCCODE.PARAMNOTFOUND, e.getMessage());
        }

        // 試圖進行反射
        BusinessService<AbstractPojo> businessServiceInstance = null;
        try {
            businessServiceInstance = (BusinessService<AbstractPojo>)Class.forName(className).newInstance();
        } catch (Exception e) {
            DIYFrameworkServiceImpl.LOGGER.error(e);
            // 向調用者抛出異常
            throw new ServiceException(EXCCODE.SERVICENOTFOUND, e.getMessage());
        }

        // 進行調用
        AbstractPojo returnPojo = null;
        try {
            returnPojo = businessServiceInstance.handle(paramJSON);
        } catch (BizException e) {
            DIYFrameworkServiceImpl.LOGGER.error(e);
            return this.buildErrorReponse(e.getMessage() , e.getResponseCode());
        }

        // 構造處理成功情況下的傳回資訊
        BusinessResponsePojo responsePojo = new BusinessResponsePojo();
        responsePojo.setData(returnPojo);
        DescPojo descPojo = new DescPojo("", ResponseCode._200);
        responsePojo.setDesc(descPojo);

        // 生成json
        String returnString = JSONUtils.toString(responsePojo);
        byte[] returnBytes = returnString.getBytes();
        ByteBuffer returnByteBuffer = ByteBuffer.allocate(returnBytes.length);
        returnByteBuffer.put(returnBytes);
        returnByteBuffer.flip();

        // 構造response
        Reponse reponse = new Reponse(RESCODE._200, returnByteBuffer);
        return reponse;
    }

    /**
     * 這個私有方法,用于構造“Thrift中錯誤的傳回資訊”
     * @param erroe_mess
     * @return
     */
    private Reponse buildErrorReponse(String erroe_mess , ResponseCode responseCode) {
        // 構造傳回資訊
        BusinessResponsePojo responsePojo = new BusinessResponsePojo();
        responsePojo.setData(null);
        DescPojo descPojo = new DescPojo(erroe_mess, responseCode == null?ResponseCode._504:responseCode);
        responsePojo.setDesc(descPojo);

        // 存儲byteBuffer;
        String responseJSON = JSONUtils.toString(responsePojo);
        byte[] responseJSON_bytes = responseJSON.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(responseJSON_bytes.length);
        byteBuffer.put(byteBuffer);
        byteBuffer.flip();

        Reponse reponse = new Reponse(RESCODE._500, byteBuffer);
        return reponse;
    }
}           

3-3、編寫用戶端實作

在上文中已經介紹過了,用戶端有兩件事情需要做:連接配接到zookeeper查詢注冊的服務該如何通路;然後向真實的服務提供者發起請求。代碼如下:

package client;

import java.nio.ByteBuffer;
import java.util.List;

import net.sf.json.JSONObject;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

import thrift.iface.DIYFrameworkService.Client;
import thrift.iface.Reponse;
import thrift.iface.Request;
import utils.JSONUtils;

public class ThriftClient {
    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(ThriftClient.class);

    private static final String SERVCENAME = "queryUserDetailService";

    static {
        BasicConfigurator.configure();
    }

    public static final void main(String[] main) throws Exception {
        /*
         * 服務治理架構的用戶端示例,要做以下事情:
         * 
         * 1、連接配接到zk,查詢目前zk下提供的服務清單中是否有自己需要的服務名稱(queryUserDetailService)
         * 2、如果沒有找到需要的服務名稱,則用戶端終止工作
         * 3、如果找到了服務,則通過服務給出的ip,port,基于Thrift進行正式請求
         * (這時,和zookeeper是否斷開,關系就不大了)
         * */
        // 1、===========================
        // 預設的監聽器
        ClientDefaultWatcher defaultWatcher = new ClientDefaultWatcher();
        // 連接配接到zk伺服器叢集,添加預設的watcher監聽
        ZooKeeper zk = new ZooKeeper("192.168.61.128:2181", 120000, defaultWatcher);

        /*
         * 為什麼用戶端連接配接上來以後,也可能建立一個Service根目錄呢?
         * 因為正式的環境下,不能保證用戶端一點就在伺服器端全部準備好的情況下,再來做調用請求
         * */
        Stat pathStat = null;
        try {
            pathStat = zk.exists("/Service", defaultWatcher);
            //如果條件成立,說明節點不存在(隻需要判斷一個節點的存在性即可)
            //建立的這個節點是一個“永久狀态”的節點
            if(pathStat == null) {
                zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch(Exception e) {
            System.exit(-1);
        }

        // 2、===========================
        //擷取服務清單(不需要做任何的事件監聽,是以第二個參數可以為false)
        List<String> serviceList = zk.getChildren("/Service", false);
        if(serviceList == null || serviceList.isEmpty()) {
            ThriftClient.LOGGER.info("未發現相關服務,用戶端退出");
            return;
        }

        //然後檢視要找尋的服務是否在存在
        boolean isFound = false;
        byte[] data;
        for (String serviceName : serviceList) {
            if(StringUtils.equals(serviceName, ThriftClient.SERVCENAME)) {
                isFound = true;
                break;
            }
        }
        if(!isFound) {
            ThriftClient.LOGGER.info("未發現相關服務,用戶端退出");
            return;
        } else {
            data = zk.getData("/Service/" + ThriftClient.SERVCENAME, false, null);
        }

        /*
         * 執行到這裡,zk的工作就完成了,接下來zk是否斷開,就不重要了
         * */
        zk.close();
        if(data == null || data.length == 0) {
            ThriftClient.LOGGER.info("未發現有效的服務端位址,用戶端退出");
            return;
        }
        // 得到伺服器地值說明
        JSONObject serverTargetJSON = null;
        String serverIp;
        String serverPort;
        try {
            serverTargetJSON = JSONObject.fromObject(new String(data));
            serverIp = serverTargetJSON.getString("ip");
            serverPort = serverTargetJSON.getString("port");
        } catch(Exception e) {
            ThriftClient.LOGGER.error(e);
            return;
        }

        //3、===========================
        TSocket transport = new TSocket(serverIp, Integer.parseInt(serverPort));
        TProtocol protocol = new TBinaryProtocol(transport);
        // 準備調用參數
        JSONObject jsonParam = new JSONObject();
        jsonParam.put("username", "yinwenjie");
        byte[] params = jsonParam.toString().getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(params.length);
        buffer.put(params);
        buffer.flip();
        Request request = new Request(buffer, ThriftClient.SERVCENAME);

        // 開始調用
        Client client = new Client(protocol);
        // 準備傳輸
        transport.open();
        // 正式調用接口
        Reponse reponse = client.send(request);
        byte[] responseBytes = reponse.getResponseJSON();
        // 一定要記住關閉
        transport.close();

        // 将傳回資訊顯示出來
        ThriftClient.LOGGER.info("respinse value = " + new String(responseBytes));
    }
}

/**
 * 這是預設的watcher,什麼也沒有,也不需要有什麼<br>
 * 因為按照功能需求,用戶端并不需要監控zk上的任何目錄變化事件
 * @author yinwenjie
 */
class ClientDefaultWatcher implements Watcher {
    public void process(WatchedEvent event) {

    }
}           

3-4、工程結構說明

以上代碼是伺服器端、用戶端的主要代碼。整個工程還有其他的輔助代碼,為了讓各位讀者能夠看得清楚直接,我們将整個工程結構進行一下說明,下載下傳後導入的工程結構如下圖所示:

架構設計:系統間通信(14)——RPC執行個體Apache Thrift 下篇(2)3、正式開始編碼3-1、編寫服務端主程式3-2、編寫服務具體實作3-3、編寫用戶端實作3-4、工程結構說明
  1. 這是一個典型的JAVA工程。請使用 JDK 1.6+ 版本。我們将講解整個工程結構。首先來看看這個工程中主要的package和它們的作用。
  2. business:具體的業務層邏輯都在這個包裡面,其中exception包含了一個業務層異常的定義BizException,還有錯誤代碼ResponseCode;impl包中放置具體的業務層實作,它們都必須實作BusinessService接口;Pojo是業務層對象模型。client:為了簡單起見,我将服務端的實作和用戶端的實作放置在一個工程中,client這個包就是用戶端的實作代碼了;utils包放置了兩個工具類,用來進行日期格式化的DataUtils和用來進行json轉換的JSONUtils。
  3. 定義的apache thrift IDL檔案放置在thrift檔案夾下面,名字叫做:demoHello.thrift;您可以使用它生成各種語言的代碼;
  4. 工程需要maven的支援。
  5. 2016年08月08日,由網友OneZhous發現了一個程式的bug,這是由于Apache Thrift内部并不會在進行org.apache.thrift.TBaseHelper.copyBinary執行時,将java.nio.ByteBuffer自動進行flip()。是以在完成request和response對象設定後,需要開發人員自行進行flip()。感謝OneZhous對文章中的問題進行糾正,但是CSDN由于無法修改已上傳的資源,是以還請各位讀者在下載下傳運作時注意這個問題:
......
ByteBuffer buffer = ByteBuffer.allocate(params.length);
buffer.put(params);
buffer.flip();

// 以及位置
ByteBuffer byteBuffer = ByteBuffer.allocate(responseJSON_bytes.length);
byteBuffer.put(byteBuffer);
byteBuffer.flip();
......           

繼續閱讀