天天看點

RocketMQ之雙Master方式部署以及簡單使用

1.1、伺服器環境

192.168.100.24 root nameServer1,brokerServer1 Master1

192.168.100.25 root nameServer2,brokerServer2 Master2

1.2、Hosts添加資訊

192.168.100.24 rocketmq-nameserver1

192.168.100.24 rocketmq-master1

192.168.100.25 rocketmq-nameserver2

192.168.100.25 rocketmq-master2

1.3、上傳解壓【兩台機器】

# 上傳alibaba-rocketmq-3.2.6.tar.gz檔案至/usr/local

# tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local

# mv alibaba-rocketmq alibaba-rocketmq-3.2.6

# ln -s alibaba-rocketmq-3.2.6 rocketmq

1.4、建立存儲路徑【兩台機器】

# mkdir /usr/local/rocketmq/store

# mkdir /usr/local/rocketmq/store/commitlog

# mkdir /usr/local/rocketmq/store/consumequeue

# mkdir /usr/local/rocketmq/store/index

1.5、RocketMQ配置檔案【兩台機器】

# vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties 

# vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties

#所屬叢集名字  
brokerClusterName=rocketmq-cluster  
#broker名字,注意此處不同的配置檔案填寫的不一樣  
brokerName=broker-a|broker-b  
#0 表示 Master,>0 表示 Slave  
brokerId=0  
#nameServer位址,分号分割  
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876  
#在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數  
defaultTopicQueueNums=4  
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉  
autoCreateTopicEnable=true  
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉  
autoCreateSubscriptionGroup=true  
#Broker 對外服務的監聽端口  
listenPort=10911  
#删除檔案時間點,預設淩晨 4點  
deleteWhen=04  
#檔案保留時間,預設 48 小時  
fileReservedTime=120  
#commitLog每個檔案的大小預設1G  
mapedFileSizeCommitLog=1073741824  
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整  
mapedFileSizeConsumeQueue=300000  
#destroyMapedFileIntervalForcibly=120000  
#redeleteHangedFileInterval=120000  
#檢測實體檔案磁盤空間  
diskMaxUsedSpaceRatio=88  
#存儲路徑  
storePathRootDir=/usr/local/rocketmq/store  
#commitLog 存儲路徑  
storePathCommitLog=/usr/local/rocketmq/store/commitlog  
#消費隊列存儲路徑存儲路徑  
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue  
#消息索引存儲路徑  
storePathIndex=/usr/local/rocketmq/store/index  
#checkpoint 檔案存儲路徑  
storeCheckpoint=/usr/local/rocketmq/store/checkpoint  
#abort 檔案存儲路徑  
abortFile=/usr/local/rocketmq/store/abort  
#限制的消息大小  
maxMessageSize=65536  
#flushCommitLogLeastPages=4  
#flushConsumeQueueLeastPages=2  
#flushCommitLogThoroughInterval=10000  
#flushConsumeQueueThoroughInterval=60000  
#Broker 的角色  
#- ASYNC_MASTER 異步複制Master  
#- SYNC_MASTER 同步雙寫Master  
#- SLAVE  
brokerRole=ASYNC_MASTER  
#刷盤方式  
#- ASYNC_FLUSH 異步刷盤  
#- SYNC_FLUSH 同步刷盤  
flushDiskType=ASYNC_FLUSH  
#checkTransactionMessageEnable=false  
#發消息線程池數量  
#sendMessageThreadPoolNums=128  
#拉消息線程池數量  
#pullMessageThreadPoolNums=128        

1.6、修改日志配置檔案【兩台機器】

# mkdir -p /usr/local/rocketmq/logs 

# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g'*.xml

1.7、修改啟動腳本參數【兩台機器】

# vim /usr/local/rocketmq/bin/runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"

# vim /usr/local/rocketmq/bin/runserver.sh

1.8、啟動NameServer【兩台機器】

# cd /usr/local/rocketmq/bin

# nohup sh mqnamesrv &

對應的關閉指令是:sh mqshutdown namesrv

1.9、啟動BrokerServer A【192.168.100.24】

# cd /usr/local/rocketmq/bin 

# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &# netstat -ntlp

# jps

# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log

# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

1.10、啟動BrokerServer B【192.168.100.25】

# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &

# netstat -ntlp

對應的關閉Broker的指令是:sh mqshutdown broker

1.11、部署RocketMQ Console(沒有的可以聯系我)

1.12、資料清理

# sh mqshutdown broker

# sh mqshutdown namesrv

# --等待停止# rm -rf /usr/local/rocketmq/store

# --按照上面步驟重新開機NameServer與BrokerServer

二、簡單示例:

所需jar包(Maven):

<dependency>  
      <groupId>com.alibaba.rocketmq</groupId>  
      <artifactId>rocketmq-client</artifactId>  
      <version>3.2.6</version>  
    </dependency>  
    <dependency>  
      <groupId>com.alibaba.rocketmq</groupId>  
      <artifactId>rocketmq-common</artifactId>  
      <version>3.2.6</version>  
    </dependency>  
    <dependency>  
      <groupId>com.alibaba.rocketmq</groupId>  
      <artifactId>rocketmq-remoting</artifactId>  
      <version>3.2.6</version>  
    </dependency>        

Producer類:

public class producer {  
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {  
        DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");  
        producer.setNamesrvAddr("192.168.100.24:9876;192.168.100.25:9876");  
        producer.start();  
        for (int i = 0;i<100;i++){  
            Message msg = new Message("TopicQuickStart","TagA",  
                    ("Hello RocketMQ" + i).getBytes());  
            SendResult sendResult = producer.send(msg);  
            System.out.println(sendResult);  
  
        }  
    }  
}        

Consumer類:

public class Consumer {  
    public static void main(String[] args) throws MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");  
        consumer.setNamesrvAddr("192.168.100.24:9876;192.168.100.25:9876");  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicQuickStart","*");  
  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {  
                System.out.println(Thread.currentThread().getName() + "Receive New Messages:" + list);  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer Started");  
    }  
  
}        

三、檢視RocketMQ控制台結果:

RocketMQ之雙Master方式部署以及簡單使用

四、對于2m-2s-async模式的broker啟動指令:

# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b.properties >/dev/null 2>&1 &

# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &

# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties >/dev/null 2>&1 &

mq