天天看點

thrift 異步 java_thrift java多線程非阻塞同步/異步調用執行個體 | 學步園

作者:呂桂強

郵箱:[email protected]

首先建立thrift檔案

namespace java thriftservice Hello{  string helloString(1:string para)}

執行thrift -gen java test.thrift會生成一個Hello.java檔案

将Hello.java檔案拷貝至IDE

server端代碼:

package com.thrift.test.thrift;

import org.apache.thrift.TProcessorFactory;

import org.apache.thrift.protocol.TCompactProtocol;

import org.apache.thrift.server.THsHaServer;

import org.apache.thrift.server.TServer;

import org.apache.thrift.transport.TFramedTransport;

import org.apache.thrift.transport.TNonblockingServerSocket;

import org.apache.thrift.transport.TTransportException;

public class Server {

public final static int PORT = 8989;

@SuppressWarnings({ "rawtypes", "unchecked" })

private void start() {

try {

TNonblockingServerSocket socket = new TNonblockingServerSocket(PORT);

final Hello.Processor processor = new Hello.Processor(new HelloImpl());

THsHaServer.Args arg = new THsHaServer.Args(socket);

// 高效率的、密集的二進制編碼格式進行資料傳輸

// 使用非阻塞方式,按塊的大小進行傳輸,類似于 Java 中的 NIO

arg.protocolFactory(new TCompactProtocol.Factory());

arg.transportFactory(new TFramedTransport.Factory());

arg.processorFactory(new TProcessorFactory(processor));

TServer server = new THsHaServer(arg);

server.serve();

System.out.println("#服務啟動-使用:非阻塞&高效二進制編碼");

} catch (TTransportException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

public static void main(String args[]) {

Server srv = new Server();

srv.start();

}

}

client端代碼:

package com.thrift.test.Async;

import java.io.IOException;

import org.apache.thrift.TApplicationException;

import org.apache.thrift.TException;

import org.apache.thrift.async.TAsyncClientManager;

import org.apache.thrift.protocol.TCompactProtocol;

import org.apache.thrift.protocol.TProtocol;

import org.apache.thrift.protocol.TProtocolFactory;

import org.apache.thrift.transport.TFramedTransport;

import org.apache.thrift.transport.TNonblockingSocket;

import org.apache.thrift.transport.TNonblockingTransport;

import org.apache.thrift.transport.TSocket;

import org.apache.thrift.transport.TTransport;

import org.apache.thrift.transport.TTransportException;

public class Client {

public static final String address = "127.0.0.1";

public static final int port = 8989;

public static final int clientTimeout = 30000;

public static void main_syn() {

TTransport transport = new TFramedTransport(new TSocket(address, port, clientTimeout));

TProtocol protocol = new TCompactProtocol(transport);

Hello.Client client = new Hello.Client(protocol);

try {

transport.open();

System.out.println(client.helloString("larry"));

} catch (TApplicationException e) {

System.out.println(e.getMessage() + " " + e.getType());

} catch (TTransportException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

}

transport.close();

}

public static void main_asy() throws Exception {

try {

TAsyncClientManager clientManager = new TAsyncClientManager();

TNonblockingTransport transport = new TNonblockingSocket(address, port, clientTimeout);

TProtocolFactory protocol = new TCompactProtocol.Factory();

Hello.AsyncClient asyncClient = new Hello.AsyncClient(protocol, clientManager, transport);

System.out.println("Client calls .....");

MyCallback callBack = new MyCallback();

asyncClient.helloString("larry", callBack);

while (true) {

Thread.sleep(1);

}

} catch (IOException e) {

e.printStackTrace();

}

}

public static void main(String[] args) throws Exception {

main_asy();

}

}

client使用到的回調函數:

package com.thrift.test.Async;

import org.apache.thrift.TException;

import org.apache.thrift.async.AsyncMethodCallback;

import com.thrift.test.Async.Hello.AsyncClient.helloString_call;

public class MyCallback implements AsyncMethodCallback {

// 傳回結果

@Override

public void onComplete(helloString_call response) {

System.out.println("onComplete");

try {

System.out.println(response.getResult().toString());

} catch (TException e) {

e.printStackTrace();

}

}

// 傳回異常

@Override

public void onError(Exception exception) {

System.out.println("onError");

}

}