RocketMQ原理学习之消息存储

前言


消息存储是消息队列中最为重要的一部分,因为对其有高可靠性的要求,就要求数据进行持久化存储。作为数据的存储介质,无非就是数据库和文件系统两种,比如ActiveMQ就是将数据存储在DB中,而rocketMQ则是将数据存放在文件系统中。本篇主要从保证rocketMQ的读写性能出发,分析其中的原理。

一、数据库和文件系统存储对比

数据库作为存储介质:我总结有两点不是很好的地方。其一是在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。其二是在可靠性方面,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。

文件系统作为存储介质:将消息存储在文件系统中一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

二、基础概念

要了解rocketMQ的消息存储之前,先来了解几个基本概念。

顺序读写与随机读写:顺序读写的速度远远超过随机读写的速度,原因之一是顺序读写和随机读写我们默认它们传输时间是一样的,但是随机读写需要多次寻道和旋转延迟,而这个时间可能是传输时间的许多倍。原因之二是顺序读写会触发预读,而随机读写不会触发预读。何为预读?根据局部性原理,当一个数据被用到时,其附近的数据也会马上被使用。当顺序读的时候,磁盘会向后连续读一页或者几页加载到内存中。这两个原因总结来说,就是提高磁盘的IO效率。

Page cache(页面缓存)与Buffer cache(块缓存):Page cache是针对文件系统的,是文件的缓存。Buffer cache是针对磁盘块(即扇区)的缓存。page cache本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会读入紧随其后的少数几个页面,这样的目的就是提高cache的命中率

内核态、用户态:Linux操作系统分为用户态和内核态,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。内核态特殊的软件程序,控制计算机的硬件资源,例如协调CPU资源,分配内存资源,并且提供稳定的环境供应用程序运行。用户态就是提供应用程序运行的空间,为了使应用程序访问到内核管理的资源例如CPU,内存,I/O。内核必须提供一组通用的访问接口,这些接口就叫系统调用。

内存映射mmap:可参考这篇文章。我的理解是,内存映射的好处就是在于将文件直接映射到用户空间,而不需要经过内核空间,减少了文件的拷贝次数。因为常规来说,我们想把文件拷贝到内存中,如果没有内存映射,文件需要先拷贝到内核空间中的一个缓存区中,再拷贝到用户空间。正是有了mmap,建立起了一个硬盘的文件到用户空间的映射,从而能够直接将文件从硬盘拷贝到用户空间中

三、Rocket中的消息存储

整体架构

RocketMQ消息的存储是由ConsumeQueueCommitLog配合完成 的,消息真正的物理存储文件是CommitLogConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。

CommitLog:是一个文件集合,每个文件大小为1G,存储满后存下一个,所有消息的内容都持久化到它的上面。

这边可以引申出一个问题:为什么CommitLog的大小是一个为1G的文件。原因有两个:一个是将所有消息存在一个文件中,便于顺序读写,有利于提高磁盘的IO。另一个原因是由于RocketMQ中用到了mmap,提高拷贝速度,而其实现是基于Java中的MappedByteBuffer实现的,而MappedByteBuffer一次只能映射1.5~2G 的文件至用户态的虚拟内存,所以这边选择1G作为文件大小,不宜超过1.5~2G的这个范围。

ConsumeQueue:注意,一个CommitLog不只能对应一个ConsumeQueue,比如下图,同一个topic的消息的索引能存在于4个ConsumeQueue中。

commitLog与ConsumeQueue的对应关系

下面介绍一下RocketMQ中的mmap内存映射技术—MappedByteBuffer

一般来说,一台服务器把本机磁盘文件的内容发送到客户端分为以下几步:

1
数据--->内核态(内核IO缓冲区)--->用户态(进程私有空间)--->网络驱动内核--->网卡--->数据

mmap内存映射和普通标准IO操作的本质区别在于它并不需要将文件中的数据先拷贝至OS的内核IO缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。也就是数据能直接从磁盘到用户空间中,看着就像是直接读入到内存中。这种技术也称为“零拷贝”技术。

但是也有一些限制之处

a.Mmap映射的内存空间释放的问题;由于映射的内存空间本身就不属于JVM的堆内存区(Java Heap),因此其不受JVM GC的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。然而unmap()方法是FileChannelImpl类里实现的私有方法,无法直接显示调用。RocketMQ中的做法是,通过Java反射的方式调用“sun.misc”包下的Cleaner类的clean()方法来释放映射占用的内存空间;
b.MappedByteBuffer内存映射大小限制;因为其占用的是虚拟内存(非JVM的堆内存),大小不受JVM的-Xmx参数限制,但其大小也受到OS虚拟内存大小的限制。一般来说,一次只能映射1.5~2G 的文件至用户态的虚拟内存空间,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了;
c.使用MappedByteBuffe的其他问题;会存在内存占用率较高和文件关闭不确定性的问题。

刷盘机制

同步刷盘:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PageCache后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

异步刷盘:在返回写成功状态时,消息可能只是被写入了内存的PageCache,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

同步刷盘和异步刷盘

总结以上内容:在消息存储中主要做了存取两方面的优化,在存的方面,让消息存成一个大文件,为了读取的速度考虑,并且可以根据具体的业务场景选择不同的刷盘机制。在取的方面,考虑ConsumeQueue索引文件,也能支持集群模式下多个用户共同消费,并且用了零拷贝技术。

补充

Commitlog文件存储目录为${ROCKET_HOME}/store/commitlog目录,每一个文件默认1G,一个文件写满后再创建另一个,以该文件中第一个偏移量为文件名,偏移量20位用0补齐。这样的好处就是根据物理偏移量能快速定位到消息,源码中使用MappedFile、MappedFileQueue来封装存储文件,MappedFileQueue是MappedFile的容器,一个MappedFile对应着一个Commitlog文件。

同步刷盘的简单描述就是,消息生产者在消息服务端将消息内容追加到内存映射文件中(内存)后,需要同步将内存的内容立刻刷写到磁盘。通过调用内存映射文件(MappedByteBuffer的force方法)可将内存中的数据写入磁盘。

异步刷盘根据是否开启transientStorePoolEnable机制,刷盘实现会有细微差别。如果transientStorePoolEnable为true,RocketMQ会单独申请一个与目标物理文件(commitlog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到与物理文件的内存映射内存中,再flush到磁盘。如果transientStorePoolEnable为false,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘中。


参考

https://www.jianshu.com/p/6d0c118c17de