天天看點

GCP: Pub/Sub的使用

1、簡介

Pub/Sub 是一種全托管式實時消息傳遞服務,可讓您在獨立的應用之間發送和接收消息,它是一個PAAS服務。

2、概覽

  • 主題(Topic):相當于一個消息的中轉站,釋出者釋出消息後,消息存儲在主題中。
  • 釋出者(Publisher):釋出消息的應用
  • 訂閱者(Subscriber):接收消息者

如下圖,首先對主題建立了兩個訂閱者(subscriber1, subscriber2),釋出者(publisher)向主題中釋出一條消息(Hello,World!), 接着,這兩個訂閱者都收到了此消息(Hello,World!)。

GCP: Pub/Sub的使用

3、訂閱消息

(1)訂閱是如何工作的

  • 多個訂閱者訂閱一個主題的情況,如果在一個主題中釋出一條消息,那所有的訂閱者都會收到此消息。如果想要多個訂閱者分工處理不同的内容,可以在消息中加自定義特性 (Attribute),在訂閱者邏輯中可以根據此特性隻處理目前訂閱者感興趣的消息。
  • 隻有經過确認過的消息`message.ack()`,才不會再被傳送,如果您在确認時限之前未确認消息,Pub/Sub 會重新發送該消息。是以,Pub/Sub 可能會發送重複的消息,如果訂閱者處理消息發生異常,且消息未被确認,那Pub/Sub 會重新發送該消息。
  • 确認過的消息,會在Pub/Sub被删除。
  • 在給定訂閱者建立之前釋出的消息通常不會針對此訂閱進行傳送。是以,如果某主題沒有訂閱,則釋出到該主題的消息将不會傳送給任何訂閱者。

(2)至少傳送一次

通常,Pub/Sub 會按照消息釋出的順序将每條消息傳送一次,但有時可能并不按順序傳送消息,或者會将消息傳送多次。 一般來說,如果要實施多次傳送,訂閱者需要在處理消息時遵循幂等原則。您可以使用 Cloud Dataflow PubsubIO 将 Pub/Sub 消息流隻處理一次。PubsubIO 會根據自定義消息辨別符或由 Pub/Sub 配置設定的消息辨別符來删除重複的消息。是以,處理消息的邏輯必須是幂等的(所謂幂等,通俗點說,就是函數執行一次,和執行數次,産生的結/效果是一樣的)。

(3)對消息排序

通常情況下,pub/sub不完全像隊列一樣嚴格地保證消息先進先出,因為保證消息順序會對吞吐量産生嚴重限制,pub/sub僅保證第一次傳送消息時是按順序進行的,後序的消息不一定是按順序排列的,所有消息都允許随時嘗試重新傳送,這樣允許一次向訂閱者發送多條消息。

如果想使消息有順序的話,可以在自定義特性 (Attribute)中加時間戳或序列号。如果主題有10條這樣的消息0,1,2,3,4,5 那收到消息順序,下面幾種情況都可能會發生。

  • 0,1,5,4,2,3  # 第一條消息始終是0,順序不能保證
  • 0,1,2,4,2,3,0  # 0 也可能被發送了2次
  • 0,1,1,2,4,3,5 ,6  # 後序的消息可能被發送多次

4、訂閱者接收消息的兩種方式

訂閱者可以有兩種方式拿到消息,一是設定public endpoint,讓pub/sub被動地推送消息給你,二是主要向pub/sub發送拉取(pull)請求。

4.1 推送傳送

(1)關于推送

Pub/Sub 将根據收到成功響應的速率來動态調整推送請求的速率, 推送訂閱受一組配額和資源限制的限制,系統會自動調整推送傳送的速率,以最大限度地提高傳送速率,同時不會使推送端點過載,它是通過一套算法來控制的。

(2)Cloud Function和App Engine通常都是使用推送傳送的,也是遵循至少傳送一次的原則。

(3)對于推送訂閱,Pub/Sub 不會發送否定确認(有時稱為 NACK)。如果 Webhook 未傳回成功代碼,則 Pub/Sub 會重試傳送,直到消息在訂閱的消息保留期限過後失效為止。

(4)推送不能一次處理多條消息,沒法使用批處理,但拉取和釋出消息可以。

4.2 拉取傳送

(1)異步拉取(用得最多,實時處理消息那種)

可以在應用中使用長時間運作的消息偵聽器接收消息,并且一次确認一條消息,不建議使用cron job,效率不高。

使用異步拉取不需要應用阻止新消息,進而在應用中實作更高的吞吐量。

如果訂閱者用戶端處理和确認消息的速度可能比 Pub/Sub 将消息發送到用戶端的速度要慢,可考慮使用訂閱者的流控制功能來控制訂閱者接收消息的速率。https://cloud.google.com/pubsub/docs/pull

(2)同步拉取

在某些情況下,異步拉取并不非常适合您的應用。例如,應用邏輯可能依賴輪詢模式來檢索消息,或者需要對用戶端在任何給定時間檢索的消息數量進行精确限制。為了支援此類應用,該服務支援同步拉取方法,用于拉取和确認固定數量的消息,但這會帶來一些消息傳送的延遲。

如下是例子的代碼

from google.cloud import pubsub_v1


def sub_data():
    project_id = 'cong-proj'
    # topic_name = 'hello_topic'
    subscription_name = 'sub_one'
    subscription1 = pubsub_v1.SubscriberClient()
    # topic_path = subscription1.topic_path(project_id, topic_name)
    subscription_path = subscription1.subscription_path(project_id, subscription_name)

    def callback(message):
        print("Received message: {}".format(message))
        message.ack()

    streaming_pull_future = subscription1.subscribe(subscription_path, callback=callback)
    print("Listening for messages on {}..\n".format(subscription_path))
    try:
        streaming_pull_future.result()
    except:  # noqa
        streaming_pull_future.cancel()           

5、釋出消息

5.1 批處理以平衡延遲和吞吐量

消息可以根據請求大小(以位元組為機關)、消息數量和時間分批。

batch_settings = pubsub_v1.types.BatchSettings(

    max_bytes=1024, max_latency=1  # One kilobyte  # One second

)

publisher = pubsub_v1.PublisherClient(batch_settings)           

5.2 重試請求

如果釋出失敗,系統會自動重試,但無法保證能夠重試的錯誤除外。

publisher = pubsub_v1.PublisherClient(client_config=retry_settings)           

如下是例子的代碼

def publish_data():
    project_id = 'cong-proj'
    topic_name = 'hello_topic'

    # batch_settings = pubsub_v1.types.BatchSettings(max_messages=10, max_latency=30)

    publisher = pubsub_v1.PublisherClient() # batch_settings 
    # publisher.from_service_account_file("""C:\CongStudy\pubsub-demo\cong-pubsub.json""")
    topic_path = publisher.topic_path(project_id, topic_name)

    data = "Message number 1".encode('utf-8')
    future = publisher.publish(topic_path, data)
    print(future.result())



 def get_message(message):
    # 消息實際上是以Base64的字元串的形式存于主題中,如要在訂閱器中使用消息,可以使用如下代碼
    pubsub_message = base64.b64decode(message).decode('utf-8')


    # hell,world base64加密後為 aGVsbCx3b3JsZA==, 它就會存于主題中
    # print(base64.b64encode('hell,world'.encode('utf-8')).decode('utf-8')) # aGVsbCx3b3JsZA==

    # aGVsbCx3b3JsZA == 解密後為 hell,world
    # print(base64.b64decode('aGVsbCx3b3JsZA==').decode('utf-8')) # hell,world
    


           

6、重放和完全清除消息

  • 還原至某一時間戳
  • 還原至快照

7、參考連結

  • https://cloud.google.com/pubsub/docs/quickstarts
  • https://blog.gcp.expert/google-cloud-pub-sub-aws-sqs-comparison/
gcp

繼續閱讀