轉載請注明原創為Michael Zhang:http://blog.csdn.net/matrixyy/article/details/46688467
一、先說注意事項吧:
1、Coprocessor啟動有三種方式:配置檔案、shell和程式中指定,我使用的是程式指定:
static {
EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
family.setInMemory(true);
family.setMaxVersions(1);
EP_TABLE_DISCRIPTOR.addFamily(family);
try {
EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
} catch (IOException ioe) {
}
上段代碼中的addCoprocessor就是指定該表啟動coprocessor操作。但前提是 必須重新開機HBase才能把jar包載入進來。
2、如果用戶端連接配接後出現如下問題:No matching handler **** for protocol in *** region,說明jar包還沒有載入到HBaes中,確定HBase已經重新開機,另外檢查代碼中addCoprocessor("ict.wde.test.RowCountServer");的類名“RowCountServer”是否寫正确了
二、說下步驟
2.1編寫服務端代碼:
1)接口類(固定格式)
package ict.wde.test;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import java.io.File;
import java.io.IOException;
/**
* Created by Michael on 2015/6/22.
*/
public interface RowCountProtocol extends Coprocessor, CoprocessorProtocol {
public long getRowCount() throws IOException;
public long getRowCount(Filter filter) throws IOException;
public String getStr() throws IOException;
//public long getKeyValue() throws IOException;
}
2)真正起作用的類
package ict.wde.test;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import java.io.IOException;
/**
* Created by Michael on 2015/6/27.
*/
public class RowCountServer implements RowCountProtocol {
@Override
public void start(CoprocessorEnvironment env) throws IOException {
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(3, null);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return 3;
}
@Override
public long getRowCount() throws IOException {
return this.getRowCount(new FirstKeyOnlyFilter());
}
@Override
public long getRowCount(Filter filter) throws IOException {
return this.getRowCount(filter, false);
}
@Override
public String getStr() throws IOException {
String name = "Hello Doctor Michael Zhang, again!";
return name;
}
// @Override
// public long getKeyValueCount() {
// return 0;
// }
public long getRowCount(Filter filter, boolean countKeyValue) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(1);
if (filter != null) {
scan.setFilter(filter);
}
return 1;
}
}
上述兩個類打包jar後放入hbase的lib目錄下
2.2用戶端代碼
import ict.wde.test.RowCountProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.filter.Filter;
import java.io.IOException;
/**
* Created by Michael on 2015/6/30.
*/
public class EndpointTestClient {
private final HTableInterface table;
private final Configuration conf;
private final RowCountProtocol server;
private static final HTableDescriptor EP_TABLE_DISCRIPTOR;
static {
EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
family.setInMemory(true);
family.setMaxVersions(1);
EP_TABLE_DISCRIPTOR.addFamily(family);
try {
EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
} catch (IOException ioe) {
}
}
public EndpointTestClient(Configuration config) throws IOException {
conf = config;
table = initTidTable();
server = table.coprocessorProxy(RowCountProtocol.class, "0".getBytes());
}
private HTableInterface initTidTable() throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
if (!admin.tableExists("epTest")) {
admin.createTable(EP_TABLE_DISCRIPTOR);
}
admin.close();
return new HTable(conf, "epTest");
}
public String getStr() throws IOException {
return server.getStr();
}
}
啟動類:
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
/**
* Created by Michael on 2015/6/22.
*/
public class EndpointExample {
// private final HTableInterface table;
// private static final Configuration conf;
// private static final HTableDescriptor EP_TABLE_DISCRIPTOR;
//
// static {
// conf = new Configuration();
// conf.set("hbase.zookeeper.quorum", "ccf04:2181");
//
// EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
// HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
// family.setInMemory(true);
// family.setMaxVersions(1);
// EP_TABLE_DISCRIPTOR.addFamily(family);
// try {
// EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
// } catch (IOException ioe) {
//
// }
//
// table = initTidTable();
// }
//
// private HTableInterface initTidTable() throws IOException {
// HBaseAdmin admin = new HBaseAdmin(conf);
// if (!admin.tableExists("epTest")) {
// admin.createTable(EP_TABLE_DISCRIPTOR);
// }
// admin.close();
// return new HTable(conf, "epTest");
// }
public static void main(String[] agrs) throws IOException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "ccf04:2181");
EndpointTestClient client = new EndpointTestClient(conf);
String name = client.getStr();
System.out.println(name);
}
}