天天看點

flume+ Elasticsearch +kibana環境搭建及講解

1、軟體介紹

1.1、flume

1.1.1、flume介紹

1)flume概念

1、flume是一個分布式的日志收集系統,具有高可靠、高可用、事務管理、失敗重新開機等功能。資料處理速度快,完全可以用于生産環境;

2、flume的核心是agent。agent是一個java程序,運作在日志收集端,通過agent接收日志,然後暫存起來,再發送到目的地;

3、agent裡面包含3個核心元件:source、channel、sink,介紹如下:

source元件是專用于收集日志的,可以處理各種類型各種格式的日志資料,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義等。source元件把資料收集來以後,臨時存放在channel中。

channel 元件是在agent中專用于臨時存儲資料的,可以存放在memory、jdbc、file、自定義。channel中的資料隻有在sink發送成功之後才會被删除。

sink元件是用于把資料發送到目的地的元件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、Elasticsearch、自定義等。

4、在整個資料傳輸過程中,流動的是事件event。事件event是Flume的基本資料機關,它攜帶日志資料(位元組數組形式)并且攜帶有頭資訊,這些Event由Agent外部的Source生成;

5、flume事務保證是在event級别;

6、flume可以支援多級flume的agent ,支援扇入(fan-in)、扇出(fan-out);

2)flume資料流模型

Flume以agent為最小的獨立運作機關。一個agent就是一個JVM。單agent由Source、Sink和Channel三大元件構成。

Flume支援使用者建立多級流,也就是說,多個agent可以協同工作。常見模型如下:

1、單agent模型:

flume+ Elasticsearch +kibana環境搭建及講解

2、多agent模型:

flume+ Elasticsearch +kibana環境搭建及講解

圖(一)

flume+ Elasticsearch +kibana環境搭建及講解

圖(二)

flume+ Elasticsearch +kibana環境搭建及講解

圖(三)

3)flume讀取檔案方式

對于直接讀取檔案Source, 主要有兩種方式:

1、Exec source

可通過寫Unix command的方式組織資料,最常用的就是tail -F [file]。可以實作實時傳輸,但在flume不運作和腳本錯誤時,會丢資料,也不支援斷點續傳功能。因為沒有記錄上次檔案讀到的位置,進而沒辦法知道,下次再讀時,從什麼地方開始讀。特别是在日志檔案一直在增加的時候。flume的source挂了。等flume的source再次開啟的這段時間内,增加的日志内容,就沒辦法被source讀取到了。不過flume有一個execStream的擴充,可以自己寫一個監控日志增加情況,把增加的日志,通過自己寫的工具把增加的内容,傳送給flume的node。再傳送給sink的node。要是能在tail類的source中能支援,在node挂掉這段時間的内容,等下次node開啟後在繼續傳送,那就更完美了。

2、Spooling Directory Source

SpoolSource是監測配置的目錄下新增的檔案,并将檔案中的資料讀取出來,可實作準實時。需要注意兩點:1、拷貝到spool目錄下的檔案不可以再打開編輯。2、spool目錄下不可包含相應的子目錄。在實際使用的過程中,可以結合log4j使用,使用log4j的時候,将log4j的檔案分割機制設為1分鐘一次,将檔案拷貝到spool的監控目錄。log4j有一個TimeRolling的插件,可以把log4j分割的檔案到spool目錄。基本實作了實時的監控。Flume在傳完檔案之後,将會修改檔案的字尾,變為.COMPLETED(字尾也可以在配置檔案中靈活指定)。

Exec與Spooling Directory比較:

ExecSource可以實作對日志的實時收集,但是存在Flume不運作或者指令執行出錯時,将無法收集到日志資料,無法保證日志資料的完整性。SpoolSource雖然無法實作實時的收集資料,但是可以使用以分鐘的方式分割檔案,趨近于實時。如果應用無法實作以分鐘切割日志檔案的話,可以兩種收集方式結合使用。

4)flume channel介紹

Channel有多種方式:有MemoryChannel,JDBC Channel, MemoryRecoverChannel, FileChannel。MemoryChannel可以實作高速的吞吐,但是無法保證資料的完整性。MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。FileChannel保證資料的完整性與一緻性。在具體配置FileChannel時,建議FileChannel設定的目錄和程式日志檔案儲存的目錄設成不同的磁盤,以便提高效率。

5)flume特性

1、高可靠性

作為生産環境運作的軟體,高可靠性是必須的。從單agent來看,Flume使用基于事務的資料傳遞方式來保證事件傳遞的可靠性。Source和Sink分别被封裝進一個事務。事件被存放在Channel中直到該事件被處理,Channel中的事件才會被移除。這是Flume提供的點到點的可靠機制。

從多級流來看,前一個agent的sink和後一個agent的source同樣有它們的事務來保障資料的可靠性。

事務流程圖如下:

2、可恢複性

還是靠Channel。推薦使用FileChannel,将事件持久化在本地檔案系統裡,但性能相對MemoryChannel較差。

1.1.2、flume配置說明

1、source常見配置(詳細配置見官方文檔)

1)Avro source

flume可以監聽avro-client發送過來的内容,然後進行處理。

配置檔案内容如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2)Spool source,

Spool監測配置的目錄下新增的檔案,并将檔案中的資料讀取出來。需要注意兩點:

1、拷貝到spool目錄下的檔案不可以再打開編輯;

2、spool目錄下不可包含相應的子目錄;

配置檔案内容如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home /flume-1.6.0-bin/logs

a1.sources.r1.fileHeader = true

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3)Exec source

EXEC執行一個給定的指令獲得輸出的源,如使用tail指令。

配置檔案内容如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/flume-1.6.0-bin/log_exec_tail

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

4)Syslogtcp source

Syslogtcp監聽TCP的端口做為資料源

配置檔案内容如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2、sink常見配置(詳細配置見官方文檔)

1)Hadoop sink

配置檔案内容如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path =hdfs://m1:9000/user/flume/syslogtcp

a1.sinks.k1.hdfs.filePrefix = Syslog

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2)File Roll Sink

配置檔案内容如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5555

a1.sources.r1.host = localhost

# Describe the sink

a1.sinks.k1.type = file_roll

a1.sinks.k1.sink.directory = /home/flume-1.6.0-bin/logs

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3)elasticsearch Sink

配置檔案内容如下:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks. k1.type = elasticsearch

a1.sinks. k1.hostNames =127.0.0.1:9300

a1.sinks. k1.indexName = test_index

a1.sinks. k1.indexType = test_type_1

a1.sinks. k1.clusterName =vie61_yanshi

a1.sinks. k1.batchSize = 10

a1.sinks. k1.ttl = 5d

a1.sinks. k1.serializer =org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3、Flume Sink Processors常見配置(詳細配置見官方文檔)

1)Failover SinkProcessor

将多個sink歸為一個組,先向組中優先級高的sink發送消息,如果發送失敗,則會向組中優先級低的sink發送消息。

配置檔案内容如下:

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type =failover

a1.sinkgroups.g1.processor.priority.k1= 5

a1.sinkgroups.g1.processor.priority.k2= 10

a1.sinkgroups.g1.processor.maxpenalty= 10000

2)Load balancing SinkProcessor

将多個sink歸為一個組,向組中sink負載均衡的發送消息。負載均衡的方式分為:循環周遊(round_robin)、随機(random)和自定義實作。

配置檔案内容如下:

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type =load_balance

a1.sinkgroups.g1.processor.backoff =true

a1.sinkgroups.g1.processor.selector =random

4、channel常見配置(詳細配置見官方文檔)

1)Memory Channel

此種方式将event内容放在緩存中。優點是效率高,缺點是放在channel中的資料丢失後不可恢複。

配置檔案内容如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2)File Channel

此種方式将event内容放在檔案系統中。優點是資料達到channel後即使flume挂了,在flume重新開機後資料依然可以恢複發送。缺點是效率較低。

配置檔案内容如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers eventsin memory

a1.channels.c1.type = file

a1.channels.c1.checkpointDir =/mnt/flume/checkpoint

a1.channels.c1.dataDirs =/mnt/flume/data

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

5、Fan Out常見配置(詳細配置見官方文檔)

Fan Out(扇出)分為兩種方式:replicating(複制)和multiplexing(複用)。

1)replicating(複制)

複制會将一個source的event内容發送給所有配置的channel。

配置檔案内容如下:

# List the sources, sinks andchannels for the agent

<Agent>.sources =<Source1>

<Agent>.sinks = <Sink1><Sink2>

<Agent>.channels =<Channel1> <Channel2>

# set list of channels for source(separated by space)

<Agent>.sources.<Source1>.channels= <Channel1> <Channel2>

# set channel for sinks

<Agent>.sinks.<Sink1>.channel= <Channel1>

<Agent>.sinks.<Sink2>.channel= <Channel2>

<Agent>.sources.<Source1>.selector.type= replicating

2)multiplexing(複用)

複用會将一個source的event内容發送給比對到結果的一個子集channels。

配置檔案内容如下:

# Mapping for multiplexing selector

<Agent>.sources.<Source1>.selector.type= multiplexing

<Agent>.sources.<Source1>.selector.header= <someHeader>

<Agent>.sources.<Source1>.selector.mapping.<Value1>= <Channel1>

<Agent>.sources.<Source1>.selector.mapping.<Value2>= <Channel1> <Channel2>

<Agent>.sources.<Source1>.selector.mapping.<Value3>= <Channel2>

#...

<Agent>.sources.<Source1>.selector.default= <Channel2>

具體示例:

# list the sources, sinks andchannels in the agent

agent_foo.sources =avro-AppSrv-source1

agent_foo.sinks = hdfs-Cluster1-sink1avro-forward-sink2

agent_foo.channels = mem-channel-1file-channel-2

# set channels for source

agent_foo.sources.avro-AppSrv-source1.channels= mem-channel-1 file-channel-2

# set channel for sinks

agent_foo.sinks.hdfs-Cluster1-sink1.channel= mem-channel-1

agent_foo.sinks.avro-forward-sink2.channel= file-channel-2

# channel selector configuration

agent_foo.sources.avro-AppSrv-source1.selector.type= multiplexing

agent_foo.sources.avro-AppSrv-source1.selector.header= State

agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA= mem-channel-1

agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ= file-channel-2

agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY= mem-channel-1 file-channel-2

agent_foo.sources.avro-AppSrv-source1.selector.default= mem-channel-1

1.1.3、flume擴充說明

Flume對source、channel和sink提供了多種實作方式,但因為需求的複雜性,可能這些方式滿足不了我們的需求。好在flume提供了靈活的擴充方式,我們通過實作flume提供的接口,就可以友善的實作自定義的source、sink等。具體實作請參考官方源碼。

1、  source擴充

public class MySource extendsAbstractSource implements Configurable, PollableSource {

 private String myProp;

 @Override

 public void configure(Context context) {

   String myProp = context.getString("myProp","defaultValue");

   // Process the myProp value (e.g. validation, convert to another type,...)

   // Store myProp for later retrieval by process() method

   this.myProp = myProp;

 }

 @Override

 public void start() {

   // Initialize the connection to the external client

 }

 @Override

 public void stop () {

   // Disconnect from external client and do any additional cleanup

   // (e.g. releasing resources or nulling-out field values) ..

  }

 @Override

 public Status process() throws EventDeliveryException {

   Status status = null;

   try {

     // This try clause includes whatever Channel/Event operations you wantto do

     // Receive new data

     Event e = getSomeData();

     // Store the Event into this Source's associated Channel(s)

     getChannelProcessor().processEvent(e);

     status = Status.READY;

   } catch (Throwable t) {

     // Log exception, handle individual exceptions as needed

     status = Status.BACKOFF;

     // re-throw all Errors

     if (t instanceof Error) {

        throw (Error)t;

     }

   } finally {

     txn.close();

   }

   return status;

 }

}

2、  sink擴充

public class MySink extendsAbstractSink implements Configurable {

 private String myProp;

 @Override

 public void configure(Context context) {

   String myProp = context.getString("myProp","defaultValue");

   // Process the myProp value (e.g. validation)

   // Store myProp for later retrieval by process() method

   this.myProp = myProp;

 }

 @Override

 public void start() {

   // Initialize the connection to the external repository (e.g. HDFS) that

   // this Sink will forward Events to ..

 }

 @Override

 public void stop () {

   // Disconnect from the external respository and do any

   // additional cleanup (e.g. releasing resources or nulling-out

   // field values) ..

 }

 @Override

 public Status process() throws EventDeliveryException {

   Status status = null;

   // Start transaction

   Channel ch = getChannel();

   Transaction txn = ch.getTransaction();

   txn.begin();

   try {

     // This try clause includes whatever Channel operations you want to do

     Event event = ch.take();

     // Send the Event to the external repository.

     // storeSomeData(e);

     txn.commit();

     status = Status.READY;

   } catch (Throwable t) {

     txn.rollback();

     // Log exception, handle individual exceptions as needed

     status = Status.BACKOFF;

     // re-throw all Errors

     if (t instanceof Error) {

        throw (Error)t;

     }

   }

   return status;

 }

}

3、  Interceptor擴充

public class MyInterceptor implementsInterceptor {

           @Override

           public void initialize() {

           }

           @Override

           public Event intercept(Event event) {

           }

           @Override

           public List<Event>intercept(List<Event> events) {

           }

           @Override

           public void close() {

             // no-op

           }

           public static class Builder implementsInterceptor.Builder {                 

                   @Override

                   publicvoid configure(Context context) {

                   }

                   @Override

                   publicInterceptor build() {

                            returnnew MyInterceptor();

                   }

           }

}

4、  RPC clients – Avro

通過Avro client可以實作将日志資訊直接發送到flume,前提是flume的source端收集日志方式需要配置為Avro方式。示例如下:

public class MyApp {

 public static void main(String[] args) {

   MyRpcClientFacade client = new MyRpcClientFacade();

   // Initialize client with the remote Flume agent's host and port

   client.init("host.example.org", 41414);

   // Send 10 events to the remote Flume agent. That agent should be

   // configured to listen with an AvroSource.

   String sampleData = "Hello Flume!";

   for (int i = 0; i < 10; i++) {

     client.sendDataToFlume(sampleData);

   }

   client.cleanUp();

 }

}

class MyRpcClientFacade {

 private RpcClient client;

 private String hostname;

 private int port;

 public void init(String hostname, int port) {

   // Setup the RPC connection

   this.hostname = hostname;

   this.port = port;

   this.client = RpcClientFactory.getDefaultInstance(hostname, port);

   // Use the following method to create a thrift client (instead of theabove line):

   // this.client = RpcClientFactory.getThriftInstance(hostname, port);

 }

 public void sendDataToFlume(String data) {

   // Create a Flume Event object that encapsulates the sample data

   Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));

   // Send the event

   try {

     client.append(event);

   } catch (EventDeliveryException e) {

     // clean up and recreate the client

     client.close();

     client = null;

     client = RpcClientFactory.getDefaultInstance(hostname, port);

     // Use the following method to create a thrift client (instead of theabove line):

     // this.client = RpcClientFactory.getThriftInstance(hostname, port);

   }

 }

 public void cleanUp() {

   // Close the RPC connection

   client.close();

 }

}

1.2、Elasticsearch

1.2.1、Elasticsearch介紹

1)Elasticsearch介紹

Elasticsearch是一個實時的分布式搜尋和分析引擎。它可以幫助你用前所未有的速度去處理大規模資料。它可以用于全文搜尋,結構化搜尋以及分析,當然你也可以将這三者進行組合。

Elasticsearch是一個建立在全文搜尋引擎Apache Lucene™基礎上的搜尋引擎,可以說Lucene是當今最先進,最高效的全功能開源搜尋引擎架構。

Elasticsearch使用Lucene作為内部引擎,但是在使用它做全文搜尋時,隻需要使用統一開發好的API即可,而不需要了解其背後複雜的Lucene的運作原理。

當然Elasticsearch并不僅僅是Lucene這麼簡單,它不但包括了全文搜尋功能,還可以進行以下工作:

    1、分布式實時檔案存儲,并将每一個字段都編入索引,使其可以被搜尋;

    2、實時分析的分布式搜尋引擎;

    3、可以擴充到上百台伺服器,處理PB級别的結構化或非結構化資料;

這麼多的功能被內建到一台伺服器上,你可以輕松地通過用戶端或者任何你喜歡的程式語言與ES的RESTful API進行交流。

2)Elasticsearch優點

1、 Elasticsearch是分布式的。不需要其他元件,分發是實時的,被叫做”Push replication”;

    2、Elasticsearch完全支援 Apache Lucene 的接近實時的搜尋;

    3、處理多租戶(multitenancy)不需要特殊配置,而Solr則需要更多的進階設定;

    4、Elasticsearch采用 Gateway 的概念,使得完備份更加簡單;

    5、各節點組成對等的網絡結構,某些節點出現故障時會自動配置設定其他節點代替其進行工作。

3)Elasticsearch使用案例

    1、維基百科使用Elasticsearch來進行全文搜做并高亮顯示關鍵詞,以及提供search-as-you-type、did-you-mean等搜尋建議功能;

    2、英國衛報使用Elasticsearch來處理訪客日志,以便能将公衆對不同文章的反應實時地回報給各位編輯;

    3、StackOverflow将全文搜尋與地理位置和相關資訊進行結合,以提供more-like-this相關問題的展現;

    4、GitHub使用Elasticsearch來檢索超過1300億行代碼;

    5、每天,GoldmanSachs使用它來處理5TB資料的索引,還有很多投行使用它來分析股票市場的變動。

但是Elasticsearch并不隻是面向大型企業的,它還幫助了很多類似DataDog以及Klout的創業公司進行了功能的擴充。

1.2.2、Elasticsearch配置說明

1、索引模闆(Index templates)

索引可使用預定義的模闆進行建立,這個模闆稱作Indextemplates。模闆設定包括settings和mappings,通過模式比對的方式使得多個索引重用一個模闆。

1)定義模闆:

   curl -XPUT localhost:9200/_template/template_1 -d ' 

   { 

        "template" :"te*", 

        "settings" : { 

            "number_of_shards" :5 

        }, 

        "mappings" : { 

            "type1" : { 

                "_source" :{"enabled" : false },

                                     "properties": { }

            } 

        }  

   } 

   ' 

說明:上述定義的模闆template_1将對用te開頭的新索引都是有效。

模闆中也可以包含别别名的定義,如下:

   curl -XPUT localhost:9200/_template/template_1 -d ' 

   { 

        "template" :"te*", 

        "settings" : { 

            "number_of_shards" :1 

        }, 

        "aliases" : { 

            "alias1" : {}, 

            "alias2" : { 

                "filter" : { 

                    "term":{"user" : "kimchy" } 

                }, 

                "routing":"kimchy" 

            }, 

            "{index}-alias" : {}  

        } 

2)删除模闆

curl -XDELETElocalhost:9200/_template/template_1 

3)檢視模闆

curl -XGET localhost:9200/_template/template_1  

4)模闆配置檔案

除了以上方式,索引模闆也可以在檔案中進行配置。索引模闆的配置檔案需要在每個主節點的config目錄下,目錄結構為:config/templates/template_1.json,

template_1.json的示例如下:

   { 

     "template-logstash" : { 

        "template" :"logstash*", 

        "settings" : { 

          "index.number_of_shards" :5, 

          "number_of_replicas" :1, 

          "index" : { 

            "store" : { 

             "compress" :{ 

                "stored" : true, 

                "tv": true 

              } 

            } 

          } 

        }, 

        "mappings" : { 

          "_default_" : { 

            "properties" : { 

              "dynamic" :"true", 

            }, 

          }, 

          "loadbalancer" : { 

            "_source" : { 

              "compress" : true, 

            }, 

            "_ttl" : { 

              "enabled" : true, 

              "default" :"10d" 

            }, 

            "_all" : { 

              "enabled" : false 

            }, 

            "properties" : { 

              "@fields" : { 

                "dynamic" :"true", 

                "properties" : { 

                  "client" : { 

                    "type" :"string", 

                    "index" :"not_analyzed" 

                  }, 

                  "domain" : { 

                    "type" :"string", 

                    "index" :"not_analyzed" 

                  }, 

                  "oh" : { 

                    "type" :"string", 

                    "index" :"not_analyzed" 

                  }, 

                  "responsetime" :{ 

                    "type" :"double", 

                 }, 

                  "size" : { 

                    "type" :"long", 

                    "index" :"not_analyzed" 

                  }, 

                  "status" : { 

                    "type" :"string", 

                    "index" :"not_analyzed" 

                  }, 

                  "upstreamtime" :{ 

                    "type" :"double", 

                  }, 

                  "url" : { 

                    "type" :"string", 

                    "index" :"not_analyzed" 

                  } 

                } 

              }, 

              "@source" : { 

                "type" :"string", 

                "index" :"not_analyzed" 

              }, 

              "@timestamp" : { 

                "type" :"date", 

                "format" :"dateOptionalTime" 

              }, 

              "@type" : { 

                "type" :"string", 

                "index" :"not_analyzed", 

                "store" :"no"  

              } 

            } 

          } 

        } 

     } 

   } 

5)_source字段說明

_source字段是自動生成的,以JSON格式存儲索引檔案。_source字段沒有建索引,是以不可搜尋。當執行“get”或者“search”操作時,預設會傳回_source字段。

_source字段消耗性能,是以可以屏蔽(disable)掉。例如:

{

"tweet":{

"_source":{"enabled":false}

}

}

enabale:false的情況下,預設檢索隻傳回ID。

如果覺得enabale:true時,索引的膨漲率比較大的情況下可以通過下面一些輔助設定進行優化:

Compress:是否進行壓縮,建議一般情況下将其設為true

“includes” : ["author", "name"],

“excludes” : ["sex"]

上面的includes和 excludes主要是針對預設情況下面_source一般是儲存全部Bulk過去的資料,我們可以通過include,excludes在字段級别上做出一些限索。

2、環境搭建

2.1、flume安裝

1)下載下傳apache-flume-1.6.0-bin.tar.gz

官網下載下傳位址:http://flume.apache.org/download.html

2)安裝flume

1、将軟體包拷貝到伺服器上,如/root/yiran目錄下;

2、解壓軟體包,指令:tar –xvf apache-flume-1.6.0-bin.tar.gz;

3、配置*.conf檔案,如example.conf,放在/root/yiran/apache-flume-1.6.0-bin/conf目錄下,内容示例如下:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

# Describe the sink

#a1.sinks.k1.type = logger

a1.sinks. k1.type = elasticsearch

a1.sinks. k1.hostNames =127.0.0.1:9300

a1.sinks. k1.indexName = test_index

a1.sinks. k1.indexType = test_type_1

a1.sinks. k1.clusterName = vie61_yanshi

a1.sinks. k1.batchSize = 10

a1.sinks. k1.ttl = 5d

a1.sinks. k1.serializer =org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

# Use a channel which buffers eventsin memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

# Bind the source and sink to thechannel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

4、配置flume-env.sh檔案,位于/root/yiran/apache-flume-1.6.0-bin/conf目錄下,操作如下:

mv flume-env.sh.template flume-env.sh

vi flume-env.sh

修改JAVA_HOME

JAVA_HOME=/usr/java/jdk1.7.0_71

JAVA_OPTS="-Xms512m–Xmx1024m-Dcom.sun.management.jmxremote"

5、從elasticsearch安裝包下将寫入elasticsearch需要的核心包拷貝到/root/yiran/apache-flume-1.6.0-bin/lib(當收集的日志需要寫入es時才進行第5步操作,否則不用進行第5步);

elasticsearch-1.6.2.jar;

lucene-core-4.10.4.jar;

3)啟動flume

完成上述5個步驟後,則安裝完成。然後可以啟動flume,指令如下:

cd /root/yiran/apache-flume-1.6.0-bin;

./bin/flume-ng agent --conf conf--conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console &;

說明:--conf指明配置檔案目錄名稱;--conf-file指明要運作的配置檔案;--name指明agent名稱,保持與*.conf配置檔案裡面命名一緻;-Dflume.root.logger指明日志列印級别;

2.2、Elasticsearch安裝

1) 下載下傳elasticsearch-1.6.2.tar.gz

官網下載下傳位址:https://www.elastic.co/downloads/elasticsearch;

2)安裝elasticsearch

1、将軟體包拷貝到伺服器上,如/root/yiran目錄下;

2、解壓軟體包,指令:tar –xvf elasticsearch-1.6.2.tar.gz;

3、修改/root/yiran/elasticsearch-1.6.2/config下elasticsearch.yml檔案:

如将node.name的值設定為“test-node”,表示目前這個es服務節點名字為test-node;

修改cluster.name的值為vie61_yanshi;

修改network.host為本機ip;

4、  根據需要可配置/root/yiran/elasticsearch-1.6.2/bin目錄下elasticsearch.in.sh檔案;

3)啟動elasticsearch

完成上述4個步驟後,則安裝完成。然後可以啟動elasticsearch,指令如下:

nohup ./elasticsearch &

2.3、kibana安裝

1)下載下傳kibana-4.1.10-linux-x64.tar.gz

官網下載下傳位址:https://www.elastic.co/downloads/past-releases

2)安裝kibana

1、将軟體包拷貝到伺服器上,如/root/yiran目錄下;

2、解壓軟體包,指令:tar –xvf kibana-4.1.10-linux-x64.tar.gz;

3、修改/root/yiran/kibana-4.1.10-linux-x64/config目錄下kibana.yml檔案;

修改host為本機ip;

修改elasticsearch_url為要通路的elasticsearch的位址,如:http://localhost:9200;

3)啟動kibana

完成上述3個步驟後,則安裝完成。然後可以啟動kibana,指令如下:

nohup ./kibana &

3、執行個體示範

3.1、執行個體說明

日志記錄方式用logback,示例展示了将tomcat通路日志和使用者行為日志收集到flume,然後由flume将日志資訊寫入到elasticsearch中,最後通過kibana對資料進行統計分析。架構圖如下:

flume+ Elasticsearch +kibana環境搭建及講解

3.2、flume配置

環境說明:

1)用logback記錄日志;

2)  部署兩套flume環境,本地flume環境命名為agent1(ip:192.168.27.73),遠端flume環境命名為agent3(ip:192.168.77.113);

3)  Agent1收集日志資料,然後将資料通過Avro方式傳遞到agent3,最後agent3将日志資料寫入到elasticsearch;

4)  Source1收集使用者行為日志,source2收集tomcat通路日志;

5)  安裝步驟見“2.1節flume安裝”,*.conf檔案見下面配置說明。

配置說明:

1)  agent3配置檔案命名為sink_to_es.conf,内容如下:

#定義agent各元件名稱

agent1.sources = source1 source2

agent1.sinks = sink1 sink2

agent1.channels = channel1 channel2

#定義source1配置資訊,監聽44444端口

agent1.sources.source1.type = avro

agent1.sources.source1.bind =192.168.77.113

agent1.sources.source1.port = 44444

#定義source2配置資訊,監聽33333端口

agent1.sources.source2.type = avro

agent1.sources.source2.bind =192.168.77.113

agent1.sources.source2.port = 33333

#定義sink1的配置資訊,将資訊寫入到es中,索引名為test_index,索引類型為test_type_1,叢集名為vie61_yanshi

agent1.sinks.sink1.type =elasticsearch

agent1.sinks.sink1.hostNames =127.0.0.1:9300

agent1.sinks.sink1.indexName =test_index

agent1.sinks.sink1.indexType =test_type_1

agent1.sinks.sink1.clusterName =vie61_yanshi

agent1.sinks.sink1.batchSize = 10

agent1.sinks.sink1.ttl = 5d

agent1.sinks.sink1.serializer =org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

#定義sink2的配置資訊,将資訊寫入到es中,索引名為test_index_tomcat,索引類型為test_type_1,叢集名為vie61_yanshi

agent1.sinks.sink2.type =elasticsearch

agent1.sinks.sink2.hostNames =127.0.0.1:9300

agent1.sinks.sink2.indexName =test_index_tomcat

agent1.sinks.sink2.indexType =test_type_1

agent1.sinks.sink2.clusterName =vie61_yanshi

agent1.sinks.sink2.batchSize = 10

agent1.sinks.sink2.ttl = 5d

agent1.sinks.sink2.serializer =org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

#定義channel1的配置資訊,資訊存儲在緩存中

agent1.channels.channel1.type =memory

agent1.channels.channel1.capacity =1000

agent1.channels.channel1.transactionCapacity= 100

#定義channel2的配置資訊,資訊存儲在緩存中

agent1.channels.channel2.type =memory

agent1.channels.channel2.capacity =1000

agent1.channels.channel2.transactionCapacity= 100

#将source1 and sink1 綁定到 channel1

agent1.sources.source1.channels =channel1

agent1.sinks.sink1.channel = channel1

#将source2 and sink2 綁定到 channel2

agent1.sources.source2.channels =channel2

agent1.sinks.sink2.channel = channel2

2)  agent1配置檔案命名為sink_to_agent.conf,提供兩種實作方式,任選其中一種即可。

第一種:通過讀取生成的日志檔案,将其中的内容收集到flume。假設tomcat通路日志路徑為/root/yiran/apache-tomcat-8.0.36/logs/tomcat,使用者記錄檔路徑為/root/yiran/apache-tomcat-8.0.36/logs/operator。内容如下:

#定義agent各元件名稱

agent1.sources= source1 source2

agent1.sinks= sink1 sink2

agent1.channels= channel1 channel2

#定義source1配置資訊,讀取operator檔案夾下日志内容,隻接受“^@@.*@@$”格式的消息,其他消息過濾掉

agent1.sources.source1.type= org.apache.flume.source.SpoolDirectoryTailFileSource

agent1.sources.source1.spoolDir= /root/yiran/apache-tomcat-8.0.36/logs/operator

agent1.sources.source1.fileSuffix= .COMPLETED

agent1.sources.source1.deletePolicy= never

agent1.sources.source1.ignorePattern= ^$

agent1.sources.source1.targetPattern= .*(\\d){4}-(\\d){2}-(\\d){2}.*

agent1.sources.source1.targetFilename= yyyy-MM-dd

agent1.sources.source1.trackerDir= .flumespooltail

agent1.sources.source1.consumeOrder= oldest

agent1.sources.source1.batchSize= 100

agent1.sources.source1.inputCharset= UTF-8

agent1.sources.source1.decodeErrorPolicy= REPLACE

agent1.sources.source1.deserializer= LINE

agent1.sources.source1.interceptors= i1 i2

agent1.sources.source1.interceptors.i1.type= regex_filter

agent1.sources.source1.interceptors.i1.regex= ^@@.*@@$

agent1.sources.source1.interceptors.i2.type= org.apache.flume.interceptor.JsonParseInterceptor$Builder

[email protected]@

#定義source2配置資訊,讀取tomcat檔案夾下日志内容,隻接受“^@@.*@@$”格式的消息,其他消息過濾掉

agent1.sources.source2.type= org.apache.flume.source.SpoolDirectoryTailFileSource

agent1.sources.source2.spoolDir= /root/yiran/apache-tomcat-8.0.36/logs/tomcat

agent1.sources.source2.fileSuffix= .COMPLETED

agent1.sources.source2.deletePolicy= never

agent1.sources.source2.ignorePattern= ^$

agent1.sources.source2.targetPattern= .*(\\d){4}-(\\d){2}-(\\d){2}.*

agent1.sources.source2.targetFilename= yyyy-MM-dd

agent1.sources.source2.trackerDir= .flumespooltail

agent1.sources.source2.consumeOrder= oldest

agent1.sources.source2.batchSize= 100

agent1.sources.source2.inputCharset= UTF-8

agent1.sources.source2.decodeErrorPolicy= REPLACE

agent1.sources.source2.deserializer= LINE

agent1.sources.source2.interceptors= i3 i4

agent1.sources.source2.interceptors.i3.type= regex_filter

agent1.sources.source2.interceptors.i3.regex= ^@@.*@@$

agent1.sources.source2.interceptors.i4.type= org.apache.flume.interceptor.JsonParseInterceptor$Builder

[email protected]@

#定義sink1的配置資訊,将資訊寫入到agent3中的source1中

agent1.sinks.sink1.type= avro

agent1.sinks.sink1.hostname= 192.168.77.113

agent1.sinks.sink1.port= 44444

#定義sink2的配置資訊,将資訊寫入到agent3中的source2中

agent1.sinks.sink2.type= avro

agent1.sinks.sink2.hostname= 192.168.77.113

agent1.sinks.sink2.port= 33333

#定義channel1的配置資訊,資訊存儲在緩存中

agent1.channels.channel1.type= memory

agent1.channels.channel1.capacity= 1000

agent1.channels.channel1.transactionCapacity= 100

#定義channel2的配置資訊,資訊存儲在緩存中

agent1.channels.channel2.type= memory

agent1.channels.channel2.capacity= 1000

agent1.channels.channel2.transactionCapacity= 100

#将source1 andsink1 綁定到 channel1

agent1.sources.source1.channels= channel1

agent1.sinks.sink1.channel= channel1

#将source2 andsink2 綁定到 channel2

agent1.sources.source2.channels= channel2

agent1.sinks.sink2.channel= channel2

說明:

1)agent1中定義了兩個攔截器,第一個類型為“regex_filter”,是系統自帶的攔截器,作用是隻接受“regex”正規表達式定義格式的消息,如“^@@.*@@$”。第二個類型為org.apache.flume.interceptor.JsonParseInterceptor$Builder,是一個自定義的攔截器,作用是解析接收到的json格式的内容,其内容分割标志預設為:@@,可以根據格式自行定義。實作如下,詳細内容見(flume-ng-core-extend工程):

public classJsonParseInterceptor implements Interceptor {

     private staticfinalLogger logger= LoggerFactory.getLogger(JsonParseInterceptor.class);

     private finalString splitFlag;

     private JsonParseInterceptor(StringsplitFlag) {

        this.splitFlag=splitFlag;

     }

     @Override

     public voidinitialize() {

     }

     @Override

     public Event intercept(Eventevent) {

      Stringbody= newString(event.getBody(),Charsets.UTF_8);

       String[] bodyArray = body.split(splitFlag);

       logger.info("body content: " +body +"  length:" +bodyArray.length);

       if(bodyArray !=null&&bodyArray.length > 1){

          Stringcontent= bodyArray[1];  //取得body内容,為json格式資料

          try{

             Map<String,String> jsonMap=(Map<String, String>) JSON.parse(content);

             event.setHeaders(jsonMap);

             event.setBody(null);

             logger.info("body content: "+jsonMap);

          }catch(Exceptione){

             logger.error("Could not parse json data, ",e);

             return null;

          }

       }else{

          return null;

       }

       returnevent;

     }

     @Override

     public List<Event> intercept(List<Event>events) {

        List<Event> out = Lists.newArrayList();

          for (Eventevent :events) {

            Event outEvent = intercept(event);

            if (outEvent !=null){

               out.add(outEvent);

            }

          }

          returnout;

     }

     @Override

     public voidclose() {

       // no-op

     }

     public staticclassBuilder implementsInterceptor.Builder {

      private StringsplitFlag =null;

      @Override

      public void configure(Context context) {

         splitFlag = context.getString("splitFlag",Constants.DEFAULT_SPLITFLAG);

      }

      @Override

      public Interceptor build() {

         // TODO Auto-generated method stub

         return new JsonParseInterceptor(splitFlag);

      }

     }

     public staticclassConstants {

       public static String DEFAULT_SPLITFLAG ="@@";

     }

}

3)  agent1中使用了自定義的Source(SpoolDirectoryTailFileSource,詳見flume-ng-core-extend工程),此source将exec和spoolingsource結合起來,可以實作如下效果:

l  監控指定檔案夾下的日志檔案;

l  實時讀取日志檔案的内容;

l  實作斷點續傳功能,即flume發生故障後,重新開機會從故障前的位置繼續讀取日志内容;

第二種:擴充logback接口,通過Avroclient将日志資訊傳遞到flume。内容如下:

#定義agent各元件名稱

agent1.sources = source1 source2

agent1.sinks = sink1 sink2

agent1.channels = channel1 channel2

#定義source1配置資訊,監聽44444端口,隻接受“^@@.*@@$”格式的消息,其他消息過濾掉

agent1.sources.source1.type = avro

agent1.sources.source1.bind =192.168.77.113

agent1.sources.source1.port = 44444

agent1.sources.source1.interceptors =i1 i2

agent1.sources.source1.interceptors.i1.type= regex_filter

agent1.sources.source1.interceptors.i1.regex= ^@@.*@@$

agent1.sources.source1.interceptors.i2.type= org.apache.flume.interceptor.JsonParseInterceptor$Builder

#定義source2配置資訊,監聽33333端口,隻接受“^@@.*@@$”格式的消息,其他消息過濾掉

agent1.sources.source2.type = avro

agent1.sources.source2.bind =192.168.77.113

agent1.sources.source2.port = 33333

agent1.sources.source2.interceptors =i3 i4

agent1.sources.source2.interceptors.i3.type= regex_filter

agent1.sources.source2.interceptors.i3.regex= ^@@.*@@$

agent1.sources.source2.interceptors.i4.type= org.apache.flume.interceptor.JsonParseInterceptor$Builder

#定義sink1的配置資訊,通過Avro方式将資訊寫入到agent3的source1中

agent1.sinks.sink1.type = avro

agent1.sinks.sink1.hostname =192.168.77.113

agent1.sinks.sink1.port = 44444

#定義sink2的配置資訊,通過Avro方式将資訊寫入到agent3的source2中

agent1.sinks.sink2.type = avro

agent1.sinks.sink2.hostname =192.168.77.113

agent1.sinks.sink2.port = 33333

#定義channel1的配置資訊,資訊存儲在緩存中

agent1.channels.channel1.type =memory

agent1.channels.channel1.capacity =1000

agent1.channels.channel1.transactionCapacity= 100

#定義channel2的配置資訊,資訊存儲在緩存中

agent1.channels.channel2.type =memory

agent1.channels.channel2.capacity =1000

agent1.channels.channel2.transactionCapacity= 100

#将source1 and sink1 綁定到 channel1

agent1.sources.source1.channels =channel1

agent1.sinks.sink1.channel = channel1

#将source2 and sink2 綁定到 channel2

agent1.sources.source2.channels =channel2

agent1.sinks.sink2.channel = channel2

說明:

1、攔截器的定義見第一種方式裡面說明;

2、此種方式需要擴充logback,核心類如下,詳細内容見(logback-monitor工程):

public classFlumeRpcAppender<E> extends OutputStreamAppender<E> {

   protected Stringconfigfile;

   public static final String CLIENT_APPNAME="client.appname";

   public static final String POOL_SIZE="pool.size";

   private final Properties props = new Properties();

   RpcClientclient;

   @Override

   public void start() {

      genProps();

      client = RpcClientFactory.getInstance(props);

      Map<String,String>header= configHeader(props);

      int poolSize = 10;

      StringstrPoolSize= props.getProperty(POOL_SIZE);

      if(strPoolSize !=null && !strPoolSize.trim().isEmpty() ){

         poolSize = Integer.parseInt(strPoolSize.trim());

      }

      ExecutorServiceexecutorService= Executors.newFixedThreadPool(poolSize);

      setOutputStream(new FlumeRpcOutputStream(client,header,executorService));

      super.start();

   }

   private void genProps(){

      if(this.configfile!=null){

         ClassLoadercl =FlumeRpcAppender.class.getClassLoader();

         try {

            props.load(cl.getResourceAsStream(getConfigfile()));

         }catch(IOExceptione) {

            e.printStackTrace();

         }

      }else{

         throw new RuntimeException("configfile is null.");

      }

   }

   privateMap<String,String> configHeader(Propertiesprops){

      Map<String,String>header=newHashMap<String,String>();

      header.put(CLIENT_APPNAME,(String)props.remove(CLIENT_APPNAME));

      return header;

   }

   @Override

   public void stop() {

      client.close();

      super.stop();

   }

   public String getConfigfile(){

      return configfile;

   }

   public void setConfigfile(String configfile) {

      this.configfile =configfile;

   }

}

3、使用者行為日志會通過Avro的方式發送到source1的44444端口,內建使用者行為日志需要做如下配置:

1)在web應用中引入logback-monitor工程,通過maven方式或将logback-monitor打包為jar放入web應用lib目錄下;

2)将logback.xml和logback-monitor.properties檔案放到web應用class目錄。具體内容如下:

logback.xml内容:

<?xml version="1.0"encoding="UTF-8"?>

<configuration>

         <!--always a good activate OnConsoleStatusListener -->

         <statusListenerclass="ch.qos.logback.core.status.OnConsoleStatusListener" />

         <appendername="STDOUT"class="ch.qos.logback.core.ConsoleAppender">

                   <encoder>

                            <pattern>%d{HH:mm:ss.SSS}[%thread] %-5level %logger{56} - %msg%n</pattern>

                   </encoder>

         </appender>

         <appendername="FILE"

                   class="ch.qos.logback.core.rolling.RollingFileAppender">

                   <rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">

                            <fileNamePattern>/root/yiran/apache-tomcat-8.0.36/logs/operator/operator.%d{yyyy-MM-dd}.log</fileNamePattern>

                            <maxHistory>30</maxHistory>

                   </rollingPolicy>

                   <encoder>

                            <pattern>%msg</pattern>

                   </encoder>

         </appender>

                   <!--flume收集日志配置 -->

         <appendername="flumeRpcAppender"

                   class="com.logback.core.FlumeRpcAppender">

                   <encoder>

                            <pattern>%msg</pattern>

                            <charset>UTF-8</charset>

                   </encoder>

                   <configfile>logback-monitor.properties</configfile>

         </appender>

         <rootlevel="INFO"> 

              <appender-ref ref="FILE"/>

              <appender-refref="flumeRpcAppender"/> 

          </root> 

</configuration>

說明:

1)msg格式示例:@@{"userName":"王五", "sex": "女","age": "23","createDate":"2016-07-28T23:23:32.999+0800"}@@;

2)應用程式在寫入日志時要按照msg消息示例格式進行日志記錄。

logback-monitor.properties内容:

client.appname=something

client.type=DEFAULT_FAILOVER

hosts=h1

hosts.h1=127.0.0.1:44444

pool.size=20

說明:如有多個flume接收端,可以如下配置。通過配置client.type可以實作負載均衡(default_loadbalance)或失敗重發(default_failover)。

host2=h1 h2 h3

hosts.h1=127.0.0.1:44444

hosts.h2=127.0.0.1:44444

hosts.h3=127.0.0.1:44444

4、tomcat通路日志會通過Avro的方式發送到source2的33333端口,內建tomcat通路日志需要做如下配置:

1)  配置logback-monitor.properties檔案,将其放入到D:\server2\apache-tomcat-8.0.36\lib目錄下,内容如下:

client.appname=something

client.type=DEFAULT_FAILOVER

hosts=h1

hosts.h1=127.0.0.1:33333

pool.size=5

2)  配置logback-access.xml檔案,将其放入到D:\server2\apache-tomcat-8.0.36\conf目錄下,内容如下:

<?xml version="1.0"encoding="UTF-8"?>

<configuration>

         <!--always a good activate OnConsoleStatusListener -->

         <statusListenerclass="ch.qos.logback.core.status.OnConsoleStatusListener" />

         <appendername="STDOUT" class="ch.qos.logback.core.ConsoleAppender">

         <encoder>

          <pattern>@@{"createTime":"%t{yyyy-MM-ddHH:mm:ss.SSSZ}", "remoteIP":"%h","status":"%s", "contentLength":"%b","duration":"%D", "requestURL":"%r",

"logType":"tomcatLog"}@@</pattern>

         </encoder>

 </appender>

                   <!--flume收集日志配置 -->

         <appendername="flumeRpcAppender"

                   class="com.logback.core.FlumeRpcAppender">

                   <encoder>

                            <pattern>@@{"createTime":"%t{yyyy-MM-dd'T'HH:mm:ss.SSSZ}","remoteIP":"%h", "status":"%s","contentLength":"%b", "duration":"%D","requestURL":"%r","logType":"tomcatLog"}@@</pattern>

                            <charset>UTF-8</charset>

                   </encoder>

                   <configfile>logback-monitor.properties</configfile>

         </appender>

         <appendername="FILE"

                   class="ch.qos.logback.core.rolling.RollingFileAppender">

                   <rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">

                            <fileNamePattern>/root/yiran/apache-tomcat-8.0.36/logs/tomcat/tomcat_access.%d{yyyy-MM-dd}.log</fileNamePattern>

                            <maxHistory>30</maxHistory>

                   </rollingPolicy>

                   <encoder>

                            <pattern>@@{"createTime":"%t{yyyy-MM-dd'T'HH:mm:ss.SSSZ}","remoteIP":"%h", "status":"%s","contentLength":"%b", "duration":"%D","requestURL":"%r","logType":"tomcatLog"}@@</pattern>

                   </encoder>

         </appender>

          <appender-refref="FILE"/> 

          <appender-ref ref="flumeRpcAppender"/>           

</configuration>

3)  将[\部署配置檔案\lib]目錄下的jar包複制到D:\server2\apache-tomcat-8.0.36\lib目錄下:

4)修改tomcat中server.xml下最後一行,添加如下配置:

<ValveclassName="ch.qos.logback.access.tomcat.LogbackValve"quiet="true"/>

通過上述步驟,則完成flume配置,然後啟動agent1和agent3,指令如下:

啟動agent1:

cd C:\Users\qxb-810\Desktop\flume_study\apache-flume-1.6.0-bin;

bin\flume-ng.cmd agent --conf conf--conf-file conf\sink_to_agent.conf --name agent1 &

啟動agent3:

cd /root/yiran/apache-flume-1.6.0-bin;

./bin/flume-ng agent --conf conf --conf-fileconf/sink_to_es.conf --name agent1 &

3.3、Elasticsearch配置

環境說明:

1)  根據“2.2節Elasticsearch安裝”說明完成安裝;

2)  用crul方式建立索引模闆,

使用者記錄檔模闆示例如下:

curl -XPUTlocalhost:9200/_template/template_flume_log -d ' 

   "template" : "test_index-*", 

   "settings" : { 

        "number_of_shards" : 5 

   }, 

   "mappings" : { 

        "test_type_1" : { 

            "_source" :{"enabled" : true },

         "properties": {

                   "createDate": {

                     "type" :   "date",

                     "date_format":"date_hour_minute_second_millis"

                   },

                   "sex": {

                     "type" :   "string"

                   },

                   "userName": {

                     "type" :   "string"

                   },

                   "age": {

                     "type" :   "long"

                   }

           }

        } 

   } 

'

Tomcat通路日志模闆如下:

curl -XPUTlocalhost:9200/_template/template_flume_tomcat_log -d ' 

   "template" : "test_index_tomcat-*", 

   "settings" : { 

        "number_of_shards" : 5 

   }, 

   "mappings" : { 

        "test_type_1" : { 

            "_source" : {"enabled": true },

         "properties": {

                   "createTime": {

                     "type" :   "date",

                     "date_format":"date_hour_minute_second_millis"

                   },

                   "remoteIP": {

                     "type" :   "string"

                   },

                   "status": {

                     "type" :   "long"

                   },

                   "contentLength": {

                     "type" :   "string"

                   },

                   "duration": {

                     "type" :   "long"

                   },

                   "requestURL": {

                     "type" :   "string"

                   },

                   "logType": {

                     "type" :   "string"

                   }

           }

        } 

   } 

'

3.4、Kibana配置

環境說明:

1)根據“2.3節Kibana安裝”完成Kibana安裝;

2)通路http://192.168.77.113:5601/,完成kibana索引配置;

4、常見問題說明

1、elasticsearch啟動不成功

原因:很大可能是因為jdk版本不對

解決:更新jdk版本

2、flume向elasticsearch寫資料,flume啟動報:No nodeavailable and cluster/nodes/info] request_id [0] timed out

原因:1、可能是節點配置資訊有誤,檢查節點配置資訊,确認沒有問題重新開機es;

           2、檢查elasticsearch的jdk版本與應用的jdk版本是否一緻;

3、tail 斷點續傳的問題

可以在 tail 傳的時候記錄行号,下次再傳的時候,取上次記錄的位置開始傳輸,類似:

agent1.sources.avro-source1.command =/usr/local/bin/tail  -n +$(tail -n1/home/storm/tmp/n) --max-unchanged-stats=600 -F  /home/storm/tmp/id.txt | awk'ARNGIND==1{i=$0;next}{i++; if($0~/檔案已截斷/)i=0; print i>> "/home/storm/tmp/n";print $1"---"i}' /home/storm/tmp/n-

需要注意如下幾點:

1)檔案被 rotation 的時候,需要同步更新你的斷點記錄“指針”;

2)需要按檔案名來追蹤檔案;

3)flume 挂掉後需要累加斷點續傳“指針”;

4)flume 挂掉後,如果恰好檔案被rotation,那麼會有丢資料的風險

   隻能監控盡快拉起或者加邏輯判斷檔案大小重置指針;

5)tail 注意你的版本,請更新coreutils 包到最新。

4、flume 報錯:java.lang.OutOfMemoryError:GC overhead limit exceeded

解決:Flume 啟動時的最大堆記憶體大小預設是 20M,線上環境很容易 OOM,是以需要你在 flume-env.sh 中添加 JVM 啟動參數:

JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k-Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

然後在啟動 agent 的時候一定要帶上 -c conf 選項,否則 flume-env.sh 裡配置的環境變量不會被加載生效。

5、關于elasticsearch和kibana的時區和日期問題

原因:elasticsearch原生支援date類型,json格式通過字元來表示date類型。date類型是包含時區資訊的,如果我們沒有在json代表日期的字元串中顯式指定時區,對es來說沒什麼問題,但是如果通過kibana顯示es裡的資料時,就會出現問題,資料的時間會晚8個小時。因為kibana從es裡讀取的date類型資料,沒有時區資訊,kibana會預設當作0時區來解析,但是kibana在通過浏覽器展示的時候,會通過js擷取目前用戶端機器所在的時區,也就是東八區,是以kibana會把從es得到的日期資料減去8小時。這裡就會導緻kibana經常遇到的“資料時間延遲8小時”的問題。

解決:在往es送出日期資料的時候,直接送出帶有時區資訊的日期字元串,如:“2016-07-15T12:58:17.136+0800”。

5、參考文檔

1)Kibana使用參考文檔:http://kibana.logstash.es/content/kibana/index.html

2)Elasticsearch使用參考文檔:http://es.xiaoleilu.com/030_Data/45_Partial_update.html

3)flume使用者使用參考文檔:http://flume.apache.org/FlumeUserGuide.html

4)flume開發者參考文檔:http://flume.apache.org/FlumeDeveloperGuide.html

繼續閱讀