事务消息

分布式事务:

使用场景

RocketMQ的事务遵循最终一致性,只能保证把消息发送到Broker节点,也保证能消费

张三转账给李四,因为分库分表,他俩在不同的数据库,有可能出现张三扣款成功,但是消费者消费消息的时候,李四已经销户,这时候只能再向MQ发送一个回滚消息,让张三回滚扣款,这种更适合使用tcc的模式。

对于普通消息:

修改订单状态后发短信这种场景,极端情况可能有问题,有可能发送消息超时,订单状态回滚了,过一会儿Broker又收到消息,发送了短信,但是订单状态已经不对了。

事务消息:

  • 生产者发送一个“Half”消息,并得到Half消息的UNIQ-ID,作为事务的ID

Half消息与普通消息已有,保存了真实的数据,Broker储存消息的时候,会检查消息是否为prepare状态的事务消息,

如果是,需要把原topic和queueId保存到消息属性内,再修改topic和queueID为half消息的队列,写入commitLog文件,这时候消费者订阅的是原topic,并不会消费到这条消息)

  • 再执行本地的任务,任务成功后,再进行RPC确认消息(commit/rollback, 二阶段提交),请求中包含half消息的commitLogOffset信息

如果Broker没有收到确认请求,例如:生产者在处理事务的时候宕机了

Broker内部会有一个定时服务(事务回查服务),每分钟检查处于prepare状态的half事务消息,发现消息会向生产者组内任意一个生产者发起事务状态回查的RPC请求(这里需要生产者提供回查接口),如果查到了事务信息,就会提交half消息

  • Broker根据commitLogOffset查询到halfMsg后,复制一条新消息,还原topic和queueID,重新写一条commitLog,再删除(逻辑删除)half消息
  • 消费者就能消费到这条消息了

架构

负载均衡

消费者怎么确认自己要消费的队列

消费者启动后,会将自己注册到MQ客户端的实例内(类似监听者模式),

正常来说一个JVM进程只有一个MQ客户端实例,这个实例提供了很多定时服务,

其中包括“负载均衡”定时服务,20s会触发一次Rebalance操作,每次触发会调用消费者对象的rebalance接口,

在消费者启动阶段,会将消费者订阅信息copy到负载均衡实现对象的map中,

然后负载均衡实现对象会根据订阅信息还有主题队列分布信息去计算分配给自己的队列。

分配算法:

消费者知道两组信息:

第一是消费者组下都有哪些消费者,可以通过RPC到Broker获取(Broker端通过收集客户端心跳来维护)

第二是订阅主题队列的分布信息

然后根据负载均衡算法进行分配(默认是平均分配算法)

消费者消费消息

触发消费

每个队列的消费进度是存储在队列所属的Broker节点上

对于新分配给消费者的队列,消费者会先到Broker上进行RPC请求,获取队列的消费进度,然后存储到消费者本地的offsetStore对象里

有了消费记录,会创建一个PullRequest对象,保存队列信息和拉消息位点信息,把request对象交给拉消息服务(PullMessageService)

在PullMessageService内部有一个BlockingQueue,来存放PullRequest对象,同时有自己的线程资源来异步消费blockingQueue,向Broker发起拉消息请求

拉下来一批消息以后,会更新PullRequest下一次拉消息的位点信息,重新存储到blockingQueue,形成拉消息闭环。

消费结果的处理

拉消息和消费消息都是异步的,有各自独立的线程池去完成。

拉取下来的每条消息,都会被封装成“消息消费任务”,提交给并发消费线程池,最终会被线程池内的线程执行

“消息消费任务”核心逻辑是调用用户注册在Consumer上的messageListener对象,对象上封装了用户处理消息的具体逻辑,

messageListener处理完后,会返回消息处理的结果,即:消费成功or失败,

如果消费成功,直接更新消费者本地的offsetStore,消费失败,失败的消息需要回退给Broker节点,放到重试topic中,消费者启动的时候会订阅重试topic。

此时并不会立刻放入到重试topic中,而是将重试topic和queueId放入消息内容中,并设置topic为延迟主题(SCHEDULE_TOPIC_xxxxx)

队列ID根据重试次数决定,队列ID对应延迟时间不同,比如:队列1延迟1s,队列2延迟5s

延迟队列是由Broker服务来消费,检查消息是否达到了交付时间,如果达到了消费时间,则复制消息,从消息内容还原topic和queueId,写入commitLog

消费者拉取消息

消费者消费完消息后会继续拉取消息,但是Broker服务器会进行长轮询处理。

拉消息请求进入到Broker之后,由PullMessageProcessor协议处理器处理,处理器的核心逻辑由两部分:

  • 根据啦消息请求的参数来查询指定offset位点的消息
  • 根据上一步的查询结果做处理(查询到or没查询到)

如果查询到了,直接返回客户端,如果没查询到消息,需要控制客户端拉取的频次,broker会创建一个长轮询对象,保存两个数据: 查询offset信息、NettyChannel对象,将这个对象交给长轮询服务(PullRequestHoldService)。长轮询服务的线程死循环运行,提取“长轮询对象”的拉消息参数(offset)与服务器的最大offset对比,如果最大offset更大说明有新消息,这时将查询参数再次交给PullMessageProcessor处理拉消息,这时候即使是没有消息,也返回给客户端了。

读写分离

  • 写:消息写入到master,通过HA同步程序同步到Slave节点。
  • 读:消费者到broker来拉取消息时,broker会根据拉消息结果,来推荐消费者下一次到主or从节点拉取下一批消息。如果最后一条数据是“热数据”,那下一次还是从Master拉取,如果是冷数据,下一次推荐到Slave上拉取。

冷数据or热数据:距离commitLog最大offset的大小超过系统内存的40%,认为是冷数据(消费者大概率是处于消息堆积状态)。

broker将数据存储到一组commitLog文件,当前正在写的这个commitLog文件,Broker会锁定它对应内存的映射空间(MappedByteBuffer),这个内存缓冲区创建的时候会进行预热,避免写消息时产生缺页异常,再慌忙去申请虚拟内存页对应的物理内存页。被写满的commitlog文件其对应的内存映射缓冲区回解除锁定限制,系统在内存紧张的时候,可以把虚拟内存页对应的物理内存页置换出去,释放缓存的算法一般使用LRU算法(最不经常访问的内存会被先释放掉),因为时顺序写,LRU算法在实际工作的时候会退化成FIFO逻辑(最早的内存页会被先释放)。

冷热数据判断逻辑: