1、軟體介紹
1.1、flume
1.1.1、flume介紹
1)flume概念
1、flume是一個分布式的日志收集系統,具有高可靠、高可用、事務管理、失敗重新開機等功能。資料處理速度快,完全可以用于生産環境;
2、flume的核心是agent。agent是一個java程序,運作在日志收集端,通過agent接收日志,然後暫存起來,再發送到目的地;
3、agent裡面包含3個核心元件:source、channel、sink,介紹如下:
source元件是專用于收集日志的,可以處理各種類型各種格式的日志資料,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義等。source元件把資料收集來以後,臨時存放在channel中。
channel 元件是在agent中專用于臨時存儲資料的,可以存放在memory、jdbc、file、自定義。channel中的資料隻有在sink發送成功之後才會被删除。
sink元件是用于把資料發送到目的地的元件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、Elasticsearch、自定義等。
4、在整個資料傳輸過程中,流動的是事件event。事件event是Flume的基本資料機關,它攜帶日志資料(位元組數組形式)并且攜帶有頭資訊,這些Event由Agent外部的Source生成;
5、flume事務保證是在event級别;
6、flume可以支援多級flume的agent ,支援扇入(fan-in)、扇出(fan-out);
2)flume資料流模型
Flume以agent為最小的獨立運作機關。一個agent就是一個JVM。單agent由Source、Sink和Channel三大元件構成。
Flume支援使用者建立多級流,也就是說,多個agent可以協同工作。常見模型如下:
1、單agent模型:
2、多agent模型:
圖(一)
圖(二)
圖(三)
3)flume讀取檔案方式
對于直接讀取檔案Source, 主要有兩種方式:
1、Exec source
可通過寫Unix command的方式組織資料,最常用的就是tail -F [file]。可以實作實時傳輸,但在flume不運作和腳本錯誤時,會丢資料,也不支援斷點續傳功能。因為沒有記錄上次檔案讀到的位置,進而沒辦法知道,下次再讀時,從什麼地方開始讀。特别是在日志檔案一直在增加的時候。flume的source挂了。等flume的source再次開啟的這段時間内,增加的日志内容,就沒辦法被source讀取到了。不過flume有一個execStream的擴充,可以自己寫一個監控日志增加情況,把增加的日志,通過自己寫的工具把增加的内容,傳送給flume的node。再傳送給sink的node。要是能在tail類的source中能支援,在node挂掉這段時間的内容,等下次node開啟後在繼續傳送,那就更完美了。
2、Spooling Directory Source
SpoolSource是監測配置的目錄下新增的檔案,并将檔案中的資料讀取出來,可實作準實時。需要注意兩點:1、拷貝到spool目錄下的檔案不可以再打開編輯。2、spool目錄下不可包含相應的子目錄。在實際使用的過程中,可以結合log4j使用,使用log4j的時候,将log4j的檔案分割機制設為1分鐘一次,将檔案拷貝到spool的監控目錄。log4j有一個TimeRolling的插件,可以把log4j分割的檔案到spool目錄。基本實作了實時的監控。Flume在傳完檔案之後,将會修改檔案的字尾,變為.COMPLETED(字尾也可以在配置檔案中靈活指定)。
Exec與Spooling Directory比較:
ExecSource可以實作對日志的實時收集,但是存在Flume不運作或者指令執行出錯時,将無法收集到日志資料,無法保證日志資料的完整性。SpoolSource雖然無法實作實時的收集資料,但是可以使用以分鐘的方式分割檔案,趨近于實時。如果應用無法實作以分鐘切割日志檔案的話,可以兩種收集方式結合使用。
4)flume channel介紹
Channel有多種方式:有MemoryChannel,JDBC Channel, MemoryRecoverChannel, FileChannel。MemoryChannel可以實作高速的吞吐,但是無法保證資料的完整性。MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。FileChannel保證資料的完整性與一緻性。在具體配置FileChannel時,建議FileChannel設定的目錄和程式日志檔案儲存的目錄設成不同的磁盤,以便提高效率。
5)flume特性
1、高可靠性
作為生産環境運作的軟體,高可靠性是必須的。從單agent來看,Flume使用基于事務的資料傳遞方式來保證事件傳遞的可靠性。Source和Sink分别被封裝進一個事務。事件被存放在Channel中直到該事件被處理,Channel中的事件才會被移除。這是Flume提供的點到點的可靠機制。
從多級流來看,前一個agent的sink和後一個agent的source同樣有它們的事務來保障資料的可靠性。
事務流程圖如下:
2、可恢複性
還是靠Channel。推薦使用FileChannel,将事件持久化在本地檔案系統裡,但性能相對MemoryChannel較差。
1.1.2、flume配置說明
1、source常見配置(詳細配置見官方文檔)
1)Avro source
flume可以監聽avro-client發送過來的内容,然後進行處理。
配置檔案内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2)Spool source,
Spool監測配置的目錄下新增的檔案,并将檔案中的資料讀取出來。需要注意兩點:
1、拷貝到spool目錄下的檔案不可以再打開編輯;
2、spool目錄下不可包含相應的子目錄;
配置檔案内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home /flume-1.6.0-bin/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3)Exec source
EXEC執行一個給定的指令獲得輸出的源,如使用tail指令。
配置檔案内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/flume-1.6.0-bin/log_exec_tail
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4)Syslogtcp source
Syslogtcp監聽TCP的端口做為資料源
配置檔案内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、sink常見配置(詳細配置見官方文檔)
1)Hadoop sink
配置檔案内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://m1:9000/user/flume/syslogtcp
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2)File Roll Sink
配置檔案内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5555
a1.sources.r1.host = localhost
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/flume-1.6.0-bin/logs
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3)elasticsearch Sink
配置檔案内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks. k1.type = elasticsearch
a1.sinks. k1.hostNames =127.0.0.1:9300
a1.sinks. k1.indexName = test_index
a1.sinks. k1.indexType = test_type_1
a1.sinks. k1.clusterName =vie61_yanshi
a1.sinks. k1.batchSize = 10
a1.sinks. k1.ttl = 5d
a1.sinks. k1.serializer =org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、Flume Sink Processors常見配置(詳細配置見官方文檔)
1)Failover SinkProcessor
将多個sink歸為一個組,先向組中優先級高的sink發送消息,如果發送失敗,則會向組中優先級低的sink發送消息。
配置檔案内容如下:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type =failover
a1.sinkgroups.g1.processor.priority.k1= 5
a1.sinkgroups.g1.processor.priority.k2= 10
a1.sinkgroups.g1.processor.maxpenalty= 10000
2)Load balancing SinkProcessor
将多個sink歸為一個組,向組中sink負載均衡的發送消息。負載均衡的方式分為:循環周遊(round_robin)、随機(random)和自定義實作。
配置檔案内容如下:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type =load_balance
a1.sinkgroups.g1.processor.backoff =true
a1.sinkgroups.g1.processor.selector =random
4、channel常見配置(詳細配置見官方文檔)
1)Memory Channel
此種方式将event内容放在緩存中。優點是效率高,缺點是放在channel中的資料丢失後不可恢複。
配置檔案内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2)File Channel
此種方式将event内容放在檔案系統中。優點是資料達到channel後即使flume挂了,在flume重新開機後資料依然可以恢複發送。缺點是效率較低。
配置檔案内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers eventsin memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/mnt/flume/checkpoint
a1.channels.c1.dataDirs =/mnt/flume/data
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5、Fan Out常見配置(詳細配置見官方文檔)
Fan Out(扇出)分為兩種方式:replicating(複制)和multiplexing(複用)。
1)replicating(複制)
複制會将一個source的event内容發送給所有配置的channel。
配置檔案内容如下:
# List the sources, sinks andchannels for the agent
<Agent>.sources =<Source1>
<Agent>.sinks = <Sink1><Sink2>
<Agent>.channels =<Channel1> <Channel2>
# set list of channels for source(separated by space)
<Agent>.sources.<Source1>.channels= <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel= <Channel1>
<Agent>.sinks.<Sink2>.channel= <Channel2>
<Agent>.sources.<Source1>.selector.type= replicating
2)multiplexing(複用)
複用會将一個source的event内容發送給比對到結果的一個子集channels。
配置檔案内容如下:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type= multiplexing
<Agent>.sources.<Source1>.selector.header= <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1>= <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2>= <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3>= <Channel2>
#...
<Agent>.sources.<Source1>.selector.default= <Channel2>
具體示例:
# list the sources, sinks andchannels in the agent
agent_foo.sources =avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1avro-forward-sink2
agent_foo.channels = mem-channel-1file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels= mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel= mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel= file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type= multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header= State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA= mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ= file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY= mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default= mem-channel-1
1.1.3、flume擴充說明
Flume對source、channel和sink提供了多種實作方式,但因為需求的複雜性,可能這些方式滿足不了我們的需求。好在flume提供了靈活的擴充方式,我們通過實作flume提供的接口,就可以友善的實作自定義的source、sink等。具體實作請參考官方源碼。
1、 source擴充
public class MySource extendsAbstractSource implements Configurable, PollableSource {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp","defaultValue");
// Process the myProp value (e.g. validation, convert to another type,...)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external client
}
@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
try {
// This try clause includes whatever Channel/Event operations you wantto do
// Receive new data
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e);
status = Status.READY;
} catch (Throwable t) {
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
2、 sink擴充
public class MySink extendsAbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp","defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
return status;
}
}
3、 Interceptor擴充
public class MyInterceptor implementsInterceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
}
@Override
public List<Event>intercept(List<Event> events) {
}
@Override
public void close() {
// no-op
}
public static class Builder implementsInterceptor.Builder {
@Override
publicvoid configure(Context context) {
}
@Override
publicInterceptor build() {
returnnew MyInterceptor();
}
}
}
4、 RPC clients – Avro
通過Avro client可以實作将日志資訊直接發送到flume,前提是flume的source端收集日志方式需要配置為Avro方式。示例如下:
public class MyApp {
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("host.example.org", 41414);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of theabove line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of theabove line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
1.2、Elasticsearch
1.2.1、Elasticsearch介紹
1)Elasticsearch介紹
Elasticsearch是一個實時的分布式搜尋和分析引擎。它可以幫助你用前所未有的速度去處理大規模資料。它可以用于全文搜尋,結構化搜尋以及分析,當然你也可以将這三者進行組合。
Elasticsearch是一個建立在全文搜尋引擎Apache Lucene™基礎上的搜尋引擎,可以說Lucene是當今最先進,最高效的全功能開源搜尋引擎架構。
Elasticsearch使用Lucene作為内部引擎,但是在使用它做全文搜尋時,隻需要使用統一開發好的API即可,而不需要了解其背後複雜的Lucene的運作原理。
當然Elasticsearch并不僅僅是Lucene這麼簡單,它不但包括了全文搜尋功能,還可以進行以下工作:
1、分布式實時檔案存儲,并将每一個字段都編入索引,使其可以被搜尋;
2、實時分析的分布式搜尋引擎;
3、可以擴充到上百台伺服器,處理PB級别的結構化或非結構化資料;
這麼多的功能被內建到一台伺服器上,你可以輕松地通過用戶端或者任何你喜歡的程式語言與ES的RESTful API進行交流。
2)Elasticsearch優點
1、 Elasticsearch是分布式的。不需要其他元件,分發是實時的,被叫做”Push replication”;
2、Elasticsearch完全支援 Apache Lucene 的接近實時的搜尋;
3、處理多租戶(multitenancy)不需要特殊配置,而Solr則需要更多的進階設定;
4、Elasticsearch采用 Gateway 的概念,使得完備份更加簡單;
5、各節點組成對等的網絡結構,某些節點出現故障時會自動配置設定其他節點代替其進行工作。
3)Elasticsearch使用案例
1、維基百科使用Elasticsearch來進行全文搜做并高亮顯示關鍵詞,以及提供search-as-you-type、did-you-mean等搜尋建議功能;
2、英國衛報使用Elasticsearch來處理訪客日志,以便能将公衆對不同文章的反應實時地回報給各位編輯;
3、StackOverflow将全文搜尋與地理位置和相關資訊進行結合,以提供more-like-this相關問題的展現;
4、GitHub使用Elasticsearch來檢索超過1300億行代碼;
5、每天,GoldmanSachs使用它來處理5TB資料的索引,還有很多投行使用它來分析股票市場的變動。
但是Elasticsearch并不隻是面向大型企業的,它還幫助了很多類似DataDog以及Klout的創業公司進行了功能的擴充。
1.2.2、Elasticsearch配置說明
1、索引模闆(Index templates)
索引可使用預定義的模闆進行建立,這個模闆稱作Indextemplates。模闆設定包括settings和mappings,通過模式比對的方式使得多個索引重用一個模闆。
1)定義模闆:
curl -XPUT localhost:9200/_template/template_1 -d '
{
"template" :"te*",
"settings" : {
"number_of_shards" :5
},
"mappings" : {
"type1" : {
"_source" :{"enabled" : false },
"properties": { }
}
}
}
'
說明:上述定義的模闆template_1将對用te開頭的新索引都是有效。
模闆中也可以包含别别名的定義,如下:
curl -XPUT localhost:9200/_template/template_1 -d '
{
"template" :"te*",
"settings" : {
"number_of_shards" :1
},
"aliases" : {
"alias1" : {},
"alias2" : {
"filter" : {
"term":{"user" : "kimchy" }
},
"routing":"kimchy"
},
"{index}-alias" : {}
}
}
2)删除模闆
curl -XDELETElocalhost:9200/_template/template_1
3)檢視模闆
curl -XGET localhost:9200/_template/template_1
4)模闆配置檔案
除了以上方式,索引模闆也可以在檔案中進行配置。索引模闆的配置檔案需要在每個主節點的config目錄下,目錄結構為:config/templates/template_1.json,
template_1.json的示例如下:
{
"template-logstash" : {
"template" :"logstash*",
"settings" : {
"index.number_of_shards" :5,
"number_of_replicas" :1,
"index" : {
"store" : {
"compress" :{
"stored" : true,
"tv": true
}
}
}
},
"mappings" : {
"_default_" : {
"properties" : {
"dynamic" :"true",
},
},
"loadbalancer" : {
"_source" : {
"compress" : true,
},
"_ttl" : {
"enabled" : true,
"default" :"10d"
},
"_all" : {
"enabled" : false
},
"properties" : {
"@fields" : {
"dynamic" :"true",
"properties" : {
"client" : {
"type" :"string",
"index" :"not_analyzed"
},
"domain" : {
"type" :"string",
"index" :"not_analyzed"
},
"oh" : {
"type" :"string",
"index" :"not_analyzed"
},
"responsetime" :{
"type" :"double",
},
"size" : {
"type" :"long",
"index" :"not_analyzed"
},
"status" : {
"type" :"string",
"index" :"not_analyzed"
},
"upstreamtime" :{
"type" :"double",
},
"url" : {
"type" :"string",
"index" :"not_analyzed"
}
}
},
"@source" : {
"type" :"string",
"index" :"not_analyzed"
},
"@timestamp" : {
"type" :"date",
"format" :"dateOptionalTime"
},
"@type" : {
"type" :"string",
"index" :"not_analyzed",
"store" :"no"
}
}
}
}
}
}
5)_source字段說明
_source字段是自動生成的,以JSON格式存儲索引檔案。_source字段沒有建索引,是以不可搜尋。當執行“get”或者“search”操作時,預設會傳回_source字段。
_source字段消耗性能,是以可以屏蔽(disable)掉。例如:
{
"tweet":{
"_source":{"enabled":false}
}
}
enabale:false的情況下,預設檢索隻傳回ID。
如果覺得enabale:true時,索引的膨漲率比較大的情況下可以通過下面一些輔助設定進行優化:
Compress:是否進行壓縮,建議一般情況下将其設為true
“includes” : ["author", "name"],
“excludes” : ["sex"]
上面的includes和 excludes主要是針對預設情況下面_source一般是儲存全部Bulk過去的資料,我們可以通過include,excludes在字段級别上做出一些限索。
2、環境搭建
2.1、flume安裝
1)下載下傳apache-flume-1.6.0-bin.tar.gz
官網下載下傳位址:http://flume.apache.org/download.html
2)安裝flume
1、将軟體包拷貝到伺服器上,如/root/yiran目錄下;
2、解壓軟體包,指令:tar –xvf apache-flume-1.6.0-bin.tar.gz;
3、配置*.conf檔案,如example.conf,放在/root/yiran/apache-flume-1.6.0-bin/conf目錄下,内容示例如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
#a1.sinks.k1.type = logger
a1.sinks. k1.type = elasticsearch
a1.sinks. k1.hostNames =127.0.0.1:9300
a1.sinks. k1.indexName = test_index
a1.sinks. k1.indexType = test_type_1
a1.sinks. k1.clusterName = vie61_yanshi
a1.sinks. k1.batchSize = 10
a1.sinks. k1.ttl = 5d
a1.sinks. k1.serializer =org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
# Use a channel which buffers eventsin memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity =100
# Bind the source and sink to thechannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、配置flume-env.sh檔案,位于/root/yiran/apache-flume-1.6.0-bin/conf目錄下,操作如下:
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
修改JAVA_HOME
JAVA_HOME=/usr/java/jdk1.7.0_71
JAVA_OPTS="-Xms512m–Xmx1024m-Dcom.sun.management.jmxremote"
5、從elasticsearch安裝包下将寫入elasticsearch需要的核心包拷貝到/root/yiran/apache-flume-1.6.0-bin/lib(當收集的日志需要寫入es時才進行第5步操作,否則不用進行第5步);
elasticsearch-1.6.2.jar;
lucene-core-4.10.4.jar;
3)啟動flume
完成上述5個步驟後,則安裝完成。然後可以啟動flume,指令如下:
cd /root/yiran/apache-flume-1.6.0-bin;
./bin/flume-ng agent --conf conf--conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console &;
說明:--conf指明配置檔案目錄名稱;--conf-file指明要運作的配置檔案;--name指明agent名稱,保持與*.conf配置檔案裡面命名一緻;-Dflume.root.logger指明日志列印級别;
2.2、Elasticsearch安裝
1) 下載下傳elasticsearch-1.6.2.tar.gz
官網下載下傳位址:https://www.elastic.co/downloads/elasticsearch;
2)安裝elasticsearch
1、将軟體包拷貝到伺服器上,如/root/yiran目錄下;
2、解壓軟體包,指令:tar –xvf elasticsearch-1.6.2.tar.gz;
3、修改/root/yiran/elasticsearch-1.6.2/config下elasticsearch.yml檔案:
如将node.name的值設定為“test-node”,表示目前這個es服務節點名字為test-node;
修改cluster.name的值為vie61_yanshi;
修改network.host為本機ip;
4、 根據需要可配置/root/yiran/elasticsearch-1.6.2/bin目錄下elasticsearch.in.sh檔案;
3)啟動elasticsearch
完成上述4個步驟後,則安裝完成。然後可以啟動elasticsearch,指令如下:
nohup ./elasticsearch &
2.3、kibana安裝
1)下載下傳kibana-4.1.10-linux-x64.tar.gz
官網下載下傳位址:https://www.elastic.co/downloads/past-releases
2)安裝kibana
1、将軟體包拷貝到伺服器上,如/root/yiran目錄下;
2、解壓軟體包,指令:tar –xvf kibana-4.1.10-linux-x64.tar.gz;
3、修改/root/yiran/kibana-4.1.10-linux-x64/config目錄下kibana.yml檔案;
修改host為本機ip;
修改elasticsearch_url為要通路的elasticsearch的位址,如:http://localhost:9200;
3)啟動kibana
完成上述3個步驟後,則安裝完成。然後可以啟動kibana,指令如下:
nohup ./kibana &
3、執行個體示範
3.1、執行個體說明
日志記錄方式用logback,示例展示了将tomcat通路日志和使用者行為日志收集到flume,然後由flume将日志資訊寫入到elasticsearch中,最後通過kibana對資料進行統計分析。架構圖如下:
3.2、flume配置
環境說明:
1)用logback記錄日志;
2) 部署兩套flume環境,本地flume環境命名為agent1(ip:192.168.27.73),遠端flume環境命名為agent3(ip:192.168.77.113);
3) Agent1收集日志資料,然後将資料通過Avro方式傳遞到agent3,最後agent3将日志資料寫入到elasticsearch;
4) Source1收集使用者行為日志,source2收集tomcat通路日志;
5) 安裝步驟見“2.1節flume安裝”,*.conf檔案見下面配置說明。
配置說明:
1) agent3配置檔案命名為sink_to_es.conf,内容如下:
#定義agent各元件名稱
agent1.sources = source1 source2
agent1.sinks = sink1 sink2
agent1.channels = channel1 channel2
#定義source1配置資訊,監聽44444端口
agent1.sources.source1.type = avro
agent1.sources.source1.bind =192.168.77.113
agent1.sources.source1.port = 44444
#定義source2配置資訊,監聽33333端口
agent1.sources.source2.type = avro
agent1.sources.source2.bind =192.168.77.113
agent1.sources.source2.port = 33333
#定義sink1的配置資訊,将資訊寫入到es中,索引名為test_index,索引類型為test_type_1,叢集名為vie61_yanshi
agent1.sinks.sink1.type =elasticsearch
agent1.sinks.sink1.hostNames =127.0.0.1:9300
agent1.sinks.sink1.indexName =test_index
agent1.sinks.sink1.indexType =test_type_1
agent1.sinks.sink1.clusterName =vie61_yanshi
agent1.sinks.sink1.batchSize = 10
agent1.sinks.sink1.ttl = 5d
agent1.sinks.sink1.serializer =org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
#定義sink2的配置資訊,将資訊寫入到es中,索引名為test_index_tomcat,索引類型為test_type_1,叢集名為vie61_yanshi
agent1.sinks.sink2.type =elasticsearch
agent1.sinks.sink2.hostNames =127.0.0.1:9300
agent1.sinks.sink2.indexName =test_index_tomcat
agent1.sinks.sink2.indexType =test_type_1
agent1.sinks.sink2.clusterName =vie61_yanshi
agent1.sinks.sink2.batchSize = 10
agent1.sinks.sink2.ttl = 5d
agent1.sinks.sink2.serializer =org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
#定義channel1的配置資訊,資訊存儲在緩存中
agent1.channels.channel1.type =memory
agent1.channels.channel1.capacity =1000
agent1.channels.channel1.transactionCapacity= 100
#定義channel2的配置資訊,資訊存儲在緩存中
agent1.channels.channel2.type =memory
agent1.channels.channel2.capacity =1000
agent1.channels.channel2.transactionCapacity= 100
#将source1 and sink1 綁定到 channel1
agent1.sources.source1.channels =channel1
agent1.sinks.sink1.channel = channel1
#将source2 and sink2 綁定到 channel2
agent1.sources.source2.channels =channel2
agent1.sinks.sink2.channel = channel2
2) agent1配置檔案命名為sink_to_agent.conf,提供兩種實作方式,任選其中一種即可。
第一種:通過讀取生成的日志檔案,将其中的内容收集到flume。假設tomcat通路日志路徑為/root/yiran/apache-tomcat-8.0.36/logs/tomcat,使用者記錄檔路徑為/root/yiran/apache-tomcat-8.0.36/logs/operator。内容如下:
#定義agent各元件名稱
agent1.sources= source1 source2
agent1.sinks= sink1 sink2
agent1.channels= channel1 channel2
#定義source1配置資訊,讀取operator檔案夾下日志内容,隻接受“^@@.*@@$”格式的消息,其他消息過濾掉
agent1.sources.source1.type= org.apache.flume.source.SpoolDirectoryTailFileSource
agent1.sources.source1.spoolDir= /root/yiran/apache-tomcat-8.0.36/logs/operator
agent1.sources.source1.fileSuffix= .COMPLETED
agent1.sources.source1.deletePolicy= never
agent1.sources.source1.ignorePattern= ^$
agent1.sources.source1.targetPattern= .*(\\d){4}-(\\d){2}-(\\d){2}.*
agent1.sources.source1.targetFilename= yyyy-MM-dd
agent1.sources.source1.trackerDir= .flumespooltail
agent1.sources.source1.consumeOrder= oldest
agent1.sources.source1.batchSize= 100
agent1.sources.source1.inputCharset= UTF-8
agent1.sources.source1.decodeErrorPolicy= REPLACE
agent1.sources.source1.deserializer= LINE
agent1.sources.source1.interceptors= i1 i2
agent1.sources.source1.interceptors.i1.type= regex_filter
agent1.sources.source1.interceptors.i1.regex= ^@@.*@@$
agent1.sources.source1.interceptors.i2.type= org.apache.flume.interceptor.JsonParseInterceptor$Builder
[email protected]@
#定義source2配置資訊,讀取tomcat檔案夾下日志内容,隻接受“^@@.*@@$”格式的消息,其他消息過濾掉
agent1.sources.source2.type= org.apache.flume.source.SpoolDirectoryTailFileSource
agent1.sources.source2.spoolDir= /root/yiran/apache-tomcat-8.0.36/logs/tomcat
agent1.sources.source2.fileSuffix= .COMPLETED
agent1.sources.source2.deletePolicy= never
agent1.sources.source2.ignorePattern= ^$
agent1.sources.source2.targetPattern= .*(\\d){4}-(\\d){2}-(\\d){2}.*
agent1.sources.source2.targetFilename= yyyy-MM-dd
agent1.sources.source2.trackerDir= .flumespooltail
agent1.sources.source2.consumeOrder= oldest
agent1.sources.source2.batchSize= 100
agent1.sources.source2.inputCharset= UTF-8
agent1.sources.source2.decodeErrorPolicy= REPLACE
agent1.sources.source2.deserializer= LINE
agent1.sources.source2.interceptors= i3 i4
agent1.sources.source2.interceptors.i3.type= regex_filter
agent1.sources.source2.interceptors.i3.regex= ^@@.*@@$
agent1.sources.source2.interceptors.i4.type= org.apache.flume.interceptor.JsonParseInterceptor$Builder
[email protected]@
#定義sink1的配置資訊,将資訊寫入到agent3中的source1中
agent1.sinks.sink1.type= avro
agent1.sinks.sink1.hostname= 192.168.77.113
agent1.sinks.sink1.port= 44444
#定義sink2的配置資訊,将資訊寫入到agent3中的source2中
agent1.sinks.sink2.type= avro
agent1.sinks.sink2.hostname= 192.168.77.113
agent1.sinks.sink2.port= 33333
#定義channel1的配置資訊,資訊存儲在緩存中
agent1.channels.channel1.type= memory
agent1.channels.channel1.capacity= 1000
agent1.channels.channel1.transactionCapacity= 100
#定義channel2的配置資訊,資訊存儲在緩存中
agent1.channels.channel2.type= memory
agent1.channels.channel2.capacity= 1000
agent1.channels.channel2.transactionCapacity= 100
#将source1 andsink1 綁定到 channel1
agent1.sources.source1.channels= channel1
agent1.sinks.sink1.channel= channel1
#将source2 andsink2 綁定到 channel2
agent1.sources.source2.channels= channel2
agent1.sinks.sink2.channel= channel2
說明:
1)agent1中定義了兩個攔截器,第一個類型為“regex_filter”,是系統自帶的攔截器,作用是隻接受“regex”正規表達式定義格式的消息,如“^@@.*@@$”。第二個類型為org.apache.flume.interceptor.JsonParseInterceptor$Builder,是一個自定義的攔截器,作用是解析接收到的json格式的内容,其内容分割标志預設為:@@,可以根據格式自行定義。實作如下,詳細内容見(flume-ng-core-extend工程):
public classJsonParseInterceptor implements Interceptor {
private staticfinalLogger logger= LoggerFactory.getLogger(JsonParseInterceptor.class);
private finalString splitFlag;
private JsonParseInterceptor(StringsplitFlag) {
this.splitFlag=splitFlag;
}
@Override
public voidinitialize() {
}
@Override
public Event intercept(Eventevent) {
Stringbody= newString(event.getBody(),Charsets.UTF_8);
String[] bodyArray = body.split(splitFlag);
logger.info("body content: " +body +" length:" +bodyArray.length);
if(bodyArray !=null&&bodyArray.length > 1){
Stringcontent= bodyArray[1]; //取得body内容,為json格式資料
try{
Map<String,String> jsonMap=(Map<String, String>) JSON.parse(content);
event.setHeaders(jsonMap);
event.setBody(null);
logger.info("body content: "+jsonMap);
}catch(Exceptione){
logger.error("Could not parse json data, ",e);
return null;
}
}else{
return null;
}
returnevent;
}
@Override
public List<Event> intercept(List<Event>events) {
List<Event> out = Lists.newArrayList();
for (Eventevent :events) {
Event outEvent = intercept(event);
if (outEvent !=null){
out.add(outEvent);
}
}
returnout;
}
@Override
public voidclose() {
// no-op
}
public staticclassBuilder implementsInterceptor.Builder {
private StringsplitFlag =null;
@Override
public void configure(Context context) {
splitFlag = context.getString("splitFlag",Constants.DEFAULT_SPLITFLAG);
}
@Override
public Interceptor build() {
// TODO Auto-generated method stub
return new JsonParseInterceptor(splitFlag);
}
}
public staticclassConstants {
public static String DEFAULT_SPLITFLAG ="@@";
}
}
3) agent1中使用了自定義的Source(SpoolDirectoryTailFileSource,詳見flume-ng-core-extend工程),此source将exec和spoolingsource結合起來,可以實作如下效果:
l 監控指定檔案夾下的日志檔案;
l 實時讀取日志檔案的内容;
l 實作斷點續傳功能,即flume發生故障後,重新開機會從故障前的位置繼續讀取日志内容;
第二種:擴充logback接口,通過Avroclient将日志資訊傳遞到flume。内容如下:
#定義agent各元件名稱
agent1.sources = source1 source2
agent1.sinks = sink1 sink2
agent1.channels = channel1 channel2
#定義source1配置資訊,監聽44444端口,隻接受“^@@.*@@$”格式的消息,其他消息過濾掉
agent1.sources.source1.type = avro
agent1.sources.source1.bind =192.168.77.113
agent1.sources.source1.port = 44444
agent1.sources.source1.interceptors =i1 i2
agent1.sources.source1.interceptors.i1.type= regex_filter
agent1.sources.source1.interceptors.i1.regex= ^@@.*@@$
agent1.sources.source1.interceptors.i2.type= org.apache.flume.interceptor.JsonParseInterceptor$Builder
#定義source2配置資訊,監聽33333端口,隻接受“^@@.*@@$”格式的消息,其他消息過濾掉
agent1.sources.source2.type = avro
agent1.sources.source2.bind =192.168.77.113
agent1.sources.source2.port = 33333
agent1.sources.source2.interceptors =i3 i4
agent1.sources.source2.interceptors.i3.type= regex_filter
agent1.sources.source2.interceptors.i3.regex= ^@@.*@@$
agent1.sources.source2.interceptors.i4.type= org.apache.flume.interceptor.JsonParseInterceptor$Builder
#定義sink1的配置資訊,通過Avro方式将資訊寫入到agent3的source1中
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.hostname =192.168.77.113
agent1.sinks.sink1.port = 44444
#定義sink2的配置資訊,通過Avro方式将資訊寫入到agent3的source2中
agent1.sinks.sink2.type = avro
agent1.sinks.sink2.hostname =192.168.77.113
agent1.sinks.sink2.port = 33333
#定義channel1的配置資訊,資訊存儲在緩存中
agent1.channels.channel1.type =memory
agent1.channels.channel1.capacity =1000
agent1.channels.channel1.transactionCapacity= 100
#定義channel2的配置資訊,資訊存儲在緩存中
agent1.channels.channel2.type =memory
agent1.channels.channel2.capacity =1000
agent1.channels.channel2.transactionCapacity= 100
#将source1 and sink1 綁定到 channel1
agent1.sources.source1.channels =channel1
agent1.sinks.sink1.channel = channel1
#将source2 and sink2 綁定到 channel2
agent1.sources.source2.channels =channel2
agent1.sinks.sink2.channel = channel2
說明:
1、攔截器的定義見第一種方式裡面說明;
2、此種方式需要擴充logback,核心類如下,詳細内容見(logback-monitor工程):
public classFlumeRpcAppender<E> extends OutputStreamAppender<E> {
protected Stringconfigfile;
public static final String CLIENT_APPNAME="client.appname";
public static final String POOL_SIZE="pool.size";
private final Properties props = new Properties();
RpcClientclient;
@Override
public void start() {
genProps();
client = RpcClientFactory.getInstance(props);
Map<String,String>header= configHeader(props);
int poolSize = 10;
StringstrPoolSize= props.getProperty(POOL_SIZE);
if(strPoolSize !=null && !strPoolSize.trim().isEmpty() ){
poolSize = Integer.parseInt(strPoolSize.trim());
}
ExecutorServiceexecutorService= Executors.newFixedThreadPool(poolSize);
setOutputStream(new FlumeRpcOutputStream(client,header,executorService));
super.start();
}
private void genProps(){
if(this.configfile!=null){
ClassLoadercl =FlumeRpcAppender.class.getClassLoader();
try {
props.load(cl.getResourceAsStream(getConfigfile()));
}catch(IOExceptione) {
e.printStackTrace();
}
}else{
throw new RuntimeException("configfile is null.");
}
}
privateMap<String,String> configHeader(Propertiesprops){
Map<String,String>header=newHashMap<String,String>();
header.put(CLIENT_APPNAME,(String)props.remove(CLIENT_APPNAME));
return header;
}
@Override
public void stop() {
client.close();
super.stop();
}
public String getConfigfile(){
return configfile;
}
public void setConfigfile(String configfile) {
this.configfile =configfile;
}
}
3、使用者行為日志會通過Avro的方式發送到source1的44444端口,內建使用者行為日志需要做如下配置:
1)在web應用中引入logback-monitor工程,通過maven方式或将logback-monitor打包為jar放入web應用lib目錄下;
2)将logback.xml和logback-monitor.properties檔案放到web應用class目錄。具體内容如下:
logback.xml内容:
<?xml version="1.0"encoding="UTF-8"?>
<configuration>
<!--always a good activate OnConsoleStatusListener -->
<statusListenerclass="ch.qos.logback.core.status.OnConsoleStatusListener" />
<appendername="STDOUT"class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS}[%thread] %-5level %logger{56} - %msg%n</pattern>
</encoder>
</appender>
<appendername="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/root/yiran/apache-tomcat-8.0.36/logs/operator/operator.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%msg</pattern>
</encoder>
</appender>
<!--flume收集日志配置 -->
<appendername="flumeRpcAppender"
class="com.logback.core.FlumeRpcAppender">
<encoder>
<pattern>%msg</pattern>
<charset>UTF-8</charset>
</encoder>
<configfile>logback-monitor.properties</configfile>
</appender>
<rootlevel="INFO">
<appender-ref ref="FILE"/>
<appender-refref="flumeRpcAppender"/>
</root>
</configuration>
說明:
1)msg格式示例:@@{"userName":"王五", "sex": "女","age": "23","createDate":"2016-07-28T23:23:32.999+0800"}@@;
2)應用程式在寫入日志時要按照msg消息示例格式進行日志記錄。
logback-monitor.properties内容:
client.appname=something
client.type=DEFAULT_FAILOVER
hosts=h1
hosts.h1=127.0.0.1:44444
pool.size=20
說明:如有多個flume接收端,可以如下配置。通過配置client.type可以實作負載均衡(default_loadbalance)或失敗重發(default_failover)。
host2=h1 h2 h3
hosts.h1=127.0.0.1:44444
hosts.h2=127.0.0.1:44444
hosts.h3=127.0.0.1:44444
4、tomcat通路日志會通過Avro的方式發送到source2的33333端口,內建tomcat通路日志需要做如下配置:
1) 配置logback-monitor.properties檔案,将其放入到D:\server2\apache-tomcat-8.0.36\lib目錄下,内容如下:
client.appname=something
client.type=DEFAULT_FAILOVER
hosts=h1
hosts.h1=127.0.0.1:33333
pool.size=5
2) 配置logback-access.xml檔案,将其放入到D:\server2\apache-tomcat-8.0.36\conf目錄下,内容如下:
<?xml version="1.0"encoding="UTF-8"?>
<configuration>
<!--always a good activate OnConsoleStatusListener -->
<statusListenerclass="ch.qos.logback.core.status.OnConsoleStatusListener" />
<appendername="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>@@{"createTime":"%t{yyyy-MM-ddHH:mm:ss.SSSZ}", "remoteIP":"%h","status":"%s", "contentLength":"%b","duration":"%D", "requestURL":"%r",
"logType":"tomcatLog"}@@</pattern>
</encoder>
</appender>
<!--flume收集日志配置 -->
<appendername="flumeRpcAppender"
class="com.logback.core.FlumeRpcAppender">
<encoder>
<pattern>@@{"createTime":"%t{yyyy-MM-dd'T'HH:mm:ss.SSSZ}","remoteIP":"%h", "status":"%s","contentLength":"%b", "duration":"%D","requestURL":"%r","logType":"tomcatLog"}@@</pattern>
<charset>UTF-8</charset>
</encoder>
<configfile>logback-monitor.properties</configfile>
</appender>
<appendername="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/root/yiran/apache-tomcat-8.0.36/logs/tomcat/tomcat_access.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>@@{"createTime":"%t{yyyy-MM-dd'T'HH:mm:ss.SSSZ}","remoteIP":"%h", "status":"%s","contentLength":"%b", "duration":"%D","requestURL":"%r","logType":"tomcatLog"}@@</pattern>
</encoder>
</appender>
<appender-refref="FILE"/>
<appender-ref ref="flumeRpcAppender"/>
</configuration>
3) 将[\部署配置檔案\lib]目錄下的jar包複制到D:\server2\apache-tomcat-8.0.36\lib目錄下:
4)修改tomcat中server.xml下最後一行,添加如下配置:
<ValveclassName="ch.qos.logback.access.tomcat.LogbackValve"quiet="true"/>
通過上述步驟,則完成flume配置,然後啟動agent1和agent3,指令如下:
啟動agent1:
cd C:\Users\qxb-810\Desktop\flume_study\apache-flume-1.6.0-bin;
bin\flume-ng.cmd agent --conf conf--conf-file conf\sink_to_agent.conf --name agent1 &
啟動agent3:
cd /root/yiran/apache-flume-1.6.0-bin;
./bin/flume-ng agent --conf conf --conf-fileconf/sink_to_es.conf --name agent1 &
3.3、Elasticsearch配置
環境說明:
1) 根據“2.2節Elasticsearch安裝”說明完成安裝;
2) 用crul方式建立索引模闆,
使用者記錄檔模闆示例如下:
curl -XPUTlocalhost:9200/_template/template_flume_log -d '
{
"template" : "test_index-*",
"settings" : {
"number_of_shards" : 5
},
"mappings" : {
"test_type_1" : {
"_source" :{"enabled" : true },
"properties": {
"createDate": {
"type" : "date",
"date_format":"date_hour_minute_second_millis"
},
"sex": {
"type" : "string"
},
"userName": {
"type" : "string"
},
"age": {
"type" : "long"
}
}
}
}
}
'
Tomcat通路日志模闆如下:
curl -XPUTlocalhost:9200/_template/template_flume_tomcat_log -d '
{
"template" : "test_index_tomcat-*",
"settings" : {
"number_of_shards" : 5
},
"mappings" : {
"test_type_1" : {
"_source" : {"enabled": true },
"properties": {
"createTime": {
"type" : "date",
"date_format":"date_hour_minute_second_millis"
},
"remoteIP": {
"type" : "string"
},
"status": {
"type" : "long"
},
"contentLength": {
"type" : "string"
},
"duration": {
"type" : "long"
},
"requestURL": {
"type" : "string"
},
"logType": {
"type" : "string"
}
}
}
}
}
'
3.4、Kibana配置
環境說明:
1)根據“2.3節Kibana安裝”完成Kibana安裝;
2)通路http://192.168.77.113:5601/,完成kibana索引配置;
4、常見問題說明
1、elasticsearch啟動不成功
原因:很大可能是因為jdk版本不對
解決:更新jdk版本
2、flume向elasticsearch寫資料,flume啟動報:No nodeavailable and cluster/nodes/info] request_id [0] timed out
原因:1、可能是節點配置資訊有誤,檢查節點配置資訊,确認沒有問題重新開機es;
2、檢查elasticsearch的jdk版本與應用的jdk版本是否一緻;
3、tail 斷點續傳的問題
可以在 tail 傳的時候記錄行号,下次再傳的時候,取上次記錄的位置開始傳輸,類似:
agent1.sources.avro-source1.command =/usr/local/bin/tail -n +$(tail -n1/home/storm/tmp/n) --max-unchanged-stats=600 -F /home/storm/tmp/id.txt | awk'ARNGIND==1{i=$0;next}{i++; if($0~/檔案已截斷/)i=0; print i>> "/home/storm/tmp/n";print $1"---"i}' /home/storm/tmp/n-
需要注意如下幾點:
1)檔案被 rotation 的時候,需要同步更新你的斷點記錄“指針”;
2)需要按檔案名來追蹤檔案;
3)flume 挂掉後需要累加斷點續傳“指針”;
4)flume 挂掉後,如果恰好檔案被rotation,那麼會有丢資料的風險
隻能監控盡快拉起或者加邏輯判斷檔案大小重置指針;
5)tail 注意你的版本,請更新coreutils 包到最新。
4、flume 報錯:java.lang.OutOfMemoryError:GC overhead limit exceeded
解決:Flume 啟動時的最大堆記憶體大小預設是 20M,線上環境很容易 OOM,是以需要你在 flume-env.sh 中添加 JVM 啟動參數:
JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k-Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
然後在啟動 agent 的時候一定要帶上 -c conf 選項,否則 flume-env.sh 裡配置的環境變量不會被加載生效。
5、關于elasticsearch和kibana的時區和日期問題
原因:elasticsearch原生支援date類型,json格式通過字元來表示date類型。date類型是包含時區資訊的,如果我們沒有在json代表日期的字元串中顯式指定時區,對es來說沒什麼問題,但是如果通過kibana顯示es裡的資料時,就會出現問題,資料的時間會晚8個小時。因為kibana從es裡讀取的date類型資料,沒有時區資訊,kibana會預設當作0時區來解析,但是kibana在通過浏覽器展示的時候,會通過js擷取目前用戶端機器所在的時區,也就是東八區,是以kibana會把從es得到的日期資料減去8小時。這裡就會導緻kibana經常遇到的“資料時間延遲8小時”的問題。
解決:在往es送出日期資料的時候,直接送出帶有時區資訊的日期字元串,如:“2016-07-15T12:58:17.136+0800”。
5、參考文檔
1)Kibana使用參考文檔:http://kibana.logstash.es/content/kibana/index.html
2)Elasticsearch使用參考文檔:http://es.xiaoleilu.com/030_Data/45_Partial_update.html
3)flume使用者使用參考文檔:http://flume.apache.org/FlumeUserGuide.html
4)flume開發者參考文檔:http://flume.apache.org/FlumeDeveloperGuide.html