天天看點

日志服務Python消費組實戰(二):實時分發資料

日志服務Python消費組實戰(二):實時分發資料

場景目标

使用日志服務的Web-tracking、logtail(檔案極簡)、syslog等收集上來的日志經常存在各種各樣的格式,我們需要針對特定的日志(例如topic)進行一定的分發到特定的logstore中處理和索引,本文主要介紹如何使用消費組實時分發日志到不通的目标日志庫中。并且利用消費組的特定,達到自動平衡、負載均衡和高可用性。

日志服務Python消費組實戰(二):實時分發資料

基本概念

協同消費庫(Consumer Library)是對日志服務中日志進行消費的進階模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行資料讀取的差別在于,使用者無需關心日志服務的實作細節,隻需要專注于業務邏輯,另外,消費者之間的負載均衡、failover等使用者也都無需關心。

消費組(Consumer Group) - 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的資料,消費者之間不會重複消費資料。

消費者(Consumer) - 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不同。

在日志服務中,一個logstore下面會有多個shard,協同消費庫的功能就是将shard配置設定給一個消費組下面的消費者,配置設定方式遵循以下原則:

  • 每個shard隻會配置設定到一個消費者。
  • 一個消費者可以同時擁有多個shard。

    新的消費者加入一個消費組,這個消費組下面的shard從屬關系會調整,以達到消費負載均衡的目的,但是上面的配置設定原則不會變,配置設定過程對使用者透明。

協同消費庫的另一個功能是儲存checkpoint,友善程式故障恢複時能接着從斷點繼續消費,進而保證資料不會被重複消費。

使用消費組進行實時分發

這裡我們描述用Python使用消費組進行程式設計,實時根據資料的topic進行分發。

注意:本篇文章的相關代碼可能會更新,最新版本在這裡可以找到:

Github樣例

.

日志服務Python消費組實戰(二):實時分發資料

安裝

環境

  1. 建議程式運作在源日志庫同Region下的ECS上,并使用 區域網路服務入口 ,這樣好處是網絡速度最快,其次是讀取沒有外網費用産生。
  2. 強烈推薦 PyPy3 來運作本程式,而不是使用标準CPython解釋器。
  3. 日志服務的Python SDK可以如下安裝:
pypy3 -m pip install aliyun-log-python-sdk -U           

更多SLS Python SDK的使用手冊,可以參考

這裡

程式配置

如下展示如何配置程式:

  1. 配置程式日志檔案,以便後續測試或者診斷可能的問題(跳過,具體參考樣例)。
  2. 基本的日志服務連接配接與消費組的配置選項。
  3. 目标Logstore的一些連接配接資訊

請仔細閱讀代碼中相關注釋并根據需要調整選項:

#encoding: utf8
def get_option():
    ##########################
    # 基本選項
    ##########################

    # 從環境變量中加載SLS參數與選項,根據需要可以配置多個目标
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    to_endpoint = os.environ.get('SLS_ENDPOINT_TO', endpoint)
    to_project = os.environ.get('SLS_PROJECT_TO', project)
    to_logstore1 = os.environ.get('SLS_LOGSTORE_TO1', '')
    to_logstore2 = os.environ.get('SLS_LOGSTORE_TO2', '')
    to_logstore3 = os.environ.get('SLS_LOGSTORE_TO3', '')
    consumer_group = os.environ.get('SLS_CG', '')

    # 消費的起點。這個參數在第一次跑程式的時候有效,後續再次運作将從上一次消費的儲存點繼續。
    # 可以使”begin“,”end“,或者特定的ISO時間格式。
    cursor_start_time = "2018-12-26 0:0:0"

    # 一般不要修改消費者名,尤其是需要并發跑時
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 建構一個消費組和消費者
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time)

    # bind put_log_raw which is faster
    to_client = LogClient(to_endpoint, accessKeyId, accessKey)
    put_method1 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore1)
    put_method2 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore2)
    put_method3 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore3)

    return option, {u'ngnix': put_method1, u'sql_audit': put_method2, u'click': put_method3}           

注意,這裡使用了

functools.partial

put_log_raw

進行綁定,以便後續調用友善。

資料消費與分發

如下代碼展示如何從SLS拿到資料後根據

topic

進行轉發。

if __name__ == '__main__':
    option, put_methods = get_copy_option()

    def copy_data(shard_id, log_groups):
        for log_group in log_groups.LogGroups:
            # update topic
            if log_group.Topic in put_methods:
                put_methods[log_group.Topic](log_group=log_group)

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(ConsumerProcessorAdaptor, option, args=(copy_data, ))
    worker.start(join=True)           

啟動

假設程式命名為"dispatch_data.py",可以如下啟動:

export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore1 Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore2 Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore3 Name>
export SLS_CG=<消費組名,可以簡單命名為"dispatch_data">

pypy3 dispatch_data.py           

性能考慮

啟動多個消費者

基于消費組的程式可以直接啟動多次以便達到并發作用:

nohup pypy3 dispatch_data.py &
nohup pypy3 dispatch_data.py &
nohup pypy3 dispatch_data.py &
...           

注意:

所有消費者使用了同一個消費組的名字和不同的消費者名字(因為消費者名以程序ID為字尾)。

因為一個分區(Shard)隻能被一個消費者消費,假設一個日志庫有10個分區,那麼最多有10個消費者同時消費。

性能吞吐

基于測試,在沒有帶寬限制、接收端速率限制(如Splunk端)的情況下,以推進硬體用

pypy3

運作上述樣例,單個消費者占用大約

10%的單核CPU

下可以消費達到

5 MB/s

原始日志的速率。是以,理論上可以達到

50 MB/s

原始日志

每個CPU核

,也就是

每個CPU核每天可以消費4TB原始日志

注意: 這個資料依賴帶寬、硬體參數和目标Logstore是否能夠較快接收資料。

高可用性

消費組會将檢測點(check-point)儲存在伺服器端,當一個消費者停止,另外一個消費者将自動接管并從斷點繼續消費。

可以在不同機器上啟動消費者,這樣當一台機器停止或者損壞的清下,其他機器上的消費者可以自動接管并從斷點進行消費。

理論上,為了備用,也可以啟動大于shard數量的消費者。

其他

限制與限制

每一個日志庫(logstore)最多可以配置10個消費組,如果遇到錯誤

ConsumerGroupQuotaExceed

則表示遇到限制,建議在控制台端删除一些不用的消費組。

監測

Https

如果服務入口(endpoint)配置為

https://

字首,如

https://cn-beijing.log.aliyuncs.com

,程式與SLS的連接配接将自動使用HTTPS加密。

伺服器證書

*.aliyuncs.com

是GlobalSign簽發,預設大多數Linux/Windows的機器會自動信任此證書。如果某些特殊情況,機器不信任此證書,可以參考

下載下傳并安裝此證書。

更多案例