Kafaka connect 是一種用于在Kafka和其他系統之間可擴充的、可靠的流式傳輸資料的工具。它使得能夠快速定義将大量資料集合移入和移出Kafka的連接配接器變得簡單。Kafka Connect可以從資料庫或應用程式伺服器收集資料到Kafka topic,使資料可用于低延遲的流處理。導出作業可以将資料從Kafka topic傳輸到二次存儲和查詢系統,或者傳遞到批處理系統以進行離線分析。
Kafaka connect的核心元件:
Source:負責将外部資料寫入到kafka的topic中。
Sink:負責從kafka中讀取資料到自己需要的地方去,比如讀取到HDFS,hbase等。

Connectors :通過管理任務來協調資料流的進階抽象
Tasks:資料寫入kafk和從kafka中讀出資料的具體實作,source和sink使用時都需要Task
Workers:運作connectors和tasks的程序
Converters:kafka connect和其他存儲系統直接發送或者接受資料之間轉換資料,
converter會把bytes資料轉換成kafka connect内部的格式,也可以把kafka connect内部存儲格式的資料轉變成bytes,converter對connector來說是解耦的,是以其他的connector都可以重用,例如,使用了avro converter,那麼jdbc connector可以寫avro格式的資料到kafka,當然,hdfs connector也可以從kafka中讀出avro格式的資料。
Transforms:一種輕量級資料調整的工具
Kafka connect 工作模式:
Kafka connect 有兩種工作模式:
standalone:在standalone模式中,所有的worker都在一個獨立的程序中完成。
distributed:distributed模式具有高擴充性,以及提供自動容錯機制。你可以使用一個group.ip來啟動很多worker程序,在有效的worker程序中它們會自動的去協調執行connector和task,如果你新加了一個worker或者挂了一個worker,其他的worker會檢測到然後在重新配置設定connector和task。
本文作者:張永清,轉載請注明出處:https://www.cnblogs.com/laoqing/p/11927958.html
在分布式模式下通過rest api來管理connector。
connector的常見管理操作API:
如何開發自己的Connector:
1、引入maven依賴。
2、開發自定義的Source
開發自定義的Source 需要繼承實作SourceConnector和SourceTask這兩個抽象類,實作抽象類中的未實作的方法或者重寫抽象類中的方法。
A、開發自定義的SourceConnector
B、開發Source對應的Task
3、開發自定義的Sink
開發自定義的Sink 需要繼承實作SinkConnector和SinkTask這兩個抽象類,實作抽象類中的未實作的方法或者重寫抽象類中的方法。
A、開發自定義的SinkConnector
B、開發Sink對應的Task
開源的實作的比較好的connector項目:
https://github.com/debezium/debezium
https://github.com/confluentinc
https://docs.confluent.io/current/connect/managing/connectors.html
這裡我們以https://github.com/debezium/debezium 中的debezium-connector-mongodb 為例配置connector的standalone模式運作
從github中擷取debezium-connector-mongodb-0.9.5.Final.jar 包,放到kafka的libs目錄下,并且把mongodb相關的jar包一起放入到libs下。
在config目錄下建立對應的mongodb.properties 屬性配置檔案
配置解釋如下:
詳情參考:https://debezium.io/documentation/reference/0.10/connectors/mongodb.html
https://docs.confluent.io/current/connect/debezium-connect-mongodb/mongodb_source_connector_config.html
<col>
Property
Default
Description
<code>name</code>
Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)
<code>connector.class</code>
The name of the Java class for the connector. Always use a value of <code>io.debezium.connector.mongodb.MongoDbConnector</code> for the MongoDB connector.
<code>mongodb.hosts</code>
The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list can contain a single hostname and port pair. If <code>mongodb.members.auto.discover</code> is set to <code>false</code>, then the host and port pair should be prefixed with the replica set name (e.g., <code>rs0/localhost:27017</code>).
<code>mongodb.name</code>
A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.
<code>mongodb.user</code>
Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
<code>mongodb.password</code>
Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
<code>mongodb.ssl.enabled</code>
<code>false</code>
Connector will use SSL to connect to MongoDB instances.
<code>mongodb.ssl.invalid.hostname.allowed</code>
When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If <code>true</code> the connection will not prevent man-in-the-middle attacks.
<code>database.whitelist</code>
empty string
An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored. May not be used with <code>database.blacklist</code>.
<code>database.blacklist</code>
An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in the blacklist will be monitored. May not be used with <code>database.whitelist</code>.
<code>collection.whitelist</code>
An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in the whitelist will be excluded from monitoring. Each identifier is of the form databaseName.collectionName. By default the connector will monitor all collections except those in the <code>local</code> and <code>admin</code> databases. May not be used with <code>collection.blacklist</code>.
<code>collection.blacklist</code>
An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in the blacklist will be monitored. Each identifier is of the form databaseName.collectionName. May not be used with <code>collection.whitelist</code>.
<code>snapshot.mode</code>
<code>initial</code>
Specifies the criteria for running a snapshot (eg. initial sync) upon startup of the connector. The default is initial, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. The never option specifies that the connector should never use snapshots, instead the connector should proceed to tail the log.
<code>field.blacklist</code>
An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName.nestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters.
<code>field.renames</code>
An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path.
<code>tasks.max</code>
<code>1</code>
The maximum number of tasks that should be created for this connector. The MongoDB connector will attempt to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, we recommend specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.
<code>initial.sync.max.threads</code>
Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1.
<code>tombstones.on.delete</code>
<code>true</code>
Controls whether a tombstone event should be generated after a delete event.
When <code>true</code> the delete operations are represented by a delete event and a subsequent tombstone event. When <code>false</code> only a delete event is sent.
Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.
<code>snapshot.delay.ms</code>
An interval in milli-seconds that the connector should wait before taking a snapshot after starting up;
Can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which may cause re-balancing of connectors.
<code>snapshot.fetch.size</code>
<code>0</code>
Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in multiple batches of this size.
Defaults to 0, which indicates that the server chooses an appropriate fetch size.
The following advanced configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector’s configuration.
<code>max.queue.size</code>
<code>8192</code>
Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than the maximum batch size specified in the <code>max.batch.size</code> property.
<code>max.batch.size</code>
<code>2048</code>
Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.
<code>poll.interval.ms</code>
<code>1000</code>
Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second.
<code>connect.backoff.initial.delay.ms</code>
Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1 second (1000 ms).
<code>connect.backoff.max.delay.ms</code>
Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120 seconds (120,000 ms).
<code>connect.max.attempts</code>
<code>16</code>
Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for <code>connect.backoff.initial.delay.ms</code> and <code>connect.backoff.max.delay.ms</code>results in just over 20 minutes of attempts before failing.
<code>mongodb.members.auto.discover</code>
Boolean value that specifies whether the addresses in 'mongodb.hosts' are seeds that should be used to discover all members of the cluster or replica set (<code>true</code>), or whether the address(es) in <code>mongodb.hosts</code> should be used as is (<code>false</code>). The default is <code>true</code> and should be used in all cases except where MongoDB is fronted by a proxy.
<code>source.struct.version</code>
v2
Schema version for the <code>source</code> block in CDC events; Debezium 0.10 introduced a few breaking
changes to the structure of the <code>source</code> block in order to unify the exposed structure across all the connectors.
By setting this option to <code>v1</code> the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.
<code>heartbeat.interval.ms</code>
Controls how frequently heartbeat messages are sent.
This property contains an interval in milli-seconds that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog from the database but never emit any change messages into Kafka, which in turn means that no offset updates will be committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.
Set this parameter to <code>0</code> to not send heartbeat messages at all.
Disabled by default.
<code>heartbeat.topics.prefix</code>
<code>__debezium-heartbeat</code>
Controls the naming of the topic to which heartbeat messages are sent.
The topic is named according to the pattern <code><heartbeat.topics.prefix>.<server.name></code>.
<code>sanitize.field.names</code>
<code>true</code> when connector configuration explicitly specifies the <code>key.converter</code> or <code>value.converter</code>parameters to use Avro, otherwise defaults to <code>false</code>.
Whether field names will be sanitized to adhere to Avro naming requirements. See Avro namingfor more details.
這裡以standalone的模式運作,在connect-standalone.properties中做如下配置:
standalone模式下啟動方式如下:
bin/connect-standalone.sh config/connect-standalone.properties connector1.properties[connector2.properties ...] 一次可以啟動多個connector,隻需要在參數中加上connector的配置檔案路徑即可。
例如:connect-standalone.sh config/connect-standalone.properties mongodb.properties
distribute模式部署:
1、修改配置connect-distributed.properties
2、手動建立叢集模式所必須的kafka的幾個topic
config.storage.topic:topic用于存儲connector和任務配置;注意,這應該是一個單個的partition,多副本的topic
offset.storage.topic:用于存儲offsets;這個topic應該配置多個partition和副本。
status.storage.topic:用于存儲狀态;這個topic 可以有多個partitions和副本
3、 啟動worker
啟動distributed模式指令如下:
<code>./bin/connect-distributed ./etc/kafka/connect-distributed.properties </code>
常見問題:
1、在啟動的過程中出現各種各樣的java.lang.ClassNotFoundException。
在啟動connector的時候,一開始總是會報各個各樣的ClassNotFoundException,不是這個包就是那個包,查找問題一直說要麼缺少包要麼是包沖突,那麼要排除依賴沖突或者看下是不是少了jar包。
2、在connector.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的問題。
如果想發送普通的json格式而不是avro格式的話,很簡單key.converter.schemas.enable和value.converter.schemas.enable設定為false就行。這樣就能發送普通的json格式資料。
作者的原創文章,轉載須注明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對于轉載了部落客的原創文章,不标注出處的,作者将依法追究版權,請尊重作者的成果。