前面已經講過如何将log4j的日志輸出到指定的hdfs目錄,我們前面的指定目錄為/flume/events。
如果想用hive來分析采集來的日志,我們可以将/flume/events下面的日志資料都load到hive中的表當中去。
如果了解hive的load data原理的話,還有一種更簡便的方式,可以省去load data這一步,就是直接将sink1.hdfs.path指定為hive表的目錄。
下面我将較長的描述具體的操作步驟。
我們還是從需求驅動來講解,前面我們采集的資料,都是接口的通路日志資料,資料格式是json格式如下:
{“requesttime”:1405651379758,”requestparams”:{“timestamp”:1405651377211,”phone”:”02038824941″,”cardname”:”測試商家名稱”,”provincecode”:”440000″,”citycode”:”440106″},”requesturl”:”/reporter-api/reporter/reporter12/init.do”}
現在有一個需求,我們要統計接口的總調用量。
我第一想法就是,hive中建一張表:test 然後将hdfs.path指定為tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test
然後select count(*) from test; 完事。
這個方案簡單,粗暴,先這麼幹着。于是會遇到一個問題,我的日志資料時json格式的,需要hive來序列化和反序列化json格式的資料到test表的具體字段當中去。
這有點糟糕,因為hive本身沒有提供json的serde,但是有提供函數來解析json字元串,
第一個是(udf):
get_json_object(string json_string,string path) 從給定路徑上的json字元串中抽取出json對象,并傳回這個對象的json字元串形式,如果輸入的json字元串是非法的,則傳回null。
第二個是表生成函數(udtf):json_tuple(string jsonstr,p1,p2,…,pn) 本函數可以接受多個标簽名稱,對輸入的json字元串進行處理,這個和get_json_object這個udf類似,不過更高效,其通過一次調用就可以獲得多個鍵值,例:select b.* from test_json a lateral view json_tuple(a.id,’id’,’name’) b as f1,f2;通過lateral view行轉列。
最理想的方式就是能有一種json serde,隻要我們load完資料,就直接可以select * from test,而不是select get_json_object這種方式來擷取,n個字段就要解析n次,效率太低了。
好在cloudrea wiki裡提供了一個json serde類(這個類沒有在發行的hive的jar包中),于是我把它搬來了,如下:
package com.besttone.hive.serde;
import java.util.arraylist;
import java.util.arrays;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.properties;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.hive.serde.serdeconstants;
import org.apache.hadoop.hive.serde2.serde;
import org.apache.hadoop.hive.serde2.serdeexception;
import org.apache.hadoop.hive.serde2.serdestats;
import org.apache.hadoop.hive.serde2.objectinspector.listobjectinspector;
import org.apache.hadoop.hive.serde2.objectinspector.mapobjectinspector;
import org.apache.hadoop.hive.serde2.objectinspector.objectinspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitiveobjectinspector;
import org.apache.hadoop.hive.serde2.objectinspector.structfield;
import org.apache.hadoop.hive.serde2.objectinspector.structobjectinspector;
import org.apache.hadoop.hive.serde2.typeinfo.listtypeinfo;
import org.apache.hadoop.hive.serde2.typeinfo.maptypeinfo;
import org.apache.hadoop.hive.serde2.typeinfo.structtypeinfo;
import org.apache.hadoop.hive.serde2.typeinfo.typeinfo;
import org.apache.hadoop.hive.serde2.typeinfo.typeinfofactory;
import org.apache.hadoop.hive.serde2.typeinfo.typeinfoutils;
import org.apache.hadoop.io.text;
import org.apache.hadoop.io.writable;
import org.codehaus.jackson.map.objectmapper;
/**
* this serde can be used for processing json data in hive. it supports
* arbitrary json data, and can handle all hive types except for union. however,
* the json data is expected to be a series of discrete records, rather than a
* json array of objects.
*
* the hive table is expected to contain columns with names corresponding to
* fields in the json data, but it is not necessary for every json field to have
* a corresponding hive column. those json fields will be ignored during
* queries.
* example:
* { “a”: 1, “b”: [ "str1", "str2" ], “c”: { “field1″: “val1″ } }
* could correspond to a table:
* create table foo (a int, b array<string>, c struct<field1:string>);
* json objects can also interpreted as a hive map type, so long as the keys and
* values in the json object are all of the appropriate types. for example, in
* the json above, another valid table declaraction would be:
* create table foo (a int, b array<string>, c map<string,string>);
* only string keys are supported for hive maps.
*/
public class jsonserde implements serde {
private structtypeinfo rowtypeinfo;
private objectinspector rowoi;
private list<string> colnames;
private list<object> row = new arraylist<object>();
//遇到非json格式輸入的時候的處理。
private boolean ignoreinvalidinput;
/**
* an initialization function used to gather information about the table.
* typically, a serde implementation will be interested in the list of
* column names and their types. that information will be used to help
* perform actual serialization and deserialization of data.
*/
@override
public void initialize(configuration conf, properties tbl)
throws serdeexception {
// 遇到無法轉換成json對象的字元串時,是否忽略,預設不忽略,抛出異常,設定為true将跳過異常。
ignoreinvalidinput = boolean.valueof(tbl.getproperty(
“input.invalid.ignore”, “false”));
// get a list of the table’s column names.
string colnamesstr = tbl.getproperty(serdeconstants.list_columns);
colnames = arrays.aslist(colnamesstr.split(“,”));
// get a list of typeinfos for the columns. this list lines up with
// the list of column names.
string coltypesstr = tbl.getproperty(serdeconstants.list_column_types);
list<typeinfo> coltypes = typeinfoutils
.gettypeinfosfromtypestring(coltypesstr);
rowtypeinfo = (structtypeinfo) typeinfofactory.getstructtypeinfo(
colnames, coltypes);
rowoi = typeinfoutils
.getstandardjavaobjectinspectorfromtypeinfo(rowtypeinfo);
}
* this method does the work of deserializing a record into java objects
* that hive can work with via the objectinspector interface. for this
* serde, the blob that is passed in is a json string, and the jackson json
* parser is being used to translate the string into java objects.
*
* the json deserialization works by taking the column names in the hive
* table, and looking up those fields in the parsed json object. if the
* value of the field is not a primitive, the object is parsed further.
public object deserialize(writable blob) throws serdeexception {
map<?, ?> root = null;
row.clear();
try {
objectmapper mapper = new objectmapper();
// this is really a map<string, object>. for more information about
// how
// jackson parses json in this example, see
// http://wiki.fasterxml.com/jacksondatabinding
root = mapper.readvalue(blob.tostring(), map.class);
} catch (exception e) {
// 如果為true,不抛出異常,忽略該行資料
if (!ignoreinvalidinput)
throw new serdeexception(e);
else {
return null;
}
}
// lowercase the keys as expected by hive
map<string, object> lowerroot = new hashmap();
for (map.entry entry : root.entryset()) {
lowerroot.put(((string) entry.getkey()).tolowercase(),
entry.getvalue());
root = lowerroot;
object value = null;
for (string fieldname : rowtypeinfo.getallstructfieldnames()) {
try {
typeinfo fieldtypeinfo = rowtypeinfo
.getstructfieldtypeinfo(fieldname);
value = parsefield(root.get(fieldname), fieldtypeinfo);
} catch (exception e) {
value = null;
row.add(value);
return row;
* parses a json object according to the hive column’s type.
* @param field
* – the json object to parse
* @param fieldtypeinfo
* – metadata about the hive column
* @return – the parsed value of the field
private object parsefield(object field, typeinfo fieldtypeinfo) {
switch (fieldtypeinfo.getcategory()) {
case primitive:
// jackson will return the right thing in this case, so just return
// the object
if (field instanceof string) {
field = field.tostring().replaceall(“\n”, “\\\\n”);
return field;
case list:
return parselist(field, (listtypeinfo) fieldtypeinfo);
case map:
return parsemap(field, (maptypeinfo) fieldtypeinfo);
case struct:
return parsestruct(field, (structtypeinfo) fieldtypeinfo);
case union:
// unsupported by json
default:
return null;
* parses a json object and its fields. the hive metadata is used to
* determine how to parse the object fields.
* @return – a map representing the object and its fields
private object parsestruct(object field, structtypeinfo fieldtypeinfo) {
map<object, object> map = (map<object, object>) field;
arraylist<typeinfo> structtypes = fieldtypeinfo
.getallstructfieldtypeinfos();
arraylist<string> structnames = fieldtypeinfo.getallstructfieldnames();
list<object> structrow = new arraylist<object>(structtypes.size());
for (int i = 0; i < structnames.size(); i++) {
structrow.add(parsefield(map.get(structnames.get(i)),
structtypes.get(i)));
return structrow;
* parse a json list and its elements. this uses the hive metadata for the
* list elements to determine how to parse the elements.
* – the json list to parse
* @return – a list of the parsed elements
private object parselist(object field, listtypeinfo fieldtypeinfo) {
arraylist<object> list = (arraylist<object>) field;
typeinfo elemtypeinfo = fieldtypeinfo.getlistelementtypeinfo();
for (int i = 0; i < list.size(); i++) {
list.set(i, parsefield(list.get(i), elemtypeinfo));
return list.toarray();
* parse a json object as a map. this uses the hive metadata for the map
* values to determine how to parse the values. the map is assumed to have a
* string for a key.
* @return
private object parsemap(object field, maptypeinfo fieldtypeinfo) {
typeinfo valuetypeinfo = fieldtypeinfo.getmapvaluetypeinfo();
for (map.entry<object, object> entry : map.entryset()) {
map.put(entry.getkey(), parsefield(entry.getvalue(), valuetypeinfo));
return map;
* return an objectinspector for the row of data
public objectinspector getobjectinspector() throws serdeexception {
return rowoi;
* unimplemented
public serdestats getserdestats() {
return null;
* json is just a textual representation, so our serialized class is just
* text.
public class<? extends writable> getserializedclass() {
return text.class;
* this method takes an object representing a row of data from hive, and
* uses the objectinspector to get the data for each column and serialize
* it. this implementation deparses the row into an object that jackson can
* easily serialize into a json blob.
public writable serialize(object obj, objectinspector oi)
object deparsedobj = deparserow(obj, oi);
objectmapper mapper = new objectmapper();
// let jackson do the work of serializing the object
return new text(mapper.writevalueasstring(deparsedobj));
throw new serdeexception(e);
* deparse a hive object into a jackson-serializable object. this uses the
* objectinspector to extract the column data.
* @param obj
* – hive object to deparse
* @param oi
* – objectinspector for the object
* @return – a deparsed object
private object deparseobject(object obj, objectinspector oi) {
switch (oi.getcategory()) {
return deparselist(obj, (listobjectinspector) oi);
return deparsemap(obj, (mapobjectinspector) oi);
return deparseprimitive(obj, (primitiveobjectinspector) oi);
return deparsestruct(obj, (structobjectinspector) oi, false);
* deparses a row of data. we have to treat this one differently from other
* structs, because the field names for the root object do not match the
* column names for the hive table.
* – object representing the top-level row
* @param structoi
* – objectinspector for the row
* @return – a deparsed row of data
private object deparserow(object obj, objectinspector structoi) {
return deparsestruct(obj, (structobjectinspector) structoi, true);
* deparses struct data into a serializable json object.
* – hive struct data
* – objectinspector for the struct
* @param isrow
* – whether or not this struct represents a top-level row
* @return – a deparsed struct
private object deparsestruct(object obj, structobjectinspector structoi,
boolean isrow) {
map<object, object> struct = new hashmap<object, object>();
list<? extends structfield> fields = structoi.getallstructfieldrefs();
for (int i = 0; i < fields.size(); i++) {
structfield field = fields.get(i);
// the top-level row object is treated slightly differently from
// other
// structs, because the field names for the row do not correctly
// reflect
// the hive column names. for lower-level structs, we can get the
// field
// name from the associated structfield object.
string fieldname = isrow ? colnames.get(i) : field.getfieldname();
objectinspector fieldoi = field.getfieldobjectinspector();
object fieldobj = structoi.getstructfielddata(obj, field);
struct.put(fieldname, deparseobject(fieldobj, fieldoi));
return struct;
* deparses a primitive type.
private object deparseprimitive(object obj, primitiveobjectinspector primoi) {
return primoi.getprimitivejavaobject(obj);
private object deparsemap(object obj, mapobjectinspector mapoi) {
map<object, object> map = new hashmap<object, object>();
objectinspector mapvaloi = mapoi.getmapvalueobjectinspector();
map<?, ?> fields = mapoi.getmap(obj);
for (map.entry<?, ?> field : fields.entryset()) {
object fieldname = field.getkey();
object fieldobj = field.getvalue();
map.put(fieldname, deparseobject(fieldobj, mapvaloi));
* deparses a list and its elements.
private object deparselist(object obj, listobjectinspector listoi) {
list<object> list = new arraylist<object>();
list<?> field = listoi.getlist(obj);
objectinspector elemoi = listoi.getlistelementobjectinspector();
for (object elem : field) {
list.add(deparseobject(elem, elemoi));
return list;
}
我稍微修改了一點東西,多加了一個參數input.invalid.ignore,對應的變量為:
//遇到非json格式輸入的時候的處理。
private boolean ignoreinvalidinput;
在deserialize方法中原來是如果傳入的是非json格式字元串的話,直接抛出了serdeexception,我加了一個參數來控制它是否抛出異常,在initialize方法中初始化這個變量(預設為false):
// 遇到無法轉換成json對象的字元串時,是否忽略,預設不忽略,抛出異常,設定為true将跳過異常。
ignoreinvalidinput = boolean.valueof(tbl.getproperty(
“input.invalid.ignore”, “false”));
好的,現在将這個類打成jar包: jsonserde.jar,放在hive_home的auxlib目錄下(我的是/etc/hive/auxlib),然後修改hive-env.sh,添加hive_aux_jars_path=/etc/hive/auxlib/jsonserde.jar,這樣每次運作hive用戶端的時候都會将這個jar包添加到classpath,否則在設定serde的時候會報找不到類。
現在我們在hive中建立一張表用來存放日志資料:
create table test(
requesttime bigint,
requestparams struct<timestamp:bigint,phone:string,cardname:string,provincecode:string,citycode:string>,
requesturl string)
row format serde “com.besttone.hive.serde.jsonserde”
with serdeproperties(
“input.invalid.ignore”=”true”,
“requesttime”=”$.requesttime”,
“requestparams.timestamp”=”$.requestparams.timestamp”,
“requestparams.phone”=”$.requestparams.phone”,
“requestparams.cardname”=”$.requestparams.cardname”,
“requestparams.provincecode”=”$.requestparams.provincecode”,
“requestparams.citycode”=”$.requestparams.citycode”,
“requesturl”=”$.requesturl”);
這個表結構就是按照日志格式設計的,還記得前面說過的日志資料如下:
我使用了一個struct類型來儲存requestparams的值,row format我們用的是自定義的json serde:com.besttone.hive.serde.jsonserde,serdeproperties中,除了設定json對象的映射關系外,我還設定了一個自定義的參數:”input.invalid.ignore”=”true”,忽略掉所有非json格式的輸入行。這裡不是真正意義的忽略,隻是非法行的每個輸出字段都為null了,要在結果集上忽略,必須這樣寫:select * from test where requesturl is not null;
ok表建好了,現在就差資料了,我們啟動flumedemo的writelog,往hive表test目錄下面輸出一些日志資料,然後在進入hive用戶端,select * from test;是以字段都正确的解析,大功告成。
flume.conf如下:
tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1
tier1.sources.source1.type=avro
tier1.sources.source1.bind=0.0.0.0
tier1.sources.source1.port=44444
tier1.sources.source1.channels=channel1
tier1.sources.source1.interceptors=i1 i2
tier1.sources.source1.interceptors.i1.type=regex_filter
tier1.sources.source1.interceptors.i1.regex=\\{.*\\}
tier1.sources.source1.interceptors.i2.type=timestamp
tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000
tier1.channels.channel1.transactioncapacity=1000
tier1.channels.channel1.keep-alive=30
tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test
tier1.sinks.sink1.hdfs.filetype=datastream
tier1.sinks.sink1.hdfs.writeformat=text
tier1.sinks.sink1.hdfs.rollinterval=0
tier1.sinks.sink1.hdfs.rollsize=10240
tier1.sinks.sink1.hdfs.rollcount=0
tier1.sinks.sink1.hdfs.idletimeout=60
besttone.db是我在hive中建立的資料庫,了解hive的應該了解沒多大問題。
ok,到這篇文章為止,整個從log4j生産日志,到flume收集日志,再到用hive離線分析日志,一整套流水線都講解完了。