天天看點

flume實時寫資料到HA模式下的hdfs

上一篇文章寫到flume實時抽取mysql資料到kafka和hdfs,但是之前沒有考慮到在hdfs是在HA模式下的情況,如果在HA模式下,我們指定了寫入位址為: hdfs://cdh2:8020/flume/oracle/topic/test_%Y%m%d,當cdh2是Active狀态下是沒有問題的,但是當cdh2變成Standby狀态後,則資料無法正常寫入;為了解決這種問題,我們在flume寫入到hdfs時,位址寫hdfs的nameservices;實作方式如下:

一,把hadoop的配置檔案hdfs-site.xml,和core-site.xml複制到flume的conf目錄下;

由于我的叢集是CDH管控的,是以hadoop的配置檔案,是存儲在/etc/hadoop/conf/目錄下的,不在hadoop的安裝目錄下,我複制如下:

cp /etc/hadoop/conf/hdfs-site.xml /opt/cloudera/parcels/CDH/lib/flume-ng/conf/

cp /etc/hadoop/conf/core-site.xml /opt/cloudera/parcels/CDH/lib/flume-ng/conf/

注:如果flume沒有安裝在hadoop所在的叢集内,記得把上述兩個配置檔案移動到flume安裝主機所在的conf目錄下

二,配置flume的相關配置檔案

a1.channels = ch-1

a1.sources = src-1

a1.sinks = HDFS

###########sql source#################

# For each one of the sources, the type is defined

a1.sources.src-1.type = org.keedio.flume.source.SQLSource

a1.sources.src-1.hibernate.connection.url = jdbc:mysql://10.1.40.104:3306/ibrain

# Hibernate Database connection properties

a1.sources.src-1.hibernate.connection.user = root

a1.sources.src-1.hibernate.connection.password = root

#這個參數很重要,預設false,如果設為false就不會自動查詢

a1.sources.src-1.hibernate.connection.autocommit = true

#聲明mysql的hibernate方言

a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

#聲明mysql驅動

a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver

#查詢間隔,機關毫秒

a1.sources.src-1.run.query.delay=5000

a1.sources.src-1.hibernate.columns.to.select = *

#表裡面的某個字段,用來判斷增量

a1.sources.src-1.hibernate.incremental.column.name =submit_time

#聲明儲存flume狀态的檔案夾位置

a1.sources.src-1.status.file.path = /opt/flume-test/

#聲明儲存flume狀态的檔案位置

a1.sources.src-1.status.file.name = sqlT.status

# Custom query

#第一次啟動時的初始值,第二次啟動時則去讀取狀态檔案中的值

a1.sources.src-1.start.from = 2012-07-28 00:00:0.0000000

#sql語句自定義,但是要注意:增量隻能針對查詢字段的第一個字段,如下面的SUBMIT_TIME,經測試系統預設如此.

#$@$表示增量列上一次查詢的值,記錄在status檔案中

#查詢sql不能加";",不然會報錯

a1.sources.src-1.custom.query = select SUBMIT_TIME,ID,ENTRANCE_GUARD_ID,ENTRANCE_GUARD_TYPE,ENTRANCE_GUARD_STATUS,ID_CARD,NAME,EXAM_SITE_ID,FACE_IDENTIFY_RESULT,FACE_IDENTIFY_MESSAGE,FACE_BASE64,ENTRY_TYPE from T_ZCKJ_MJ_MJSJ  where  SUBMIT_TIME > to_timestamp('$@$','yyyy-mm-dd hh24:mi:ss.ff6')

#設定分批參數

a1.sources.src-1.batch.size = 1000

a1.sources.src-1.max.rows = 1000

#設定c3p0連接配接池參數

a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider

a1.sources.src-1.hibernate.c3p0.min_size=1

a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################

#設定通道為記憶體模式

a1.channels.ch-1.type = memory

a1.channels.ch-1.capacity = 1000

a1.channels.ch-1.transactionCapacity = 1000

a1.channels.ch-1.byteCapacityBufferPercentage = 20

a1.channels.ch-1.byteCapacity = 68435456

a1.channels.ch-1.keep-alive = 60

a1.channels.ch-1.capacity = 1000000

#配置hdfs sink    資料持久化備份

a1.sinks.HDFS.type = hdfs

#

a1.sinks.HDFS.hdfs.path = hdfs://nameserviceBackup/flume/oracle/test_topic1/test_%Y%m%d

a1.sinks.HDFS.hdfs.filePrefix=prefix_%Y%m%d

a1.sinks.HDFS.hdfs.fileType = DataStream

a1.sinks.HDFS.hdfs.writeFormat = Text

#設定檔案存儲資料多大的時候生成下一個檔案,建議設定成128M和塊大小相同

a1.sinks.HDFS.hdfs.rollSize = 268435456

#設定滾動時間,每隔多少時間生成一個檔案.如果設定成0,則禁止滾動,可以使所有資料被寫到一個檔案中.機關是s

a1.sinks.HDFS.hdfs.rollInterval = 3600

#設定檔案多少行時,滾動生成下一個檔案,設定成0時禁止滾動

a1.sinks.HDFS.hdfs.rollCount = 0

#綁定sources sinks

a1.sources.src-1.channels=ch-1

a1.sinks.HDFS.channel = ch-1

注:由于我的hdfs的nameservices為nameserviceBackup

三,啟動程式

bin/flume-ng agent \

-c conf \

-n a1 \           #--配置檔案中agent的名稱

-f /opt/flume.conf \

-Dflume.root.logger=DEBUG,console

注:由于我上述的配置檔案放在的目錄為:/opt/flume.conf

繼續閱讀