天天看点

flume mysql 增量,Flume实时采集MySQL增量数据

1 tier1.sources = sqlSource 2 tier1.channels = sqlChannel 3 tier1.sinks = kafkaSink 4 # 源配置

5 # 源,类型已定义

6 tier1.sources.sqlSource.type = org.keedio.flume.source.SQLSource

7 tier1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://132.35.231.131:3306/zhaoxj_db

8 #

9 tier1.sources.sqlSource.hibernate.connection.user = zhaoxj

10 tier1.sources.sqlSource.hibernate.connection.password = ***** ****

11 tier1.sources.sqlSource.hibernate.connection.autocommit = true

12 tier1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

13 tier1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc驱动程序

14 # agent.sources.sqlSource.table = employee1

15 # 要导入到的列kafka(默认*导入整行)

16 # agent.sources。 sqlSource.columns.to.select = *

17 # 查询延迟,每个配置的毫秒数将发送查询

18 tier1.sources.sqlSource.run.query.delay = 10000

19 # 源查询状态配置

20 tier1.sources.sqlSource.status.file.path =/var/log/水槽

21 tier1.sources.sqlSource.status.file.name = sqlSource.status

22 #

23 # 递增起始值跨度>

24 tier1.sources.sqlSource.start.from = 0

25 # 这里需要引号转移字符\\\\转义,$ @ $将替换为:1.当状态文件中的增量标识符不存在时,即第一个查询,替换为start.from指定的值; 2.当增量标识符存在时,将其替换为状态文件中存储的值

26 tier1.sources.sqlSource.custom.query = select * from test_line_feed其中t_id \\ u> \\\\" $ @ $ \\\\"

27 tier1.sources.sqlSource.batch.size = 1000

28 # 每个查询数据1000 ,查询语句将被自动缝合。限制为1000

29 tier1.sources.sqlSource.max.rows = 1000

30 # 字段分隔符跨度>

31 tier1.sources.sqlSource.delimiter.entry = |

32 # 连接池配置跨度>

33 tier1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider

34 tier1.sources.sqlSource.hibernate.c3p0.min_size = 1

35 tier1.sources.sqlSource.hibernate.c3p0.max_size = 10

36

37 # 频道配置跨度>

38 # 通道可以是定义如下。

39 tier1.sources.sqlSource.channels = sqlChannel

40

41 tier1.channels.sqlChannel.type = 内存

42 tier1.channels.sqlChannel.capacity = 500

43

44 # 接收器配置跨度>

45 # Kafka使用的水槽下沉

46 tier1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

47 tier1.sinks.kafkaSink.flumeBatchSize = 640

48 tier1.sinks.kafkaSink.kafka.bootstrap.servers = 132.35.231.160:9092,132.35.231.161:9092,132.35.231.162:9092

49 tier1.sinks.kafkaSink.kafka.topic = zhaoxj_test

50

51 # 关联配置跨度>

52 tier1.sources.sqlSource.channels = sqlChannel

53 tier1.sinks.kafkaSink.channel = sqlChannel