从 Broker 获取消息的方式有两种,pull
拉取方式和 push
推动方式。消费者组对于消费的模式又分为两种,广播消费( Broadcasting
)和集群消费( Clustering
)
RoketMQ 中所说的“消费”可以为两个步骤:
- Consumer 从 Broker 拉取消息到本地,并保存到本地的消息缓存队列( ProcessQueue )。这个步骤中,消费的主体是 RocketMQ 的 Consumer 模块。
- Consumer 从本地的消息缓存队列取出消息,并调用上层应用程序指定的回调函数对消息进行处理。这个步骤中,消费的主体是上层应用程序。
消费类型
拉取式消费(pull)
Consumer 主动从 Broker 中拉取消息,主动权由 Consumer 控制。一旦获取了批量消息,就会启动消费过程。不过,该方式的实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。
因为是由客户端发起请求,所以不存在数据积压的问题。缺点是可能不够及时,对客户端来说需要考虑数据拉取相关逻辑,何时去拉,拉的频率怎么控制等等。
推送式消费(push)
这个是典型的发布-订阅模式,Consumer 向其关联的 Queue 注册了监听器,一旦发现有新的消息到来就会触发回调的执行,回调方法是 Consumer 去 Queue 中拉取消息。而这些都是基于 Consumer 与 Broker 间的长连接的
优点
有消息就推给消费者。延迟小,几乎可以做到实时
缺点
- 加大Server端的工作量,进而影响Server的性能,
- Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题(比如消息堆积)。
- 有的消费者机器配置好处理能力强,有的配置低处理能力低,但是server推相同数量级消息给消费者,就会导致消费者强的等待,弱的处理效率跟不上,从而导致崩溃。
- server资源相比消费者的资源肯定是更宝贵
- 总结下就是客户端慢消费(设计到io等耗时操作)时会放大缺点。
消费模式
广播消费(Broadcasting)
相同 Consumer Group 的每个 Consumer 实例都接收同一个 Topic 的全量消息。即每条消息都会被发送到 Consumer Group 中的每个 Consumer。
可以理解为同组各自消费。
即同一 Topic 下,同一消息会被多个实例各自都消费一次,消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
广播消费模式中的 ConsumerGroup 概念没有太大的意义。这适用于一些分发消息的场景。
适用场景&注意事项
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过
集群消费(Clustering)
相同Consumer Group 的每个 Consumer 实例平均分摊同一个Topic的消息。即每条消息只会被发送到Consumer Group中的某个 Consumer。
可以理解成同组公共消费,公共资源我拿了你就没有。
即同一 Topic 下,一个 ConsumerGroup 下如果有多个实例,那么这些实例会均摊消费这些消息
适用场景&注意事项
- 消费端集群化部署, 每条消息只需要被处理一次。
- 由于消费进度在服务端维护, 可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
消费进度保存
广播模式:
消费进度保存在 consumer 端。因为广播模式下 consumer group 中每个 consumer 都会消费所有消息,但它们的消费进度是不同。所以consumer 各自保存各自的消费进度。
集群模式:
消费进度保存在broker中。consumer group中的所有consumer共同消费同一个Topic中的消息,同一条消息只会被消费一次。消费进度会参与到了消费的负载均衡中,故消费进度是需要共享的。下图是broker中存放的各个Topic的各个Queue的消费进度。
Rebalance 机制
集群消费模式才会存在 Rebalance 的情况
Rebalance 指的是将一个 Topic 下的多个 Queue 在同一个 Consumer Group 中的多个 Consumer 间进行重新分配的过程。导致Rebalance产生的原因,无非就两个:消费者所订阅Topic的Queue数量发生变化,或消费者组中消费者的数量发生变化。
举个列子:一个Topic下 5 个队列,在只有 1 个消费者的情况下,这个消费者将负责消费这 5 个队列的消息。如果增加一个消费者,那么就可以给其中一个消费者分配 2 个队列,给另一个分配 3 个队列。
Rebalance 限制
由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。
Rebalance 带来的问题
1. 消费暂停
比如在新增了一个 Consumer 后会触发Rebalanc,这个时候之前的 Consumer 就需要暂停消费,等重新分配完才会再重新工作。
2. 消费重复
重新分配之后,加入的 Consumer 会接着之前队列所属的 Consumer 的进度 offset 继续消费,如果之前 Consumer 消费的offset没有提交,就会导致重复消费。
3. 消费突刺
由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息。
Queue 分配算法
一个 Topic 中的 Queue 只能由 Consumer Group 中的一个 Consumer 进行消费,而一个 Consumer 可以同时消费多个 Queue 中的消息。queue要分配给哪个Consumer进行消费,有几种常见的策略。
平均分配策略
根据 avg = QueueCount / ConsumerCount
的计算结果进行分配的。如果能够整除,则按顺序将 avg 个 Queue 逐个分配 Consumer,如果不能整除,则将多余出的 Queue 按照 Consumer 顺序逐个分配。
环形平均策略
根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配(该算法不用事先计算每个Consumer需要分配几 个Queue,直接一个一个分即可)。
一致性hash策略
将 consumer 的 hash 值作为 Node 节点存放到 hash 环上,然后将 queue 的 hash 值也放到 hash 环上,通过顺时针方向,距离 queue 最近的那个 consume r就是该 queue 要分配的 consumer。
至少一次原则
RocketMQ 有一个原则:每条消息必须要被成功消费一次。
Consumer 在消费完消息后会向其消费进度记录器提交其消费消息的 offset,offset 被成功记录到记录器中,那么这认为条消费就被成功消费了。
对于广播消费模式来说,Consumer本身就是消费进度记录器。
对于集群消费模式来说,Broker是消费进度记录器。
消费幂等
消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。
若某操作执行多次与执行一次对系统产生的影响是相同的,则称该操作是幂等的。
消息重复的场景
发送时消息重复
消息到了 Broker 并完成持久化,因为网络或者其他原因 Broker 对 Producer 应答失败,这个时候Producer意识到消息发送失败并尝试再次发送消息,此时Broker中就可能会出现两条内容相同并且Message ID也相同的消息,那么后续Consumer就一定会消费两次该消息。
消费时消息重复
Consumer 消费之后没有对对 Broker 发送成功响应,为了保证消息至少被消费一次的原则,Broker 会再次尝试投递之前已被处理过的消息,此时消费者就会收到与之前处理过的内容相同、Message ID也相同的消息。
Rebalance 时消息重复
发生 Rebalance 的时候,因为新的 Consumer 是从之前 Consumer 的 offset 开始消费的,如果之前消费的 offset 没有成功写入,此时 Consumer 可能会收到曾经被消费过的消息
通用解决方案
设计的两个要素
- 幂等令牌:是生产者和消费者两者中的既定协议,通常指具备唯一业务标识的字符串。例如,订单号、流水号。一般由Producer随着消息一同发送来的。
- 唯一性处理:服务端通过采用一定的算法策略,保证同一个业务逻辑不会被重复执行成功多次。例如,对同一笔订单的多次支付操作,只会成功一次。
通用性解决方案
- 首先通过缓存去重。在缓存中如果已经存在了某幂等令牌,则说明本次操作是重复性操作,若缓存没有命中,则进入下一步。
- 在唯一性处理之前,先在数据库中查询幂等令牌作为索引的数据是否存在。若存在,则说明本次操作为重复性操作,若不存在,则进入下一步。
- 在同一事务中完成三项操作:唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引的数据写入到DB中。
基本思路就是指定不会重复的唯一标识,不建议以Message ID作为处理依据,最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。
消息的清理
消息的清理是以消息为单位进行清理的,而是以commitlog文件为单位进行清理的。
commitLog文件过期时间默认为 72 小时
自动清理的情况:
- 文件过期,且到达清理时间点(默认为凌晨 4 点)后,自动清理过期文件。
- 文件过期,且磁盘空间占用率已达过期清理警戒线(默认75%)后,无论是否达到清理时间点,都会自动清理过期文件。
- 磁盘占用率达到清理警戒线(默认85%)后,开始按照设定好的规则清理文件,无论是否过期。默认会从最老的文件开始清理。
- 磁盘占用率达到系统危险警戒线(默认90%)后,Broker将拒绝消息写入
注意两点
- 删除是一个压力巨大的IO操作,删除会造成系统性能会骤然下降。默认清理时间点为凌晨 4 点,访问量最小的时间,所以尽量要保障磁盘空间的空闲率,不要使系统出现在其它时间点删除commitlog文件的情况
- 官方建议RocketMQ服务的Linux文件系统采用ext4。因为对于文件删除操作,ext4要比ext3性能更好