天天看點

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊
https://developer.aliyun.com/topic/download?id=1295 · 更多精彩内容,請下載下傳閱讀全本《Elastic Stack實戰手冊》 https://developer.aliyun.com/topic/download?id=1295 https://developer.aliyun.com/topic/es100 · 加入創作人行列,一起交流碰撞,參與技術圈年度盛事吧 https://developer.aliyun.com/topic/es100

創作人:劉曉國

目前 COVID-19 有很多的地方發表資料。

在本文中,我們使用 Elastic Stack 來分析相關的資料,并對資料進行分析。我們将使用 Kibana 來對所有的資料進行可視化分析。

特别指出的是:我們将使用 Elasticsearch 的 processors 來對我們的資料進行解析。

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

資料來源

我們将使用在位址:

https://www.datawrapper.de/_/Gnfyw/

下載下傳我們想要的資料。我們點選連結 Get the data 來下載下傳我們想要的資料。

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們下載下傳後的資料就像是下面的:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

上面的資料是一個 csv 格式的檔案。我們把下載下傳後的資料存入到一個我們喜歡的目錄中,并指令為 covid19.csv 。針對 CSV 格式的資料導入,我們可以采用 Logstash 來把它導入到 Elasticsearch 中。具體如何操作,我們可以參照以下文章:

  • Logstash:導入 zipcode CSV 檔案和 Geo Search 體驗
https://blog.csdn.net/UbuntuTouch/article/details/100606857
  • Logstash: 應用實踐 - 裝載 CSV 文檔到 Elasticsearch

當然我們也可以直接通過 Kibana 提供的便利,直接把資料導入到 Elasticsearch 中:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

針對我們今天的這個 COVID-19 資料,我們将使用 Filebeat 來對它處理。

安裝

  • 安裝 Elasticsearch 及 Kibana
  • 安裝 Filebeat
  • 打開 Kibana
運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

雖然 csv 部署 log 範疇,但是在上面的 logs 裡含有針對各個作業系統的 filebeat 的安裝指南。我們點選上面的 Add log data 按鈕:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們點選 System logs:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

在上面我們選擇我們的作業系統,并按照上面的安裝指令來完成和我們 Elasticsearch 版本相比對的 Filbeat 的安裝。我們可以不啟動相應的子產品,隻做相應的安裝即可。

配置 Filebeat 及導入資料到 Elasticsearch

為了能夠把我們的資料導入到 Elasticsearch 中,我們可以采用 Filebeat。為此,我們必須來配置 filebeat。我們首先來建立一個定制的 filebeat_covid19.yml 配置檔案,并把這個檔案存于和我們上面的 covid19.csv 同一個目錄中:

filebeat_covid19.yml

filebeat.inputs:
- type: log
  paths:
    - /Users/liuxg/data/covid19/covid19.csv
  exclude_lines: ['^Lat']
 
output.elasticsearch:
  hosts: ["http://localhost:9200"]
  index: covid19
 
setup.ilm.enabled: false
setup.template.name: covid19
setup.template.pattern: covid19           

在上面我們定義了一個 type 為 log 的 filebeat.inputs。我們定了我們的 log 的路徑。你需要根據自己的實際路徑來修改上面的路徑。

我們定義了資料的 index 為 covid19 的索引。值得注意的是,由于 csv 檔案的第一行是資料的 header,我們需要去掉這一行。為此,我們采用了 exclude_lines: ['^Lat'] 來去掉第一行。

等我們定義好上面的配置後,我們運作如下的指令:

./filebeat -e -c ~/data/covid19/filebeat_covid19.yml           

上面的指令将把我們的資料導入到 Elasticsearch 中。

Filebeat 的 registry 檔案存儲 Filebeat 用于跟蹤上次讀取位置的狀态和位置資訊。如果由于某種原因,我們想重複對這個 csv 檔案的處理,我們可以删除如下的目錄:

  • data/registry 針對 .tar.gz and .tgz 歸檔檔案安裝
  • /var/lib/filebeat/registry 針對 DEB 及 RPM 安裝包
  • c:\ProgramData\filebeat\registry 針對 Windows zip 檔案

如果上面的指令成功後,我們可以在 Kibana 中檢視新生産的 covid19 索引:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

如果可以對資料進行查詢:

GET covid19/_search           
運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

在上面,它顯示了 message 字段。

顯然這個資料是最原始的資料,它并不能讓我們很友善地對這個資料進行分析。那麼我們該如何處理呢?我們可以通過 Elasticsearch 的 ingest node 提供的強大的 processors 來幫我們處理這個資料。

利用 Processors 來加工資料

去掉無用的字段

在我們的文檔裡,我們可以看到有很多我們并不想要的字段,比如 ecs, host,log 等等。我們想把這些字段去掉,那麼我們該如何做呢?我們可以通過定義一 個 pipleline 來幫我們處理。為此,我們定義一個如下的 pipeline:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    }
  ]
}           

上面的 pipeline 定義了一個叫做 remove 的 processor。它檢查 log,input, ecs, host 及 agent 都不為空的情況下,删除字段 log, input,ecs, host 及 agent。我們在 Kibana 中執行上面的指令。

為了能夠使得我們的 pipleline 起作用,我們通過如下指令來執行:

POST covid19/_update_by_query?pipeline=covid19_parser           

當我們執行完上面的指令後,我們重新檢視我們的文檔:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

在上面我們可以看出來,所有的我們不想要的字段都已經被去掉了

替換引号

我們可以看到導入的 message 資料為:

"""37.1232245,-78.4927721,"Virginia, US",Virginia,",",US,221,0,0"""           

顯然,這裡的資料有很多的引号"字元,我們想把這些字元替換為符号'。為此,我們需要 gsub processors 來幫我們處理。我重新修改我們的 pipeline:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    }    
  ]
}           

在 Kibana 中運作上面的指令,并同時執行:

POST covid19/_update_by_query?pipeline=covid19_parser           

經過上面的 pipeline 的處理後,我們重新來檢視我們的文檔:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

從上面的顯示中,我們也看出來我們已經成功都去掉了引号。我們的 message 的資訊如下:

"37.1232245,-78.4927721,'Virginia, US',Virginia,',',US,221,0,0"           

解析資訊

在上面我們已經很成功地把我們的資訊轉換為我們所希望的資料類型。接下來我們來使用 grok 來解析我們的資料。grok 的資料解析,基本上是一種正則解析的方法。我們首先使用Kibana所提供的 Grok Debugger 來幫助我們分析資料。我們将使用如下的 grok pattern 來解析我們的message:

%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們點選 Grok Debugger,并把我們的相應的文檔拷入到相應的輸入框中,并用上面的 grok pattern 來解析資料。上面顯示,它可以幫我們成功地解析我們想要的資料。顯然這個被解析的資訊更适合我們做資料的分析。為此,我們需要重新修改 pipeline:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    },
    {
     "grok": {
        "field": "message",
        "patterns": [
          "%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}"
        ]
      }
    }        
  ]
}           

我們運作上面的 pipeline,并使用如下的指令來重新對資料進行分析:

POST covid19/_update_by_query?pipeline=covid19_parser           

我們重新來檢視文檔:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

在上面我們可以看到新增加的 country,infected,address 等等的字段。

添加location字段

在上面我們可以看到 lon 及 lat 字段。這些字段是文檔的經緯度資訊。這些資訊并不能為我們所使用,因為首先他們是分散的,并不處于一個通過叫做 location 的字段中。為此,我們需要建立一個新的 location 字段。為此我們更新 pipeline 為:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    },
    {
     "grok": {
        "field": "message",
        "patterns": [
          "%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}"
        ]
      }
    },
    {
      "set": {
        "field": "location.lat",
        "value": "{{lat}}"
      }
    },
    {
      "set": {
        "field": "location.lon",
        "value": "{{lon}}"
      }
    }              
  ]
}           

在上面我們設定了一個叫做 location.lat 及 location.lon 的兩個字段。它們的值分别是 {{lat}} 及 {{lon}}。我們執行上面的指令。

由于 location 是一個新增加的字段,在預設的情況下,它的兩個字段都會被 Elasticsearch 設定為 text 的類型。為了能夠讓我們的資料在地圖中進行顯示,它必須是一個 geo_point 的資料類型。為此,我們必須通過如下指令來設定它的資料類型:

PUT covid19/_mapping
{
  "properties": {
    "location": {
      "type": "geo_point"
    }
  }
}           

執行上面的指令,我們再使用如下的指令來對我們的資料重新進行處理:

POST covid19/_update_by_query?pipeline=covid19_parser           

等執行完上面的指令後,我們重新來檢視我們的文檔:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

從上面我們可以看到一個叫做 location 的新字段。它含有 lon 及 lat 兩個字段。我們同時也可以檢視 covid19 的 Mapping。

GET covid19/_mapping           

我們可以發現 Location 的資料類型為:

"location" : {
     "type" : "geo_point"
  }           

它顯示 location 的資料類型是對的。

到目前為止,我們已經成功地把資料導入到 Elasticsearch 中。我們接下來針對 covid19 來進行資料分析。

展示并分析資料

建立 index pattern

為了對 covid19 進行分析,我們必須建立一個 index pattern:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

點選上面的 Create index pattern 按鈕:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

點選 Next step:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

點選 Create index pattern:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

分析資料

建立 Maps visualization

我們打開 Visualization:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

點選 Create visualization:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

選擇 Maps:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

選擇 Documents:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊
運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們接着定制這個 Map:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

這樣顔色越深,代表感染的越嚴重。點選 Save & close:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們把上面的 Visualization 儲存為 covid-1。

找出感染和死亡最多的國家

同樣地,我們建立一個叫做 Horizontal Bar 的

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

點選 Horizontal Bar:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

選擇 covid-19:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們儲存該 Visualization 為 covid-2。

同樣地,我們得到死亡最多的前十個國家:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們儲存該 Visualization 為 covid-3。

找出感染人數最多的地區

我們可以選擇一個 pie 統計:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊
運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊
運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊
運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們儲存上面的 Visualization 為 covid-3。

建立一個表格

我們最早得到的表格也是非常不錯的。我們想把之前的那個表格的内容也放到我們的Dashboard 裡,這樣,我們可以在 Dashboard 裡進行搜尋。我們首先點選 Discover:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們選中我們的 Index pattern covid19*。同時在左下方選擇我們想要的字段,并點選 add。我們添加 city, country, death, infected 等字段:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們儲存這個表格為 covid-4。

建立 Dashboard

根據之前的 Visualization,我們來建立一個 Dashboard:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊
運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

我們把之前建立的 Visualization 一個一個地添加進來,并形成我們最終的 Dashboard:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊
運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

在上面圖中,如果我們點選 China,那麼所有的圖形将變為:

運用 Elastic Stack 分析COVID-19資料—Elastic Stack 實戰手冊

從上面我們可以看出來整個表格都發送了變化,而且我們的餅狀圖也發生了相應的變化。

參考:

繼續閱讀