RocketMQ系列(三)消息的生產與消費

前面的章節,我們已經把RocketMQ的環境搭建起來了,是一個兩主兩從的異步集群。接下來,我們就看看怎麼去使用RocketMQ,在使用之前,先要在NameServer中創建Topic,我們知道RocketMQ是基於Topic的消息隊列,在生產者發送消息的時候,要指定消息的Topic,這個Topic的路由規則是怎樣的,這些都要在NameServer中去創建。

Topic的創建

我們先看看Topic的命令是如何使用的,如下:

./bin/mqadmin updateTopic -h

usage: mqadmin updateTopic -b <arg> | -c <arg>  [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t
       <arg> [-u <arg>] [-w <arg>]
 -b,--brokerAddr <arg>       create topic to which broker
 -c,--clusterName <arg>      create topic to which cluster
 -h,--help                   Print help
 -n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -o,--order <arg>            set topic's order(true|false)
 -p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
 -r,--readQueueNums <arg>    set read queue nums
 -s,--hasUnitSub <arg>       has unit sub (true|false)
 -t,--topic <arg>            topic name
 -u,--unit <arg>             is unit topic (true|false)
 -w,--writeQueueNums <arg>   set write queue nums

其中有一段,-b <arg> | -c <arg>,說明這個Topic可以指定集群,也可以指定隊列,我們先創建一個Topic指定集群,因為集群中有兩個隊列broker-abroker-b,看看我們的消息是否在兩個隊列中負載;然後再創建一個Topic指向broker-a,再看看這個Topic的消息是不是只在broker-a中。

創建兩個Topic,

./bin/mqadmin updateTopic -c 'RocketMQ-Cluster' -t cluster-topic -n '192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876'

./bin/mqadmin updateTopic -b 192.168.73.130:10911 -t broker-a-topic

第一個命令創建了一個集群的Topic,叫做cluster-topic;第二個命令創建了一個只在broker-a中才有的Topic,我們指定了-b 192.168.73.130:10911,這個是broker-a的地址和端口。

生產者發送消息

我們新建SpringBoot項目,然後引入RocketMQ的jar包,

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

然後配置一下生產者的客戶端,在這裏使用@Configuration這個註解,具體如下:

@Configuration
public class RocketMQConfig {

    @Bean(initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQProducer producer() {
        DefaultMQProducer producer = new
                DefaultMQProducer("DefaultMQProducer");
											producer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
        return producer;
    }
}
  • 首先創建一個生產者組,名字叫做DefaultMQProducer;
  • 然後指定NameServer,192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;
  • 最後在@Bean註解中指定初始化的方法,和銷毀的方法;

這樣,生產者的客戶端就配置好了,然後再寫個Test類,在Test類中向MQ中發送消息,如下,

@SpringBootTest
class RocketmqDemoApplicationTests {

    @Autowired
    public DefaultMQProducer defaultMQProducer;

    @Test
    public void producerTest() throws Exception {

        for (int i = 0;i<5;i++) {
            Message message = new Message();
            message.setTopic("cluster-topic");
            message.setKeys("key-"+i);
            message.setBody(("this is simpleMQ,my NO is "+i).getBytes());

            SendResult sendResult = defaultMQProducer.send(message);
            System.out.println("SendStatus:" + sendResult.getSendStatus());
            System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
        }
    }
}
  • 我們先自動注入前面配置DefaultMQProducer;
  • 然後在Test方法中,循環5次,發送5個消息,消息的Topic指定為cluster-topic,是集群的消息,然後再設置消息的key和內容,最後調用send方法發送消息,這個send方法是同步方法,程序運行到這裡會阻塞,等待返回的結果;
  • 最後,我們打印出返回的結果和broker的名字;

運行一下,看看結果:

SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-a

5個消息發送都是成功的,而發送的隊列有4個是broker-b,1個broker-a,說明兩個broker之間還是有負載的,負載的規則我們猜測是隨機。

我們再寫個測試方法,看看broker-a-topic這個Topic的發送結果是什麼樣子的,如下:

@Test
public void brokerTopicTest() throws Exception {

    for (int i = 0;i<5;i++) {
        Message message = new Message();
        message.setTopic("broker-a-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is broker-a-topic's MQ,my NO is "+i).getBytes());

        defaultMQProducer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("SendStatus:" + sendResult.getSendStatus());
                System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });

        System.out.println("異步發送 i="+i);

    }
}
  • 消息的Topic指定的是broker-a-topic,這個Topic我們只指定了broker-a這個隊列;
  • 發送的時候我們使用的是異步發送,程序到這裏不會阻塞,而是繼續向下執行,發送的結果正常或者異常,會調用對應的onSuccess和onException方法;
  • 我們在onSuccess方法中,打印出發送的結果和隊列的名稱;

運行一下,看看結果:

異步發送 i=0
異步發送 i=1
異步發送 i=2
異步發送 i=3
異步發送 i=4
SendStatus:SEND_OK
SendStatus:SEND_OK
SendStatus:SEND_OK
SendStatus:SEND_OK
BrokerName:broker-a
SendStatus:SEND_OK
BrokerName:broker-a
BrokerName:broker-a
BrokerName:broker-a
BrokerName:broker-a

由於我們是異步發送,所以最後的日誌先打印了出來,然後打印出返回的結果,都是發送成功的,並且隊列都是broker-a,完全符合我們的預期。

消費者

生產的消息已經發送到了隊列當中,再來看看消費者端如何消費這個消息,我們在這個配置類中配置消費者,如下:

@Bean(initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer pushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultMQPushConsumer");
    consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    consumer.subscribe("cluster-topic","*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            if (msgs!=null&&msgs.size()>0) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                    System.out.println(context.getMessageQueue().getBrokerName());
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    } );
    return consumer;
}
  • 我們創建了一個消費者組,名字叫做DefaultMQPushConsumer;
  • 然後指定NameServer集群,192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;
  • 消費者訂閱的Topic,這裏我們訂閱的是cluster-topic,後面的*號是對應的tag,代表我們訂閱所有的tag;
  • 最後註冊一個併發執行的消息監聽器,實現裡邊的consumeMessage方法,在方法中,我們打印出消息體的內容,和消息所在的隊列;
  • 如果消息消費成功,返回CONSUME_SUCCESS,如果出現異常等情況,我們要返回RECONSUME_LATER,說明這個消息還要再次消費;

好了,這個訂閱了cluster-topic的消費者,配置完了,我們啟動一下項目,看看消費的結果如何,

this is simpleMQ,my NO is 2
broker-b
this is simpleMQ,my NO is 3
broker-b
this is simpleMQ,my NO is 1
broker-b
this is simpleMQ,my NO is 0
broker-a
this is simpleMQ,my NO is 4
broker-b

結果符合預期,cluster-topic中的5個消息全部消費成功,而且隊列是4個broker-b,1個broker-a,和發送時的結果是一致的。

大家有問題歡迎評論區討論~

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※超省錢租車方案

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

※推薦台中搬家公司優質服務,可到府估價