天天看點

Elasticsearch service通過企業微信定期發送報告

在上一篇文章《如何選擇Elastic Stack中的Alert和Watcher》中,我們介紹了Alert和Watcher的使用場景。也提到Watcher與Kibana Alert的一個重要不同是,Watcher也可以用來排程Elasticsearch的任務。其中一個常見的用途是排程報告的定時生成和發送電子郵件。當我們在使用Elasticsearch service作為資料引擎進行各種與資料有關的搜尋和分析工作時,通常需要将資料彙總,做成各種可視化的儀表闆,定期發送各種報告(比如,營運彙總報告,安全分析報告,服務異常報告等)。本文中,我們将介紹:

  • 在Kibana上,如何生成儀表闆的PDF或PNG報告
  • 如何通過騰訊雲的serverless函數服務:
    • 定期生成報告
    • 将報告發送到企業微信

生成儀表闆的PDF或PNG報告

要自動生成 PDF 和 CSV 報告,需要生成一個 POST URL,然後使用 Watcher 或腳本送出 HTTP請求。在本文中,我們是通過騰訊雲的serverless函數服務來執行腳本,送出HTTP請求

建立一個 POST URL

建立觸發報告以生成 PDF 和 CSV 報告的 POST URL。

要為 PDF ,PNG報告建立 POST URL:

  1. 打開主菜單,然後單擊Dashboard、Visualize Library或Canvas。
  2. 打開要作為報告檢視 的儀表闆、可視化或Canvas工作闆。
  3. 從工具欄中,單擊共享 > PDF 報告,然後選擇一個選項:
    • 如果您使用的是Dashboard或Visulize Library,請單擊Copy POST URL。
    • 如果您使用的是Canvas,請單擊進階選項 > 複制 POST URL。

要為 CSV 報告建立 POST URL:

  1. 打開主菜單,然後單擊Discover。
  2. 打開您要共享的已儲存搜尋。
  3. 在工具欄中,單擊共享 > CSV 報告 > 複制 POST URL。
Elasticsearch service通過企業微信定期發送報告

擷取POST URL

當我們擷取POST URL之後,每次通路該連結,均可觸發一次生成報告的任務,并且将會記錄于Elasticsearch當中。我們可通過Kibana上的管理界面檢視任務的狀态,以及下載下傳對應的報告

Elasticsearch service通過企業微信定期發送報告

報告任務界面

例如,我們可以通過以下腳本,進行任務觸發

curl \
-XPOST \ 
-u elastic \ 
-H 'kbn-xsrf: true' \ 
'http://0.0.0.0:5601/api/reporting/generate/csv?jobParams=...'            

複制

需要注意,在通過HTTP請求觸發時,對于配置了基礎安全的叢集,需要提供使用者認證資訊。

通過騰訊雲的serverless函數服務定期生成和發送報告

Elasticsearch的Watcher功能提供了叢集内的定期報告生成和發送功能。比如:

PUT _watcher/watch/error_report
{
  "trigger" : {
    "schedule": {
      "interval": "1h"
    }
  },
  "actions" : {
    "email_admin" : { 
      "email": {
        "to": "'Recipient Name <[email protected]>'",
        "subject": "Error Monitoring Report",
        "attachments" : {
          "error_report.pdf" : {
            "reporting" : {
              "url": "http://0.0.0.0:5601/api/reporting/generate/printablePdf?jobParams=...", 
              "retries":40, 
              "interval":"15s", 
              "auth":{ 
                "basic":{
                  "username":"elastic",
                  "password":"changeme"
                }
              }
            }
          }
        }
      }
    }
  }
}           

複制

但Watcher的該功能僅能通過電子郵件發送報告。在不少場景,電子郵件的通達性不如企業微信等即時通信工具。

當我們選擇企業微信作為發送報告的工具時,需要以腳本的方式觸發報告的生成。該場景下,我們可以選擇的方案很多,這裡介紹的是通過騰訊雲的雲函數服務:

雲函數(Serverless Cloud Function,SCF)是騰訊雲為企業和開發者們提供的無伺服器執行環境,幫助您在無需購買和管理伺服器的情況下運作代碼。您隻需使用平台支援的語言編寫核心代碼并設定代碼運作的條件,即可在騰訊雲基礎設施上彈性、安全地運作代碼。SCF 是實時檔案處理和資料處理等場景下理想的計算平台。

我們可以将示例代碼部署到雲函數中,并啟用特定的觸發器進行觸發

import hashlib

import requests
from requests.auth import HTTPBasicAuth
import base64
import json
import time

# Kibana的位址
host="https://es-i4hx8bxq.kibana.tencentelasticsearch.com:5601"
username=<your username>
password=<your password>
headers = {
    'kbn-xsrf': "true",
}
# 企微的webhook
qyapi_webhook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=<your key>"
# dashoard位址
dashboard_url="https://es-i4hx8bxq.kibana.tencentelasticsearch.com:5601/goto/5a40604be221c859894b218a7f38e6f3"
# 針對dashboard生成報告的POST URL
report_url = "https://es-i4hx8bxq.kibana.tencentelasticsearch.com:5601/api/reporting/generate/png?jobParams=..."

def generate_report(url):

    # 發送post url指令,生成報告
    response = requests.post(url, timeout=10, headers=headers, auth=HTTPBasicAuth(username, password))
    if response.ok:
        # 下載下傳報告
        download_path = json.loads(response.text)['path']
        status = json.loads(response.text)['job']['status']
        if status == 'pending':
            time.sleep(60)

        response = requests.get(host+download_path, auth=HTTPBasicAuth(username, password))

        if response.text == 'pending':
            requests.post(qyapi_webhook, data=json.dumps({"msgtype":"text", "text": {"content": "報告生成異常"}}))
            return
        # 将報告轉碼(based64 + md5驗證)
        content = json.dumps({
            "msgtype": "image",
            "image": {
                "base64": str(base64.b64encode(response.content), encoding='utf-8'),
                "md5": str(hashlib.md5(response.content).hexdigest())
            }})
        # 将報告發送到企業微信
        requests.post(qyapi_webhook, data=json.dumps({"msgtype": "markdown", "markdown": {"content": "報告已生成,具體可以檢視[連結]("+dashboard_url+")"}}))
        response = requests.post(qyapi_webhook, data=content)
        print(response.text)
        return

    requests.post(qyapi_webhook, data=json.dumps({"msgtype":"text", "text": {"content": "報告生成異常"}}))


def main_handler(event, context):
    generate_report(report_url)
if __name__ == '__main__':
    main_handler("", "")           

複制

注意:示例中設定了一個60秒的sleep,是因為根據儀表闆加載資料所需時間的不同,報告需要1~2分鐘的生産時間。為了保證報告生産任務完成之後再讀取報告,需要設定一個等待時間。我們也可以把該腳本拆分成兩個雲函數,一個負責生産報告,一個負責下載下傳報告,并通過企業微信發送。

Elasticsearch service通過企業微信定期發送報告

示例:企微收到的報告