天天看點

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊
https://developer.aliyun.com/topic/download?id=1295 · 更多精彩内容,請下載下傳閱讀全本《Elastic Stack實戰手冊》 https://developer.aliyun.com/topic/download?id=1295 https://developer.aliyun.com/topic/es100 · 加入創作人行列,一起交流碰撞,參與技術圈年度盛事吧 https://developer.aliyun.com/topic/es100

創始人:劉曉國

你是否考慮分析和可視化地理資料? 為什麼不嘗試 Elastic Stack? 也就是所謂的 ELK(Elasticsearch + Logstash + Kibana)或 Elatic Stack 不僅是 NoSQL資料庫。 它是一個整體系統,可以實時存儲,搜尋,分析和可視化來自任何來源的資料。 在這種情況下,我們将使用有關華沙公共交通位置的開放資料。

本文中,我将介紹如何使用 Elastic Stack 和 Kafka 來監控公共交通的車輛。我們将使用 Docker 來部署所有需要的元件。下面是整個系統的架構圖:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

整個應用的架構如上:

  • 汽車或公交的資料上傳到一個資料平台。它提供 REST API 接口來被調用。
  • Python 應用定時從 data portal 進行抓取資料,并同時發送到 Kafka
  • Kafaka 的資料發送到 Logstash 進行加工,并導入到 Elasticsearch 中
  • 在 Kibana 中對資料進行呈現

安裝

Python

我們有一個應用是用 python 語言寫的。你需要安裝 python3 來運作該應用。

API key

為了測試這個應用,我們必須得到相應的華沙公共交通資訊的 API key。

我們可以在位址

https://api.um.warszawa.pl/#

進行申請。

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選上面的 “登入” 連結,并進行腦力測試:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

最終得到如上所示的 API key:86882ed9-4533-4630-b03b-47b3d68ae5e5。這個 key 将在一下的 python 應用中使用。

Elastic Stack 及 Kafka

你需要安裝 Docker 來實作 Elastic Stack 及 Kafka 的安裝。

本展示的所有的源碼可以在位址

https://github.com/liu-xiao-guo/wiadro-danych-kafka-to-es-ztm

進行下載下傳。

docker-compose 包含 Elasticsearch,Kibana,Zookeeper,Kafka,Logstash 和應用程式 Kafka Streams (由于一些原因,在本展示中将不被采用)。

docker-compose.yml

version: '3.3'
services:
    elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0
        restart: unless-stopped
        environment:
        - discovery.type=single-node
        - bootstrap.memory_lock=true
        - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ulimits:
            memlock:
                soft: -1
                hard: -1
        volumes:
        - esdata:/usr/share/elasticsearch/data
        ports:
        - 9200:9200
 
    kibana:
        image: docker.elastic.co/kibana/kibana:7.7.0
        restart: unless-stopped
        depends_on:
            - elasticsearch
        ports:
            - 5601:5601
        volumes:
            - kibanadata:/usr/share/kibana/data
 
    zookeeper:
        image: 'bitnami/zookeeper:3'
        ports:
            - '2181:2181'
        volumes:
            - 'zookeeper_data:/bitnami'
        environment:
            - ALLOW_ANONYMOUS_LOGIN=yes
            
    kafka:
        image: 'bitnami/kafka:2'
        ports:
            - '9092:9092'
            - '29092:29092'
        volumes:
            - 'kafka_data:/bitnami'
        environment:
            - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
            - ALLOW_PLAINTEXT_LISTENER=yes
            - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
            - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
        depends_on:
            - zookeeper
 
    ztm_kafka_streams:
        image: "maciejszymczyk/ztm_stream:1.0"
        environment:
          - APPLICATION_ID_CONFIG=awesome_overrided_ztm_stream_app_id
          - BOOTSTRAP_SERVERS_CONFIG=kafka:9092
        depends_on:
          - kafka
 
    logstash:
        image: docker.elastic.co/logstash/logstash:7.7.0
        volumes:
            - "./pipeline:/usr/share/logstash/pipeline"
        environment:
            LS_JAVA_OPTS: "-Xmx256m -Xms256m"
        depends_on:
            - elasticsearch
            - kafka
  
volumes:
    esdata:
        driver: local
    kibanadata:
        driver: local
    zookeeper_data:
        driver: local
    kafka_data:
        driver: local           

我們在自己電腦的 console 中打入如下的指令:

docker-compose up           

我們可以看到如下的畫面:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

從上面我們可以看出來 Logstash 已經被成功地啟動。

我們在浏覽器的位址欄中輸入位址

http://localhost:5601
在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

我們可以看到 Kibana 已經成功啟動,這也意味着 Elasticsearch 被成功地運作起來了。

配置及運作

Logstash

我們使用如下的 pipeline 來實作對資料的處理:

pipeline/kafka_to_es.conf

input {
   kafka {
        topics => "ztm-input"
        bootstrap_servers => "kafka:9092"
        codec => "json"
   }
}
 
filter {
    mutate {
        convert => {"Lat" => "float"}
        convert => {"Lon" => "float"}
 
        add_field => ["location", "%{Lat},%{Lon}"]
        remove_field => ["Lat", "Lon"]
    }
}
output {
    stdout {
        codec => rubydebug
    }
 
    elasticsearch {
        hosts => ["elasticsearch:9200"]
        index => "ztm"
   }
}           

它從 Kafaka 的 "ztm-input" topic 擷取資料,并把相應的 Lat 及 Lon 字段合并成為一個 location 字段。在 output 的部分,我們把資料導入到 Elasticsearch 之中。

Elasticsearch

我們使用了索引生命周期管理機制, 而不是将記錄放入諸如 ztm-2020.05.24 之類的索引中。 它使你可以自動執行索引的壽命。 它會自動進行彙總,并根據你配置政策的方式更改索引屬性(熱-熱-冷架構)。 假設我希望在索引達到1GB或30天過去後進行 rollover,我們在 Kibana 中執行如下的指令:

PUT _ilm/policy/ztm_policy
{
  "policy": {
    "phases": {
      "hot":{
        "actions": {
          "rollover": {
            "max_size": "1gb",
            "max_age": "30d"
          }
        }
      }
    }
  }
}           

你還需要一個模闆,該模闆具有 ztm_policy 将連接配接到的适當 mapping。 如果沒有 mapping,Elasticsearch 将不會猜測到 location 字段為 geo_point 的資料類型,并且時間字段将是純文字。

PUT _template/ztm_template
{
  "index_patterns": ["ztm-*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "index.lifecycle.name":"ztm_policy",
    "index.lifecycle.rollover_alias": "ztm"
  },
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "@version": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "bearing": {
        "type": "float"
      },
      "brigade": {
        "type": "keyword"
      },
      "distance": {
        "type": "float"
      },
      "lines": {
        "type": "keyword"
      },
      "location": {
        "type": "geo_point"
      },
      "speed": {
        "type": "float"
      },
      "time": {
        "type": "date",
        "format":"MMM dd, yyyy K:mm:ss a"
      },
      "vehicleNumber": {
        "type": "keyword"
      }
    }
  }
}           

現在該使用适當的别名建立第一個索引了。

PUT ztm-000001
{
  "aliases": {
    "ztm": {
      "is_write_index":true
    }
  }
}           

我們在 Kibana 中運作上面的三個指令。

Python 腳本

首先,我們必須獲得所需要的 API key。這個在上面我們已經講述了。

ztm.py

import requests 
import json
import time
from kafka import KafkaProducer
 
token = '86882ed9-4533-4630-b03b-47b3d68ae5e5'
url = 'https://api.um.warszawa.pl/api/action/busestrams_get/'
resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59'
sleep_time = 15
 
bus_params = {
    'apikey':token,
    'type':1,
    'resource_id': resource_id
    }
tram_params = {
    'apikey':token,
    'type':2,
    'resource_id': resource_id
    }
 
while True:
    try:
        r = requests.get(url = url, params = bus_params)
        data = r.json() 
        producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
                                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                                key_serializer=lambda x: x
                                )
 
        print('Sending records...')
        for record in data['result']:
            print(record)
            future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8'))
            result = future.get(timeout=60)
    except:
        print("¯\_(ツ)_/¯")
    time.sleep(sleep_time)           

上面的代碼其實是蠻簡單的。它定時從 API portal 擷取公交系統的位置資訊,并轉發到 Kafka。

我們使用如下的指令來運作上面的應用:

python3 ztm.py           
在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

這個時候,我們可以在螢幕上看到所獲得很多的關于公交系統車輛的資訊。

我們可以轉到運作 docker-compopse up 指令的那個 console,我們可以看到如下的資訊:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

它表明我們的 Logstash 是在正常工作。

在 Kibana 中展示

打開 Kibana,并使用如下的指令:

GET _cat/indices           
在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

從上面,我們可以看到一個叫做 ztm-000001 的索引,并且它裡面含有已經收集上來的車輛資訊。

為了分析這個索引,我們必須建立一個 index pattern:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選 Create index pattern:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選 Next step:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選上面的 Create index pattern 按鈕。這樣就完成了建立 index pattern。

為了對資料可視化,我們點選 Visualization:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選上面的 Create new visualization:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選 Maps:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選 Add layer:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選 Documents:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊
在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

向下滾動:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

點選上面的 Save & close 按鈕:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

在上面,我們配置每隔2秒自動擷取資料。點選 Apply 按鈕。

我們聚焦華沙地區:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

這樣在地圖上,我們可以清楚地看到每個車輛的運作情況。

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

我們甚至可以針對一個 Brigade 進行搜尋:

在 Docker 上使用 Elastic Stack 和 Kafka—Elastic Stack 實戰手冊

參考: