RocketMQ 消息发送
hanpy

简单的记录一下 RocketMQ 消息发送的方式

消息发送方式

RocketMQ支持3种消息发送方式:

  • 单向
  • 同步
  • 异步

单向发送

producer 只负责发送消息,不等待 broker 发回响应结果,而且也没有回调函数触发,这也就意味着 producer 只发送请求不等待响应结果。发送消息的过程耗时非常短,一般在微秒级别。

一般用在日志收集这种对可靠性要求并不高的场景

同步发送

同步发送消息是指,Producer发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。该方式的消息可靠性最高,但消息发送效率太低。

适合一些发送比较重要的消息场景,比如说重要的通知邮件、营销短信等等

需要注意:这种方式具有内部重试机制,默认为2次,发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

异步发送

producer 发出一条消息后,不需要等待 broker 响应,就接着发送下一条消息的通信方式。需要注意的是,不等待 broker 响应,并不意味着 broker 不响应,而是通过回调接口来接收 broker 的响应。所以要记住一点,异步发送同样可以对消息的响应结果进行处理。

适合于一些比较注重响应时间的场景,比如上传视频之后的转码,上传成功马上返回成功,转码成功之后通过回调函数通知调用方。

RocketMQ内部只对同步模式做了重试,异步发送模式是没有自动重试的,需要自己手动实现

消息批量发送

发送限制:

  1. 同一批发送的消息的Topic必须相同
  2. 批量发送的消息必须具有相同的刷盘策略
  3. 批量发送的消息不能是延时消息与事务消息

一批发送的消息总大小不能超过 4MB ,如果超出了可以将消息拆分再批量发送,还可以修改Producer端与Broker端的属性,Producer端需要在发送之前设置Producer 的maxMessageSize 属性, Broker 端需要修改其加载的配置文件中的maxMessageSize属性。

消息的生产

Producer 可以将消息写入到某 Broker 中的某 Queue 中,大致过程如下:

  1. Producer 发送消息之前,会先向 NameServer 发出获取消息 Topic 的路由信息的请求
  2. NameServer 返回该 Topic 的路由表及 Broker 列表
  3. Producer 根据指定的 Queue 选择策略,从 Queue 列表中选出一个队列,用于后续存储消息
  4. Produer 对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩
  5. Producer 向选择出的 Queue 所在的 Broker 发出 RPC 请求,将消息发送到选择出的 Queue

Queue 选择策略

轮询算法
默认选择算法。该算法保证了每个Queue中可以均匀的获取到消息。
如果某个Broker上的Queue投递延迟较严重,就会导致Producer的缓存队列中出现较大的消息积压,影响消息的投递性能。

最小投递延迟算法
会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的Queue。如果延迟相同,则采用轮询算法投递。该算法可以有效提升消息的投递性能。
这个算法会造成Queue上的分配不均匀。投递延迟小的Queue其可能会存在大量的消息。而对该Queue的消费者压力会增大,降低消息的消费能力,可能会导致MQ中消息的堆积。