天天看點

flume資料采集工具Event

業務系統就是用戶端加上背景的服務

Source,channel,Event

flume資料采集工具Event

flime運作起來的程序叫agentflume采集系統就是由一個個agent連接配接起來所形成的一個或簡單或複雜的資料傳輸通道每一個agent都有3個元件Source,channel,sink

flume資料采集工具Event
flume資料采集工具Event

Source就相當于read(讀資料)

Channel就相當于緩存資料(為了解耦合)

Sink就相當于write(寫資料)

Event

event是資料 在channel中的封裝形式

是以,Source元件在擷取到原始資料後,需要封裝成event放入channel

sink元件從channel中取出event後,需要根據目标存儲的需求,轉成其他形式的資料輸出

event封裝對象主要有兩部分組成:Headers和Body

headers是一個集合 Map[String,String],用于攜帶一些KV形式的中繼資料(标志,描述等)

body是一個位元組數組byte[]:裝載具體的資料内容

interceptor攔截器

攔截器,就是為使用者提供添加資料處理邏輯卷的可能性

攔截器工作在source元件之後,source産生的event會被傳入攔截器根據需要進行攔截處理

而且,攔截器可以組成攔截器鍊

攔截器在flume中有一些内置的功能比較常用的攔截器

使用者也可以根據自己的資料處理需求,自己開發自定義攔截器

這也是flume的一個可以用來自定義擴充的接口

channel selector

一個source可以對接多個channel,則event在這n個channel之間傳遞的政策,由配置的channel selector決定;

channel selector有2中實作: replicating(複制),multiplexing(多路複用)

sink processor

如果sink和channel是一對一關系,則不需要專門的sink processor;

如果一個channel配置多個sink,則可以将這多個sink配置成一個sink group(sink組);

event在一個組中的多個sink間如何傳遞,則由所配置的sink processor來決定;

sink processor有2種: load balance (round robing)和 fail over

load balance手機負載均衡比如上遊sink拿1-10下遊就拿11-20

fail over失敗切換 一般是兩個sink在但是隻有一個sink在工作如果一個挂機了另一個就切換上去

Transaction:事務機制 at least once

Flime 并沒有實作(精确一次)Exactly once 但是在不斷的靠近(隻能保證不丢)

at least once 至少一次(不會丢資料不保證不重複)

at most once 不重資料(可能會丢資料)

事務保證一個前提一個功能或業務,分成多個步驟取做(要麼都成功要麼都失敗)

案例1加注解

a1.sources = r1
a1.channels = c1
a1.sinks = k1

#要連到哪個channels上
a1.sources.r1.channels = c1   
#是哪一種具體類型的source     exec是擷取控制台上資訊的source 
a1.sources.r1.type = exec  
#一個批次存多少條資料    
a1.sources.r1.batchSize = 100    
#擷取哪一個指令F是根據檔案名跟蹤的 f是根據inode号跟蹤的  檔案名變了inode号不會變  如果日志檔案系統在往1.txt寫東西而這個檔案名改了就會從新建立一個1.txt繼續寫
a1.sources.r1.command = tail -F /root/logs/a.log        


#寫在記憶體的channel
a1.channels.c1.type = memory 
#channel裡最多可以存多少條資料    
a1.channels.c1.capacity = 1000 
#決定一個事務最多存多少條資料 要大于上遊的批次資料   
a1.channels.c1.transactionCapacity = 200    

#接的哪一個channel
a1.sinks.k1.channel = c1 
#擷取的資料打在控制太上的sink類型    
a1.sinks.k1.type = logger 

---------------------------------------------------------------------------------------

在flume裡建立一個采集配置檔案夾agentsconf(可以起别的名字)在建立一個檔案exec-m-logger.conf用于寫采集配置檔案

# 啟動指令
bin/flume-ng agent -c conf/ -f agentsconf/exec-m-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c自己的分支檔案所在的地方
-f采集配置檔案所在的地方
-n a1指定配置檔案裡agent的名字
-Dflume.root.logger=INFO,console打日志的級别不是每次啟動都要寫的
           

案例3 自定義攔截器截取擷取檔案的時間為檔案名且寫到hdfs中

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.batchSize = 100
a1.sources.r1.command = tail -F /root/a.log
#自定義攔截器名字
a1.sources.r1.interceptors = i1     
#自定義攔截器類的路徑及名字注意名字前加$              
a1.sources.r1.interceptors.i1.type = cn._51doit.flnm.demo01.EventTimeStampInterceptor$EventTimeStampInterceptorBuilder
#參數       
a1.sources.r1.interceptors.i1.split_by = ,
#參數      
a1.sources.r1.interceptors.i1.ts_index = 2      


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200


#連接配接哪個channel
a1.sinks.k1.channel = c1 
#哪個具體類型的sink                
a1.sinks.k1.type = hdfs
#寫道hdfs的路徑寫道hdfs的路徑後面就是跟着日期變的目錄                   
a1.sinks.k1.hdfs.path = hdfs://linux01:8020/doitedu01/
#檔案名的字首      
a1.sinks.k1.hdfs.filePrefix = DoitEduData
#檔案名的字尾    
a1.sinks.k1.hdfs.fileSuffix = .log
#檔案滾動的時間(就是寫在這個檔案裡不寫了要寫到下一個檔案裡了[滾動到下一個檔案])          
a1.sinks.k1.hdfs.rollInterval = 60 
#檔案滾動的大小           
a1.sinks.k1.hdfs.rollSize = 268435456  
#檔案滾動的條數        
a1.sinks.k1.hdfs.rollCount = 0 
#批次                 
a1.sinks.k1.hdfs.batchSize = 100 
#壓縮編碼   支援的有(gzip,bzip,lzop,snappy)             
a1.sinks.k1.hdfs.codeC = gzip
#檔案格式(有三種SequenceFile[二進制的kv格式人看不懂但是占用空間小]Datastream[原本的流是什麼格式就是什麼格式]CompressedStream[壓縮格式])                  
a1.sinks.k1.hdfs.fileType = CompressedStream 
#是否使用本地時間戳預設是false不使用本地時間戳   如果改成true就是用本地時間這樣不好因為中間有延遲23點59分的資料可能會被傳到第二天的檔案夾  
a1.sinks.k1.hdfs.useLocalTimeStamp = false

攔截器的jar包放在lib檔案夾下
# 實驗手冊
模拟資料:
for i in {1..1000000}; do echo "${i},zhangsan,`date +%s`000,iphone8plus,submit_order" >> a.log; sleep 0.5; done

# 啟動指令
bin/flume-ng agent -c conf/ -f agentsconf/exec-m-logger.conf -n a1 -Dflume.root.logger=INFO,console 
           

自定義攔截器jar包代碼

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class EventTimeStampInterceptor implements Interceptor {
    private String splitby;
    private Integer ts_index;
    public EventTimeStampInterceptor(String splitby, Integer ts_index) {
        this.splitby=splitby;
        this.ts_index=ts_index;
    }
//初始化方法,在正式調用攔截邏輯之前,會調用一次
    public void initialize() {


    }
//攔截的處理邏輯所在的方法
    //假設我們要采集的資料如下:id,name,timestamp,devicetype,event
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        String line = new String(body);
        //從事件内容中提取事件時間戳
        String[] split = line.split(this.splitby);
        String timestampStr = split[this.ts_index];
        //将時間戳放入header
        event.getHeaders().put("timestamp",timestampStr);
        return event;
    }

    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }

        return list;
    }
//關閉清理方法,在銷毀該攔截器執行個體之前,會調用一次
    public void close() {

    }
//builder 是用于提供給flume來建構自定義攔截器對象的
    public static class EventTimeStampInterceptorBuilder implements Interceptor.Builder{

        String splitby;
        Integer ts_index;
//flime會調用改方法來建立我們的自定義攔截器對象
        public Interceptor build() {
            return new EventTimeStampInterceptor(splitby,ts_index);
        }
//flume會将加載的參數,通過改方法傳遞過來
        public void configure(Context context) {
           splitby = context.getString("split_by", ",");
            ts_index = context.getInteger("ts_index", 2);

        }
    }
}

           

級聯案例接受網絡source是avro同理上遊的sink也是avro

就是上遊有多個agent寫到下遊的一個agent再寫到hdfs中

關鍵點上遊的sink和下遊的source可以接的上,相當于上遊的sink是用戶端下遊的source是服務端

# 上遊配置
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.batchSize = 100
a1.sources.r1.command = tail -F /root/a.log

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200

a1.sinks.k1.channel = c1
#avro類型的sink友善下遊抓取資料序列化
a1.sinks.k1.type = avro
#接受的機器的名稱         
a1.sinks.k1.hostname = linux03 
#端口号(随便)         
a1.sinks.k1.port = 41414               
a1.sinks.k1.batch-size = 100


# 下遊配置
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
#sources的類型是avro
a1.sources.r1.type = avro 
#綁定的位址所有的ip       
a1.sources.r1.bind = 0.0.0.0
#端口号       
a1.sources.r1.port = 41414          

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200


a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://linux01:8020/doitedu03/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = DoitEduData
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true

啟動指令
bin/flume-ng agent -c conf/ -f agentsconf/shangyou.conf -n a1 -Dflume.root.logger=INFO,console 
bin/flume-ng agent -c conf/ -f agentsconf/xiayou.conf -n a1 -Dflume.root.logger=INFO,console 
腳本
for i in {1..1000000}; do echo "${i},zhangsan,`date +%s`000,iphone8plus,submit_order" >> a.log; sleep 0.5; done
for i in {1..1000000}; do echo "${i},wwwwwwwwwwww,`date +%s`000,iphone8plus,submit_order" >> a.log; sleep 0.5; done
           

taildir source就是從兩個檔案目錄讀放到指定目錄中

a1.sources = r1
a1.channels = c1
a1.sinks = k1


a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
#分别采集兩個檔案的命名(不是檔案名)
a1.sources.r1.filegroups =g1 g2
#要采集明名為g1的檔案路徑
a1.sources.r1.filegroups.g1 = /root/logs/wxlog/event.*
#要采集明名為g2的檔案路徑
a1.sources.r1.filegroups.g2 = /root/logs/applog/event.*
#header中一個指定key的value來決定這條消息會寫入哪個channel 這邊就定義了這個k叫wxlog
a1.sources.r1.headers.g1.k = wxlog
#header中一個指定key的value來決定這條消息會寫入哪個channel  這邊就定義了這個k叫applog
a1.sources.r1.headers.g2.k = applog
a1.sources.r1.batchSize =100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200

a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger




#啟停指令
bin/flume-ng agent -c conf/ -f agentsconf/wx.conf -n a1 -Dflume.root.logger=INFO,console 
#創造檔案腳本
for i in {1..1000000}; do echo "${i},wxwxwxwxwxwxwxwxwxw" >> applog/event.log; sleep 0.5; done
for i in {1..1000000}; do echo "${i},ppppppppppppppppppp" >> wxlog/event.log; sleep 0.5; done
           

複制選擇器source上面接多個channel

上遊有一個複制選擇器一個source接多個channel和多個sink,就可以分發到下遊的多個agent(如即給hdfs發又給本地發)就相當于把讀到的日志檔案複制了多次

flume資料采集工具Event
下遊
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200



a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger


#上遊

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2


a1.sources.r1.channels = c1 c2
a1.sources.r1.type = exec
a1.sources.r1.batchSize = 100
a1.sources.r1.command = tail -F /root/a.log
#這就相當于這是複制選擇器就要分發給下遊的c1 c2 了
a1.sources.r1.selector.type = replicating
#寫在這的channel可以傳輸成功也可以不成功,如果不配這個就是預設都必須成功
a1.sources.r1.selector.optional = c2

#c1的channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200

#c2的channel
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 200

#k1的sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100

#k2的sink
a1.sinks.k2.channel = c2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux03
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100


創造檔案腳本
for i in {1..1000000}; do echo "${i},wxwxwxwxwxwxwxwxwxw" >> /root/a.log; sleep 0.5; done
運作指令
bin/flume-ng agent -c conf/ -f agentsconf/xiayou.conf -n a1 -Dflume.root.logger=INFO,console 
bin/flume-ng agent -c conf/ -f agentsconf/shangyou.conf -n a1 -Dflume.root.logger=INFO,console 
           

多路選擇器source是TAILDIR類型的

這個例子是從兩個不同的檔案夾(不同路徑)同時采集資料在分類存儲到不同的地方(這邊例子是列印在兩個控制台上)

#下遊
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200


#這是列印到控制台上如果想放到hdfs上就可以改這個sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger


#上遊
#這個例子是從兩個不同的檔案夾(不同路徑)同時采集資料在分類存儲到不同的地方(這邊例子是列印在兩個控制台上)

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2


a1.sources.r1.channels = c1 c2
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1 g2
a1.sources.r1.filegroups.g1 = /root/logs/wxlog
a1.sources.r1.filegroups.g2 = /root/logs/applog
a1.sources.r1.headers.g1.logtype = wxlog
a1.sources.r1.headers.g2.logtype = applog
a1.sources.r1.batchSize = 100
#這邊定義了這個是多路選擇器
a1.sources.r1.selector.type = multiplexing
#header是上面加進去的logtype
a1.sources.r1.selector.header = logtype
#映射如果是上面定義的wxiog就發送到c1
a1.sources.r1.selector.mapping.wxlog = c1
#映射如果是上面定義的applog就發送到c1
a1.sources.r1.selector.mapping.applog = c2
#如果都沒有映射到的上面的就發送到c2(是以的default)
a1.sources.r1.selector.default = c2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 200


a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100

a1.sinks.k2.channel = c2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux03
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100



#産生檔案的腳本
for i in {1..1000000}; do echo "${i},wxwxwxwxwxwxwxwxwxw" >> /root/logs/applog; sleep 0.5; done
for i in {1..1000000}; do echo "${i},pppppppppppppppppppppp" >> /root/logs/wxlog; sleep 0.5; done

#運作的指令
bin/flume-ng agent -c conf/ -f agentsconf/shangyou -n a1 -Dflume.root.logger=INFO,console 
           

實戰練習題

#就是從一個檔案中讀取資料把裡面的資料攔截後分成兩類寫入到兩個不同的檔案夾中

需求: 日志中是如下資料

#就是從一個檔案中讀取資料把裡面的資料攔截後分成兩類寫入到兩個不同的檔案夾中
需求: 日志中是如下資料

1,zs,mi6,addcart,13845934468300,app
2,aa,mi6,addcart,13845934468300,app
3,aa,mi6,addcart,13845934468300,app
4,aa,mi6,addcart,13845934468300,app
5,bb,oppo6,addcart,13845934468300,wx
6,bb,oppo6,addcart,13845934468300,wx
7,bb,oppo6,addcart,13845934468300,wx
8,bb,oppo6,addcart,13845934468300,wx
9,bb,oppo6,addcart,13845934468300,wx

需要将不同類型的資料寫入不同的hdfs目錄:
hdfs://hdp01:8020/logdata/app/2021-01-08

hdfs://hdp01:8020/logdata/wx/2021-01-08




模拟資料的腳本
#!/bin/bash
for i in {1..100000}
do
if [ $(($RANDOM % 2)) -eq 0 ] 
 then
   echo  "${i},aa,mi6,addcart,`date +%s`000,app" >> /root/logs/event.log
 else 
   echo  "${i},bb,mi6,addcart,`date +%s`000,wx" >> /root/logs/event.log
fi
sleep 0.2
done


方案:

1. 利用自定義攔截器從資料中提取channel資訊放入header
2. 可以使用多路選擇器根據header進行多路映射



上遊配置檔案
#上遊
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2


a1.sources.r1.channels = c1 c2
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /root/logs/event.*
a1.sources.r1.batchSize = 100

#定義了一個攔截器鍊
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = cn._51doit.flnm.demo02.EventTimeStampInterceptor$EventTimeStampInterceptorBuilder
#自己定義的headerName因為傳進去的是k,v類型的相當于定義了這個k被定義成了flag,
a1.sources.r1.interceptors.i1.headerName = flag
a1.sources.r1.interceptors.i1.split_by = ,
a1.sources.r1.interceptors.i1.ts_index = 5

a1.sources.r1.interceptors.i2.type = cn._51doit.flnm.demo02.EventTimeStampInterceptor$EventTimeStampInterceptorBuilder
a1.sources.r1.interceptors.i2.headerName = timestamp
a1.sources.r1.interceptors.i2.split_by = ,
a1.sources.r1.interceptors.i2.ts_index = 4

a1.sources.r1.selector.type = multiplexing
#這邊會擷取上面定義的fiag和攔截到的索引5做比較在這會擷取到wx和app
a1.sources.r1.selector.header = flag
#如果flag是spp就分到c1
a1.sources.r1.selector.mapping.app = c1
#如果flag是wx就分到c2
a1.sources.r1.selector.mapping.wx = c2
#如果擷取到不是wx也不是app就分到c2
a1.sources.r1.selector.default = c2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 200


a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100

a1.sinks.k2.channel = c2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux03
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100



# 下遊-寫hdfs


a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200
a1.channels.c1.keep-alive=60
a1.channels.c1.capacity=1000000



a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://linux01:8020/doitedu02/app/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = DoitEduData
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = false


啟動指令
bin/flume-ng agent -c conf/ -f agentsconf/wx -n a1 -Dflume.root.logger=INFO,console 
           

sink輪詢和随機

round_robin(輪詢算法)

random(随機)

下面的輪詢的預設的不用指定

就是多個sink連接配接一個channel沒有配sink組就是很和諧這個sink拿以組另一個再拿一組資料是一個負載均衡的情況(就是這個sink拿了資料另外一個sink就不拿不會重複也不會少)
上遊:
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.batchSize = 100
a1.sources.r1.command = tail -F /root/logs/event.log


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200


a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdp02.doitedu.cn
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100


a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hdp03.doitedu.cn
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100



下遊:
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200

a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

           

下面的是随機的random

就是多個sink連接配接一個channel沒有配sink組兩個sink搶資料有個能這個sink拿了資料下一次還是他拿是一個負載均衡的情況(就是這個sink拿了資料另外一個sink就不拿不會重複也不會少)
上遊:
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.batchSize = 100
a1.sources.r1.command = tail -F /root/logs/event.log


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200


a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdp02.doitedu.cn
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100


a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hdp03.doitedu.cn
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100

# 定義sink組及其配套的sink處理器
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random





下遊:
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200

a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

           

sink失敗切換

這是一個高可用的模式主要應用在級聯(多個上遊連接配接一個下遊)給下遊搞個備胎,但是下遊主的和備胎都在工作隻是

備胎在空跑,上遊設定shik1連接配接主的,sink2連接配接備胎,主的當機了就換sink2和備胎工作

這是一個高可用的模式主要應用在級聯(多個上遊連接配接一個下遊)給下遊搞個備胎,但是下遊主的和備胎都在工作隻是
備胎在空跑,上遊設定shik1連接配接主的,sink2連接配接備胎,主的當機了就換sink2和備胎工作
# 上遊
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.batchSize = 100
a1.sources.r1.command = tail -F /root/logs/event.log


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200


a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100


a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux03
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100

# 定義sink組及其配套的sink處理器
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#這是這個可以切換sink的類型
a1.sinkgroups.g1.processor.type = failover
#這是優先級數字大的優先級高連接配接主的自己設定的
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 1
#主sink失敗的停用的懲罰時間
a1.sinkgroups.g1.processor.maxpenalty = 10000



下遊:
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 200

a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

啟動指令和制造檔案指令
bin/flume-ng agent -c conf/ -f agentsconf/shangyou -n a1 -Dflume.root.logger=INFO,console 
for i in {1..1000000}; do echo "${i},pppppppppppppppppppppp" >> /root/logs/event.log; sleep 0.5; done
           

繼續閱讀