简单的记录一下 RocketMQ 消息发送的方式
消息发送方式
RocketMQ支持3种消息发送方式:
- 单向
- 同步
- 异步
单向发送
producer 只负责发送消息,不等待 broker 发回响应结果,而且也没有回调函数触发,这也就意味着 producer 只发送请求不等待响应结果。发送消息的过程耗时非常短,一般在微秒级别。
一般用在日志收集这种对可靠性要求并不高的场景
同步发送
同步发送消息是指,Producer发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。该方式的消息可靠性最高,但消息发送效率太低。
适合一些发送比较重要的消息场景,比如说重要的通知邮件、营销短信等等
需要注意:这种方式具有内部重试机制,默认为2次,发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。
异步发送
producer 发出一条消息后,不需要等待 broker 响应,就接着发送下一条消息的通信方式。需要注意的是,不等待 broker 响应,并不意味着 broker 不响应,而是通过回调接口来接收 broker 的响应。所以要记住一点,异步发送同样可以对消息的响应结果进行处理。
适合于一些比较注重响应时间的场景,比如上传视频之后的转码,上传成功马上返回成功,转码成功之后通过回调函数通知调用方。
RocketMQ内部只对同步模式做了重试,异步发送模式是没有自动重试的,需要自己手动实现
消息批量发送
发送限制:
- 同一批发送的消息的Topic必须相同
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
一批发送的消息总大小不能超过 4MB
,如果超出了可以将消息拆分再批量发送,还可以修改Producer端与Broker端的属性,Producer端需要在发送之前设置Producer 的maxMessageSize
属性, Broker 端需要修改其加载的配置文件中的maxMessageSize
属性。
消息的生产
Producer 可以将消息写入到某 Broker 中的某 Queue 中,大致过程如下:
- Producer 发送消息之前,会先向 NameServer 发出获取消息 Topic 的路由信息的请求
- NameServer 返回该 Topic 的路由表及 Broker 列表
- Producer 根据指定的 Queue 选择策略,从 Queue 列表中选出一个队列,用于后续存储消息
- Produer 对消息做一些特殊处理,例如,消息本身超过4M,则会对其进行压缩
- Producer 向选择出的 Queue 所在的 Broker 发出 RPC 请求,将消息发送到选择出的 Queue
Queue 选择策略
轮询算法
默认选择算法。该算法保证了每个Queue中可以均匀的获取到消息。
如果某个Broker上的Queue投递延迟较严重,就会导致Producer的缓存队列中出现较大的消息积压,影响消息的投递性能。
最小投递延迟算法
会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的Queue。如果延迟相同,则采用轮询算法投递。该算法可以有效提升消息的投递性能。
这个算法会造成Queue上的分配不均匀。投递延迟小的Queue其可能会存在大量的消息。而对该Queue的消费者压力会增大,降低消息的消费能力,可能会导致MQ中消息的堆积。