源資料檔案:https://pan.baidu.com/s/1UiM8qmYY8MFKJaSLwIlPqQ
提取碼:apk6
1.在flume的conf目錄下建立jobkb09目錄:
mkdir /opt/flume160/conf/jobkb09
2.進入jobkb09目錄,在其中建立tmp目錄,并将源資料檔案均放入其中
3.建立Kafka topic:
events :
event_attendees:
train:
user_friends:
users:
4.編寫對應需求的agent配置檔案:
train-flume-kafka.conf:
train.sources=trainSource
train.channels=trainChannel
train.sinks=trainSink
train.sources.trainSource.type=spooldir
train.sources.trainSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/train
train.sources.trainSource.deserializer=LINE
train.sources.trainSource.deserializer.maxLineLength=320000
train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
train.sources.trainSource.interceptors=head_filter
train.sources.trainSource.interceptors.head_filter.type=regex_filter
train.sources.trainSource.interceptors.head_filter.regex=^user*
train.sources.trainSource.interceptors.head_filter.excludeEvents=true
train.channels.trainChannel.type=file
train.channels.trainChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/train
train.channels.trainChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/train
train.sinks.trainSink.type=org.apache.flume.sink.kafka.KafkaSink
train.sinks.trainSink.batchSize=640
train.sinks.trainSink.brokerList=192.168.134.104:9092
train.sinks.trainSink.topic=train
train.sources.trainSource.channels=trainChannel
train.sinks.trainSink.channel=trainChannel
eventsattend-flume-kafka.conf:
eventattend.sources=eventAttendSource
eventattend.channels=eventAttendChannel
eventattend.sinks=eventAttendSink
eventattend.sources.eventAttendSource.type=spooldir
eventattend.sources.eventAttendSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/eventAttend
eventattend.sources.eventAttendSource.includePattern=eventAttend_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
eventattend.sources.eventAttendSource.deserializer=LINE
eventattend.sources.eventAttendSource.deserializer.maxLineLength=320000
eventattend.sources.eventAttendSource.interceptors=head_filter
eventattend.sources.eventAttendSource.interceptors.head_filter.type=regex_filter
eventattend.sources.eventAttendSource.interceptors.head_filter.regex=^event*
eventattend.sources.eventAttendSource.interceptors.head_filter.excludeEvents=true
eventattend.channels.eventAttendChannel.type=file
eventattend.channels.eventAttendChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/eventAttend
eventattend.channels.eventAttendChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/eventAttend
eventattend.sinks.eventAttendSink.type=org.apache.flume.sink.kafka.KafkaSink
eventattend.sinks.eventAttendSink.batchSize=640
eventattend.sinks.eventAttendSink.brokerList=192.168.134.104:9092
eventattend.sinks.eventAttendSink.topic=event_attendees
eventattend.sources.eventAttendSource.channels=eventAttendChannel
eventattend.sinks.eventAttendSink.channel=eventAttendChannel
events-flume-kafka.conf:
events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSink
events.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=320000
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
events.sources.eventsSource.interceptors=head_filter
events.sources.eventsSource.interceptors.head_filter.type=regex_filter
events.sources.eventsSource.interceptors.head_filter.regex=^event_id*
events.sources.eventsSource.interceptors.head_filter.excludeEvents=true
events.channels.eventsChannel.type=file
events.channels.eventsChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/events
events.channels.eventsChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/events
events.sinks.eventsSink.type=org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink.batchSize=640
events.sinks.eventsSink.brokerList=192.168.134.104:9092
events.sinks.eventsSink.topic=events
events.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel
user-flume-kafka.conf:
users.sources=userSource
users.channels=userChannel
users.sinks=userSink
users.sources.userSource.type=spooldir
users.sources.userSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/user
users.sources.userSource.includePattern=users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.sources.userSource.deserializer=LINE
users.sources.userSource.deserializer.maxLineLength=10000
users.sources.userSource.interceptors=head_filter
users.sources.userSource.interceptors.head_filter.type=regex_filter
users.sources.userSource.interceptors.head_filter.regex=^user_id*
users.sources.userSource.interceptors.head_filter.excludeEvents=true
users.channels.userChannel.type=file
users.channels.userChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/user
users.channels.userChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/user
users.sinks.userSink.type=hdfs
users.sinks.userSink.hdfs.fileType=DataStream
users.sinks.userSink.hdfs.filePrefix=users
users.sinks.userSink.hdfs.fileSuffix=.csv
users.sinks.userSink.hdfs.path=hdfs://192.168.134.104:9000/kb09file/user/users/%Y-%m-%d
users.sinks.userSink.hdfs.useLocalTimeStamp=true
users.sinks.userSink.hdfs.batchSize=640
users.sinks.userSink.hdfs.rollCount=0
users.sinks.userSink.hdfs.rollSize=120000000
users.sinks.userSink.hdfs.rollInterval=20
users.sources.userSource.channels=userChannel
users.sinks.userSink.channel=userChannel
userFriend-flume-kafka.conf:
user_friend.sources=userFriendSource
user_friend.channels=userFriendChannel
user_friend.sinks=userFriendSink
user_friend.sources.userFriendSource.type=spooldir
user_friend.sources.userFriendSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/userFriend
user_friend.sources.userFriendSource.deserializer=LINE
user_friend.sources.userFriendSource.deserializer.maxLineLength=320000
user_friend.sources.userFriendSource.includePattern=userFriend_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
user_friend.sources.userFriendSource.interceptors=head_filter
user_friend.sources.userFriendSource.interceptors.head_filter.type=regex_filter
user_friend.sources.userFriendSource.interceptors.head_filter.regex=^user,friends*
user_friend.sources.userFriendSource.interceptors.head_filter.excludeEvents=true
user_friend.channels.userFriendChannel.type=file
user_friend.channels.userFriendChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/userFriend
user_friend.channels.userFriendChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/userFriend
user_friend.sinks.userFriendSink.type=org.apache.flume.sink.kafka.KafkaSink
user_friend.sinks.userFriendSink.batchSize=640
user_friend.sinks.userFriendSink.brokerList=192.168.134.104:9092
user_friend.sinks.userFriendSink.topic=user_friends
user_friend.sources.userFriendSource.channels=userFriendChannel
user_friend.sinks.userFriendSink.channel=userFriendChannel
5.啟動flume agent,将csv檔案傳輸到各自對應的kafka topic中
在flume160目錄下:
以user_friends topic為例:
啟動flume agent:
flume-ng agent --name user_friend --conf ./conf/ --conf-file ./conf/jobkb09/userFriend-flume-kafka.conf -Dflume.root.logger=INFO,console
在/opt/flumem160/conf/jobkb09/tmp目錄下将其目錄下的user_friends.csv檔案複制到/jobkb09/dataSourceFile/userFriend/userFriend_xxxx-xx-xx.csv中
cp ./user_friends.csv /opt/flume160/conf/jobkb09/dataSourceFile/userFriend/userFriend_2020-12-08.csv
對其他topic,修改相對應的csv檔案名和路徑名以及topic名即可
6.檢視偏移量(資料量)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.134.104:9092 --topic user_friends -time -1 --offsets 1