天天看點

III 23 rabbitmq

MOM(message oriented middleware)

消息中間件(是在消息的傳遞過程中儲存消息的容器,消息中間件再将消息從它的源中繼到它的目标時,充當中間人的作用,隊列的主要目的是提供路由并保證消息的傳遞,如果發送消息時接收者不可用,消息隊列會保留消息,直到可以成功的傳遞它為止,消息隊列儲存消息也是有期限的);

消息中間件特點:

采用異步處理模式(消息發送者可以發送一個消息而無須等待響應,消息發送者将消息發送到一條虛拟的通道上(主題topic或隊列queue),消息接收者訂閱或監聽該通道,一條消息可能最終轉發給一個或多個消息接收者,這些接收者都無需對消息發送者作出同步回應,整個過程是異步的;例如使用者資訊注冊,注冊完畢後,過段時間才收到郵件或短信(pub/sub模型));

應用程式和應用程式調用關系為松耦合關系(發送者和接收者不必了解對方,隻需要确認消息;發送者和接收者不必同時線上;例如線上交易系統,為保證資料的最終一緻,在支付系統處理完成後會把支付結果放到消息中間件裡,通知訂單系統修改訂單支付狀态,兩個系統通過消息中間件)

消息傳遞服務模型:

<a href="http://s1.51cto.com/wyfs02/M00/87/BE/wKioL1fgtrnzpWqvAAA1u8p3WkE026.jpg" target="_blank"></a>

消息中間件的傳遞模型(PTP點對點模型;pub/sub釋出/訂閱模型):

PTP模型用于消息producer和消息consumer之間點到點的通信,消息producer将消息發送到由某個名字辨別的特定consumer,這個名字實際是對應于消息服務中的一個queue,在消息傳遞給消費者之前它被存儲在這個queue中,隊列消息可放在記憶體中,也可是持久的,以保證在消息服務出故障時仍能傳遞消息;

PTP模型特性(每個消息對應一個consumer;發送者和接收者沒有時間依賴;接收者确認消息接收和處理成功);

<a href="http://s4.51cto.com/wyfs02/M01/87/C2/wKiom1fgtsKQFDFIAAAQg6_SVf8318.jpg" target="_blank"></a>

pub/sub模型支援向一個特定的消息topic生産消息,0或多個subscriber可能對接收來自特定消息topic的消息感興趣,這種模型下,pub和sub彼此不知道對方,類似匿名公告闆,可了解為broadcast廣播;多個consumer可擷取消息,在pub和sub之間存在時間依賴性(publisher和subscriber隻有建立訂閱關系,subscriber才能收到消息),釋出者需要建立一個訂閱subscription,以便能讓consumer訂閱;subscriber必須保持一定的活動狀态以接收消息,除非subscriber建立了持久的訂閱,這種情況下,在subscriber未連接配接時釋出的消息将在subscriber重新連接配接時重新釋出;

pub/sub模型特性(每個消息可以有多個subscriber;consumer隻有訂閱後才能接收到消息;持久訂閱和非持久訂閱(時間長短的差別));

注:持久訂閱(長連接配接,訂閱關系建立後,消息就不會消失(就算consumer不線上或當機消息不會消失),會暫存在MOM上);非持久訂閱(長連接配接,訂閱者為了接收消息,必須一直線上,consumer不線上或當機将收不到消息;當隻有一個subscriber時相當于是PTP模式)

<a href="http://s2.51cto.com/wyfs02/M01/87/BE/wKioL1fgtszhN2A_AAAgCflz4gg062.jpg" target="_blank"></a>

網際網路消息中間件應用場景:

1、使用者資訊注冊,注冊完後過段時間才收到郵件、短信(pub/sub模型,push或pull):

<a href="http://s1.51cto.com/wyfs02/M02/87/C2/wKiom1fgttfCEw9fAAAcBYkrD5o828.jpg" target="_blank"></a>

2、日志分析使用,pv分析計算、使用者行為分析(push或pull,時效性要求高時push):

<a href="http://s2.51cto.com/wyfs02/M02/87/BE/wKioL1fgtuDg7eQ-AAAgnvKOIb4258.jpg" target="_blank"></a>

3、資料複制(将資料從源頭複制到多個目的地,一般要求順序或保證因果順序,用于跨機房資料傳輸、搜尋、離線資料計算等;push或pull,對時間要求高時用push):

<a href="http://s2.51cto.com/wyfs02/M00/87/C2/wKiom1fgtuzy_3fKAAAlGj6bm3Y564.jpg" target="_blank"></a>

4、延遲消息發送和暫存,把消息中間件當成可靠的消息暫存地,定時進行消息投遞(例如模拟使用者秒殺通路,進行系統性能壓測,pull):

<a href="http://s2.51cto.com/wyfs02/M02/87/BE/wKioL1fgtvXQnYirAAAc1xobTL0120.jpg" target="_blank"></a>

5、消息廣播(緩存資料同步更新,往應用推送資料,例如更新本地緩存、變更商品價格;push):

<a href="http://s1.51cto.com/wyfs02/M00/87/C2/wKiom1fgtwGxg9alAAAZ9VGDzoc125.jpg" target="_blank"></a>

消息中間件分類(push推;pull拉):

push推消息模型(消息producer将消息發送給消息傳遞服務mom,消息傳遞服務又将消息推給消息consumer);

pull拉消息模型(消息producer将消息發送給消息傳遞服務,消息consumer從消息傳遞服務上請求拉取消息);

Push

Pull

MOM

消息存儲;處理請求;儲存推送軌迹;儲存訂閱關系;消費者LB;集中式

消息存儲;處理請求;分布式

Consumer

處理響應和請求

處理響應和請求;儲存pull狀态,如拉取位置的offset;異常情況下的消息暫存和recover

實時性

較好,收到資料後可立即發送給consumer

取決于pull的間隔時間

Consumer故障

服務端堆積消息;重複推送耗費資源;儲存推送軌迹壓力大

對中間件無影響

其它

對消息推送有更多控制;能實作多樣化的推送機制;當消費者數量增多時,推送壓力大,性能天花闆;consumer處理能力差異,導緻堆消息

需要在consumer端實作消息過濾,浪費資源;需要在不同consumer之間協調作LB

rabbitmq是在AMQP(advanced message queuing protocol)協定标準基礎上實作的,遵循mozilla public license開源協定,采用erlang實作的工業級mq server;

http://www.rabbitmq.com/

AMQP是一個異步消息傳遞所使用的應用層協定規範,作為線路層協定而不是API(如jms,java message service),amqp能無視消息的來源任意發送和接收消息;amqp的原始用途隻是為金融界提供一個可以彼此協作的消息協定,而現在的目标則是為通用消息隊列架構,提供通用建構工具;是以面向消息的消息中間件系統,如pub/sub隊列沒有作為基本元素實作,反而通過發送簡化的amq實體,使用者被賦予了建構這些實體的能力,這些實體也是規範的一部分,形成了線上路層協定頂端的一個層級;amqp模型統一了消息模式,如pub/sub、列隊、事務及流資料,并且添加了額外的特性,如更易于擴充基于内容的路由;

百科上的AMQP:AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層标準進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計;基于此協定的用戶端與消息中間件可傳遞消息,并不受用戶端/中間件不同産品,不同的開發語言等條件的限制;Erlang中的實作有 RabbitMQ等;

RabbitMQ是由RabbitMQ Technologies Ltd開發并且提供商業支援的,該公司在2010年4月被SpringSource(VMWare的一個部門)收購,在2013年5月被并入Pivotal;其實VMWare的Pivotal和EMC本質上是一家的,不同的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,現在并沒有上市;

amqp model(virtual host=exchange+MQ;一個server可有多個virtual host):

<a href="http://s3.51cto.com/wyfs02/M00/87/BE/wKioL1fgtybDmJCOAABhEbsiM8A288.jpg" target="_blank"></a>

rabbitmq的整體架構(兩大核心元件exchange和queue;server即broker):

<a href="http://s4.51cto.com/wyfs02/M02/87/C2/wKiom1fgtzHAgTmNAACptXpXVOk961.jpg" target="_blank"></a>

注:

RabbitMQ Server:也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證資料能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對于普通的應用來說這已經足夠了。當然對于商業系統來說,可以再做一層資料一緻性的guard,就可以徹底保證系統的一緻性了。

Client A &amp; B:也叫Producer,資料的發送方。create messages and publish (send) them to a broker server (RabbitMQ).一個Message有兩個部分:payload(有效載荷)和label(标簽)。payload顧名思義就是傳輸的資料。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

Client 1,2,3:也叫Consumer,資料的接收方。Consumers attach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它發送給它的某個訂閱者即Consumer。當然可能會把同一個Message發送給很多的Consumer。在這個Message中,隻有payload,label已經被删掉了。對于Consumer來說,它是不知道誰發送的這個資訊的。是協定本身不支援。但是當然了如果Producer發送的payload包含了Producer的資訊就另當别論了。

Exchanges are where producers publish theirmessages.

Queues are where the messages end up and arereceived by consumers

Bindings are how the messages get routedfrom the exchange to particular queues.

Connection:就是一個TCP的連接配接,Producer和Consumer都是通過TCP連接配接到RabbitMQ Server的,程式的起始處就是建立這個TCP連接配接

Channels:虛拟連接配接,它建立在上述的TCP連接配接中,資料流動都是在Channel中進行的,也就是說,一般情況是程式起始建立TCP連接配接,第二步就是建立這個Channel;

為什麼使用Channel,而不是直接使用TCP連接配接?對于OS來說,建立和關閉TCP連接配接是有代價的,頻繁的建立關閉TCP連接配接對于系統的性能有很大的影響,而且TCP的連接配接數也有限制,這也限制了系統處理高并發的能力;但是,在TCP連接配接中建立Channel是沒有上述代價的,對于Producer或者Consumer來說,可以并發的使用多個Channel進行Publish或者Receive。有實驗表明,1s的資料可以Publish 10K的資料包,當然對于不同的硬體環境,不同的資料包大小這個資料肯定不一樣,但是我隻想說明,對于普通的Consumer或者Producer來說,這已經足夠了,如果不夠用,你考慮的應該是如何細化split你的設計

rabbitmq重要術語:

server(broker,接受client(producer和consumer)連接配接,實作amqp消息隊列和路由功能的程序);

virtual host(是虛拟概念,類似于權限控制組,一個virtualhost裡面可有若幹個exchange和queue,但是權限控制的最小粒度是virtual host);

exchange(接收producer發送的消息,并根據binding規則将消息路由給server中的queue,exchangetype決定了exchange路由消息的行為,exchange type有direct exchange、fanout exchange、topic exchange、headers exchange四種,不同類型的exchange路由的行為是不一樣的);

message queue(用于存儲還未被consumer消費的消息);

message(由header和body組成,header是由producer添加的各種屬性的集合,包括message是否被持久化,由哪個messagequeue接收,優先級是多少等,而body是真正需要傳輸的app資料);

BindingKey(所謂綁定就是将一個特定的exchange和一個特定的queue綁定起來,關鍵字為BindingKey);

exchange type:

direct exchange(直接互動式處理路由鍵(bindingkey,可了解為是一個隊列名字),需要将一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全比對,這是一個完整的比對;如果一個隊列綁定到該交換機上要求路由鍵dog,則隻有被标記為dog的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,隻會轉發dog);

<a href="http://s3.51cto.com/wyfs02/M01/87/BE/wKioL1fgt1riwaQEAABGgJEiZfU883.jpg" target="_blank"></a>

fanout exchange(廣播式路由鍵,隻需将隊列綁定到交換機上,一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上;類似子網廣播,子網内的每台主機都獲得了一份複制的消息;fanout交換機轉發消息是最快的);

<a href="http://s5.51cto.com/wyfs02/M00/87/C2/wKiom1fgt2SDIP9DAABSVJq5HQM927.jpg" target="_blank"></a>

topic exchange(主題式交換器,通過消息的路由關鍵字和綁定關鍵字的模式比對,将消息路由到被綁定的隊列中,這種路由器類型可被用來支援經典的pub/sub消息傳輸類型(使用主題名字空間作為消息尋址模式,将消息傳遞給那些部分或全部比對主題模式的多個consumer);工作方式:綁定關鍵字用0個或多個标記構成,每一個标記之間用點分隔,綁定關鍵字必須用這種形式明确說明,支援通配符(*比對一個詞組,#比對0個或多個詞組),例如綁定關鍵字*.stock.#,比對路由關鍵字usd.stock和eur.stock.db,不比對stock.nasdaq);

<a href="http://s1.51cto.com/wyfs02/M02/87/BE/wKioL1fgt2_hyPRKAACEtCBS3So963.jpg" target="_blank"></a>

headers exchange:

A headers exchange is designed to forrouting on multiple attributes that are more easily expressed as messageheaders than a routing key. Headers exchanges ignore the routing key attribute.Instead, the attributes used for routing are taken from the headers attribute.A message is considered matching if the value of the header equals the valuespecified upon binding.

It is possible to bind a queue to a headersexchange using more than one header for matching. In this case, the brokerneeds one more piece of information from the application developer, namely,should it consider messages with any of the headers matching, or all of them?This is what the "x-match" binding argument is for. When the"x-match" argument is set to "any", just one matchingheader value is sufficient. Alternatively, setting "x-match" to"all" mandates that all the values must match.

Headers exchanges can be looked upon as"direct exchanges on steroids". Because they route based on headervalues, they can be used as direct exchanges where the routing key does nothave to be a string; it could be an integer or a hash (dictionary) for example.

rabbitmq配置(一般情況下,預設配置足夠,若有特殊配置通過以下兩個檔案):

/etc/rabbitmq/rabbitmq-env.conf   #(define ports, file locations and names (taken from the shell, or set in the environment configuration file,rabbitmq-env.conf/rabbitmq-env-conf.bat),常用的變量見:http://www.rabbitmq.com/configure.html#define-environment-variables)

/etc/rabbitmq/rabbitmq.config   #(defines server component settings for permissions, limits and clusters, and also plugin settings.)

rabbitmq-env.conf常用的參數:

RABBITMQ_NODE_IP_ADDRESS=

RABBITMQ_NODE_PORT=

RABBITMQ_DIST_PORT=   #(Port to use for clustering. Ignored if your config file sets inet_dist_listen_min orinet_dist_listen_max)

RABBITMQ_NODENAME=

RABBITMQ_CONF_ENV_FILE=   #(Location of the file that contains environment variable definitions (without the RABBITMQ_ prefix).Note that the file name on Windows is different from other operating systems.)

rabbitmq.config這是個标準的erlang配置檔案,必須符合erlang配置檔案标準,erlangtuple,結構為{key,value},key為atom類型,value為一個term,關鍵字參數有:

tcp_listeners   #(List of ports on which to listen for AMQP connections (without SSL). Can contain integers(meaning "listen on all interfaces") or tuples such as {"127.0.0.1",5672} to listen on one or more interfaces.Default: [5672];設定rabbitmq監聽的port,要與rabbitmq-env.conf中RABBITMQ_NODE_PORT一緻);

disk_free_limit   #(Disk free space limit of the partition on which RabbitMQ is storing data. When available diskspace falls below this limit, flow control is triggered. The value may be setrelative to the total amount of RAM (e.g. {mem_relative, 1.0}). The value mayalso be set to an integer number of bytes. Or, alternatively, in informationunits (e.g "50MB"). By default free disk space must exceed 50MB. See the Disk Alarms documentation.Default: 50000000;disk低水位線,若disk容量低于指定值則停止接收資料);

vm_memory_high_watermark   #(Memory threshold at which the flow control is triggered. See the memory-based flow control documentation.Default: 0.4即記憶體問題的40%;記憶體低水位線,若低于該水位線,則開啟流控機制)

[

   {rabbit, [{tcp_listeners, [5673]}]}

].

rabbitmq指令:

#service rabbitmq-server start|stop|restart|reload

#rabbitmqctl add_vhost VHOSTNAME   #(建立虛拟主機)

#rabbitmqctl delete_vhost VHOSTNAME   #(删除虛拟主機)

#rabbitmqctl list_vhosts   #(周遊所有虛拟主機資訊)

#rabbitmqctl add_user USERNAMEPASSWORD   #(添加使用者)

#rabbitmqctl change_password USERNAMENEW_PASSWORD   #(修改使用者密碼)

#rabbitmqctl set_permissions -p VHOSTNAMEUSER ".*" ".*" ".*"   #(vhost+user,綁定權限,并賦予rw權限)

#rabbitmqctl list_queues   (顯示所有隊列)

操作:

[root@server1 ~]# uname -rm

2.6.32-431.el6.x86_64 x86_64

[root@server1 ~]# cat /etc/redhat-release

Red Hat Enterprise Linux Server release 6.5(Santiago)

[root@server1 ~]# wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo   #(aliyum此yum源用來安裝依賴的包,如tk、erlang-*等,epel這個yum源用來安裝rabbitmq-server)

[root@server1 ~]# rpm -Uvh --force epel-release-6-8.noarch.rpm   #(下載下傳位址:http://mirrors.ustc.edu.cn/fedora/epel/6/x86_64/)

Preparing...                ###########################################[100%]

  1:epel-release          ########################################### [100%]

[root@server1 ~]# yum makecache

……

[root@server1 ~]# yum -y install rabbitmq-server

Installed:

 rabbitmq-server.noarch 0:3.1.5-1.el6                                                                                              

Dependency Installed:

 SDL.x86_64 0:1.2.14-7.el6_7.1                                    erlang.x86_64 0:R14B-04.3.el6                                    

 erlang-appmon.x86_64 0:R14B-04.3.el6                             erlang-asn1.x86_64 0:R14B-04.3.el6                              

 erlang-common_test.x86_64 0:R14B-04.3.el6                         erlang-compiler.x86_640:R14B-04.3.el6                           

 erlang-cosEvent.x86_64 0:R14B-04.3.el6                           erlang-cosEventDomain.x86_64 0:R14B-04.3.el6                    

 erlang-cosFileTransfer.x86_64 0:R14B-04.3.el6                    erlang-cosNotification.x86_64 0:R14B-04.3.el6                   

 erlang-cosProperty.x86_64 0:R14B-04.3.el6                         erlang-cosTime.x86_640:R14B-04.3.el6                           

 erlang-cosTransactions.x86_64 0:R14B-04.3.el6                     erlang-crypto.x86_64 0:R14B-04.3.el6                            

 erlang-debugger.x86_64 0:R14B-04.3.el6                           erlang-dialyzer.x86_64 0:R14B-04.3.el6                          

 erlang-diameter.x86_64 0:R14B-04.3.el6                            erlang-docbuilder.x86_640:R14B-04.3.el6                        

 erlang-edoc.x86_64 0:R14B-04.3.el6                               erlang-erl_docgen.x86_64 0:R14B-04.3.el6                        

 erlang-erl_interface.x86_64 0:R14B-04.3.el6                       erlang-erts.x86_640:R14B-04.3.el6                              

 erlang-et.x86_64 0:R14B-04.3.el6                                 erlang-eunit.x86_64 0:R14B-04.3.el6                             

 erlang-examples.x86_64 0:R14B-04.3.el6                            erlang-gs.x86_640:R14B-04.3.el6                                

 erlang-hipe.x86_64 0:R14B-04.3.el6                               erlang-ic.x86_64 0:R14B-04.3.el6                                

 erlang-inets.x86_64 0:R14B-04.3.el6                              erlang-inviso.x86_640:R14B-04.3.el6                            

 erlang-jinterface.x86_64 0:R14B-04.3.el6                          erlang-kernel.x86_640:R14B-04.3.el6                            

 erlang-megaco.x86_64 0:R14B-04.3.el6                             erlang-mnesia.x86_64 0:R14B-04.3.el6                            

 erlang-observer.x86_64 0:R14B-04.3.el6                            erlang-odbc.x86_640:R14B-04.3.el6                              

 erlang-orber.x86_64 0:R14B-04.3.el6                              erlang-os_mon.x86_64 0:R14B-04.3.el6                            

 erlang-otp_mibs.x86_64 0:R14B-04.3.el6                           erlang-parsetools.x86_64 0:R14B-04.3.el6                        

 erlang-percept.x86_64 0:R14B-04.3.el6                             erlang-pman.x86_640:R14B-04.3.el6                              

 erlang-public_key.x86_64 0:R14B-04.3.el6                          erlang-reltool.x86_640:R14B-04.3.el6                            

 erlang-runtime_tools.x86_64 0:R14B-04.3.el6                       erlang-sasl.x86_640:R14B-04.3.el6                              

 erlang-snmp.x86_64 0:R14B-04.3.el6                               erlang-ssh.x86_64 0:R14B-04.3.el6                               

 erlang-ssl.x86_64 0:R14B-04.3.el6                                erlang-stdlib.x86_64 0:R14B-04.3.el6                            

 erlang-syntax_tools.x86_64 0:R14B-04.3.el6                       erlang-test_server.x86_64 0:R14B-04.3.el6                       

 erlang-toolbar.x86_64 0:R14B-04.3.el6                            erlang-tools.x86_64 0:R14B-04.3.el6                             

 erlang-tv.x86_64 0:R14B-04.3.el6                                 erlang-typer.x86_64 0:R14B-04.3.el6                             

 erlang-webtool.x86_64 0:R14B-04.3.el6                             erlang-wx.x86_640:R14B-04.3.el6                                

 erlang-xmerl.x86_64 0:R14B-04.3.el6                               unixODBC.x86_640:2.2.14-14.el6                                 

 wxBase.x86_64 0:2.8.12-1.el6.centos                               wxGTK.x86_640:2.8.12-1.el6.centos                              

 wxGTK-gl.x86_64 0:2.8.12-1.el6.centos                            

Complete!

[root@server1 ~]# rpm -ql rabbitmq-server

/etc/logrotate.d/rabbitmq-server

/etc/rabbitmq

/etc/rc.d/init.d/rabbitmq-server

/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server

/usr/lib/rabbitmq/bin

/usr/lib/rabbitmq/bin/rabbitmq-defaults

/usr/lib/rabbitmq/bin/rabbitmq-env

/usr/lib/rabbitmq/bin/rabbitmq-plugins

/usr/lib/rabbitmq/bin/rabbitmq-server

/usr/lib/rabbitmq/bin/rabbitmqctl

/usr/lib/rabbitmq/lib/rabbitmq_server-3.1.5

/usr/sbin/rabbitmq-server

/usr/sbin/rabbitmqctl

[root@server1 ~]# service rabbitmq-server start

Starting rabbitmq-server: SUCCESS

rabbitmq-server.

[root@server1 ~]# netstat -tnulp | grep:5672

tcp       0      0 :::5672                     :::*                        LISTEN      11303/beam 

[root@server1 ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins list  #(rabbitmq_web_dispath,rabbitmq_web_stomp,rabbitmq_web_stomp_example用于監控)

[ ] amqp_client                       3.1.5

[ ] cowboy                           0.5.0-rmq3.1.5-git4b93c2d

[ ] eldap                             3.1.5-gite309de4

[ ] mochiweb                         2.7.0-rmq3.1.5-git680dba8

[ ] rabbitmq_amqp1_0                  3.1.5

[ ] rabbitmq_auth_backend_ldap        3.1.5

[ ] rabbitmq_auth_mechanism_ssl       3.1.5

[ ] rabbitmq_consistent_hash_exchange 3.1.5

[ ] rabbitmq_federation               3.1.5

[ ] rabbitmq_federation_management    3.1.5

[ ] rabbitmq_jsonrpc                  3.1.5

[ ] rabbitmq_jsonrpc_channel          3.1.5

[ ] rabbitmq_jsonrpc_channel_examples 3.1.5

[ ] rabbitmq_management               3.1.5

[ ] rabbitmq_management_agent         3.1.5

[ ] rabbitmq_management_visualiser    3.1.5

[ ] rabbitmq_mqtt                     3.1.5

[ ] rabbitmq_shovel                   3.1.5

[ ] rabbitmq_shovel_management        3.1.5

[ ] rabbitmq_stomp                    3.1.5

[ ] rabbitmq_tracing                  3.1.5

[ ] rabbitmq_web_dispatch             3.1.5

[ ] rabbitmq_web_stomp                3.1.5

[ ] rabbitmq_web_stomp_examples       3.1.5

[ ] rfc4627_jsonrpc                   3.1.5-git5e67120

[ ] sockjs                           0.3.4-rmq3.1.5-git3132eb9

[ ] webmachine                       1.10.3-rmq3.1.5-gite9359c7

[root@server1 ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management

The following plugins have been enabled:

 mochiweb

 webmachine

 rabbitmq_web_dispatch

 amqp_client

 rabbitmq_management_agent

 rabbitmq_management

Plugin configuration has changed. RestartRabbitMQ for changes to take effect.

[root@server1 ~]# rabbitmqctl list_queues  

Listing queues ...

...done.

[root@server1 ~]# rabbitmqctl list_vhosts

Listing vhosts ...

/

[root@server1 ~]# rabbitmqctl add_vhost test   (新添加vhost;/為預設就有)

Creating vhost "test" ...

[root@server1 ~]# rabbitmqctl list_vhosts

test

[root@server1 ~]# rabbitmqctl add_user jowin 123456

Creating user "jowin" ...

[root@server1 ~]# rabbitmqctl list_users

Listing users ...

guest        [administrator]

jowin         []

[root@server1 ~]# rabbitmqctl set_permissions -p test jowin ".*" ".*"".*"

Setting permissions for user"jowin" in vhost "test" ...

MetaQ(全稱Metamorphosis)是一個高性能、高可用、可擴充的分布式消息中間件,思路起源于LinkedIn的Kafka,但并不是Kafka的一個Copy;MetaQ具有消息存儲順序寫、吞吐量大和支援本地和XA事務等特性,适用于大吞吐量、順序消息、廣播和日志資料傳輸等場景,目前在淘寶和支付寶有着廣泛的應用;

metaq是一款完全隊列模型的消息中間件,用java編寫,可在多種平台上部署,典型的pull機制;consumer-side支援java、C++程式設計語言;單台server可支援1w+消息隊列,通過擴容,隊列數幾乎可任意橫向擴充;每個隊列可持久化、長度無限(取決于disk空間);可從任意位置開始消費;

http://metaq.taobao.org/

總體架構:

<a href="http://s3.51cto.com/wyfs02/M02/87/BE/wKioL1fguA2THWVAAAA-TObaTzg374.jpg" target="_blank"></a>

metaq特點:

producer、MOM(broker)、consumer都可分布式;

消息存儲順序寫;支援消息順序;

性能極高,吞吐量大;

consumer拉取,随機讀,批量拉取資料;

資料遷移、擴容對使用者透明;

消費狀态儲存在consumer-side;

metaq術語:

broker(metaq的服務端,在消息中間件中通常稱為broker);

topic(消息的主題,可了解為隊列名,由使用者定義并在中間件配置,producer發送消息到某個topic下,consumer從某個topic下消費消息);

offset(消息在中間件broker上的每個分區都組織成一個檔案清單,consumer拉取資料需要知道資料在檔案中的offset,是絕對偏移量,中間件會将offset轉化為具體檔案的相對偏移量);

partition(分區,同一個topic下有多個partition,例如meta-test這個topic可分10個partition,分别有兩台broker提供,那可能每台broker提供5個partition,若broker id分别為0和1,所有分區為0-0、0-1、0-2、0-3、0-4、1-0、1-1、1-2、1-3、1-4);

metaq主要配置項:

[root@server1 ~]# vim /usr/local/taobao/metamorphosis-server-wrapper/conf/server.ini

[system]

brokerId=0  #(必須是叢集内唯一,0-1024)

numPartitions=1

serverPort=8123   #(伺服器port)

;; hostName=   #(預設取本機IP,多網卡需指明)

;; dataPath=   #(預設資料存儲路徑)

;; dataLogPath=   #(日志資料檔案路徑,預設與dataPath一樣)

dashboardHttpPort=8120

unflushThreshold=0   #(每隔多少條消息做一次disk sync,強制将更改的資料刷入disk,預設0表示強制每次寫入都sync,當為0時伺服器會自動啟用groupcommit技術,将多個消息合并成一次sync來提升io性能,經測試group commit情況下消息發送者的tps沒有受到太大影響,但server負載會上升很多;若為1000表示在掉電時最多允許丢失1000條消息,

unflushInterval=10000   #(間隔多少ms做一次disk sync,預設10s,在掉電時最多丢失10s内發送過來的消息,不可設為0或&lt;0)

maxSegmentSize=1073741824

maxTransferSize=1048576

deletePolicy=delete,168   #(資料删除政策,機關小時,預設超過7天就删除,此處為168小時,若為其它機關要注明,例如10s、10m)

deleteWhen=0 0 6,18 * * ?   #(何時執行删除政策的cron表達式,預設是此處設定,每天6:00和18:00執行)

flushTxLogAtCommit=1   #(事務日志的同步設定,0表示os決定,1表示每次commit都同步,2表示每隔1s同步一次;此參數嚴重影響性能,可根據需要的性能和可靠性權衡作出合理選擇,建議設為2,有問題最多丢失1s内運作的事務,這個級别對大多數服務是可靠的;最安全的是設為1這将嚴重影響事務性能;而0的安全級别最低,在安全級别上1&gt;=2&gt;0,而在性能上0&gt;=2&gt;1)

stat=true

updateConsumerOffsets=true

[zookeeper]

;; zk.zkEnable=True   #(是否注冊到zk,預設True)

zk.zkConnect=10.96.20.113:2181,10.96.20.114:2181   #(zk的伺服器清單)

zk.zkSessionTimeoutMs=30000   #(zk心跳逾時,機關ms,預設30s)

zk.zkConnectionTimeoutMs=30000   #(zk連接配接逾時時間,機關ms,預設30s)

zk.zkSyncTimeMs=5000

[topic=test]

[topic=meta-test]

metaq的叢集實作:

所有的broker注冊到zookeeper;producer連接配接zookeeper并傳回可用的broker清單,選擇一個broker發送消息

metaq主要指令:

#./metaServer.sh start &amp;

#./metaServer.sh stop

#./metaServer.sh restart &amp;

#./metaServer.sh reload &amp;

#./metaServer.sh stats

http://apache.fayea.com/zookeeper/zookeeper-3.4.6/

https://github.com/killme2008/Metamorphosis/releases

metaq操作:

準備:

server1(eth0:10.96.20.113;eth1:192.168.10.113)

server2(eth0:10.96.20.114;eth1:192.168.10.114)

jdk-8u51-linux-x64.rpm

zookeeper-3.4.6.tar.gz

metaq-server-1.4.6.2.tar.gz

server1與server2執行相同操作(安裝java運作環境;安裝配置zookeeper;安裝配置metaq):

[root@server1 ~]# rpm -ivh jdk-8u51-linux-x64.rpm

[root@server1 ~]# vim /etc/profile.d/java.sh

JAVA_HOME=/usr/java/jdk1.8.0_51

CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib

PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin

export JAVA_HOMECLASSPATH PATH RESIN_HOME

[root@server1 ~]# . !$

. /etc/profile.d/java.sh

[root@server1 ~]# java -version

java version "1.8.0_51"

Java(TM) SE Runtime Environment (build1.8.0_51-b16)

Java HotSpot(TM) 64-Bit Server VM (build25.51-b03, mixed mode)

[root@server1 ~]# tar xf zookeeper-3.4.6.tar.gz -C /usr/local/

[root@server1 ~]# cd /usr/local

[root@server1 local]# ln -sv zookeeper-3.4.6/ zookeeper

`zookeeper' -&gt; `zookeeper-3.4.6/'

[root@server1 local]# cd zookeeper

[root@server1 zookeeper]# mkdir dataDir dataLogDir

[root@server1 zookeeper]# echo 1 &gt; dataDir/myid   #(server2上為#echo 2&gt; dataDir/myid)

[root@server1 zookeeper]# cat dataDir/myid

1

[root@server1 zookeeper]# cp conf/zoo_sample.cfg conf/zoo.cfg

[root@server1 zookeeper]# vim conf/zoo.cfg  #(8880為選舉port,7770是心跳傳遞port)

dataDir=/usr/local/zookeeper/dataDir

dataLogDir=/usr/local/zookeeper/dataLogDir

clientPort=2181

server.1=10.96.20.113:8880:7770

server.2=10.96.20.114:8880:7770

[root@server1 zookeeper]# cd bin

[root@server1 bin]#./zkServer.sh start

JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[root@server1 bin]# netstat -tnulp | grepjava

tcp       0      0::ffff:10.96.20.113:7770    :::*                        LISTEN      9542/java          

tcp       0      0 :::2181                     :::*                        LISTEN      9542/java          

tcp       0      0 :::39498                    :::*                        LISTEN      9542/java  

[root@server1 bin]# tail zookeeper.out

[root@server1 bin]# ./zkCli.sh -server 10.96.20.113:2181

Connecting to 10.96.20.113:2181

2016-09-19 02:01:31,347 [myid:] - INFO  [main:Environment@100] - Clientenvironment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT

2016-09-19 02:01:31,352 [myid:] - INFO  [main:Environment@100] - Clientenvironment:host.name=server1

[main-SendThread(10.96.20.113:2181):ClientCnxn$SendThread@1235]- Session establishment complete on server 10.96.20.113/10.96.20.113:2181,sessionid = 0x15741ac19b00000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:Nonepath:null

[zk: 10.96.20.113:2181(CONNECTED) 0] ls /

[zk: 10.96.20.113:2181(CONNECTED) 1] ls /zookeeper

[quota]

[zk: 10.96.20.113:2181(CONNECTED) 2] ls /zookeeper/quota

[]

[zk: 10.96.20.113:2181(CONNECTED) 3] quit

Quitting...

2016-09-19 02:03:16,018 [myid:] - INFO  [main:ZooKeeper@684] - Session:0x15741ac19b00000 closed

2016-09-19 02:03:16,019 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512]- EventThread shut down

[root@server1 bin]# cd

[root@server1 ~]# tar xf metaq-server-1.4.6.2.tar.gz -C /usr/local

[root@server1 ~]# cd /usr/local/taobao/metamorphosis-server-wrapper/conf

[root@server1 conf]# vim server.ini   #(注意以下資訊,brokerId為全局唯一,server2上的brokerId為1;dashboardHttpPort用于監控;zk.zkConnect=多個server間用逗号分隔;[topic-jowin]是新添加的一行)

brokerId=0

serverPort=8123

zk.zkConnect=10.96.20.113:2181,10.96.20.114:2181

[topic=jowin]

[root@server1 conf]# cd ../bin

[root@server1 bin]# ./metaServer.sh start &amp;

[INFO] [main] 09-19 02:20:26,837[MetaMorphosisBroker] - Starting metamorphosis server...

[INFO] [main] 09-19 02:20:26,838[MetaMorphosisBroker] - Start metamorphosis server successfully

[INFO] [main] 09-19 02:20:32,021 [Server] -Starting dashboard http server at port 8120

[INFO] [main] 09-19 02:20:32,062 [Server] -jetty-7.6.1.v20120215

[INFO] [main] 09-19 02:20:32,146[AbstractConnector] - Started [email protected]:8120

[INFO] [main] 09-19 02:20:32,147 [Server] -Started dashboard http server successfully.

tcp       0      0 :::24402                    :::*                        LISTEN      9830/java          

tcp       0      0 :::8120                     :::*                        LISTEN      9830/java          

tcp       0      0 :::8123                     :::*                        LISTEN      9830/java          

tcp       0      0 :::19552                    :::*                        LISTEN      9830/java          

tcp       0      0 :::9123                     :::*                        LISTEN      9830/java          

tcp       0      0 :::2181                     :::*                        LISTEN      9542/java          

tcp       0      0 :::39498                    :::*                        LISTEN      9542/java     

[root@server1 bin]# ./metaServer.sh stats

[2016-09-19 02:21:20,263] INFO 即将啟動RemotingController...

配置為:

BaseConfig [callBackExecutorPoolSize=1,callBackExecutorQueueSize=20000, dispatchMessageThreadCount=0, idleTime=10,keepAlive=true, linger=0, maxCallBackCountPerConnection=100000,maxCallBackExecutorPoolSize=30, maxReadBufferSize=0,maxScheduleWrittenBytes=43253760, rcvBufferSize=65536, readBufferSize=131072,readThreadCount=0, reuseAddr=true, scanInvalidCallBackInterval=300,selectorPoolSize=1, sndBufferSize=65536, soLinger=true, tcpNoDelay=true,wireFormatType=metamorphosis, writeThreadCount=0](com.taobao.gecko.service.impl.BaseRemotingController)

[2016-09-19 02:21:20,332] INFO Creating 1rectors... (com.taobao.gecko.core.nio.impl.SelectorManager)

[2016-09-19 02:21:20,501] WARN TheController started at null ... (com.taobao.gecko.core.core.impl.AbstractController)

STATS

pid 9693

broker_id 0

port 8123

uptime 56

version 1.4.6.2

slave false

curr_connections 1

threads 34

cmd_put 0

cmd_get 0

cmd_offset 0

tx_begin 0

tx_xa_begin 0

tx_commit 0

tx_rollback 0

get_miss 0

put_failed 0

total_messages 0

topics 3

config_checksum1414045505

END

 [2016-09-19 02:21:21,940] INFO Controller hasbeen stopped. (com.taobao.gecko.core.core.impl.AbstractController)

[root@server1 bin]# cd/usr/local/zookeeper/bin

[zk: 10.96.20.113:2181(CONNECTED) 0] ls /

[zookeeper, meta]

[zk: 10.96.20.113:2181(CONNECTED) 1] ls/meta

[brokers]

[zk: 10.96.20.113:2181(CONNECTED) 2] ls/meta/brokers

[topics-sub, ids, topics-pub, topics]

[zk: 10.96.20.113:2181(CONNECTED) 3] ls /meta/brokers/topics

[meta-test, test,jowin]

[zk: 10.96.20.113:2181(CONNECTED) 4] ls /meta/brokers/ids  

[0, 1]

[zk: 10.96.20.113:2181(CONNECTED) 5] quit

2016-09-19 02:34:31,031 [myid:] - INFO  [main:ZooKeeper@684] - Session:0x15741c7b5e40002 closed

2016-09-19 02:34:31,032 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512]- EventThread shut down

[root@server1 bin]# ./zkCli.sh -server 10.96.20.114:2181

[zk: 10.96.20.114:2181(CONNECTED) 0] get /meta/brokers/ids/0  #(server1和server2各兩塊網卡)

meta://192.168.10.113:8123

cZxid = 0x10000007f

ctime = Mon Sep 19 02:27:07 PDT 2016

mZxid = 0x200000004

mtime = Mon Sep 19 02:29:51 PDT 2016

pZxid = 0x20000000f

cversion = 6

dataVersion = 2

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 26

numChildren = 2

[zk: 10.96.20.114:2181(CONNECTED) 1] get /meta/brokers/ids/1

meta://192.168.10.114:8123

cZxid = 0x200000032

ctime = Mon Sep 19 02:41:10 PDT 2016

mZxid = 0x20000004b

mtime = Mon Sep 19 02:42:03 PDT 2016

pZxid = 0x200000055

[zk: 10.96.20.114:2181(CONNECTED) 2] quit

2016-09-19 02:44:26,186 [myid:] - INFO  [main:ZooKeeper@684] - Session:0x25741c7b6f30003 closed

2016-09-19 02:44:26,188 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512]- EventThread shut down

注:若在一個server上部署兩個zookeeper、broker(metaq),要改如下資訊:

zookeeper程式解壓至不同的目錄(/usr/local/zookeeper{1,2}/);

zookeeper的myid一定要不同(/usr/local/zookeeper1/dataDir/myid為1;/usr/local/zookeeper2/dataDir/myid為2);

zoo.cfg配置檔案中:

clientPort要不同(例如zookeeper1為2181、zookeeper2為2182);

server.1=127.0.0.1:8880:7770

server.2=127.0.0.1:8881:7771

vim /usr/local/taobao{1,2}/metamorphosis-server-wrapper/bin/metaServer.sh

PID_FILE="$PID_DIR/.run.pid"   #(pid檔案不能相同)

vim /usr/local/taobao{1,2}/metamorphosis-server-wrapper/bin/env.sh

export JMX_PORT=9123   #(JMX_PORT不能相同)

本文轉自 chaijowin 51CTO部落格,原文連結:http://blog.51cto.com/jowin/1854431,如需轉載請自行聯系原作者