前言
最近打算开始学习消息队列相关内容,计划先从简单的整体框架出发宏观把握,再深入到源码层面。打算从rocketMQ出发进行展开,本篇先从基础的整体框架,消息生产方式和消息消费方式进行学习。
一、RocketMQ简介
目前主流的MQ主要有RocketMQ、kafka、RabbitMQ,消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要的优势有流量削峰、应用解耦、数据分发(提升性能)和蓄流压测(通过堆积消息进行压测)。
二、RocketMQ集群部署结构
2.1 集群各部分

简单介绍一下RocketMQ集群的几个部分
- Producer:消息的生产者
- Consumer:消息消费者
- Broker:暂存和传输消息,分为Master与Slave两部分,一个Master可以对应多个Slave,但一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,Broker Id为0表示Master,非0表示Slave。
- NameServer:是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。
2.2 安装
集群的安装较为简单,网上资料也比较多,这边就简单总结一下几个常见使用命令。
启动消息队列
1 | 启动mqnamesrv |
发送消息
1 | 设置环境变量 |
接受消息
1 | 设置环境变量 |
关闭消息队列
1 | sh mqshutdown namesrv |
三、消息生产与消费
3.1 普通消息
普通消息也叫做无序消息,producer只管发送消息,consumer只管消费消息,但是不管顺序,比如说producer依次发送1-10的消息,但是consumer接受到的消息就不是这个顺序了,是大规模并发地发送和消费,吞吐量很高,适合大部分场景。
3.1.1 消息发送
普通消息有三种发送方式,分别为:同步发送、异步发送和单向(Oneway)发送。分别来介绍一下。
同步发送
这种消息的发送方式是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
1 | public static void main(String[] args) throws Exception { |
异步发送
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
1 | public static void main(String[] args) throws Exception { |
这里与同步发送的区别在于producer.send()方法的参数中多了SendCallback()回调。注意,这里有一个坑,与同步方法不同,异步方法不能提前关闭Producer实例,因为这样即使返回了成功或者失败都不能被接受到,所以这边最好先等待一段时间再关闭这个实例,否则会抛出异常。
单向发送
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
1 | public static void main(String[] args) throws Exception { |
这边只要将send()方法改成sendOneway(msg)方法即可。
3.1.2 消息消费
可以通过两种方式来进行消息的消费,一种是集群模式,另一种是广播模式。
集群模式
集群模式采用平摊的方式来消费,同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。这个模式也是默认的模式。
1 | public static void main(String[] args) throws Exception { |
关键是consumer.setMessageModel(MessageModel.CLUSTERING);这句
广播模式
与集群模式的其他代码类似,只有一句不同,这边就不贴出了。
1 | // 广播模式订阅消息 |
3.2 顺序消息
顺序消息还能细分为两种
局部有序消息:将相同顺序的消息发送到同一个消息队列,这样消费者从队列中获取数据肯定是相对有序的。
全局有序消息:将所有的消息发送到一个消息队列,消费者从单个队列中拉取消息,消息有序。
从3.1中我们知道普通消息是不能保证消息的有序性的,也就是发送消息编号为1、2、3、4、5,而消费消息的编号可能就成了2、1、5、4、3,这种顺序,而不能保证发送的顺序。比如在下单的这么一种场景下,可能有这么几步:下单、确认、付款,那么这种消息的处理就要求是有顺序的。要想保证有序就要从两个方面出发,一个是有序发送到broker,另一个是有序的进行消费。这边的做法是生产者实现MessageQueueSelector接口,选择一个消息队列进行发送,因为每个Topic默认初始化4个MessageQueue,如果不进行选择就不能保证发送到同一个的MessageQueue中。
3.2.1 消息发送
1 | public static void main(String[] args) throws Exception { |
这边的重点是发送消息时的MessageQueueSelector()对消息队列做了选择,选择同一个Topic下的一个消息队列进行消息的发送,而不是多个。
3.2.2 消息消费
1 | public static void main(String[] args) throws Exception { |
监听方式与之前不同,采用MessageListenerOrderly(),而不是MessageListenerConcurrently()。
3.3 延时消息
延时消息在电商里或者抢票软件里用的是比较多的,比如说提交了一个订单,半小时后再去检查这个消息,如果没有付款则删除该订单。注意是消息消费的延时。
3.3.1 消息发送
1 | public static void main(String[] args) throws Exception { |
关键是设置一个延迟函数msg.setDelayTimeLevel(3);即可。
3.3.2 消息消费
1 | public static void main(String[] args) throws Exception { |
在等待延迟时间到达后才会进行消费。
3.4 事务消息
先来看一下事务消息的整体流程

3.4.1 消息发送
1 | public static void main(String[] args) throws InterruptedException, MQClientException { |
生产结果
执行本地事务
SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC2278B1DF50000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=HadoopMaster, queueId=3], queueOffset=0]
执行本地事务
SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC2278B21EB0001, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=HadoopMaster, queueId=0], queueOffset=1]
执行本地事务
SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC2278B25F80002, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=HadoopMaster, queueId=1], queueOffset=2]
MQ检查消息Tag【TagC】的本地事务执行结果
3.4.2 消息消费
消息消费结果
Hello RocketMQ 2
Hello RocketMQ 0
最终的结果为只消费TagA和TagC的消息,因为只有这两个topic的消息被成功提交,而TagB的消息被回滚了,这类消息将被删除,不允许被消费者消费。
参考