天天看點

mysql同步資料到flume,然後flume同步資料到kafka

自定義插件包下載下傳位址 https://download.csdn.net/download/a1213353721/10935724

下面是flume的配置資訊,我是在CDH叢集環境上配置的,經測試可用

tier1.sources = s-1

tier1.channels = c-1

tier1.sinks = k-1 k-2

#這個是配置failover的關鍵,需要有一個sink group

tier1.sinkgroups = g-1

tier1.sinkgroups.g-1.sinks = k-1 k-2

#處理的類型是failover

tier1.sinkgroups.g-1.processor.type = failover

#優先級,數字越大優先級越高,每個sink的優先級必須不相同

tier1.sinkgroups.g-1.processor.priority.k-1 = 5

tier1.sinkgroups.g-1.processor.priority.k-2 = 10

#設定為10秒,當然可以根據你的實際狀況更改成更快或者很慢

tier1.sinkgroups.g-1.processor.maxpenalty = 10000

########## 資料通道的定義

#資料量不大,直接放記憶體。其實還可以放在JDBC,kafka或者磁盤檔案等

tier1.channels.c-1.type = memory

#通道隊列的最大長度

tier1.channels.c-1.capacity = 100000

#putList和takeList隊列的最大長度,sink從capacity中抓取batchsize個event,放到這個隊列。是以此參數最好比capacity小,比sink的batchsize大。

#官方定義:The maximum number of events the channel will take from a source or give to a sink per transaction.

tier1.channels.c-1.transactionCapacity = 1000

tier1.channels.c-1.byteCapacityBufferPercentage = 20

###預設值的預設值等于JVM可用的最大記憶體的80%,可以不配置

#tier1.channels.c-1.byteCapacity = 800000

#########sql source#################

#source s-1用到的通道,和sink的通道要保持一緻,否則就GG了

tier1.sources.s-1.channels=c-1

######### For each one of the sources, the type is defined

tier1.sources.s-1.type = org.keedio.flume.source.SQLSource

tier1.sources.s-1.hibernate.connection.url = jdbc:mysql://10.20.1.18:3306/rdms

######### Hibernate Database connection properties

tier1.sources.s-1.hibernate.connection.user = root

tier1.sources.s-1.hibernate.connection.password = bigdata

tier1.sources.s-1.hibernate.connection.autocommit = true

tier1.sources.s-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

tier1.sources.s-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver

tier1.sources.s-1.run.query.delay=10000

tier1.sources.s-1.status.file.path = /usr/lib/flume-ng/

#tier1.sources.s-1.status.file.name = SQLSource.status

tier1.sources.s-1.status.file.name.prefix = SQLSource

tier1.sources.s-1.status.file.name.suffix = .status

######## Custom query

tier1.sources.s-1.start.from = 0

#tier1.sources.s-1.table = imsi_test

tier1.sources.s-1.table.prefix = imsi

#使用完整的query時,此配置中的所有帶前字尾的配置都不生效,‘id’ 字段設定别名後,order.by的屬性需要與id字段名或别名一至

#tier1.sources.s-1.custom.query = select id as ‘tid’,area as ‘fee’ from imsi_test where id > @ @ @ order by id asc

#使用前字尾查詢語句時order.by 屬性需要注釋

#tier1.sources.s-1.order.by = tid

tier1.sources.s-1.custom.query.prefix = select * from

tier1.sources.s-1.custom.query.suffix = where id > @ @ @ order by

id

asc

tier1.sources.s-1.batch.size = 100

tier1.sources.s-1.max.rows = 100

tier1.sources.s-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider

tier1.sources.s-1.hibernate.c3p0.min_size=5

tier1.sources.s-1.hibernate.c3p0.max_size=20

######### sinks 1

#sink k-1用到的通道,和source的通道要保持一緻,否則取不到資料

tier1.sinks.k-1.channel = c-1

tier1.sinks.k-1.type = org.apache.flume.sink.kafka.KafkaSink

tier1.sinks.k-1.topic = topic_kafkajdbc

tier1.sinks.k-1.kafka.bootstrap.servers = admin:9092,dn02:9092,nn01:9092

tier1.sinks.k-1.kafka.producer.acks = 1

#每批次處理的event數量

tier1.sinks.k-1.batchSize = 100

######### sinks 2

#sink k-2用到的通道,和source的通道要保持一緻,否則取不到資料

tier1.sinks.k-2.channel = c-1

tier1.sinks.k-2.type = org.apache.flume.sink.kafka.KafkaSink

tier1.sinks.k-2.kafka.topic = topic_kafkajdbc

tier1.sinks.k-2.kafka.bootstrap.servers = admin:9092,dn02:9092,nn01:9092

tier1.sinks.k-2.kafka.producer.acks = 1

tier1.sinks.k-2.batchSize = 100