天天看點

Confluent介紹(二)--confluent platform quickstart

下載下傳

http://www.confluent.io/download,打開後,顯示最新版本3.0.0,然後在右邊填寫資訊後,點選Download下載下傳。

Confluent介紹(二)--confluent platform quickstart

之後跳轉到下載下傳頁面,選擇zip 或者 tar都行, 下載下傳完成後上傳linux系統,解壓即完成安裝。

  • zip and tar archives – 推薦OS X 和 Quickstart
  • deb packages via apt – 推薦安裝服務在 Debian/Ubuntu系統
  • rpm packages via yum – 推薦安裝服務在 RHEL/CentOS/Fedora系統
  • deb/rpm packages with installer script

Confluent 目前還不支援Windows系統。Windows使用者可以下載下傳和使用zip 和 tar包,但最好直接運作jar檔案 ,而不是使用包裝腳本。

Requirements

唯一需要的條件是java 版本>=1.7。

Confluent Platform Quickstart

你可以快速的運作Confluent platform在單台伺服器上。在這篇quickstart,我們将介紹如何運作ZooKeeper,Kafka,和Schema Registry,然後如何讀和寫一些Avro資料從/到Kafka。

(如果你想跑一個資料管道用Kafka Connect和Control Center,參考The Control Center QuickStart Guide.)我們随後也會介紹。

1.下載下傳和安裝Confluent platform。在這篇quickstart 我們使用zip包,也有很多其他安裝方式,見上。

$ wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip
$ unzip confluent-3.0.0-2.11.zip
$ cd confluent-3.0.0      

 下邊展示的是安裝目錄裡上層層級結構:

confluent-3.0.0/bin/        # Driver scripts for starting/stopping services
confluent-3.0.0/etc/        # Configuration files
confluent-3.0.0/share/java/ # Jars      

如果你通過deb或者rpm安裝,目錄結構如下:

/usr/bin/                  # Driver scripts for starting/stopping services, prefixed with <package> names
/etc/<package>/            # Configuration files
/usr/share/java/<package>/ # Jars      

2.啟動Zookeeper。因為這是長期運作的服務,你應該運作它在一個獨立的終端(或者在後邊運作它,重定向輸出到一個檔案中)。你需要有寫權限到/var/lib在這一步以及之後的步驟裡:

# The following commands assume you exactly followed the instructions above.
# This means, for example, that at this point your current working directory
# must be confluent-3.0.0/.
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties      

3.啟動Kafka,同樣在一個獨立的終端。

$ ./bin/kafka-server-start ./etc/kafka/server.properties      

4.啟動Schema Registry,同樣在一個獨立的終端。

$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties      

5.現在所有需要的服務都已啟動,我們發送一些Avro資料到Kafka的topic中。雖然這一步一般會得到一些資料從一些應用裡,這裡我們使用Kafka提供的例子,不用寫代碼。我們在本地的Kafka叢集裡,寫資料到topic “test”裡,讀取每一行Avro資訊,校驗Schema Registry .

$ ./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic test \
         --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'      

一旦啟動,程序等待你輸入一些資訊,一條一行,會發送到topic中一旦按下enter鍵。試着輸入一些資訊:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}      

輸入完成後,可以使用Ctrl+C來終止程序。

Note:如果一個空行你按下Enter鍵,會被解釋為一個null值,引起錯誤。然後僅僅需要做的是啟動producer程序,接着輸入資訊。

6.現在我們可以檢查,通過Kafka consumer控制台讀取資料從topic。在topic ‘test'中,Zookeeper執行個體,會告訴consumer解析資料使用相同的schema。最後從開始讀取資料(預設consumer隻讀取它啟動之後寫入到topic中的資料)

$ ./bin/kafka-avro-console-consumer --topic test \
         --zookeeper localhost:2181 \
         --from-beginning      

你會看到你之前在producer中輸入的資料,以同樣的格式。

consumer不會退出,它可以監聽寫入到topic中的新資料。保持consumer運作,然後重複第5步,輸入一些資訊,然後按下enter鍵,你會看到consumer會立即讀取到寫入到topic中的資料。

當你完成了測試,可以用Ctrl+C終止程序。

7.現在讓我們嘗試寫一些不相容的schema的資料到topic ’test‘中,我們重新運作producer指令,但是改變schema。

$ ./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic test \
         --property value.schema='{"type":"int"}'      

 現在輸入一個整數按下enter鍵,你會看到以下的異常:

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "int"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with the latest schema; error code: 409
       at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:146)
       at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.registerSchema(RestUtils.java:174)
       at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:51)
       at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89)
       at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:49)
       at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:155)
       at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:94)
       at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

      

 當producer試圖發送一些資訊,它會檢查schema用Schema Registry。當傳回錯誤時說明現在的schema無效,因為它不能相容之前設定的schema。控制台列印出錯誤資訊并退出,但是你自己的應用可以更加人性化處理這類問題。但最重要的是,我們保證不讓不相容的資料寫入到Kafka中。

8.當你完成這一系列測試,你可以使用ctrl+c來關閉服務,以啟動時相反的順序。

這一簡單的教程包含了Kafka和Schema Registry這一些核心的服務。你也可以參考以下document:

  • Confluent Control Center documentation
  • Kafka Streams documentation
  • Kafka Connect documentation
  • Schema Registry documentation
  • Kafka REST Proxy documentation
  • Camus documentation

本片博文為作者原創,轉載請注明出處,部分譯自confluent官網

一張網頁,要經曆怎樣的過程,才能抵達使用者面前?

一位新人,要經曆怎樣的曆練,才能站在技術之巅?