天天看点

使用flume将csv文件写入到Kafka中

源数据文件:https://pan.baidu.com/s/1UiM8qmYY8MFKJaSLwIlPqQ

提取码:apk6

1.在flume的conf目录下创建jobkb09目录:

mkdir /opt/flume160/conf/jobkb09

2.进入jobkb09目录,在其中创建tmp目录,并将源数据文件均放入其中

3.创建Kafka topic:

events :

event_attendees:

train:

user_friends:

users:

4.编写对应需求的agent配置文件:

train-flume-kafka.conf:

train.sources=trainSource
train.channels=trainChannel
train.sinks=trainSink

train.sources.trainSource.type=spooldir
train.sources.trainSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/train
train.sources.trainSource.deserializer=LINE
train.sources.trainSource.deserializer.maxLineLength=320000
train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
train.sources.trainSource.interceptors=head_filter
train.sources.trainSource.interceptors.head_filter.type=regex_filter
train.sources.trainSource.interceptors.head_filter.regex=^user*
train.sources.trainSource.interceptors.head_filter.excludeEvents=true

train.channels.trainChannel.type=file
train.channels.trainChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/train
train.channels.trainChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/train

train.sinks.trainSink.type=org.apache.flume.sink.kafka.KafkaSink
train.sinks.trainSink.batchSize=640
train.sinks.trainSink.brokerList=192.168.134.104:9092
train.sinks.trainSink.topic=train

train.sources.trainSource.channels=trainChannel
train.sinks.trainSink.channel=trainChannel

           

eventsattend-flume-kafka.conf:

eventattend.sources=eventAttendSource
eventattend.channels=eventAttendChannel
eventattend.sinks=eventAttendSink

eventattend.sources.eventAttendSource.type=spooldir
eventattend.sources.eventAttendSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/eventAttend
eventattend.sources.eventAttendSource.includePattern=eventAttend_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
eventattend.sources.eventAttendSource.deserializer=LINE
eventattend.sources.eventAttendSource.deserializer.maxLineLength=320000
eventattend.sources.eventAttendSource.interceptors=head_filter
eventattend.sources.eventAttendSource.interceptors.head_filter.type=regex_filter
eventattend.sources.eventAttendSource.interceptors.head_filter.regex=^event*
eventattend.sources.eventAttendSource.interceptors.head_filter.excludeEvents=true

eventattend.channels.eventAttendChannel.type=file
eventattend.channels.eventAttendChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/eventAttend
eventattend.channels.eventAttendChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/eventAttend

eventattend.sinks.eventAttendSink.type=org.apache.flume.sink.kafka.KafkaSink
eventattend.sinks.eventAttendSink.batchSize=640
eventattend.sinks.eventAttendSink.brokerList=192.168.134.104:9092
eventattend.sinks.eventAttendSink.topic=event_attendees


eventattend.sources.eventAttendSource.channels=eventAttendChannel
eventattend.sinks.eventAttendSink.channel=eventAttendChannel

           

events-flume-kafka.conf:

events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSink

events.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=320000
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
events.sources.eventsSource.interceptors=head_filter
events.sources.eventsSource.interceptors.head_filter.type=regex_filter
events.sources.eventsSource.interceptors.head_filter.regex=^event_id*
events.sources.eventsSource.interceptors.head_filter.excludeEvents=true

events.channels.eventsChannel.type=file
events.channels.eventsChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/events
events.channels.eventsChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/events

events.sinks.eventsSink.type=org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink.batchSize=640
events.sinks.eventsSink.brokerList=192.168.134.104:9092
events.sinks.eventsSink.topic=events

events.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel

           

user-flume-kafka.conf:

users.sources=userSource
users.channels=userChannel
users.sinks=userSink


users.sources.userSource.type=spooldir
users.sources.userSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/user
users.sources.userSource.includePattern=users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.sources.userSource.deserializer=LINE
users.sources.userSource.deserializer.maxLineLength=10000
users.sources.userSource.interceptors=head_filter
users.sources.userSource.interceptors.head_filter.type=regex_filter
users.sources.userSource.interceptors.head_filter.regex=^user_id*
users.sources.userSource.interceptors.head_filter.excludeEvents=true


users.channels.userChannel.type=file
users.channels.userChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/user
users.channels.userChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/user


users.sinks.userSink.type=hdfs
users.sinks.userSink.hdfs.fileType=DataStream
users.sinks.userSink.hdfs.filePrefix=users
users.sinks.userSink.hdfs.fileSuffix=.csv
users.sinks.userSink.hdfs.path=hdfs://192.168.134.104:9000/kb09file/user/users/%Y-%m-%d
users.sinks.userSink.hdfs.useLocalTimeStamp=true
users.sinks.userSink.hdfs.batchSize=640
users.sinks.userSink.hdfs.rollCount=0
users.sinks.userSink.hdfs.rollSize=120000000
users.sinks.userSink.hdfs.rollInterval=20


users.sources.userSource.channels=userChannel
users.sinks.userSink.channel=userChannel

           

userFriend-flume-kafka.conf:

user_friend.sources=userFriendSource
user_friend.channels=userFriendChannel
user_friend.sinks=userFriendSink


user_friend.sources.userFriendSource.type=spooldir
user_friend.sources.userFriendSource.spoolDir=/opt/flume160/conf/jobkb09/dataSourceFile/userFriend
user_friend.sources.userFriendSource.deserializer=LINE
user_friend.sources.userFriendSource.deserializer.maxLineLength=320000
user_friend.sources.userFriendSource.includePattern=userFriend_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
user_friend.sources.userFriendSource.interceptors=head_filter
user_friend.sources.userFriendSource.interceptors.head_filter.type=regex_filter
user_friend.sources.userFriendSource.interceptors.head_filter.regex=^user,friends*
user_friend.sources.userFriendSource.interceptors.head_filter.excludeEvents=true


user_friend.channels.userFriendChannel.type=file
user_friend.channels.userFriendChannel.checkpointDir=/opt/flume160/conf/jobkb09/checkPointFile/userFriend
user_friend.channels.userFriendChannel.dataDirs=/opt/flume160/conf/jobkb09/dataChannelFile/userFriend


user_friend.sinks.userFriendSink.type=org.apache.flume.sink.kafka.KafkaSink
user_friend.sinks.userFriendSink.batchSize=640
user_friend.sinks.userFriendSink.brokerList=192.168.134.104:9092
user_friend.sinks.userFriendSink.topic=user_friends


user_friend.sources.userFriendSource.channels=userFriendChannel
user_friend.sinks.userFriendSink.channel=userFriendChannel

           

5.启动flume agent,将csv文件传输到各自对应的kafka topic中

在flume160目录下:

以user_friends topic为例:

启动flume agent:

flume-ng agent --name user_friend --conf ./conf/ --conf-file ./conf/jobkb09/userFriend-flume-kafka.conf -Dflume.root.logger=INFO,console

在/opt/flumem160/conf/jobkb09/tmp目录下将其目录下的user_friends.csv文件复制到/jobkb09/dataSourceFile/userFriend/userFriend_xxxx-xx-xx.csv中

cp ./user_friends.csv /opt/flume160/conf/jobkb09/dataSourceFile/userFriend/userFriend_2020-12-08.csv

对其他topic,修改相对应的csv文件名和路径名以及topic名即可

6.查看偏移量(数据量)

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.134.104:9092 --topic user_friends -time -1 --offsets 1