RocketMQ 消息存储
hanpy

RocketMQ 中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的 store 目录中,主要存储的文件包括 Commitlog 文件、 ConsumeQueue文件、 IndexFile等。

消息存储结构

目录结构

image image
  • abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动Broker的情况下,发现这个文件是存在的,则说明之前Broker的关闭是非正常关闭。
  • checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳
  • commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的
  • config:存放着Broker运行期间的一些配置数据
  • consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中
  • index:其中存放着消息索引文件indexFile
  • lock:运行期间使用到的全局资源锁

CommitLog

CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:$ {user.home} \store$ { commitlog} \ $ { fileName}

文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

一个 Broker 中只包含一个 CommitLog 目录,无论当前Broker中存放着多少Topic的消息,这些消息都是被顺序写入到了 CommitLog 下面,这些消息在Broker中存放时并没有被按照Topic进行分类存放。

文件的消息单元存储详细信息

每个消息单元中包含消息总长度MsgLen、消息的物理位置physicalOffset、消息体内容Body、消息体长度BodyLength、消息主题Topic、Topic长度 TopicLength、消息生产者BornHost、消息发送时间戳BornTimestamp、消息所在的队列QueueId、消息在Queue中存储的偏移量QueueOffset等近 20 余项消息相关属性。

序号 字段 大小(字节) 含义
1 msgSize 4 代表这个消息的大小
2 MAGICCODE 4 MAGICCODE = daa320a7
3 BODY CRC 4 消息体BODY CRC 当broker重启recover时会校验
4 queueId 4
5 flag 4
6 QUEUEOFFSET 8 这个值是个自增值不是真正的consume queue的偏移量,
可以代表这个consumeQueue队列或者tranStateTable队列中消息的个数,
若是非事务消息或者commit事务消息,可以通过这个值查找到consumeQueue中数据,
QUEUEOFFSET * 20才是偏移地址;
若是PREPARED或者Rollback事务,则可以通过该值从tranStateTable中查找数据
7 PHYSICALOFFSET 8 代表消息在commitLog中的物理起始地址偏移量
8 SYSFLAG 4 指明消息是事物事物状态等消息特征,二进制为四个字节从右往左数:
当4个字节均为0(值为0)时表示非事务消息;
当第1个字节为1(值为1)时表示表示消息是压缩的(Compressed);
当第2个字节为1(值为2)表示多消息(MultiTags);
当第3个字节为1(值为4)时表示prepared消息;当第4个字节为1(值为8)时表示commit消息;
当第3/4个字节均为1时(值为12)时表示rollback消息;当第3/4个字节均为0时表示非事务消息
9 BORNTIMESTAMP 8 消息产生端(producer)的时间戳
10 BORNHOST 8 消息产生端(producer)地址(address:port)
11 STORETIMESTAMP 8 消息在broker存储时间
12 STOREHOSTADDRESS 8 消息存储到broker的地址(address:port)
13 RECONSUMETIMES 8 消息被某个订阅组重新消费了几次(订阅组之间独立计数),
因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了,
成功消费一次记录为0;
14 PreparedTransaction Offset 8 表示是prepared状态的事物消息
15 messagebodyLength 4 消息体大小值
16 messagebody 消息体内容
17 topicLength 1 topic名称内容大小
18 topic topic的内容值
19 propertiesLength 2 属性值大小
20 properties propertiesLength大小的属性数据

Consumequeue

由于不同的主题的消息不连续的存储在commitlog文件中,如果只是检索该消息文件速度就会很慢,为了加快消息的检索和节省磁盘空间,每一个consumequeue条目存储了消息的关键信息commitog文件中的偏移量、消息长度、tag的hashcode值。

ConsumeQueue 文件类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。

image

性能提升

consumequeue中的数据是顺序存放的,还引入了 PageCache 的预读取机制,使得对 consumequeue 文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能。

PageCache机制,页缓存机制,是OS对文件的缓存机制,用于加速对文件的读写操作。
Write:OS会先将数据写入到PageCache中,随后会以异步方式将Cache中的数据刷盘到物理磁盘
Read:首先会从PageCache中读取,若没有命中,则OS在从物理磁盘上加载该数据到PageCache的同时,也会顺序 对其相邻数据块中的数据进行预读取。

IndexFile

index 存的是索引文件,可以提高根据主题与消息检索 消息的速度 ,使用 Hash 索引机制,具体是 Hash 槽与 Hash 冲突的链表结构。

每个indexFile文件由三部分构成:indexHeader,slots槽位,indexes索引数据。

indexHeader

indexHeader 固定占用 40 个字节,其中存放着如下数据:

  • beginTimestamp:该indexFile中第一条消息的存储时间
  • endTimestamp:该indexFile中最后一条消息存储时间
  • beginPhyoffset:该indexFile中第一条消息在commitlog中的偏移量commitlog offset
  • endPhyoffset:该indexFile中最后一条消息在commitlog中的偏移量commitlog offset
  • hashSlotCount:已经填充有index的slot数量(并不是每个slot槽下都挂载有index索引单元,这里统计的是所有挂载了index索引单元的slot槽的数量)
  • indexCount:该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所有index索引单元的数量之和)
Hash 槽

一个 IndexFile 默认包含 500W 个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引。
key 的 hash值 % 500w 的结果即为 slot 槽位,然后将该 slot 值修改为该 index 索引单元的 indexNo,根据这个 indexNo 可以计算出该 index 单元在 indexFile 中的位置。为了解决取模重复很高的问题,在每个index索引单元中增加了preIndexNo,用于指定该slot中当前index索引单元的前一个index索引单元。而slot中始终存放的是其下最新的index索引单元的indexNo,

indexNo 是一个在 indexFile 中的流水号,从 0 开始依次递增。即在一个 indexFile 中所有 indexNo 是以此递增的。indexNo 在 index 索引单元中是没有体现的,其是通过 indexes 中依次数出来的。

image

indexes索引数据

index索引单元默写 20 个字节,其中存放着以下四个属性:

  • keyHash:消息中指定的业务key的hash值
  • phyOffset:当前key对应的消息在commitlog中的偏移量commitlog offset
  • timeDiff:当前key对应消息的存储时间与当前indexFile创建时间的时间差
  • preIndexNo:当前slot下当前index索引单元的前一个index索引单元的indexNo
indexFile的创建

indexFile的创建时机

  • 当第一条带 key 的消息发送来后,系统发现没有 indexFile,此时会创建第一个 indexFile 文件
  • 当一个 indexFile 中挂载的 index 索引单元数量超出2000w个时,会创建新的 indexFile。当带key的消息发送到来后,系统会找到最新的 indexFile,并从其indexHeader的最后 4 字节中读取到indexCount。若indexCount >= 2000w时,会创建新的indexFile。

Checkpoint

checkpoint文件的作用是记录commitlog、consumequeue、index文件的刷盘时间点,文件固定长度4k,其中只用了该文件的前24个字节。查看其存储格式

  • physicMsgTimestamp:commitlog文件刷盘时间点
  • logicsMsgTimestamp:消息的消费队列文件刷盘时间点
  • indexMsgTimestamp:索引文件刷盘时间点

Config

config 文件夹中 存储着 Topic 和 Consumer 等相关信息。主题和消费者群组相关的信息就存在在此。

  • topics.json:topic 配置属性
  • subscriptionGroup.json:消息消费组配置信息。
  • delayOffset.json:延时消息队列拉取进度。
  • consumerOffset.json:集群消费模式消息消进度。consumerFilter.json :主题消息过滤信息。