本文分享自華為雲社群《對象存儲隻能按檔案名搜尋? 用 DWR + ElasticSearch 實作檔案名、檔案内容、圖檔文字的模糊搜尋!》,作者:雲存儲開發者支援團隊 。
衆所周知,由于對象存儲的架構限制,要想檢索對象存儲裡的檔案,隻能使用字首的方式過濾,然後一頁一頁的列舉,限制多,效率低,要是桶内對象實在太多,可能光列舉就要列舉一兩天。你可能會問,不少大公司的一個桶裡都是幾億幾十億的對象,那他們都是怎麼檢索的呢?很簡單但很有效的方案——在上傳對象時候把對象資訊存到其他資料庫裡,如 Elasticsearch、MongoDB、MySOL 等,然後在資料庫裡檢索。
這種方案雖然見到有效,但修改成本極高,如果在業務設計初期沒有考慮到,或系統運作過程中想要添加些新的字段,那就隻能修改業務代碼并重新部署,要是再碰上有已分發用戶端的情況下還要推動用戶端更新才能解決。
有沒有更新簡單,不用改動業務代碼的方案呢?還真有,把存資料庫的過程轉移到對象存儲來做就好了,每次上傳對象之後,讓對象存儲幫你把對象資訊存一份到你指定的位置。本文我們嘗試通過 DWR 平台來進行解決。DWR 是華為雲推出的一個近資料計算平台,簡單來說,通過 DWR 平台,我們可以在不改動業務系統的情況下實作對對象的處理。如圖檔上傳時把圖檔轉成 JPG 格式并存儲在另一個桶裡、在擷取圖檔時給圖檔加上水印等。DWR 将這一個個的能力都封裝成了“算子”,除了官方和第三方夥伴提供的算子外,我們也可以編寫自定義算子來實作我們的其他定制類要求。
1. 架構總覽
1.1 資料庫選型
對象存儲中一個對象(Object)由對象名(Key)、中繼資料(Metadata)、對象内容(Data)三部分組成。從原始需求出發,為了實作對象的模糊搜尋,我們首先要把對象名存起來。進一步的,中繼資料中也包含了許多可以進行過略、排序用的資訊,如對象大小、最後修改時間、上傳時間、對象 Content-Type、自定義中繼資料等。其中自定義中繼資料中包含的 Key 的數目、value 類型都是可變的。為了友善存儲和檢索自定義中繼資料,不在每次想增加一個字段時都去修改資料庫,我們首先就排除了傳統的關系型資料庫。
非關系型資料庫(NoSQL)中,比較符合我們要求的是兩款文檔型資料庫——MongoDB 與 Elasticsearch。從定位上來說,MongoDB 更偏向于資料庫,可以用作資料管理和資料搜尋; CSS 則偏向于資料搜尋服務。具體到我們這個場景,從通路便捷度、最小規格價格幾個次元對比,最終選擇了選擇 CSS 服務。下表為華為雲上的 DDS、CSS、GaussDB for NoSQL 的對比,大家也可以根據自己具體場景選擇合适的服務。
1.2 存儲對象資訊的整體流程
總的流程分 3 步:
1. 上傳檔案到對象存儲
2. DWR 自動觸發,儲存需要的資訊到 CSS
3. 通過 API、kibana 等方式檢索 CSS 中存儲的資料
其中第二步還可以進行些進階的操作,例如上傳圖檔時,檢測圖檔中的文字資訊,一并存入資料庫;上傳視訊時,檢測把視訊大小、碼率、清晰度等資訊抽取出來存入資料庫…
2. 購買與配置 CSS
2.1 購買 CSS
1.配置叢集 控制台找到 CSS 服務,點選建立叢集,叢集版本選擇了 7.10.2,在此我們先選擇最低配的單節點。存儲選了超高 IO。
2.配置網絡 需要注意,安全組一定要允許 9200 端口,叢集在建立後不支援修改安全組,隻能删除重新建立。如果隻是在 VPC 内網通路可以不開安全模式,要是想開放給公網通路就必須開啟完全模式。
3.配置備份 建議開啟下資料備份,OBS 本身價格也不貴,還可以通過轉冷存儲進一步降低成本,資料多一份保護,萬一哪天誤删了不用從頭挨個列舉。
4.完成配置 至此就完成了初始的配置,點選立即申請即開始建立叢集。
2.2 初始化 Mapping
ES 中的 Mapping 大緻可以類比為資料庫中的表結構,通過定義 mapping,可以指定字段的存儲類型。我們目前需要的字段如下。可以使用 CSS 自帶的 kibana 控制台建立 Mapping。
在 CSS 控制台找到 kibana,點選跳轉後登入,側邊欄找到 Dev Tools
把下面的代碼插入進去,點選運作,記得把 your-bucket-name 替換成你實際要用的桶名,需要自己在 OBS 手動建立桶
PUT your-bucket-name
{
"mappings": {
"dynamic": true,
"properties": {
"etag": {
"type": "text"
},
"expiration": {
"type": "text"
},
"content-type": {
"type": "text"
},
"date": {
"type": "text"
},
"content-length": {
"type": "integer"
},
"bucket_name": {
"type": "text"
},
"object_name": {
"type": "text"
},
"create_time": {
"type": "integer"
}
}
}
}
3. 配置 DWR
資料工坊(Data Workroom,DWR)是一款近資料處理服務,下層調用了函數服務 FunctionGraph 的能力,自定義算子本質上就是 FunctionGraph 的一個函數,為了開發自定義算子,我們首先要在 FunctionGraph 上建立一個自定義函數并測試通過。
3.1 建立 FunctionGraph 函數
建立函數包含上傳依賴包、建立函數、建立委托、測試函數幾個步驟,都不複雜。
3.1.1 上傳依賴包
本地 Python 操作 Elasticsearch 需要通過 pip 安裝 Elasticsearch Python 依賴,相應的,我們在函數工作流中調用也需要添加對應的依賴包,我們需要安裝7.10.1 版本的 elasticsearch 。
首先需要使用你對應 python 版本建立個新的虛拟環境,如果沒有建立,而你本地已經有了部分依賴,會導緻依賴包裝不出來。
建議使用 Linux 環境打包依賴包,在 windows 環境下打包出的部分包可能不相容 functionGraph 環境
# 安裝虛拟環境包,有的話可以跳過
pip install virtualenv
# 建立 python 3.9 的虛拟環境
virtualenv fgpackage --python=3.9
# Linux 激活虛拟環境
source ./fgpackage/bin/activate
# Windows 激活虛拟環境
# .\fgpackage\Scripts\activate
# 安裝指定包到臨時目錄
pip install elasticsearch==7.10.1 --root \tmp\fgpackage
經過上面的操作,把就elasticsearch 和它們需要的依賴安裝到了 \tmp\fgpackage 下了。一層一層進入 \tmp\es_package,一直到 site-packages 一層,全選後添加到一個壓縮包内。
在函數清單頁點選函數-依賴包管理-添加依賴包
運作語言選 Python3.9,上傳剛剛打包的壓縮檔案點選确定即可。
更多可參考 官方添加依賴說明
3.1.2 建立函數
控制台找到 FuntionGraph 服務,點選建立函數。
選擇空白函數,運作時選擇為 Python 3.9 (話說把 Runtime 翻譯成運作時好奇怪,這種專有詞是不是最好别強行翻譯?);委托需要具有 VPC Administrator 與 Tenant Administrator 兩個權限,用以通路其他雲服務和 VPC 内網資源,如果有現成的可以直接選擇,沒有的話點選建立委托進入建立頁,參考下一節進行建立,然後重新整理下選擇即可。
點選完成建立。
3.1.3 建立委托
委托需要有 VPC Administrator 與 Tenant Administrator 兩個權限,如果已有可以直接跳過。上一節中的建立函數位置點選 建立委托 跳轉到委托建立頁,點選建立委托。
委托類型選擇雲服務
權限選擇 VPC Administrator 與 Tenant Administrator 兩個權限
授權範圍選擇所有資源,或跟你需要自己配置
點選完成即可。
3.1.4 填寫代碼
建立過函數後,會進入函數編輯頁面,将下面的代碼寫到編輯器裡,點選下部署,或鍵盤按 Ctrl + S 進行部署
# -*- coding:utf-8 -*-
import time
from urllib.parse import unquote_plus
from elasticsearch import Elasticsearch
from obs import ObsClient
def handler(event, context):
# 擷取桶名與對象名
region_id, bucket_name, object_name = get_obs_obj_info(event.get("Records", None)[0])
context.getLogger().info(f"bucket name: {bucket_name}, object key: {object_name}")
ak = context.getAccessKey()
sk = context.getSecretKey()
server = 'obs.' + region_id + '.myhuaweicloud.com'
context.getLogger().info("before token")
context.getLogger().info(context.getToken())
context.getLogger().info("finish token")
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server)
# 擷取對象中繼資料
object_metadata = obs_client.getObjectMetadata(bucket_name, object_name)
# 将頭域轉為字典
info_dict = {i[0]: i[1] for i in object_metadata["header"]}
info_dict["bucket_name"] = bucket_name
info_dict["object_name"] = object_name
# 為了不同系統下時區轉換導緻時間不統一,這裡不使用 OBS 裡的 last-modified 的 GMT 時間,改用時間戳
info_dict["create_time"] = int(time.time())
# 把對象大小轉為數字格式
info_dict["content-length"] = int(info_dict["content-length"])
# 去除部分無用的 header
for i in ["id-2", "request-id", "connection", "last-modified", "uploadid"]:
if i in info_dict:
info_dict.pop(i)
# 把其他算子裡包含的資訊也一起儲存下來
if "other_info" in event["dynamic_source"]:
info_dict.update(event["dynamic_source"]["other_info"])
context.getLogger().info(f"metadata to save: {info_dict}")
es_user = event["dynamic_source"]["es_user"]
es_password = event["dynamic_source"]["es_password"]
es_server_ip = event["dynamic_source"]["es_server"]
es_port = event["dynamic_source"]["es_port"]
context.getLogger().info(es_port)
if es_user != "" and es_password != "":
es_server = f"https://{es_user}:{es_password}@{es_server_ip}:{es_port}"
context.getLogger().info(es_server.replace(es_password, "xxxxxxx"))
else:
es_server = f"http://{es_server_ip}:{es_port}"
context.getLogger().info(es_server)
es = Elasticsearch([es_server], ca_certs=False, verify_certs=False)
response = es.index(index=bucket_name, body=info_dict)
context.getLogger().info(response)
return {
"statusCode": 200,
"isBase64Encoded": False,
"body": response,
"headers": {
"Content-Type": "application/json"
}
}
def get_obs_obj_info(record):
if 's3' in record:
s3 = record['s3']
return record["eventRegion"], s3['bucket']['name'], unquote_plus(s3['object']['key'])
else:
obs_info = record['obs']
return record["eventRegion"], obs_info['bucket']['name'], \
unquote_plus(obs_info['object']['key'])
3.1.5 配置函數
1.配置依賴 在代碼配置頁最下找到添加依賴包按鈕,分别添加公共依賴中的OBS 3.21.8 與 私有依賴中的fgpackage
2.配置 VPC 還是代碼配置頁,點選基本資訊的編輯按鈕,這裡要記得與 2.1 節中的 CSS 選擇同一個 VPC。
3.1.6 測試函數
在 OBS 裡建立一個桶,最好和 CSS、FunctionGraph 都在同一個 Region,我用的上海一節點,region id 為 cn-east-3 桶内随便上傳一個對象做備用。然後點選函數代碼頁中配置測試事件,把下面這段 Json 添加進去,并修改下面的配置為你的配置。其中 es_server 的值為 CSS 叢集 IP。
{
"Records": [
{
"eventRegion": "cn-east-3",
"obs": {
"bucket": {
"name": "your-bucket-name"
},
"object": {
"key": "your-object-name"
}
}
}
],
"dynamic_source": {
"es_server": "your-CSS-endpoint",
"es_user": "admin",
"es_password": "your-CSS-password",
"es_port": 9200
}
}
儲存後點選測試,如果一切配置正确,右邊會出現這樣的結果,如果提示執行失敗,就看下下面報錯,再找找前面幾步哪個寫錯了。
3.2 配置 DWR 工作流
DWR 現在還在公測中,需要點選申請公測,資訊随便填就可以,目測是自動稽核的,點完申請就通知申請成功了。
進入頁面,點選工作流選項-建立工作流
左側把自定義算子拖到中間,和 Start、End 連上線,函數選擇剛剛建立的函數,再填寫下參數。這裡的參數就是上一節 Json 檔案裡 dynamic_source 字段的參數即可。
點選儲存,寫上名字,會自動跳轉出來,建立個觸發器再
這裡我沒有寫字首和字尾,對桶内所有對象生效,如果填了字首字尾,則會隻比對指定事件。
4. 檢索
到現在所有配置都完成了,使用 OBS Browser+ 向桶裡上傳幾個測試檔案,然後用自己熟悉 Elasticsearch 調用方法嘗試下檢索,我這裡使用 CSS 自帶的 kibana 控制台。
4.1 檢索名字裡包含測試 兩個字的對象
測試代碼:
POST _search
{
"query": {
"match": {
"object_name": "測試"
}
}
}
4.2 檢索為 .mp4 結尾的對象
POST _search
{
"query": {
"regexp": {
"object_name": ".*mp4"
}
}
}
4.3 檢索大小介于 100k 到 1M 的對象
POST _search
{
"query": {
"range": {
"content-length": {
"gt":"102400",
"lt":"1048576"}
}
}
}
4.4 檢索建立時間在 2022 年 8 月 5 日與 2022 年 8 月 6 日之間的對象(時間戳)
POST _search
{
"query": {
"range": {
"create_time": {
"gt":"1659628800",
"lt":"1659715200"}
}
}
}
5. 再進一步
有的同學可能注意到了,流程介紹時我提到可以進行進階的操作,存入 CSS 的算子上頭可以拼接很多其他算子,算子的代碼包裡也留了一個小擴充,可以從上一個算子中讀取 other_info 并一起儲存,這個能幹點啥嘞。咱們舉幾個場景:
- 把對象名縮寫存起來,如一個檔案叫我的檔案.txt,隻用輸入 wdwj 就能找到該檔案的全稱
- 圖檔檔案上傳後,調用 AI 給圖檔打個标簽,把标簽存到資料庫,可以通過風景、美食、貓咪 等關鍵詞檢索到圖檔,現在華為鴻蒙、蘋果 iOS 都有這樣的功能
-
圖檔包含文字的話,把文字識别出來存入資料庫,可以通過文字搜尋圖檔,在某些業務系統裡還可以用專用的算子,如發票識别、身份證識别等。
…
這裡先把最簡單的對象名縮寫給個示例,抛磚引玉,大家可以自行嘗試更多功能。
用了xpinyin 這個庫,上傳依賴包步驟參考前面的介紹。代碼很簡單:
# coding:utf-8
from urllib.parse import unquote_plus
from xpinyin import Pinyin
def handler(event, context):
# 擷取桶名與對象名
_, _, object_name = get_obs_obj_info(event.get("Records", None)[0])
context.getLogger().info(f"Object name is {object_name}")
pinyin = Pinyin()
pinyin = pinyin.get_pinyin(object_name, '-')
short_pinyin = "".join([i[0] for i in pinyin.split("-") if i[0].isalpha()])
if "other_info" in event["dynamic_source"]:
event["dynamic_source"]["other_info"]["short_pinyin"] = short_pinyin
else:
event["dynamic_source"]["other_info"] = {"short_pinyin": short_pinyin}
context.getLogger().info(f"Object short name is {short_pinyin}")
context.getLogger().info(event)
return event
def get_obs_obj_info(record):
if 's3' in record:
s3 = record['s3']
return record["eventRegion"], s3['bucket']['name'], unquote_plus(s3['object']['key'])
else:
obs_info = record['obs']
return record["eventRegion"], obs_info['bucket']['name'], \
unquote_plus(obs_info['object']['key'])
這就配置完了,隻用去 DWR 工作流頁面建立個工作流,把這個函數加載前面:
給工作流配置個觸發器,然後把之前建立的工作流先删除掉,以免重複觸發。再上傳幾個檔案。
搜尋下包含 cs 的對象
POST _search
{
"query": {
"regexp": {
"short_pinyin": ".*cs.*"
}
}
}