RocketMQ基础概念之消息生产与消息消费

前言


最近打算开始学习消息队列相关内容,计划先从简单的整体框架出发宏观把握,再深入到源码层面。打算从rocketMQ出发进行展开,本篇先从基础的整体框架,消息生产方式和消息消费方式进行学习。

一、RocketMQ简介

目前主流的MQ主要有RocketMQ、kafka、RabbitMQ,消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要的优势有流量削峰、应用解耦、数据分发(提升性能)和蓄流压测(通过堆积消息进行压测)。

二、RocketMQ集群部署结构

2.1 集群各部分

RocketMQ集群架构图

简单介绍一下RocketMQ集群的几个部分

  • Producer:消息的生产者
  • Consumer:消息消费者
  • Broker:暂存和传输消息,分为Master与Slave两部分,一个Master可以对应多个Slave,但一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,Broker Id为0表示Master,非0表示Slave。
  • NameServer:是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。

2.2 安装

集群的安装较为简单,网上资料也比较多,这边就简单总结一下几个常见使用命令。

启动消息队列

1
2
3
4
# 启动mqnamesrv
nohup sh mqnamesrv &
# 启动broker
nohup sh mqbroker -n localhost:9876 &

发送消息

1
2
3
4
# 设置环境变量
export NAMESRV_ADDR=localhost:9876
# 使用安装包的demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

接受消息

1
2
3
4
# 设置环境变量
export NAMESRV_ADDR=localhost:9876
# 接受消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

关闭消息队列

1
2
sh mqshutdown namesrv
sh mqshutdown broker

三、消息生产与消费

3.1 普通消息

普通消息也叫做无序消息,producer只管发送消息,consumer只管消费消息,但是不管顺序,比如说producer依次发送1-10的消息,但是consumer接受到的消息就不是这个顺序了,是大规模并发地发送和消费,吞吐量很高,适合大部分场景。

3.1.1 消息发送

普通消息有三种发送方式,分别为:同步发送、异步发送和单向(Oneway)发送。分别来介绍一下。

同步发送

这种消息的发送方式是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。此种方式应用场景非常广泛,例如重要通知邮件报名短信通知营销短信系统等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer的地址
producer.setNamesrvAddr("10.1.13.111:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("Test", "TagA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

异步发送

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group2");
// 设置NameServer的地址
producer.setNamesrvAddr("10.1.13.111:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
int index = i;
Message msg = new Message("Test", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息到一个Broker
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("第"+index+"条发送成功"+sendResult.getMsgId());
}

@Override
public void onException(Throwable throwable) {
System.out.println("第"+index+"条发送失败:");
throwable.printStackTrace();
}
});
}
Thread.sleep(5000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

这里与同步发送的区别在于producer.send()方法的参数中多了SendCallback()回调。注意,这里有一个坑,与同步方法不同,异步方法不能提前关闭Producer实例,因为这样即使返回了成功或者失败都不能被接受到,所以这边最好先等待一段时间再关闭这个实例,否则会抛出异常。

单向发送

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group2");
// 设置NameServer的地址
producer.setNamesrvAddr("10.1.13.111:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("Test", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息到一个Broker
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

这边只要将send()方法改成sendOneway(msg)方法即可。

3.1.2 消息消费

可以通过两种方式来进行消息的消费,一种是集群模式,另一种是广播模式。

集群模式

集群模式采用平摊的方式来消费,同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。这个模式也是默认的模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("10.1.13.111:9876");
// 订阅Topic
consumer.subscribe("Test", "TagA");
//集群模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}

关键是consumer.setMessageModel(MessageModel.CLUSTERING);这句

广播模式

与集群模式的其他代码类似,只有一句不同,这边就不贴出了。

1
2
// 广播模式订阅消息
consumer.setMessageModel(MessageModel.BROADCASTING);

3.2 顺序消息

顺序消息还能细分为两种

局部有序消息:将相同顺序的消息发送到同一个消息队列,这样消费者从队列中获取数据肯定是相对有序的。

全局有序消息:将所有的消息发送到一个消息队列,消费者从单个队列中拉取消息,消息有序。

从3.1中我们知道普通消息是不能保证消息的有序性的,也就是发送消息编号为1、2、3、4、5,而消费消息的编号可能就成了2、1、5、4、3,这种顺序,而不能保证发送的顺序。比如在下单的这么一种场景下,可能有这么几步:下单、确认、付款,那么这种消息的处理就要求是有顺序的。要想保证有序就要从两个方面出发,一个是有序发送到broker,另一个是有序的进行消费。这边的做法是生产者实现MessageQueueSelector接口,选择一个消息队列进行发送,因为每个Topic默认初始化4个MessageQueue,如果不进行选择就不能保证发送到同一个的MessageQueue中

3.2.1 消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer的地址
producer.setNamesrvAddr("10.1.13.111:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("Test", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
int index = id % list.size();
return list.get(index);
}
},1);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

这边的重点是发送消息时的MessageQueueSelector()对消息队列做了选择,选择同一个Topic下的一个消息队列进行消息的发送,而不是多个。

3.2.2 消息消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("10.1.13.111:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅Topic
consumer.subscribe("Test", "TagA");
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
// 设置自动提交
consumeOrderlyContext.setAutoCommit(true);
for (MessageExt msg : list) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}

监听方式与之前不同,采用MessageListenerOrderly(),而不是MessageListenerConcurrently()

3.3 延时消息

延时消息在电商里或者抢票软件里用的是比较多的,比如说提交了一个订单,半小时后再去检查这个消息,如果没有付款则删除该订单。注意是消息消费的延时。

3.3.1 消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer的地址
producer.setNamesrvAddr("10.1.13.111:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("Test1", "TagB", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延时等级为3级,也就是10s。
// 等级为:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
msg.setDelayTimeLevel(3);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}

关键是设置一个延迟函数msg.setDelayTimeLevel(3);即可。

3.3.2 消息消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("10.1.13.111:9876");
// 订阅Topic
consumer.subscribe("Test1", "TagB");
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}

在等待延迟时间到达后才会进行消费。

3.4 事务消息

先来看一下事务消息的整体流程

事务消息流程

3.4.1 消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void main(String[] args) throws InterruptedException, MQClientException {
//创建消息生产者
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("10.1.13.111:9876");
//启动消息生产者
producer.start();
//服务器回调producer,检查本地事务分支成功还是失败
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
if (StringUtils.equals("TagA", message.getTags())) {
// 提交事务,它允许消费者消费此消息。
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", message.getTags())) {
// 回滚事务,它代表该消息将被删除,不允许被消费
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
// 中间状态,它代表需要检查消息队列来确定状态。
return LocalTransactionState.UNKNOW;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("MQ检查消息Tag【"+messageExt.getTags()+"】的本地事务执行结果");
return LocalTransactionState.COMMIT_MESSAGE;
}
});

String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
TimeUnit.SECONDS.sleep(1);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}

生产结果

执行本地事务
SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC2278B1DF50000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=HadoopMaster, queueId=3], queueOffset=0]
执行本地事务
SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC2278B21EB0001, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=HadoopMaster, queueId=0], queueOffset=1]
执行本地事务
SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000018B4AAC2278B25F80002, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=HadoopMaster, queueId=1], queueOffset=2]
MQ检查消息Tag【TagC】的本地事务执行结果

3.4.2 消息消费

消息消费结果

Hello RocketMQ 2
Hello RocketMQ 0

最终的结果为只消费TagA和TagC的消息,因为只有这两个topic的消息被成功提交,而TagB的消息被回滚了,这类消息将被删除,不允许被消费者消费。


参考

https://help.aliyun.com/document_detail/29547.html