上一篇文章寫到flume實時抽取mysql資料到kafka和hdfs,但是之前沒有考慮到在hdfs是在HA模式下的情況,如果在HA模式下,我們指定了寫入位址為: hdfs://cdh2:8020/flume/oracle/topic/test_%Y%m%d,當cdh2是Active狀态下是沒有問題的,但是當cdh2變成Standby狀态後,則資料無法正常寫入;為了解決這種問題,我們在flume寫入到hdfs時,位址寫hdfs的nameservices;實作方式如下:
一,把hadoop的配置檔案hdfs-site.xml,和core-site.xml複制到flume的conf目錄下;
由于我的叢集是CDH管控的,是以hadoop的配置檔案,是存儲在/etc/hadoop/conf/目錄下的,不在hadoop的安裝目錄下,我複制如下:
cp /etc/hadoop/conf/hdfs-site.xml /opt/cloudera/parcels/CDH/lib/flume-ng/conf/
cp /etc/hadoop/conf/core-site.xml /opt/cloudera/parcels/CDH/lib/flume-ng/conf/
注:如果flume沒有安裝在hadoop所在的叢集内,記得把上述兩個配置檔案移動到flume安裝主機所在的conf目錄下
二,配置flume的相關配置檔案
a1.channels = ch-1
a1.sources = src-1
a1.sinks = HDFS
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://10.1.40.104:3306/ibrain
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = root
#這個參數很重要,預設false,如果設為false就不會自動查詢
a1.sources.src-1.hibernate.connection.autocommit = true
#聲明mysql的hibernate方言
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
#聲明mysql驅動
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
#查詢間隔,機關毫秒
a1.sources.src-1.run.query.delay=5000
a1.sources.src-1.hibernate.columns.to.select = *
#表裡面的某個字段,用來判斷增量
a1.sources.src-1.hibernate.incremental.column.name =submit_time
#聲明儲存flume狀态的檔案夾位置
a1.sources.src-1.status.file.path = /opt/flume-test/
#聲明儲存flume狀态的檔案位置
a1.sources.src-1.status.file.name = sqlT.status
# Custom query
#第一次啟動時的初始值,第二次啟動時則去讀取狀态檔案中的值
a1.sources.src-1.start.from = 2012-07-28 00:00:0.0000000
#sql語句自定義,但是要注意:增量隻能針對查詢字段的第一個字段,如下面的SUBMIT_TIME,經測試系統預設如此.
#$@$表示增量列上一次查詢的值,記錄在status檔案中
#查詢sql不能加";",不然會報錯
a1.sources.src-1.custom.query = select SUBMIT_TIME,ID,ENTRANCE_GUARD_ID,ENTRANCE_GUARD_TYPE,ENTRANCE_GUARD_STATUS,ID_CARD,NAME,EXAM_SITE_ID,FACE_IDENTIFY_RESULT,FACE_IDENTIFY_MESSAGE,FACE_BASE64,ENTRY_TYPE from T_ZCKJ_MJ_MJSJ where SUBMIT_TIME > to_timestamp('$@$','yyyy-mm-dd hh24:mi:ss.ff6')
#設定分批參數
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
#設定c3p0連接配接池參數
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
################################################################
#設定通道為記憶體模式
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 1000
a1.channels.ch-1.transactionCapacity = 1000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 68435456
a1.channels.ch-1.keep-alive = 60
a1.channels.ch-1.capacity = 1000000
#配置hdfs sink 資料持久化備份
a1.sinks.HDFS.type = hdfs
#
a1.sinks.HDFS.hdfs.path = hdfs://nameserviceBackup/flume/oracle/test_topic1/test_%Y%m%d
a1.sinks.HDFS.hdfs.filePrefix=prefix_%Y%m%d
a1.sinks.HDFS.hdfs.fileType = DataStream
a1.sinks.HDFS.hdfs.writeFormat = Text
#設定檔案存儲資料多大的時候生成下一個檔案,建議設定成128M和塊大小相同
a1.sinks.HDFS.hdfs.rollSize = 268435456
#設定滾動時間,每隔多少時間生成一個檔案.如果設定成0,則禁止滾動,可以使所有資料被寫到一個檔案中.機關是s
a1.sinks.HDFS.hdfs.rollInterval = 3600
#設定檔案多少行時,滾動生成下一個檔案,設定成0時禁止滾動
a1.sinks.HDFS.hdfs.rollCount = 0
#綁定sources sinks
a1.sources.src-1.channels=ch-1
a1.sinks.HDFS.channel = ch-1
注:由于我的hdfs的nameservices為nameserviceBackup
三,啟動程式
bin/flume-ng agent \
-c conf \
-n a1 \ #--配置檔案中agent的名稱
-f /opt/flume.conf \
-Dflume.root.logger=DEBUG,console
注:由于我上述的配置檔案放在的目錄為:/opt/flume.conf