天天看點

flume學習(六):使用hive來分析flume收集的日志資料

前面已經講過如何将log4j的日志輸出到指定的hdfs目錄,我們前面的指定目錄為/flume/events。

如果想用hive來分析采集來的日志,我們可以将/flume/events下面的日志資料都load到hive中的表當中去。

如果了解hive的load data原理的話,還有一種更簡便的方式,可以省去load data這一步,就是直接将sink1.hdfs.path指定為hive表的目錄。

下面我将較長的描述具體的操作步驟。

我們還是從需求驅動來講解,前面我們采集的資料,都是接口的通路日志資料,資料格式是json格式如下:

{“requesttime”:1405651379758,”requestparams”:{“timestamp”:1405651377211,”phone”:”02038824941″,”cardname”:”測試商家名稱”,”provincecode”:”440000″,”citycode”:”440106″},”requesturl”:”/reporter-api/reporter/reporter12/init.do”}

現在有一個需求,我們要統計接口的總調用量。

我第一想法就是,hive中建一張表:test             然後将hdfs.path指定為tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test

然後select  count(*) from test;   完事。

這個方案簡單,粗暴,先這麼幹着。于是會遇到一個問題,我的日志資料時json格式的,需要hive來序列化和反序列化json格式的資料到test表的具體字段當中去。

這有點糟糕,因為hive本身沒有提供json的serde,但是有提供函數來解析json字元串,

第一個是(udf):

get_json_object(string json_string,string path) 從給定路徑上的json字元串中抽取出json對象,并傳回這個對象的json字元串形式,如果輸入的json字元串是非法的,則傳回null。

第二個是表生成函數(udtf):json_tuple(string jsonstr,p1,p2,…,pn) 本函數可以接受多個标簽名稱,對輸入的json字元串進行處理,這個和get_json_object這個udf類似,不過更高效,其通過一次調用就可以獲得多個鍵值,例:select b.* from test_json a lateral view json_tuple(a.id,’id’,’name’) b as f1,f2;通過lateral view行轉列。

最理想的方式就是能有一種json serde,隻要我們load完資料,就直接可以select * from test,而不是select get_json_object這種方式來擷取,n個字段就要解析n次,效率太低了。

好在cloudrea wiki裡提供了一個json serde類(這個類沒有在發行的hive的jar包中),于是我把它搬來了,如下:

package com.besttone.hive.serde;  

import java.util.arraylist;  

import java.util.arrays;  

import java.util.hashmap;  

import java.util.list;  

import java.util.map;  

import java.util.properties;  

import org.apache.hadoop.conf.configuration;  

import org.apache.hadoop.hive.serde.serdeconstants;  

import org.apache.hadoop.hive.serde2.serde;  

import org.apache.hadoop.hive.serde2.serdeexception;  

import org.apache.hadoop.hive.serde2.serdestats;  

import org.apache.hadoop.hive.serde2.objectinspector.listobjectinspector;  

import org.apache.hadoop.hive.serde2.objectinspector.mapobjectinspector;  

import org.apache.hadoop.hive.serde2.objectinspector.objectinspector;  

import org.apache.hadoop.hive.serde2.objectinspector.primitiveobjectinspector;  

import org.apache.hadoop.hive.serde2.objectinspector.structfield;  

import org.apache.hadoop.hive.serde2.objectinspector.structobjectinspector;  

import org.apache.hadoop.hive.serde2.typeinfo.listtypeinfo;  

import org.apache.hadoop.hive.serde2.typeinfo.maptypeinfo;  

import org.apache.hadoop.hive.serde2.typeinfo.structtypeinfo;  

import org.apache.hadoop.hive.serde2.typeinfo.typeinfo;  

import org.apache.hadoop.hive.serde2.typeinfo.typeinfofactory;  

import org.apache.hadoop.hive.serde2.typeinfo.typeinfoutils;  

import org.apache.hadoop.io.text;  

import org.apache.hadoop.io.writable;  

import org.codehaus.jackson.map.objectmapper;  

/** 

 * this serde can be used for processing json data in hive. it supports 

 * arbitrary json data, and can handle all hive types except for union. however, 

 * the json data is expected to be a series of discrete records, rather than a 

 * json array of objects. 

 *  

 * the hive table is expected to contain columns with names corresponding to 

 * fields in the json data, but it is not necessary for every json field to have 

 * a corresponding hive column. those json fields will be ignored during 

 * queries. 

 * example: 

 * { “a”: 1, “b”: [ "str1", "str2" ], “c”: { “field1″: “val1″ } } 

 * could correspond to a table: 

 * create table foo (a int, b array<string>, c struct<field1:string>); 

 * json objects can also interpreted as a hive map type, so long as the keys and 

 * values in the json object are all of the appropriate types. for example, in 

 * the json above, another valid table declaraction would be: 

 * create table foo (a int, b array<string>, c map<string,string>); 

 * only string keys are supported for hive maps. 

 */  

public class jsonserde implements serde {  

    private structtypeinfo rowtypeinfo;  

    private objectinspector rowoi;  

    private list<string> colnames;  

    private list<object> row = new arraylist<object>();  

    //遇到非json格式輸入的時候的處理。  

    private boolean ignoreinvalidinput;  

    /** 

     * an initialization function used to gather information about the table. 

     * typically, a serde implementation will be interested in the list of 

     * column names and their types. that information will be used to help 

     * perform actual serialization and deserialization of data. 

     */  

    @override  

    public void initialize(configuration conf, properties tbl)  

            throws serdeexception {  

        // 遇到無法轉換成json對象的字元串時,是否忽略,預設不忽略,抛出異常,設定為true将跳過異常。  

        ignoreinvalidinput = boolean.valueof(tbl.getproperty(  

                “input.invalid.ignore”, “false”));  

        // get a list of the table’s column names.  

        string colnamesstr = tbl.getproperty(serdeconstants.list_columns);  

        colnames = arrays.aslist(colnamesstr.split(“,”));  

        // get a list of typeinfos for the columns. this list lines up with  

        // the list of column names.  

        string coltypesstr = tbl.getproperty(serdeconstants.list_column_types);  

        list<typeinfo> coltypes = typeinfoutils  

                .gettypeinfosfromtypestring(coltypesstr);  

        rowtypeinfo = (structtypeinfo) typeinfofactory.getstructtypeinfo(  

                colnames, coltypes);  

        rowoi = typeinfoutils  

                .getstandardjavaobjectinspectorfromtypeinfo(rowtypeinfo);  

    }  

     * this method does the work of deserializing a record into java objects 

     * that hive can work with via the objectinspector interface. for this 

     * serde, the blob that is passed in is a json string, and the jackson json 

     * parser is being used to translate the string into java objects. 

     *  

     * the json deserialization works by taking the column names in the hive 

     * table, and looking up those fields in the parsed json object. if the 

     * value of the field is not a primitive, the object is parsed further. 

    public object deserialize(writable blob) throws serdeexception {  

        map<?, ?> root = null;  

        row.clear();  

        try {  

            objectmapper mapper = new objectmapper();  

            // this is really a map<string, object>. for more information about  

            // how  

            // jackson parses json in this example, see  

            // http://wiki.fasterxml.com/jacksondatabinding  

            root = mapper.readvalue(blob.tostring(), map.class);  

        } catch (exception e) {  

            // 如果為true,不抛出異常,忽略該行資料  

            if (!ignoreinvalidinput)  

                throw new serdeexception(e);  

            else {  

                return null;  

            }  

        }  

        // lowercase the keys as expected by hive  

        map<string, object> lowerroot = new hashmap();  

        for (map.entry entry : root.entryset()) {  

            lowerroot.put(((string) entry.getkey()).tolowercase(),  

                    entry.getvalue());  

        root = lowerroot;  

        object value = null;  

        for (string fieldname : rowtypeinfo.getallstructfieldnames()) {  

            try {  

                typeinfo fieldtypeinfo = rowtypeinfo  

                        .getstructfieldtypeinfo(fieldname);  

                value = parsefield(root.get(fieldname), fieldtypeinfo);  

            } catch (exception e) {  

                value = null;  

            row.add(value);  

        return row;  

     * parses a json object according to the hive column’s type. 

     * @param field 

     *            – the json object to parse 

     * @param fieldtypeinfo 

     *            – metadata about the hive column 

     * @return – the parsed value of the field 

    private object parsefield(object field, typeinfo fieldtypeinfo) {  

        switch (fieldtypeinfo.getcategory()) {  

        case primitive:  

            // jackson will return the right thing in this case, so just return  

            // the object  

            if (field instanceof string) {  

                field = field.tostring().replaceall(“\n”, “\\\\n”);  

            return field;  

        case list:  

            return parselist(field, (listtypeinfo) fieldtypeinfo);  

        case map:  

            return parsemap(field, (maptypeinfo) fieldtypeinfo);  

        case struct:  

            return parsestruct(field, (structtypeinfo) fieldtypeinfo);  

        case union:  

            // unsupported by json  

        default:  

            return null;  

     * parses a json object and its fields. the hive metadata is used to 

     * determine how to parse the object fields. 

     * @return – a map representing the object and its fields 

    private object parsestruct(object field, structtypeinfo fieldtypeinfo) {  

        map<object, object> map = (map<object, object>) field;  

        arraylist<typeinfo> structtypes = fieldtypeinfo  

                .getallstructfieldtypeinfos();  

        arraylist<string> structnames = fieldtypeinfo.getallstructfieldnames();  

        list<object> structrow = new arraylist<object>(structtypes.size());  

        for (int i = 0; i < structnames.size(); i++) {  

            structrow.add(parsefield(map.get(structnames.get(i)),  

                    structtypes.get(i)));  

        return structrow;  

     * parse a json list and its elements. this uses the hive metadata for the 

     * list elements to determine how to parse the elements. 

     *            – the json list to parse 

     * @return – a list of the parsed elements 

    private object parselist(object field, listtypeinfo fieldtypeinfo) {  

        arraylist<object> list = (arraylist<object>) field;  

        typeinfo elemtypeinfo = fieldtypeinfo.getlistelementtypeinfo();  

        for (int i = 0; i < list.size(); i++) {  

            list.set(i, parsefield(list.get(i), elemtypeinfo));  

        return list.toarray();  

     * parse a json object as a map. this uses the hive metadata for the map 

     * values to determine how to parse the values. the map is assumed to have a 

     * string for a key. 

     * @return 

    private object parsemap(object field, maptypeinfo fieldtypeinfo) {  

        typeinfo valuetypeinfo = fieldtypeinfo.getmapvaluetypeinfo();  

        for (map.entry<object, object> entry : map.entryset()) {  

            map.put(entry.getkey(), parsefield(entry.getvalue(), valuetypeinfo));  

        return map;  

     * return an objectinspector for the row of data 

    public objectinspector getobjectinspector() throws serdeexception {  

        return rowoi;  

     * unimplemented 

    public serdestats getserdestats() {  

        return null;  

     * json is just a textual representation, so our serialized class is just 

     * text. 

    public class<? extends writable> getserializedclass() {  

        return text.class;  

     * this method takes an object representing a row of data from hive, and 

     * uses the objectinspector to get the data for each column and serialize 

     * it. this implementation deparses the row into an object that jackson can 

     * easily serialize into a json blob. 

    public writable serialize(object obj, objectinspector oi)  

        object deparsedobj = deparserow(obj, oi);  

        objectmapper mapper = new objectmapper();  

            // let jackson do the work of serializing the object  

            return new text(mapper.writevalueasstring(deparsedobj));  

            throw new serdeexception(e);  

     * deparse a hive object into a jackson-serializable object. this uses the 

     * objectinspector to extract the column data. 

     * @param obj 

     *            – hive object to deparse 

     * @param oi 

     *            – objectinspector for the object 

     * @return – a deparsed object 

    private object deparseobject(object obj, objectinspector oi) {  

        switch (oi.getcategory()) {  

            return deparselist(obj, (listobjectinspector) oi);  

            return deparsemap(obj, (mapobjectinspector) oi);  

            return deparseprimitive(obj, (primitiveobjectinspector) oi);  

            return deparsestruct(obj, (structobjectinspector) oi, false);  

     * deparses a row of data. we have to treat this one differently from other 

     * structs, because the field names for the root object do not match the 

     * column names for the hive table. 

     *            – object representing the top-level row 

     * @param structoi 

     *            – objectinspector for the row 

     * @return – a deparsed row of data 

    private object deparserow(object obj, objectinspector structoi) {  

        return deparsestruct(obj, (structobjectinspector) structoi, true);  

     * deparses struct data into a serializable json object. 

     *            – hive struct data 

     *            – objectinspector for the struct 

     * @param isrow 

     *            – whether or not this struct represents a top-level row 

     * @return – a deparsed struct 

    private object deparsestruct(object obj, structobjectinspector structoi,  

            boolean isrow) {  

        map<object, object> struct = new hashmap<object, object>();  

        list<? extends structfield> fields = structoi.getallstructfieldrefs();  

        for (int i = 0; i < fields.size(); i++) {  

            structfield field = fields.get(i);  

            // the top-level row object is treated slightly differently from  

            // other  

            // structs, because the field names for the row do not correctly  

            // reflect  

            // the hive column names. for lower-level structs, we can get the  

            // field  

            // name from the associated structfield object.  

            string fieldname = isrow ? colnames.get(i) : field.getfieldname();  

            objectinspector fieldoi = field.getfieldobjectinspector();  

            object fieldobj = structoi.getstructfielddata(obj, field);  

            struct.put(fieldname, deparseobject(fieldobj, fieldoi));  

        return struct;  

     * deparses a primitive type. 

    private object deparseprimitive(object obj, primitiveobjectinspector primoi) {  

        return primoi.getprimitivejavaobject(obj);  

    private object deparsemap(object obj, mapobjectinspector mapoi) {  

        map<object, object> map = new hashmap<object, object>();  

        objectinspector mapvaloi = mapoi.getmapvalueobjectinspector();  

        map<?, ?> fields = mapoi.getmap(obj);  

        for (map.entry<?, ?> field : fields.entryset()) {  

            object fieldname = field.getkey();  

            object fieldobj = field.getvalue();  

            map.put(fieldname, deparseobject(fieldobj, mapvaloi));  

     * deparses a list and its elements. 

    private object deparselist(object obj, listobjectinspector listoi) {  

        list<object> list = new arraylist<object>();  

        list<?> field = listoi.getlist(obj);  

        objectinspector elemoi = listoi.getlistelementobjectinspector();  

        for (object elem : field) {  

            list.add(deparseobject(elem, elemoi));  

        return list;  

}  

我稍微修改了一點東西,多加了一個參數input.invalid.ignore,對應的變量為:

//遇到非json格式輸入的時候的處理。

private boolean ignoreinvalidinput;

在deserialize方法中原來是如果傳入的是非json格式字元串的話,直接抛出了serdeexception,我加了一個參數來控制它是否抛出異常,在initialize方法中初始化這個變量(預設為false):

// 遇到無法轉換成json對象的字元串時,是否忽略,預設不忽略,抛出異常,設定為true将跳過異常。

ignoreinvalidinput = boolean.valueof(tbl.getproperty(

“input.invalid.ignore”, “false”));

好的,現在将這個類打成jar包: jsonserde.jar,放在hive_home的auxlib目錄下(我的是/etc/hive/auxlib),然後修改hive-env.sh,添加hive_aux_jars_path=/etc/hive/auxlib/jsonserde.jar,這樣每次運作hive用戶端的時候都會将這個jar包添加到classpath,否則在設定serde的時候會報找不到類。

現在我們在hive中建立一張表用來存放日志資料:

create table test(  

requesttime bigint,  

requestparams struct<timestamp:bigint,phone:string,cardname:string,provincecode:string,citycode:string>,    

requesturl string)  

 row format serde “com.besttone.hive.serde.jsonserde”   

 with serdeproperties(  

 “input.invalid.ignore”=”true”,  

 “requesttime”=”$.requesttime”,  

 “requestparams.timestamp”=”$.requestparams.timestamp”,  

 “requestparams.phone”=”$.requestparams.phone”,  

 “requestparams.cardname”=”$.requestparams.cardname”,  

 “requestparams.provincecode”=”$.requestparams.provincecode”,  

 “requestparams.citycode”=”$.requestparams.citycode”,  

 “requesturl”=”$.requesturl”);  

這個表結構就是按照日志格式設計的,還記得前面說過的日志資料如下:

我使用了一個struct類型來儲存requestparams的值,row format我們用的是自定義的json serde:com.besttone.hive.serde.jsonserde,serdeproperties中,除了設定json對象的映射關系外,我還設定了一個自定義的參數:”input.invalid.ignore”=”true”,忽略掉所有非json格式的輸入行。這裡不是真正意義的忽略,隻是非法行的每個輸出字段都為null了,要在結果集上忽略,必須這樣寫:select * from test where requesturl is not null;

ok表建好了,現在就差資料了,我們啟動flumedemo的writelog,往hive表test目錄下面輸出一些日志資料,然後在進入hive用戶端,select * from test;是以字段都正确的解析,大功告成。

flume.conf如下:

tier1.sources=source1  

tier1.channels=channel1  

tier1.sinks=sink1  

tier1.sources.source1.type=avro  

tier1.sources.source1.bind=0.0.0.0  

tier1.sources.source1.port=44444  

tier1.sources.source1.channels=channel1  

tier1.sources.source1.interceptors=i1 i2  

tier1.sources.source1.interceptors.i1.type=regex_filter  

tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  

tier1.sources.source1.interceptors.i2.type=timestamp  

tier1.channels.channel1.type=memory  

tier1.channels.channel1.capacity=10000  

tier1.channels.channel1.transactioncapacity=1000  

tier1.channels.channel1.keep-alive=30  

tier1.sinks.sink1.type=hdfs  

tier1.sinks.sink1.channel=channel1  

tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test  

tier1.sinks.sink1.hdfs.filetype=datastream  

tier1.sinks.sink1.hdfs.writeformat=text  

tier1.sinks.sink1.hdfs.rollinterval=0  

tier1.sinks.sink1.hdfs.rollsize=10240  

tier1.sinks.sink1.hdfs.rollcount=0  

tier1.sinks.sink1.hdfs.idletimeout=60  

besttone.db是我在hive中建立的資料庫,了解hive的應該了解沒多大問題。

ok,到這篇文章為止,整個從log4j生産日志,到flume收集日志,再到用hive離線分析日志,一整套流水線都講解完了。