天天看點

flume mysql 增量,Flume實時采集MySQL增量資料

1 tier1.sources = sqlSource 2 tier1.channels = sqlChannel 3 tier1.sinks = kafkaSink 4 # 源配置

5 # 源,類型已定義

6 tier1.sources.sqlSource.type = org.keedio.flume.source.SQLSource

7 tier1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://132.35.231.131:3306/zhaoxj_db

8 #

9 tier1.sources.sqlSource.hibernate.connection.user = zhaoxj

10 tier1.sources.sqlSource.hibernate.connection.password = ***** ****

11 tier1.sources.sqlSource.hibernate.connection.autocommit = true

12 tier1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

13 tier1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc驅動程式

14 # agent.sources.sqlSource.table = employee1

15 # 要導入到的列kafka(預設*導入整行)

16 # agent.sources。 sqlSource.columns.to.select = *

17 # 查詢延遲,每個配置的毫秒數将發送查詢

18 tier1.sources.sqlSource.run.query.delay = 10000

19 # 源查詢狀态配置

20 tier1.sources.sqlSource.status.file.path =/var/log/水槽

21 tier1.sources.sqlSource.status.file.name = sqlSource.status

22 #

23 # 遞增起始值跨度>

24 tier1.sources.sqlSource.start.from = 0

25 # 這裡需要引号轉移字元\\\\轉義,$ @ $将替換為:1.當狀态檔案中的增量辨別符不存在時,即第一個查詢,替換為start.from指定的值; 2.當增量辨別符存在時,将其替換為狀态檔案中存儲的值

26 tier1.sources.sqlSource.custom.query = select * from test_line_feed其中t_id \\ u> \\\\" $ @ $ \\\\"

27 tier1.sources.sqlSource.batch.size = 1000

28 # 每個查詢資料1000 ,查詢語句将被自動縫合。限制為1000

29 tier1.sources.sqlSource.max.rows = 1000

30 # 字段分隔符跨度>

31 tier1.sources.sqlSource.delimiter.entry = |

32 # 連接配接池配置跨度>

33 tier1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider

34 tier1.sources.sqlSource.hibernate.c3p0.min_size = 1

35 tier1.sources.sqlSource.hibernate.c3p0.max_size = 10

36

37 # 頻道配置跨度>

38 # 通道可以是定義如下。

39 tier1.sources.sqlSource.channels = sqlChannel

40

41 tier1.channels.sqlChannel.type = 記憶體

42 tier1.channels.sqlChannel.capacity = 500

43

44 # 接收器配置跨度>

45 # Kafka使用的水槽下沉

46 tier1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

47 tier1.sinks.kafkaSink.flumeBatchSize = 640

48 tier1.sinks.kafkaSink.kafka.bootstrap.servers = 132.35.231.160:9092,132.35.231.161:9092,132.35.231.162:9092

49 tier1.sinks.kafkaSink.kafka.topic = zhaoxj_test

50

51 # 關聯配置跨度>

52 tier1.sources.sqlSource.channels = sqlChannel

53 tier1.sinks.kafkaSink.channel = sqlChannel