1、簡介
Pub/Sub 是一種全托管式實時消息傳遞服務,可讓您在獨立的應用之間發送和接收消息,它是一個PAAS服務。
2、概覽
- 主題(Topic):相當于一個消息的中轉站,釋出者釋出消息後,消息存儲在主題中。
- 釋出者(Publisher):釋出消息的應用
- 訂閱者(Subscriber):接收消息者
如下圖,首先對主題建立了兩個訂閱者(subscriber1, subscriber2),釋出者(publisher)向主題中釋出一條消息(Hello,World!), 接着,這兩個訂閱者都收到了此消息(Hello,World!)。
3、訂閱消息
(1)訂閱是如何工作的
- 多個訂閱者訂閱一個主題的情況,如果在一個主題中釋出一條消息,那所有的訂閱者都會收到此消息。如果想要多個訂閱者分工處理不同的内容,可以在消息中加自定義特性 (Attribute),在訂閱者邏輯中可以根據此特性隻處理目前訂閱者感興趣的消息。
- 隻有經過确認過的消息`message.ack()`,才不會再被傳送,如果您在确認時限之前未确認消息,Pub/Sub 會重新發送該消息。是以,Pub/Sub 可能會發送重複的消息,如果訂閱者處理消息發生異常,且消息未被确認,那Pub/Sub 會重新發送該消息。
- 确認過的消息,會在Pub/Sub被删除。
- 在給定訂閱者建立之前釋出的消息通常不會針對此訂閱進行傳送。是以,如果某主題沒有訂閱,則釋出到該主題的消息将不會傳送給任何訂閱者。
(2)至少傳送一次
通常,Pub/Sub 會按照消息釋出的順序将每條消息傳送一次,但有時可能并不按順序傳送消息,或者會将消息傳送多次。 一般來說,如果要實施多次傳送,訂閱者需要在處理消息時遵循幂等原則。您可以使用 Cloud Dataflow PubsubIO 将 Pub/Sub 消息流隻處理一次。PubsubIO 會根據自定義消息辨別符或由 Pub/Sub 配置設定的消息辨別符來删除重複的消息。是以,處理消息的邏輯必須是幂等的(所謂幂等,通俗點說,就是函數執行一次,和執行數次,産生的結/效果是一樣的)。
(3)對消息排序
通常情況下,pub/sub不完全像隊列一樣嚴格地保證消息先進先出,因為保證消息順序會對吞吐量産生嚴重限制,pub/sub僅保證第一次傳送消息時是按順序進行的,後序的消息不一定是按順序排列的,所有消息都允許随時嘗試重新傳送,這樣允許一次向訂閱者發送多條消息。
如果想使消息有順序的話,可以在自定義特性 (Attribute)中加時間戳或序列号。如果主題有10條這樣的消息0,1,2,3,4,5 那收到消息順序,下面幾種情況都可能會發生。
- 0,1,5,4,2,3 # 第一條消息始終是0,順序不能保證
- 0,1,2,4,2,3,0 # 0 也可能被發送了2次
- 0,1,1,2,4,3,5 ,6 # 後序的消息可能被發送多次
4、訂閱者接收消息的兩種方式
訂閱者可以有兩種方式拿到消息,一是設定public endpoint,讓pub/sub被動地推送消息給你,二是主要向pub/sub發送拉取(pull)請求。
4.1 推送傳送
(1)關于推送
Pub/Sub 将根據收到成功響應的速率來動态調整推送請求的速率, 推送訂閱受一組配額和資源限制的限制,系統會自動調整推送傳送的速率,以最大限度地提高傳送速率,同時不會使推送端點過載,它是通過一套算法來控制的。
(2)Cloud Function和App Engine通常都是使用推送傳送的,也是遵循至少傳送一次的原則。
(3)對于推送訂閱,Pub/Sub 不會發送否定确認(有時稱為 NACK)。如果 Webhook 未傳回成功代碼,則 Pub/Sub 會重試傳送,直到消息在訂閱的消息保留期限過後失效為止。
(4)推送不能一次處理多條消息,沒法使用批處理,但拉取和釋出消息可以。
4.2 拉取傳送
(1)異步拉取(用得最多,實時處理消息那種)
可以在應用中使用長時間運作的消息偵聽器接收消息,并且一次确認一條消息,不建議使用cron job,效率不高。
使用異步拉取不需要應用阻止新消息,進而在應用中實作更高的吞吐量。
如果訂閱者用戶端處理和确認消息的速度可能比 Pub/Sub 将消息發送到用戶端的速度要慢,可考慮使用訂閱者的流控制功能來控制訂閱者接收消息的速率。https://cloud.google.com/pubsub/docs/pull
(2)同步拉取
在某些情況下,異步拉取并不非常适合您的應用。例如,應用邏輯可能依賴輪詢模式來檢索消息,或者需要對用戶端在任何給定時間檢索的消息數量進行精确限制。為了支援此類應用,該服務支援同步拉取方法,用于拉取和确認固定數量的消息,但這會帶來一些消息傳送的延遲。
如下是例子的代碼
from google.cloud import pubsub_v1
def sub_data():
project_id = 'cong-proj'
# topic_name = 'hello_topic'
subscription_name = 'sub_one'
subscription1 = pubsub_v1.SubscriberClient()
# topic_path = subscription1.topic_path(project_id, topic_name)
subscription_path = subscription1.subscription_path(project_id, subscription_name)
def callback(message):
print("Received message: {}".format(message))
message.ack()
streaming_pull_future = subscription1.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))
try:
streaming_pull_future.result()
except: # noqa
streaming_pull_future.cancel()
5、釋出消息
5.1 批處理以平衡延遲和吞吐量
消息可以根據請求大小(以位元組為機關)、消息數量和時間分批。
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, max_latency=1 # One kilobyte # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
5.2 重試請求
如果釋出失敗,系統會自動重試,但無法保證能夠重試的錯誤除外。
publisher = pubsub_v1.PublisherClient(client_config=retry_settings)
如下是例子的代碼
def publish_data():
project_id = 'cong-proj'
topic_name = 'hello_topic'
# batch_settings = pubsub_v1.types.BatchSettings(max_messages=10, max_latency=30)
publisher = pubsub_v1.PublisherClient() # batch_settings
# publisher.from_service_account_file("""C:\CongStudy\pubsub-demo\cong-pubsub.json""")
topic_path = publisher.topic_path(project_id, topic_name)
data = "Message number 1".encode('utf-8')
future = publisher.publish(topic_path, data)
print(future.result())
def get_message(message):
# 消息實際上是以Base64的字元串的形式存于主題中,如要在訂閱器中使用消息,可以使用如下代碼
pubsub_message = base64.b64decode(message).decode('utf-8')
# hell,world base64加密後為 aGVsbCx3b3JsZA==, 它就會存于主題中
# print(base64.b64encode('hell,world'.encode('utf-8')).decode('utf-8')) # aGVsbCx3b3JsZA==
# aGVsbCx3b3JsZA == 解密後為 hell,world
# print(base64.b64decode('aGVsbCx3b3JsZA==').decode('utf-8')) # hell,world
6、重放和完全清除消息
- 還原至某一時間戳
- 還原至快照
7、參考連結
- https://cloud.google.com/pubsub/docs/quickstarts
- https://blog.gcp.expert/google-cloud-pub-sub-aws-sqs-comparison/