RocketMQ 中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的 store
目录中,主要存储的文件包括 Commitlog
文件、 ConsumeQueue
文件、 IndexFile
等。
消息存储结构
目录结构
- abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动Broker的情况下,发现这个文件是存在的,则说明之前Broker的关闭是非正常关闭。
- checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳
- commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的
- config:存放着Broker运行期间的一些配置数据
- consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中
- index:其中存放着消息索引文件indexFile
- lock:运行期间使用到的全局资源锁
CommitLog
CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:$ {user.home} \store$ { commitlog} \ $ { fileName}
。
文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
一个 Broker 中只包含一个 CommitLog 目录,无论当前Broker中存放着多少Topic的消息,这些消息都是被顺序写入到了 CommitLog 下面,这些消息在Broker中存放时并没有被按照Topic进行分类存放。
文件的消息单元存储详细信息
每个消息单元中包含消息总长度MsgLen、消息的物理位置physicalOffset、消息体内容Body、消息体长度BodyLength、消息主题Topic、Topic长度 TopicLength、消息生产者BornHost、消息发送时间戳BornTimestamp、消息所在的队列QueueId、消息在Queue中存储的偏移量QueueOffset等近 20 余项消息相关属性。
序号 | 字段 | 大小(字节) | 含义 |
---|---|---|---|
1 | msgSize | 4 | 代表这个消息的大小 |
2 | MAGICCODE | 4 | MAGICCODE = daa320a7 |
3 | BODY CRC | 4 | 消息体BODY CRC 当broker重启recover时会校验 |
4 | queueId | 4 | |
5 | flag | 4 | |
6 | QUEUEOFFSET | 8 | 这个值是个自增值不是真正的consume queue的偏移量, 可以代表这个consumeQueue队列或者tranStateTable队列中消息的个数, 若是非事务消息或者commit事务消息,可以通过这个值查找到consumeQueue中数据, QUEUEOFFSET * 20才是偏移地址; 若是PREPARED或者Rollback事务,则可以通过该值从tranStateTable中查找数据 |
7 | PHYSICALOFFSET | 8 | 代表消息在commitLog中的物理起始地址偏移量 |
8 | SYSFLAG | 4 | 指明消息是事物事物状态等消息特征,二进制为四个字节从右往左数: 当4个字节均为0(值为0)时表示非事务消息; 当第1个字节为1(值为1)时表示表示消息是压缩的(Compressed); 当第2个字节为1(值为2)表示多消息(MultiTags); 当第3个字节为1(值为4)时表示prepared消息;当第4个字节为1(值为8)时表示commit消息; 当第3/4个字节均为1时(值为12)时表示rollback消息;当第3/4个字节均为0时表示非事务消息 |
9 | BORNTIMESTAMP | 8 | 消息产生端(producer)的时间戳 |
10 | BORNHOST | 8 | 消息产生端(producer)地址(address:port) |
11 | STORETIMESTAMP | 8 | 消息在broker存储时间 |
12 | STOREHOSTADDRESS | 8 | 消息存储到broker的地址(address:port) |
13 | RECONSUMETIMES | 8 | 消息被某个订阅组重新消费了几次(订阅组之间独立计数), 因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了, 成功消费一次记录为0; |
14 | PreparedTransaction Offset | 8 | 表示是prepared状态的事物消息 |
15 | messagebodyLength | 4 | 消息体大小值 |
16 | messagebody | 消息体内容 | |
17 | topicLength | 1 | topic名称内容大小 |
18 | topic | topic的内容值 | |
19 | propertiesLength | 2 | 属性值大小 |
20 | properties | propertiesLength大小的属性数据 |
Consumequeue
由于不同的主题的消息不连续的存储在commitlog文件中,如果只是检索该消息文件速度就会很慢,为了加快消息的检索和节省磁盘空间,每一个consumequeue条目存储了消息的关键信息commitog文件中的偏移量、消息长度、tag的hashcode值。
ConsumeQueue 文件类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。
性能提升
consumequeue中的数据是顺序存放的,还引入了 PageCache
的预读取机制,使得对 consumequeue 文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能。
PageCache机制,页缓存机制,是OS对文件的缓存机制,用于加速对文件的读写操作。
Write:OS会先将数据写入到PageCache中,随后会以异步方式将Cache中的数据刷盘到物理磁盘
Read:首先会从PageCache中读取,若没有命中,则OS在从物理磁盘上加载该数据到PageCache的同时,也会顺序 对其相邻数据块中的数据进行预读取。
IndexFile
index 存的是索引文件,可以提高根据主题与消息检索 消息的速度 ,使用 Hash 索引机制,具体是 Hash 槽与 Hash 冲突的链表结构。
每个indexFile文件由三部分构成:indexHeader,slots槽位,indexes索引数据。
indexHeader
indexHeader 固定占用 40 个字节,其中存放着如下数据:
- beginTimestamp:该indexFile中第一条消息的存储时间
- endTimestamp:该indexFile中最后一条消息存储时间
- beginPhyoffset:该indexFile中第一条消息在commitlog中的偏移量commitlog offset
- endPhyoffset:该indexFile中最后一条消息在commitlog中的偏移量commitlog offset
- hashSlotCount:已经填充有index的slot数量(并不是每个slot槽下都挂载有index索引单元,这里统计的是所有挂载了index索引单元的slot槽的数量)
- indexCount:该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所有index索引单元的数量之和)
Hash 槽
一个 IndexFile 默认包含 500W 个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引。
key 的 hash值 % 500w 的结果即为 slot 槽位,然后将该 slot 值修改为该 index 索引单元的 indexNo,根据这个 indexNo 可以计算出该 index 单元在 indexFile 中的位置。为了解决取模重复很高的问题,在每个index索引单元中增加了preIndexNo,用于指定该slot中当前index索引单元的前一个index索引单元。而slot中始终存放的是其下最新的index索引单元的indexNo,
indexNo 是一个在 indexFile 中的流水号,从 0 开始依次递增。即在一个 indexFile 中所有 indexNo 是以此递增的。indexNo 在 index 索引单元中是没有体现的,其是通过 indexes 中依次数出来的。
indexes索引数据
index索引单元默写 20 个字节,其中存放着以下四个属性:
- keyHash:消息中指定的业务key的hash值
- phyOffset:当前key对应的消息在commitlog中的偏移量commitlog offset
- timeDiff:当前key对应消息的存储时间与当前indexFile创建时间的时间差
- preIndexNo:当前slot下当前index索引单元的前一个index索引单元的indexNo
indexFile的创建
indexFile的创建时机
- 当第一条带 key 的消息发送来后,系统发现没有 indexFile,此时会创建第一个 indexFile 文件
- 当一个 indexFile 中挂载的 index 索引单元数量超出2000w个时,会创建新的 indexFile。当带key的消息发送到来后,系统会找到最新的 indexFile,并从其indexHeader的最后 4 字节中读取到indexCount。若indexCount >= 2000w时,会创建新的indexFile。
Checkpoint
checkpoint文件的作用是记录commitlog、consumequeue、index文件的刷盘时间点,文件固定长度4k,其中只用了该文件的前24个字节。查看其存储格式
- physicMsgTimestamp:commitlog文件刷盘时间点
- logicsMsgTimestamp:消息的消费队列文件刷盘时间点
- indexMsgTimestamp:索引文件刷盘时间点
Config
config 文件夹中 存储着 Topic 和 Consumer 等相关信息。主题和消费者群组相关的信息就存在在此。
- topics.json:topic 配置属性
- subscriptionGroup.json:消息消费组配置信息。
- delayOffset.json:延时消息队列拉取进度。
- consumerOffset.json:集群消费模式消息消进度。consumerFilter.json :主题消息过滤信息。