MQ《MQ高手课》笔记

  1. 基础篇(1-8)
    1. 0.怎样学好消息队列
    2. 1.为什么需要消息队列?
      1. 1.异步:服务端异步化处理
      2. 2.削峰:流量控制
        1. 1.用MQ进行流量控制
        2. 2.用令牌桶进行流量控制
      3. 3.解耦:服务解耦
      4. 小结
      5. 提问
    3. 2.该如何选择消息队列?
      1. 1.选择 MQ 的基本标准
      2. 2.RabbitMQ
        1. 1.特点
        2. 2.缺点
      3. 3.RocketMQ
      4. 4.Kafka
      5. 5.第二梯队的 MQ
      6. 小结
      7. 提问
    4. 3.消息模型:主题和队列有什么区别?
      1. 1.主题和队列有什么区别?
        1. 1.队列模型:一条消息发一个
        2. 2.发布-订阅模型:一条消息发多个
      2. 2.RabbitMQ 的消息模型
        1. 1.Exchang模块
        2. 2.Exchang模块的几种工作模式
      3. 3.RocketMQ的消息模型
        1. 1.标准发布订阅模型的问题
        2. 2.RocketMQ的消息模型
      4. 4.Kafka 的消息模型
      5. 小结
      6. 提问
    5. 4.如何利用事务消息实现分布式事务?
      1. 1.什么是分布式事务?
        1. 1.事务的概念
        2. 2.事务的四个特性
        3. 3.分布式事务的实现方案
      2. 2.MQ如何实现分布式事务
      3. 3.RocketMQ 的分布式事务实现
      4. 小结
      5. 提问
    6. 5.如何确保消息不会丢失?
      1. 1.如何检测消息已丢失
      2. 2.确保消息可靠传递
        1. 1.生产阶段
        2. 2.存储阶段
        3. 3.消费阶段
      3. 小结
      4. 提问
    7. 6.如何处理消费过程中的重复消息?
      1. 1.消息重复无法避免
      2. 2.用幂等性解决重复消息问题
      3. 小结
      4. 提问
    8. 7.消息积压了该如何处理?
      1. 1.避免消息积压:优化性能
        1. 1.发送端性能优化
        2. 2.消费端性能优化
      2. 2.处理消息积压
      3. 小结
      4. 提问
    9. 8.答疑解惑(一) : 网关如何接收服务端的秒杀结果?(暂略)
      1. 小结
      2. 提问
  2. 进阶篇(9-28)
    1. 9.学习开源代码该如何入手?
      1. 1.通过文档来了解开源项目
        1. 1.看官方文档
        2. 2.看论文
      2. 2.用以点带面的方式来阅读源码
      3. 小结
      4. 提问
    2. 10. 如何使用异步设计提升系统性能?
      1. 1.异步设计如何提升系统性能?
        1. 1.一个非常直观的例子
        2. 2.业务场景举例
        3. 3.同步实现的性能瓶颈
        4. 4.采用异步实现解决等待问题
      2. 2.简单实用的异步框架: CompletableFuture
      3. 小结
      4. 提问
    3. 11.如何实现高性能的异步网络传输?
      1. 1.同步网络框架
      2. 2.异步网络框架
      3. 3.Netty
      4. 4.NIO
      5. 小结
      6. 提问
    4. 12.序列化与反序列化:如何通过网络传输结构化的数据?
      1. 1.常用的序列化实现
      2. 2.实现高性能的序列化和反序列化
      3. 小结
      4. 提问
    5. 13.传输协议:应用程序之间对话的语言
      1. 1.如何“断句”
      2. 2.用双工收发协议提升吞吐量
        1. 1.半双工
        2. 2.全双工
      3. 小结
      4. 提问
    6. 14.内存管理:如何避免内存溢出和频繁的垃圾回收?
      1. 1.自动内存管理机制的实现原理
      2. 2.为什么在高并发下程序会卡死?
      3. 3.高并发下的内存管理技巧
      4. 小结
      5. 提问
    7. 15.Kafka如何实现高性能IO?
      1. 1.批量消息减少请求次数,提升服务端处理能力
      2. 2.使用顺序读写提升磁盘 IO 性能
      3. 3.利用 PageCache 加速消息读写
      4. 4.ZeroCopy:零拷贝技术
      5. 小结
      6. 提问
    8. 16.缓存策略:如何使用缓存来减少磁盘IO?
      1. 1.选择只读缓存还是读写缓存?
      2. 2.只读缓存:如何保持数据最新
      3. 3.内存有限:该保留哪些数据
      4. 小结
      5. 提问
    9. 17.如何正确使用锁保护共享数据,协调异步线程?
      1. 小结
      2. 提问
    10. 18.如何用硬件同步原语(CAS)替代锁?
      1. 小结
      2. 提问
    11. 19.数据压缩:时间换空间的游戏
      1. 小结
      2. 提问

基础篇(1-8)

0.怎样学好消息队列

1.消息队列常见使用问题:比如选择哪款消息队列更适合你的业务系统?如何保证系统的高可靠、高可用和高性能?如何保证消息不重复、不丢失?如何做到水平扩展?等等。

2.消息队列很适合拿来做源码分析:涉及的底层技术是非常全面的,比如:高性能通信、海量数据存储、高并发等。

3.文章总体设置:

  • 基础篇:消息队列的使用方法和最佳实践。包括消息队列基础知识、技术选型、高级功能等。
  • 进阶篇:前半部分:关键知识点,常问内容。后半部分:分析一些开源消息队列的源代码,每篇选择一个开源的消息队列。
  • 案例篇:实践两个微型的项目

4.学习消息队列的顺序:

  1. 概念:了解消息的基本概念,比如主题、订阅、分区等。
  2. 源码:深入到源码中去。a.此时需了解其中必备的底层技术,比如高性能的网络传输、内存管理和锁的使用;b.同时也需学习MQ一些高级特性的实现原理,比如如何实现事务消息、消息队列如何支撑海量 IoT 设备同时在线。
  3. 实践:落到代码层面。可以用MQ去实现业务系统,也可以使用实现MQ的底层技术,去实现其他的中间件系统。

5.整体的MQ生态体系:包含MQ产品、标准和协议、应用场景、编程语言、实现技术。如下图所示。

image-20210415190428402

对于实现消息队列中涉及的重要的实现技术,像网络通信序列化反序列化分布式事务内存管理等,这部分内容是这门课程中的精粹,需要你重点学习。

6.MQ学习资源

1.为什么需要消息队列?

本篇主题:为什么需要消息队列,消息队列主要用来解决什么问题。

消息队列的主要功能是收发消息,但是它的作用不仅仅只是解决应用之间的通信问题。

在日常开发中,MQ主要用来解决以下问题

  • 异步
  • 解耦
  • 削峰

1.异步:服务端异步化处理

异步的核心目的就是:加快响应速度

例如:如何设计一个秒杀系统?这个问题基本都少不了MQ。

秒杀系统要解决的核心问题:用有限的服务器资源,处理短时间内的海量请求。

秒杀系统的业务步骤:

  1. 风险控制;
  2. 库存锁定;
  3. 生成订单;
  4. 短信通知;
  5. 更新统计数据。

没有优化的情况下,以上5个步骤顺序执行。

但其实,这5个步骤,只有前2个步骤能决定是否秒杀成功,所以秒杀请求只需处理这2个关键步骤即可,处理完直接响应结果,后续步骤放到MQ里,从而节省时间,节省服务器资源,加快响应速度。我们甚至可以先把服务器都用了支持秒杀,秒杀结束后,再把服务器挪去消费消息。

最终优化如下,前服务端只完成前2个步骤,确定结果马上返回。后续请求数据放入MQ。如下图:

image-20210415190631903

在这个场景用,MQ用来实现服务的异步处理。好处是:

  • 可以更快地返回结果;
  • 减少等待,自然实现了步骤之间的并发,提升系统总体的性能。

2.削峰:流量控制

实现了异步处理后,我们还面临一个问题:如何避免过多的请求压垮我们的秒杀系统?

流量控制的目的:程序需要有自我保护能力。也就是说,即使是海量的请求过来,它还能在自身范围内尽可能处理请求有拒绝处理不了的请求的能力。而不是大流量一来就挂掉,直接丧失处理能力

1.用MQ进行流量控制

设计思路:用消息队列隔离网关和后端服务,从而进行流量控制、保护后端服务。

直接用MQ拦截请求。加入MQ后,流程变为:

  1. 网关收到请求,放到MQ。
  2. 后端从MQ获取请求,完成后续处理后返回结果。

如下图所示:

image-20210415190655858

此时,海量请求被拦截到MQ,后端服务按自己最大的处理能力处理请求即可。超时的请求可以直接丢弃,APP 将超时无响应的请求处理为秒杀失败即可。

该设计的优缺点:

  • 优点:能根据下游的处理能力自动调节流量。
  • 缺点:
    • 增加了系统调用链环节:导致总体的响应时延变长。
    • 增加了系统的复杂度:上下游系统都要将同步调用改为异步消息。
2.用令牌桶进行流量控制

更轻量的流量控制,是令牌桶。

令牌桶原理:单位时间内只发放固定数量的令牌到令牌桶中。规定服务在处理请求之前必须先从令牌桶中拿出一个令牌。若令牌桶中没有令牌,则拒绝请求。

image-20210415190718033

具体流程:网关在处理 APP 请求时,增加一个获取令牌的逻辑即可。无需破坏原有的调用链。

实现方式:简单地用一个有固定容量的消息队列加一个“令牌发生器”来实现:

  • 生产令牌:令牌发生器按照预估的处理能力,匀速生产令牌并放入令牌队列(如果队列满了则丢弃令牌)
  • 消费令牌:网关在收到请求时去令牌队列消费一个令牌,获取到令牌则继续调用后端秒杀服务,如果获取不到令牌则直接返回秒杀失败

3.解耦:服务解耦

服务解耦:实现系统应用之间的解耦。

为什么要解耦:

  • 业务耦合,牵一发动全身:假如一个订单完成后,订单数据要发到很多系统,如支付系统、风控系统、客户系统、统计系统等。这些系统,每个系统都只用到订单数据的一部分。此时订单系统和其他系统是耦合的,任意一个下游系统变更,订单系统和它们之间的接口可能都需要调试或变更。也就是其他系统一变,订单系统可能都要重写进行一次上线。

引入 MQ 后,订单服务在订单变化时发送一条消息到 MQ 的一个主题 Order 中即可,所有下游系统都订阅主题 Order,这样每个下游系统都可以获得一份实时完整的订单数据。

小结

MQ的主要作用:异步、解耦、削峰。

当然,MQ 还有其他适用场景,包括:

  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式;
  • 连接流计算任务和数据;
  • 用于将消息广播给大量接收者。

简单的说,我们在单体应用里面需要用队列解决的问题,在分布式系统中大多都可以用消息队列来解决。

MQ 也有缺点,包括:

  • 引入消息队列带来的延迟问题;
  • 增加了系统的复杂度
  • 可能产生数据不一致的问题。

提问

消息队列主要用了解决什么问题?最初的目的就是,解决应用间通信。其次就是,异步,只执行关键步骤,加快响应速度,还可挪用服务器。削峰,两种流量控制方式。解耦。

2.该如何选择消息队列?

本篇讲常见的开源 MQ 的选型。每个 MQ 都有自己的优缺点和适用场景。

1.选择 MQ 的基本标准

一般有以下标准:

  • 必须是开源的产品:坑都被大家踩过了,不用重复踩坑。
  • 流行且社区活跃度高:流行的产品就稳定。而且流行的产品,一般跟周边的生态结合都比较好。如Flink 内置了 Kafka 的 Data Source。
  • MQ 本身的基本要求:
    • 消息的可靠传递:确保不丢消息
    • Cluster:支持集群,确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;
    • 性能:具备足够好的性能,能满足绝大多数场景的性能要求。

下面来看下常见的MQ。

2.RabbitMQ

1.特点

使用 Erlang 语言编写。

支持 AMQP 协议:少数几个支持 AMQP 协议的消息队列。实现上比较轻量级。

特色功能:支持路由配置。支持非常灵活的路由配置,和其他MQ不同,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,你可以理解为交换机

Exchange 特色模块:作用和交换机也非常相似,根据配置的路由规则生产者发出的消息分发到不同的队列中。路由的规则非常灵活,甚至可以自己来实现。

支持冷门语言:RabbitMQ的客户端支持的编程语言可能是所有消息队列中最多的。

2.缺点

对消息堆积的支持不好:在它的设计理念里面,消息队列是一个管道,大量的消息积压时,会导致 RabbitMQ 的性能急剧下降,应当尽量避免。

性能比较差:每秒钟几万到十几万条。跟 Kafka 和 RocketMQ 比很差。根据官方给出的测试数据,综合日常使用的经验,依据硬件配置的不同,每秒钟几万到十几万条。

语言比较冷门:Erlang 语言比较小众,而且学习曲线非常陡峭。很难做二次开发。

3.RocketMQ

阿里巴巴内部使用,2012 年开源,后捐赠给 Apache 软件基金会,2017 成为 Apache 的顶级项目。Java 语言开发。

具备一个现代的 MQ 应该有的几乎全部功能和特性,且还在持续成长。

使用 Java 语言开发,很容易进行扩展或者二次开发。

响应速度很快:对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级

性能优越:达到每秒钟几十万条。比RabbitMQ高了一个数量级。

缺点:作为国产的 MQ,相比 Kafka等,在国际上还没有那么流行,与周边生态系统的集成和兼容程度要差一些

4.Kafka

由 LinkedIn 开发,目前也是 Apache 的顶级项目。最初的设计目的是用于处理海量的日志。 Scala 和 Java 语言开发。

早期版本缺点较多:早期为了获得极致的性能,在设计方面做了很多的牺牲,比如不保证消息的可靠性,可能会丢失消息,也不支持集群。此时还不能说是一个合格的 MQ。

后期版本功能已完整:后期已逐步补齐短板。现在Kafka已经比较成熟,在数据可靠性、稳定性和功能特性等方面都可以满足绝大多数场景的需求。

生态兼容好与周边生态系统的兼容性是最好的。几乎所有的相关开源软件系统都会优先支持 Kafka。

特点:设计上大量使用了批量和异步的思想,这种设计使得 Kafka 能做到超高的性能。

性能优越:达到每秒钟几十万条。尤其是异步收发的性能,是三者中最好的,但与 RocketMQ 并没有量级上的差异,大约每秒钟可以处理几十万条消息。

使用配置比较好的服务器对 Kafka 进行过压测,在有足够的客户端并发进行异步批量发送,并且开启压缩的情况下,Kafka 的极限处理能力可以超过每秒 2000 万条消息。

缺点同步收发消息的响应时延比较高。这是异步批量的设计带来的问题是。当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,在它的 Broker 中,很多地方都会使用这种“先攒一波再一起处理”的设计。所以甚至会导致,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高,因为很久才能攒齐一波嘛。所以,Kafka 不太适合在线业务场景。

5.第二梯队的 MQ

简单说一下,了解即可:

  • ActiveMQ:最老的 MQ。十年前 MQ 的唯一选择。性能跟最新的 MQ 比起来非常差,存在的意义只是兼容老系统。
  • ZeroMQ:严格来说并不能称之为一个 MQ,而是一个基于消息队列的多线程网络库,如果你的需求是将消息队列的功能集成到你的系统进程中,可以考虑使用 ZeroMQ。
  • Pulsar:新兴的开源消息队列产品,最早是由 Yahoo 开发,目前处于成长期,流行度和成熟度相对没有那么高。Pulsar 采用存储和计算分离的设计,有可能会引领未来消息队列的一个发展方向,建议持续关注。

小结

选型建议

  • RabbitMQ:
    • 性能:万级。
    • 特色:支持 AMQP 协议。生产者和队列之间有Exchange交换机,可配置路由。
    • 缺点:对消息堆积不友好。设计理念上MQ只是一个管道,消息堆积会造成性能急速下降。
    • 选型:熟悉它就选它。
    • 语言:Erlang
  • RocketMQ
    • 性能:几十万级。
    • 特色:响应速度快。专门对响应时延做优化
    • 缺点:国产 MQ,生态兼容性差一点。
    • 选型:在线、金融能快速响应场景,就选它。
    • 语言:Java
  • Kafka
    • 性能:几十万级。
    • 特色:异步收发性能高。大量使用批量和异步的思想
    • 缺点:同步时延高。都是批量思想“惹的祸”。
    • 选型:大数据、流计算相关场景,就选它。
    • 语言:Java 和 Scala 语言开发

另外,新兴的 Pulsar 采用存储和计算分离的设计,可以持续关注。

提问

RabbitMQ、RocketMQ、Kafka怎么选型?

3.消息模型:主题和队列有什么区别?

本篇主题是,消息队列中的队列、主题、分区等基础概念。

早期,有国际组织指定过消息队列的标准,比如早期的 JMS 和 AMQP。但是,由于消息队列发展太迅猛,标准跟不上进化速度,所以这些标准实际上已经被废弃了。

这就导致了,每种消息队列都有自己的一套消息模型

而像队列(Queue)、主题(Topic)或是分区(Partition)这些名词概念,在每个消息队列模型中都会涉及一些,但是含义又还不太一样。

下面我们就来看下这些概念,从消息队列的演进讲起。

1.主题和队列有什么区别?

好的架构不是设计出来的,而是演进出来的。下面我们来看现代消息队列十几年的演进路线。

1.队列模型:一条消息发一个

队列模型:是最初的消息模型。早期的消息队列,就是按照“队列”的数据结构来设计的。特点是,消息严格有序

具体实现生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队,如下图所示,:

image-20210415201133763

特点一条消息只能被一个消费者消费

  • 多个生产者发消息时:此时队列内的消息顺序就是发送的顺序。
  • 多个消费者收消息时:消费者间是竞争关系,一条消息只能被一个消费者消费,消息被消费完就没有了。

缺点

  • 不支持一条消息发给多个消费者。

比如,一份订单数据,要同时发给风控系统、分析系统、支付系统,此时队列模型无法支持这种模式。

如何解决,一条消息,可以给多个消费者消费的问题?容易想到的做法是,为每个消费者创建一条队列,生产者每个消费者发一份就行了。但是这个做法很蠢,有两个问题:

  1. 浪费资源:同一份消息复制了多份。

  2. 生产者和消费者耦合:生产者必须知道具体有哪些消费者需要消费。

显然不行。为了解决这个问题,我们演化出来了另外一种消息模型:“发布 - 订阅模型(Publish-Subscribe Pattern)”。

2.发布-订阅模型:一条消息发多个

总体设计如下图:

image-20210415202359837

讲下概念:

  • 发布者(Publisher):消息的发送方。
  • 订阅者(Subscriber):消息的接收方。
  • 主题(Topic):服务端存放消息的容器。

发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。每份订阅中,订阅者都可以接收到主题的所有消息。

消息模型和发布订阅模型,最大的区别其实就是,一份消息数据能不能被消费多次的问题。

实际上,发布 - 订阅模型中如果只有一个订阅者,那它和队列模型就基本是一样的了。也就是说,发布 - 订阅模型在功能层面上是可以兼容队列模型的。

现代的消息队列大多是用发布 - 订阅模型,当然也有例外(RabbitMQ)。

2.RabbitMQ 的消息模型

RabbitMQ就是这种例外。

RabbitMQ使用的是队列模型

所以我们也就不难明白,前面说到的,消息积压严重时,RabbitMQ性能会大幅下降。

1.Exchang模块

那么,既然使用了队列模型,RabbitMQ是怎么解决一个消息发给多个消费者的问题呢?答案是,Exchange 模块。

Exchange 模块:Exchange 位于生产者和队列之间。生产者将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。如下图所示:

image-20210415203903812

一份消息如何发给多个消费者:配置 Exchange 即可。此时,每个队列中都放着一份完整的消息数据,可以给一个消费者消费。

所以我们看到,RabbitMQ的Exchange和RocketMQ的Topic虽然都直接接收生产者的消息,都担任了分发的工作,但是两者有本质的区别,因为消息模型不一样

2.Exchang模块的几种工作模式

以下工作模式,本质上都只是交换机不同的转发方式而已。术语上和本文中的术语不一样,注意区分:

  1. Simple:简单模式。最简单的队列。

    image-20210415215221307

  2. Work queues:工作模式。支持多个消费者,轮询消费。

    image-20210415215232174

  3. Publish/subscribe:发布订阅模式。多了一个交换机。生产者消费者提前绑定到交换机。

    image-20210415215304233

  4. Routing:路由模式。多了 routing key匹配。生产者和消费者的 routingKey 相等时匹配。

    image-20210415215325565

  5. Topics:主题模式。多了通配符的支持。*代表一个词,#代表多个词。

    image-20210415215355249

  6. Header:用键值对取代 routing key匹配,用header中的 key/value(键值对)匹配队列

  7. RPC:使用队列实现远程调用。通过两个队列分别实现调用和响应的通信。

    image-20210415215532616

交换机的对应类型:

  • Publish/subscribe工作模式:fanout
  • Routing工作模式: direct
  • Topics工作模式:topic
  • Header工作模式:headers

3.RocketMQ的消息模型

1.标准发布订阅模型的问题

“请求-确认”机制:为了确保消息不会丢失(比如网络故障),消息队列中是必须有请求确认的。分为以下两步:

  • 生产者 -> 服务端(Broker):服务端将消息写进队列后,给生产者发送确认响应。否则,生产者重发消息。
  • 服务端 -> 消费者:消费者完成消费业务逻辑后,给服务端发送确认消息。否则,服务端生产者重发消息。

“请求-确认”机制带来的问题消息必须一个一个消费,也就是说同一时刻只能有一个消费者消费消息。因为为了确保消息的有序性,在前一条消息没收到消费成功的响应之前,下一条消息是不能被消费的。

这个问题如何解决?来看RocketMQ的做法。

2.RocketMQ的消息模型

RocketMQ是发布订阅模型。

为了解决上述,同一时刻只能有一个消费者消费消息的问题,RocketMQ在主题里引入了“队列”的概念。每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。消息在队列内有序,队列间无序。注意到,这个的队列,和前面队列模型的队列,完全不是一个概念。

(1)改进思路

我们来复习下前面的标准订阅模型,如下图:

image-20210415202359837

发布者把消息发到主题,消息在主题中按先后顺序排队,订阅者消费主题。

可以看到,上面的模式,是有改进空间的:

  1. 主题Topic:大队列可以拆为很多小队列。原理是,主题中的某个消息,并不是跟其他所有消息绝对有序的,即某个元素只是跟队列中的某一部分元素有顺序关系而已,所有消息都放到同一个主题,但其实并不是所有消息都有严格的先后顺序。所以我们可以把主题,拆成很多小队列,从而可以实现并发处理。例如,大队列是1A一2二B三3C,很显然,我们可以拆为123、一二三、ABC这几个小队列,来进行并发处理。RocketMQ主题中的队列是支持水平扩展的,不过需要手动操作。
  2. 消费者:主题拆为多个小队列了,那么消费者也就可以拆为消费组去并发消费这些小队列。只需保证同一个队列串行,队列间并行的原则,即可保证消息执行的顺序性。即同一个队列,某个时刻,只允许一个消费者占用。同一个消费组,可以并行消费多个小队列。此时同一个队列内消息有序,不同队列间消息无序
  3. 生产者:此时,如果需要保证某几个消息的执行有先后顺序,只需保证这些有序的消息都被发送到同一个小队列即可。具体可根据Hash值发送到指定队列ID,例如同一个订单的风控、库存、统计消息,根据订单ID,哈希发送到同一队列。如果没有顺序要求,那么轮询发送即可。

(2)Rocket消息模型

最终模型如下:

  • 订阅者:通过消费组(Consumer Group)来体现。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
  • 消费组:消费组中包含多个消费者,同一个组内的消费者是竞争关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。
  • Topic:由于同一个消息,可能被多个消费组订阅。所以,消费完的消息并不会被立即删除。所以每个队列上会维护每个消费组的消费位置(Consumer Offset)
  • 消费位置:消费组到主题中某个队列的映射。在这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。

消费者消费消息时,一直不返回结果怎么办:会有一个超时,超时之前会阻塞,超时之后就解除锁定,允许其他消费者来拉消息,由于消费位置没变,下次再有消费者来这个队列拉消息,返回的还是上一条消息。

总体如下图所示:

image-20210415220916831

可能会有疑问,如果仅仅是为了把主题中的大队列拆成很多小队列,那为什么不直接用很多个主题来存小队列呢?

个人的理解,还是要结合实际的业务来看。这个拆分,可能是用来对应微服务场景的。例如,

  • 生产者:某个产生订单数据的服务。
  • 主题:里面的多个小队列,就可以理解为,一个小队列对应一个订单。这样,订单间可以并行来并发处理,订单内可以串行以保证业务顺序。
  • 消费组:不同的消费组对应不同的消费系统,如风控系统、统计系统等。消费组内的多个消费者,就可以对应每个系统的多个微服务。

这样,就完成了,N个订单的并发处理。原来是订单一个一个排队处理,虽然能保证业务顺序,但是排队很久。现在以订单为单位,拆开消息,并发处理,效率起飞。

再提一遍,RocketMQ改进的核心时,解决了由于请求确认机制,导致了,每个时刻只能有一个消费者消费主题的问题。

一些小Tips,帮助理解:

  • 消费者组和队列数没有关系。RocketMQ和Kafka都可以支持水平扩容队列数量,但是都需要手动操作。
  • producer会往所有队列发消息,但不是“同一条消息每个队列都发一次”,每条消息只会往某个队列里面发送一次。
  • 每队列每消费组维护一个消费位置(offset),记录这个消费组在这个队列上消费到哪儿了。消费者是不记录消费位置的,它消费的时候只管去找Broker要消息,Broker必须知道消费到哪儿了,好找出下一条或下一批消息给客户端。
  • 如何把相关的N的消息发送到一个队列:按照订单ID或者用户ID,用一致性哈希算法,计算出队列ID,指定队列ID发送,这样可以保证相同的订单/用户的消息总被发送到同一个队列上,就可以确保严格顺序了。
  • 会有一个超时,超时之前会阻塞,超时之后就解除锁定,允许其他消费者来拉消息,由于消费位置没变,下次再有消费者来这个队列拉消息,返回的还是上一条消息。
  • consumer和queue不是强关联的,但是在任何一个时刻,某个queue在同一个consumer group中最多只能有一个consumer占用。
  • producer和queue不需要关联,简单点儿说,就是发到哪个queue都可以。RocketMQ的默认策略是轮询选择每个queue。

4.Kafka 的消息模型

Kafka 的消息模型和 RocketMQ 是完全一样的.

所有 RocketMQ 中对应的概念,和生产消费过程中的确认机制,都完全适用于 Kafka。

唯一的区别是,在 Kafka 中,队列这个概念的名称不一样,对应的名称是“分区(Partition)”,含义和功能是没有任何区别。

小结

队列和主题这两个概念,实际上对应着两种不同的消息模型:队列模型和发布订阅模型。

这两种消息模型其实并没有本质上的区别,都可以通过一些扩展或者变化来互相替代。

RabbitMQ 采用队列模型,但是它一样可以实现发布 - 订阅的功能。RocketMQ 和 Kafka 采用的是发布 - 订阅模型,并且二者的消息模型是基本一致的。

消本篇息模型和相关的概念是业务层面的模型,但业务模型不等于就是实现层面的模型。比如说 MySQL 和 Hbase 的业务模型中,存放数据的单元都是“表”,但是在实现层面,没有哪个数据库是以二维表的方式去存储数据的,MySQL 使用 B+ 树来存储数据,而 HBase 使用的是 KV 的结构来存储。

同样,像 Kafka 和 RocketMQ 的业务模型基本是一样的,并不是说他们的实现就是一样的,实际上这两个消息队列的实现是完全不同的

提问

消息模型有哪些?主题和队列有什么区别?先讲两个消息模型。再讲其实在RocketMQ中也有队列,概念和队列模型中的队列完全不一样而已。

RabbitMQ是什么消息模型?具体实现原理?队列模型。补充exchange的多种情况。

RabbitMQ的Exchange和RocketMQ的Topic有什么区别?队列有什么区别?两者消息模型都不一样。图都画出来就清楚了。

RocketMQ是什么消息模型?RocketMQ的消息是有序的吗?发布订阅模型。一画图就清楚了。注意消费组、队列的概念。队列内串行,队列间并行,如果业务上几个消息有先后关系,生产者可Hash后指定发到同一队列。

消费者一直不返回消费成功的消息怎么办?

怎样避免消息丢失?RocketMQ层面会进行消息确认。消费者一直不返回响应,队列中的消费位置不变,超时后其他消费者还可以重复消费。

怎样避免消息重复消费?RocketMQ层面会进行消息确认

前面讲到,在消费的时候,为了保证消息的不丢失和严格顺序,每个队列只能串行消费,无法做到并发,否则会出现消费空洞的问题。那如果放宽一下限制,不要求严格顺序,能否做到单个队列的并行消费呢?如果可以,该如何实现?

  • 把消息队列的先进先出,改成数组的随机访问。
  • 用offset来控制消息组具体要消费哪条消息。
  • mq不主动删除消息,消息设置过期时间,过期后只能确认不能重新该消费。
  • 只保留最大可设置天数的消息,超过该天数则删除。
  • 还要维护客户端确认信息,如果有客户端没确认,需要有重发机制。

4.如何利用事务消息实现分布式事务?

为什么消息队列也需要事务呢?消息队列中的“事务”,主要是解决消息生产者和消息消费者的数据一致性问题。即某个业务流程中用到了消息,且需要整个业务流程要么全部成功、要么全部失败,且不可以读到事务中间的数据。此时需要关注,失败的时候,消息已经异步发出去了,怎么办?如果有一条消息,可以把发消息的步骤放到最后。如果有多条消息呢?

以下举例,买商品发布订单,如下图所示:

image-20210417100629430

整个业务场景分为两步:

  1. 创建订单
  2. 发送消息,清空购物车

在分布式系统里,任何一步都可能执行失败。可能有两种情况:

  1. 先创订单成功,后消费消息失败:此时好处理,消息不断重试,保证消费成功就行。
  2. 先消息消费成功,后创订单失败:此时消息已经发出去了,业务端消费成功了,很难撤回,这个时候要回滚,岂不是要配套对应的回滚业务代码??太麻烦了!

问题的关键点就是,需要保障订单库和购物车库的数据一致性,即生产者和消费者的数据一致性。

RocketMQ / Kafka的解决思路:既然整条消息消费完很难回滚,那么我先发一半不就行了。即,发一个“半消息”。这样就完成了消费端的解耦,把回滚操作提前到了MQ里。后文会详解。

1.什么是分布式事务?

1.事务的概念

对若干的数据进行更新,这些更新操作,要么都成功、要么都失败。更新的数据,不仅仅局限于DB,可以是文件、可以是远端的一个服务,或是以其他形式存储的数据。

2.事务的四个特性

一个严格意义的事务实现,必须要保证该事务具有4个属性

  • 原子性:一个事务操作不可再分割。要么成功、要么失败。
  • 一致性:即众多数据的状态是一致的要么都是已更新完的状态,要么都是未更新完的状态。这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据。即不可读到中间状态的数据
  • 隔离性:并发执行的各事务之间不可互相影响。
  • 持久性:事务一旦完成提交,结果就是确定的。后续的其他操作和故障都不会对事务的结果产生任何影响。

一般单体DB都能完整实现事务的ACID。但是,分布式系统要严格实现ACID几乎不可能,实现代价达到我们无法接收。

3.分布式事务的实现方案

所以,一般来说,在分布式系统中的分布式事务实现是不完整的事务实现。根据不同场景,有所取舍。

常见的分布式事务实现

  • 2PC:Two-phase Commit,即二阶段提交。DB本身支持,性能太差。
  • TCC:Try-Confirm-Cancel。阿里的框架?
  • 事务消息:需要消息队列提供相应功能的支持。
    • 适用场景:需要异步更新数据,且对数据实时性要求不太高的场景。比如前面的下订单后删除购物车商品,没哟及时清空也影响不大。后续数据保证最终一致性就行。
  • 其他:本地消息表。其实就是手动模拟事务消息的“半消息”。

2.MQ如何实现分布式事务

上述事务消息的实现,需要MQ提供响应的功能支持。RocketMQ 和 Kafka 都提供了这种支持。

实现思路:就是前面说到的半消息,第一次 try 先存在MQ里提供回滚的机会,第二次 confirm 再把消息发给消费者。本质上就是用 MQ 来实现了TCC。

半消息:包含的内容是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

具体举例,如下图所示:

image-20210418105428033

具体流程:

  1. 订单系统在 MQ 上开启一个事务。
  2. 发送半消息。这个半消息,消息内容是完整的,但是事务提交前,消费者不可见。
  3. 执行本地事务。
  4. 提交或回滚:提交事务消息后,消费者就能看到消息了。回滚事务消息,则删除该消息。提交事务消息失败怎么办
    • Kafka:直接报错。我们可以在业务代码里反复重试,直到提交成功。
    • RabbitMQ:增加了事务反查的机制。见后文。
  5. 投递消息:如果投递失败,则无限重试,直至保证消息被消费。

3.RocketMQ 的分布式事务实现

RocketMQ 提供的事务反查机制:如果生产者提交半消息后,后续没有再提交或回滚该消息。即,Try 后没有 comfirm,此时,RocketMQ 会定期去生产者上反查这个事务的状态,来决定事务消息是投递还是回滚。

很显然,生产者的业务代码需要提供实现一个反查本地事务状态的接口,告诉MQ你自己这个事务到底是成功还是败了。这个反查的业务逻辑,我们要做到无状态。即不要和微服务绑定,最好是和DB绑定。因为微服务是随时可能宕机的。

以上述业务举例,这个反查接口,我们可以跟进消息的订单ID,查询DB里面有没有这个订单,没有的话就说明该订单被本地回滚了,此时返回给RabbitMQ失败即可,RabbitMQ会删除相应的消息。

RabbitMQ 总体流程图如下:

image-20210418111615636

小结

本篇主要讲了,事务的 ACID 四个特性,以及如何使用消息队列来实现分布式事务。

分布式事务解决方案,主要有2TC、TCC、事务消息等。

事务消息需要MQ的功能支持,具体方式是发送半消息、以及后续的提交或回滚。

生产者提交/回滚事务消息失败,即MQ长时间未收到提交/回滚消息时,Kafka直接报错、RocketMQ提供了事务反查机制。

提问

事务的四个特性?原子性、一致性、隔离性、持久性。操作不可再分、数据一致不能读到中间状态的数据、事物间互不影响、结果不可再更改。

什么是分布式事务?我们先来看事务的四个特性:隔离性、一致性、持久性、原子性。然后对照到分布式事务一一分析,其实不能完全达到,只是一种妥协…

分布式事务有哪些方案?分别解释各自特点、适用场景。2PC、TCC、最终一致性(RocketMQ发一个半消息)。

说说RocketMQ 和 Kafka的如何实现分布式事务?

RocketMQ 的这种事务消息是不是完整地实现了事务的 ACID 四个特性?如果不是,哪些特性没有实现?在数据一致性上,只能保证数据的最终一致性。

5.如何确保消息不会丢失?

现在主流的 MQ 都提供了完善的机制,保证消息不丢。绝大部分丢消息,都是因为开发者不熟悉 MQ,配置有问题导致的。

消息队列保证消息传递可靠性的原理,大家都大同小异。

本篇主题,消息队列是怎么保证消息可靠传递的。

1.如何检测消息已丢失

最尴尬的就是丢了消息还不知道。我们来看下怎么检测消息丢失

  • 分布式链路追踪系统:如果基础设施完善,直接使用分布式链路追踪系统,可以很方便地追踪每一条消息。
  • 利用消息队列有序性:如果没有上述系统,则可利用消息队列的有序性来验证是否有消息丢失

利用消息队列有序性原理:很简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号是否连续即可。如果序号不连续,就知道是丢消息了,而且知道丢了哪一条消息。

实现方案:大多数消息队列的客户端都支持拦截器机制,可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性。这样可以不用侵入业务代码。

实现注意事项

  • 多个生产者分开检测:如果你的系统中 Producer 是多实例的,由于不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
  • 多个分区分开检测:像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性
  • 消费者数量最好和MQ分区数量一致:Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

2.确保消息可靠传递

下面我们来看,从生产者到MQ到消费者整个流程中,哪里会丢消息,如何避免

整个消息的流程,如下图所示:

image-20210418142659181

整体可分为三个阶段

  • 生产阶段:消息从 Producer 创建,经过网络传输发送到 Broker 端。
  • 存储阶段:消息在 Broker 端存储,如果是集群,消息会在该阶段被复制到其他的副本上。
  • 消费阶段:Consumer 从 Broker 上拉取消息,经网络传输发送到 Consumer 上。
1.生产阶段

使用请求确认机制:代码调用发消息方法时,MQ的客户端把消息发送到 Broker,Broker 收到消息后给客户端返回一个确认响应客户端收到响应后发送完成。

有些消息队列的 Producer 在长时间没收到确认响应后,会自动重试,若重试再失败,则会以返回值或者异常的方式告知用户。

编写代码的关键点正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。

以下用Kafka举例:

同步发送时,注意捕获异常

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println(" 消息发送成功。");
} catch (Throwable e) {
    System.out.println(" 消息发送失败!");
    System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这里需要特别注意,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果

producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println(" 消息发送成功。");
    } else {
        System.out.println(" 消息发送失败!");
        System.out.println(exception);
    }
});
2.存储阶段

一般只要 Broker 正常运行,就不会丢消息。异常情况是,Broker 出现故障,比如进程死掉了或者服务器宕机时。

和MySQL类似,可以配置 Broker 的参数,确保消息不丢

  • 单个节点的 Broker:配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应。例如,在 RocketMQ 中,需要将刷盘方式参数 flushDiskType 配置为 SYNC_FLUSH 同步刷盘
  • 集群组成的 Broker:配置 Broker 集群参数,消息至少发送到 2 个以上的节点,再给客户端回复发送确认响应。后续会讲,集群模式下,消息队列是如何通过消息复制来确保消息的可靠性的。
3.消费阶段

使用请求确认机制:客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后才会给 Broker 发送消费确认响应。若Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

编写代码的关键点不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认

以下以 Python 语言消费 RabbitMQ 消息为例:

def callback(ch, method, properties, body):
    print(" [x] 收到消息 %r" % body)
    # 在这儿处理收到的消息
    database.save(body)
    print(" [x] 消费完成 ")
    # 完成消费业务逻辑后发送消费确认响应
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_consume(queue='hello', on_message_callback=callback)

小结

检测消息丢失:用链路追踪。或者用MQ自带的拦截器,检测消息有序性。需要分消费者、分区检测。

保证消息丢失:

  • 生产阶段:同步和异步发消息,都要做好异常处理。
  • 存储阶段:单节点配置刷到磁盘再确认,多节点配置发送到两个以上节点再确认。
  • 消费阶段:处理完业务逻辑再确认,不要一收到就确认。另外考虑消息重复消费问题,幂等性。

提问

怎样知道消息已丢?

MQ如何确保消息可靠传递?分三个阶段…….发送阶段:原理是请求确认。关键点是同步和异步发送,都需要做好异常检测。存储阶段:配置好单机和集群MQ的参数。消费阶段:不要一收到消息就回应,而是处理完业务之后再响应。同时注意幂等性。

无论是 Broker 还是 Consumer 都是有可能收到重复消息,如何处理?

6.如何处理消费过程中的重复消息?

重复主要产生在两个地方

  • Producer 发送未收到 Broker 响应:可能会重试,再发送。
  • Broker 未收到 Consumer 消费的响应:下次其他Consumer再次拉取时,拉到的还是同一条消息。

显然,在大多数业务系统里,消费重复的消息,是会有问题的。

MQ 是无法保证消息不重复的,因为网络可能会断。下面具体来看。

1.消息重复无法避免

ISO 的 MQTT 协议给出了三个消息质量标准

  • At most once: 至多一次。消息在传递时,最多会被送达一次。也就是说,可能会丢消息。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但允许有少量重复消息。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢也不允许重复,这个是最高的等级。

现在绝大部分 MQ 提供的都是至少一次,包括 RocketMQ、RabbitMQ 和 Kafka 。

Kafka 所谓的支持“Exactly once”,和上面提到的实际上不是一个东西,它是 Kafka 提供的另外一个特性。另外Kafka 中支持的事务也和我们通常意义理解的事务有一定的差异。在 Kafka 中,事务和 Excactly once 主要是为了配合流计算使用的特性,后面文章会讲。

Kafka 的团队是一个非常善于包装和营销的团队,他们用“事务”和“Exactly once”来包装它的新的特性,实际上它实现的这个事务和 Exactly once 并不是我们通常理解的那两个特性。

以上。既然 MQ 无法保证消除重复消息,那么我们只有在消费代码层面来处理

2.用幂等性解决重复消息问题

即,在消费端,让消费消息的操作具备幂等性

幂等概念:如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。

幂等操作:执行任意多次所产生的影响均与一次执行的影响相同。

如何实现幂等性:从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。不过一般不是所有的业务都是天生幂等的,需要一些设计技巧。

以下是几种常用的设计幂等操作的方法

  • 重复插入就报错:数据库唯一约束,用总的业务ID,限制记录只能插入一条。插入多条时就插入失败。
  • 满足条件才更新:比如,数据版本控制,更新余额前,只有消息中的数据版本号和当前数据版本号一直,才更新。
  • 标记检测消息消费状态:通用性最强,实现起来比较麻烦。“全局唯一 ID”,在执行数据更新操作之前,先检查一下是否执行过这个更新操作。具体实现是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。关键点在于要保证“检查消费状态,更新数据、设置消费状态”三个操作必须作为一组操作,以保证原子性。实现起来很麻烦,会涉及到分布式事务和分布式锁,一般不推荐。

小结

消息重复主要是重试引起的,包括生产者的重试发送,Broker 的重复提供。

MQ 无法避免,所以我们只能在业务层面解决。保证幂等性。常用设计方法:

  • 重复插入就报错:DB唯一性约束。
  • 满足条件才更新:数据版本控制等。
  • 标记检测消息消费状态:通用性最强。实现上会涉及分布式锁等,比较麻烦。

提问

消息为什么会重复?如何解决该问题?网会断。业务代码实现幂等性。

为什么大部分消息队列都选择只提供 At least once 的服务质量,而不是级别更高的 Exactly once 呢?消息队列实现Exactly once级别会引入新的复杂度。而且即使花大力气做到了Exactly once级别,consumer也还是要做幂等。在consumer从消息队列取消息这里,如果consumer消费成功,但是ack失败,consumer还是会取到重复的消息,所以消息队列花大力气做成Exactly once并不能解决业务侧消息重复的问题。

7.消息积压了该如何处理?

在实际运用中,消息积压这个问题,应该是最常遇到的问题了。

消息积压,肯定是系统中某一部分出现了性能问题。我们来看下。

1.避免消息积压:优化性能

主要是考虑发送端接收端的性能优化:

  • 对于MQ本身:MQ其实是不用太考虑的,主流的 MQ,单节点都能达到每秒几万到几十万次,还等通过水平扩展Broker继续提升。
  • 对于业务端:单节点达到每秒几百到几千次,已经算性能相当可以了。

所以,我们更需关注的是,消息的收发两端的配合。

1.发送端性能优化

其实业务逻辑一般在发送消息前就处理完了。一般发消息慢,就看下发消息前的业务逻辑是不是耗时太多。

发送端的主要耗时如下:

  • 发送网络请求前的耗时:发送端准备数据、序列化消息、构造请求等逻辑的时间;
  • 网络耗时:发送消息和返回响应在网络传输中的耗时;
  • Broker 处理消息的时延

那么发消息前的业务逻辑,主要通过以下两点来提升性能

  • 并发发送:时延低,吞吐量小。适合在线业务。如果发送端是微服务,直接发就行。因为所有RPC框架都是支持并发的。
  • 批量发送:吞吐量大,但时延高。适合离线系统。如果发送端数据都来自于DB,直接批量读取,批量发送即可。
2.消费端性能优化

一般具体使用中,大部分原因就是消费端消费太慢。

临时的积压不怕,MQ本来就是用削峰的。最怕的是,消费速度跟不上生产速度,消息越积越多。此时,要么 MQ存储被填满无法服务,要么就消息丢失。

所以,设计的关键点:一定要保证消费端的消费性能要高于生产端的发送性能

两个思路

  • 纵向:优化消费业务逻辑。
  • 横向:水平扩容。注意,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。其实如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的,因为每个分区只支持单线程消费

注意,不要为了提升消费性能,把消息放到内存队列里临时存储,此时节点宕机的话消息就丢了,你就等着哭吧。如下图所示:

image-20210418163941195

2.处理消息积压

业务场景:一般是,日常运行时,少量积压很快就能消费掉。但是,某一个时刻,突然就开始积压消息并且积压持续上涨。

排查思路:一般情况可能是,发送太快消费太慢。极少情况可能是,消费失败导致一条消息反复消费

  • 看监控:大部分消息队列都内置了监控的功能,只要通过监控数据,和日常的数据比较,很容易确定是前者还是后者。
  • 看日志:优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

解决思路

  • 水平扩容:如果确实是业务需要,比如业务搞活动,那么短时间内优化代码显然不现实,水平扩容吧。
  • 系统降级:资源不够水平扩容,只能关闭一些不重要的服务,减少发送方数据量。

其他场景:发送速度和消费速度和日常差不多,但是消息就是积压。此时要看下是不是消费失败导致的一条消息被反复消费。消息反复消费失败,可能就会卡住主队列。有的消息中间件提供了“死信队列”的功能,丢到里面,防止卡死主队列。比如 RabbitMQ。

小结

预防积压:增加批量或增加并发。增加并发时注意保持分区数量和消费者数量一致。

处理积压:看监控,和日常数据比较,看日志。水平扩容、系统降级。

如果消费者消费异常,即使多次消费也无法成功处理(如消息格式异常),导致一直无法成功ack此条消息,这种场景一般要怎么处理?有的消息中间件提供了“死信队列”的功能,它会自动把这种反复消费都失败的消息丢到一个特殊的死信队列中,避免一条消息卡主队列的情况。

提问

如何处理消息积压?

在消费端是否可以通过批量消费的方式来提升消费性能?这种方法有什么局限性?1、要求消费端能够批量处理或者开启多线程进行单条处理。2、批量消费一旦某一条数据消费失败会导致整批数据重复消费。3、对实时性要求不能太高,批量消费需要Broker积累到一定消费数据才会发送到Consumer。

8.答疑解惑(一) : 网关如何接收服务端的秒杀结果?(暂略)

小结

提问

进阶篇(9-28)

9.学习开源代码该如何入手?

本篇主要讲,如何学习开源软件的代码。

1.通过文档来了解开源项目

1.看官方文档

看文档:了解一个项目最佳的方式,就是先看它的文档。快速地掌握这个软件整体的结构,它有哪些功能特性,它涉及到的关键技术、实现原理和它的生态系统等等。对整体架构有个初步的了解。

看官方文档:尽量看原版文档官方网站,不要看翻译的中文文档。翻译的文档,一是实时性不高。而是有些地方翻译不够准确。

Kafka 的官网为例子,来说下怎么来看它的文档:

  1. 跑起来玩下:看Quick Start文档。跑起来,有个直观体会。
  2. 项目介绍和基本概念:看Introduction文档。一般会介绍一些基本概念或者名词。比如,Kafka的Topic、Producer、 Consumer、Partition 这些概念。有些开源项目会单独有一个 Basic Concepts 文档来讲这些基础概念。这个文档非常重要,因为这些开源社区的开发者都有个很不好的爱好:发明概念。很多开源项目都会自己创造一些名词或者概念,了解这些基本概念才有可能看懂它项目的其他文档。
  3. 使用场景和功能特性:对项目有个基本的了解之后呢,接下来你可以看一下它的使用场景、功能特性以及相关的生态系统的介绍。在 Kafka 中功能相关的内容在Use casesEcoSystem两篇文章中,有些项目中会有类似名为 Features 的文档介绍功能和特性。项目的生态系统,也就是 EcoSystem,一般会介绍它这个项目适用的一些典型的使用场景,在某个场景下适合与哪些其他的系统一起来配合使用等。如果说你的系统不是特别特殊或者说冷门的话,你大概率可以在 EcoSystem 里面找到和你类似的场景,可以少走很多的弯路。

通过以上,我们了解到了:

  • 这个项目是干什么的?
  • 能解决哪些问题?
  • 适合在哪些场景使用?
  • 有哪些功能?
  • 如何使用?
2.看论文

以上了解到整个项目,我们是不是就开始看源代码了呢?还不合适。

论文是灵魂:一般这些开源的项目,都是大厂或者科学家先提出理论,然后项目对这个理论进行实践。对于这样的开源项目,它背后的这篇论文就是整个项目的灵魂,你如果能把这篇论文看完并且理解透了,这个项目的实现原理也就清楚了。

对于 Kafka 来说,它的灵魂是这篇博文The Log: What every software engineer should know about real-time data’s unifying abstraction,对应的中文译稿在这里:《日志:每个软件工程师都应该知道的有关实时数据的统一抽象》。

这篇博文被评为程序员史诗般必读文章,无论是不是想了解 Kafka 的实现原理,都强烈推荐你好好读一下上面这篇博文。

看完文档和论文,我们可以开始看源码了。

2.用以点带面的方式来阅读源码

源代码特点:

  • 文章是线性结构:可以从前往后读。
  • 书是树状结构:可以根据目录来读。大体上也可以从前往后读。
  • 源代码是网壮结构:泛泛去读,很容易迷失在这个网里。

阅读源码的方式带着问题去读源码,最好是带着问题的答案去读源码。*每次读源码之前,确定一个具体的问题,比如:

  • RocketMQ 的消息是怎么写到文件里的?
  • Kafka 的 Coordinator 是怎么维护消费位置的?

粒度应该细到每个问题的答案就是一两个流程就可以回答。

怎么找问题看文档

怎么找答案:确定问题后,能找到答案更好。一般来说,核心功能都会有专门的文档来说明它的实现原理,比如在 Kafka 的文档中,DESIGNIMPLEMENTATION两个章节中,介绍了 Kafka 很多功能的实现原理和细节。一些更细节的非核心的功能不一定有专门的文档来说明,但是可以去找一找是否有对应的 Improvement Proposal。(Kafka 的所有 Improvement Proposals 在这里。)

Improvement Proposal:描述一个新功能的文档。每个 Improvement Proposal 都是有固定格式的,一般要说明为什么需要增加这个功能,会对系统产生那些影响和改变,还有我们最关心的设计和实现原理的简述。

有了问题和答案,就可以去读源代码了

这种方式通过一个一个的问题,在网状的源代码中,每次去读几个点组成的那一两条线。随着你通过阅读源码了解的问题越来越多,你对项目源码的理解也会越来越全面和深入。

小结

如何了解一个开源项目

  • 看官网和文档:快速开始(how-to)-》基本概念(Concepts)-》使用场景(Features/EcoSystem)-》论文
  • 看源码:找问题(文档)-》找答案(专门文档/Improvement Proposal)-》读源码

源码整理:读完后,把主要的流程用流程图或者时序图画出来,把重点的算法、原理用文字写出来。

提问

如何了解一个开源项目?

10. 如何使用异步设计提升系统性能?

异步模式设计的优点:使用异步模式设计的程序可以显著减少线程等待,从而在高吞吐量的场景中,极大提升系统的整体性能,显著降低时延。

消息队列的业务场景:MQ是需要超高吞吐量和超低时延的中间件系统。所以,其核心流程,一定会大量采用异步的设计思想。

下面我们来了解下异步设计。

1.异步设计如何提升系统性能?

1.一个非常直观的例子

异步的核心:减少线程上下文切换的成本、提升CPU使用率,也降低了线程数量、减少了等待时延。

直观的例子:太多的线程会造成频繁的cpu上下文切换。可以想象一下,假设你的小公司只有8台电脑,:

  • 你雇8个程序员一直不停的工作显然是效率最高的。
  • 考虑到程序员要休息不可能连轴转,雇佣24个人,每天三班倒,效率也还行。
  • 但是,你要雇佣10000个人,他们还是只能用这8台电脑,大部分时间不都浪费在换人、交接工作上了吗?

上面的8,就是CPU支持的线程数。

其实可以把系统理解为一个管道

  • 同步情况下:每个请求都要等待一下,等待时也占着线程不放,而线程有最大数量限制,所以请求数量有限制。比如,每个请求有100ms时延,OS最大支持200个线程,CPU总共10个线程,那么200个请求过来以后,性能主要有两方面耗损:1.线程上下文切换损耗:200个线程抢占10个CPU线程的性能损耗、线程上下文切换损耗。2.CPU利用率不高:10个线程正在运行的线程,每个都要等待100ms,也就是处理流量是有上限的,为 (1000 / 100)*10 = 100个请求每秒。也就是,线程的大部分时间都花在等待上了。也就是,水管的流量是有限的。
  • 异步情况下:每个请求都无需等待。请求处理好才回调你,等待时不消耗CPU资源CPU利用率很高。所以水管的吞吐量非常大 。所以异步非常适合 IO 等待很多的业务场景。
2.业务场景举例

假设有以下业务场景,转账:

  1. A账户减100,
  2. B账户加100。

时序图如下图,可以看到,Transfer服务就只是我们上述提到的管道而已,大部分时间花在等待Account服务的响应上:

补图。

3.同步实现的性能瓶颈

同步实现如下:

Transfer(accountFrom, accountTo, amount) {
  // 先从 accountFrom 的账户中减去相应的钱数
  Add(accountFrom, -1 * amount)
  // 再把减去的钱数加到 accountTo 的账户中
  Add(accountTo, amount)
  return OK
}

性能分析:

  • 业务处理时延:假设微服务 Add 的平均响应时延是 50ms,那么执行 Transfer 服务的时延就是两次Add,总共100ms。
  • 处理上限:每处理一个请求需要耗时 100ms,并在这 100ms 过程中是需要独占一个线程的。那么,每个线程每秒钟最多可以处理 10 个请求。假设服务器最多可支持10000个线程,那么处理上限为10,000 (个线程)* 10(次请求每秒) = 100,000 次每秒。
  • 请求排队延迟:请求超过上述100000次上限,请求只能阻塞或者排队。此时处理时间:排队的等待时延 + 处理时延 (100ms)。也就是说,在大量请求的情况下,由于同步消耗的线程数量太多,导致达到上限,导致有排队等待时间,导致微服务的平均响应时延变长了。

此时,所有的线程,大部分时间都用在等待上了。CPU使用率不高。

同步总结,服务的类型是IO密集型,此时:

  • 未达到请求线程上限频繁的线程上下文切换,消耗了CPU性能。
  • 达到请求线程上限后请求的排队等待时延,增加了新的等待时间。

如果是计算密集型,则主要是线程上下文切换的消耗。

4.采用异步实现解决等待问题

代码如下:

TransferAsync(accountFrom, accountTo, amount, OnComplete()) {
  // 异步从 accountFrom 的账户中减去相应的钱数,然后调用 OnDebit 方法。
  AddAsync(accountFrom, -1 * amount, OnDebit(accountTo, amount, OnAllDone(OnComplete())))
}
// 扣减账户 accountFrom 完成后调用
OnDebit(accountTo, amount, OnAllDone(OnComplete())) {
  //  再异步把减去的钱数加到 accountTo 的账户中,然后执行 OnAllDone 方法
  AddAsync(accountTo, amount, OnAllDone(OnComplete()))
}
// 转入账户 accountTo 完成后调用
OnAllDone(OnComplete()) {
  OnComplete()
}

参数里面多了回调。 2 个回调方法:

  • *OnDebit()*:扣减账户 accountFrom 完成后调用的回调方法;
  • *OnAllDone()*:转入账户 accountTo 完成后调用的回调方法。

整体流程图如下,可以看到,流程时序和同步的流程图没什么区别,区别只是在线程模型上由同步顺序调用改为了异步调用和回调的机制:

补图。

性能分析:

  • 低请求数量场景:和同步实现是一样,平均响应时延也是 100ms。
  • 超高请求量场景:由于不需要线程等待执行结果,只需要个位数量的线程,即可实现大吞吐量。

由于没有了线程的数量的限制,总体吞吐量上限会大大超过同步实现,并且在服务器 CPU、网络带宽资源达到极限之前,响应时延不会随着请求数量增加而显著升高,几乎可以一直保持约 100ms 的平均响应时延。

异步总结:减少甚至去除等待时间,大幅提升CPU利用率,同时也大幅减少了所需线程数量。因为请求等待时无需占用线程,只需等待回调即可。

2.简单实用的异步框架: CompletableFuture

Java8 内置的异步框架。CompletableFuture类,它的 thenXX() 系列方法几乎囊获了我们在开发异步程序的大部分功能。实际编程中也使用较多。

另一个常用的异步框架是ReactiveX 的RxJava。复杂一些,但功能更强大。

Java中也可以通过传入一个回调类的实例来变相实现类似的回调功能。

CompletableFuture 实现的转账服务示例

先定义两个服务接口,可以看到,返回值都是CompletableFuture<Void>,泛型中是实际需要返回的类型。

/**
 * 账户服务
 */
public interface AccountService {
    /**
     * 变更账户金额
     * @param account 账户 ID
     * @param amount 增加的金额,负值为减少
     */
    CompletableFuture<Void> add(int account, int amount);
}

/**
 * 转账服务
 */
public interface TransferService {
    /**
     * 异步转账服务
     * @param fromAccount 转出账户
     * @param toAccount 转入账户
     * @param amount 转账金额,单位分
     */
    CompletableFuture<Void> transfer(int fromAccount, int toAccount, int amount);
}

实现转账:

/**
 * 转账服务的实现
 */
public class TransferServiceImpl implements TransferService {
    @Inject
    private  AccountService accountService; // 使用依赖注入获取账户服务的实例
    @Override
    public CompletableFuture<Void> transfer(int fromAccount, int toAccount, int amount) {
      // 异步调用 add 方法从 fromAccount 扣减相应金额
      return accountService.add(fromAccount, -1 * amount)
      // 然后调用 add 方法给 toAccount 增加相应金额
      .thenCompose(v -> accountService.add(toAccount, amount));    
    }
}

客户端使用 CompletableFuture 也非常灵活,既可以同步调用,也可以异步调用:

public class Client {
    @Inject
    private TransferService transferService; // 使用依赖注入获取转账服务的实例
    private final static int A = 1000;
    private final static int B = 1001;
 
    public void syncInvoke() throws ExecutionException, InterruptedException {
        // 同步调用
        transferService.transfer(A, B, 100).get();
        System.out.println(" 转账完成!");
    }
 
    public void asyncInvoke() {
        // 异步调用
        transferService.transfer(A, B, 100)
                .thenRun(() -> System.out.println(" 转账完成!"));
    }
}

小结

同步总结,服务的类型是IO密集型时,此时请求有上限:

  • 未达到请求线程上限频繁的线程上下文切换,消耗了CPU性能。
  • 达到请求线程上限后请求的排队等待时延,增加了新的等待时间。

异步总结

  • 减少甚至去除等待时间,大幅提升CPU利用率,同时也大幅减少了所需线程数量。因为请求等待时无需占用线程,只需等待回调即可。

常用的异步框架:CompletableFuture

Java中也可以通过传入一个回调类的实例来变相实现类似的回调功能。

提问

同步有什么问题?IO密集型时,时间大量花在等待上。线程有上限,请求数就有上限。大量的线程导致密集的上下文切换,达到请求上限后,还多了请求排队的时延。总体来说,导致时延降低。

异步有什么好处?提升了CPU利用率。IO密集型服务,线程无需等待,去除等待时间,减少时延。在小请求量和性能和同步差不多,在大请求量下,可以用少量的线程支持,而不像同步需要大量线程,而导致受到了线程数量的限制,以及增加了线程上下文切换的成本。

11.如何实现高性能的异步网络传输?

我们开发的系统,一般分为两种:IO密集型、计算密集型。

最常用的IO资源包括网络IO和磁盘IO。上一篇中,我们已经讲过用异步来提升IO性能。随着SSD的普及,对本地磁盘IO的优化意义越来越小。主要是对网络IO的优化。

所以,本篇来讲,如何实现高性能的异步网络传输

1.同步网络框架

各个语言都会自带网络通信的基础类库。这些类库一般是同步的。

同步的方式:一个 TCP 连接建立后,用户代码会获得一个用于收发数据的通道。每个通道会在内存中开辟两片区域,分别用来做收数据发数据缓存

发数据过程:代码直接往通道写数据。写入的数据暂存到发送缓存中,然后操作系统操作网卡,把缓存中的数据发送到对端的服务器上。此时,发送的耗时,仅仅是一次写入内存的时间而已。所以,发送数据时用同步就行,没必要用异步。

收数据过程:由于不知道数据什么时候来,需要一个线程阻塞等待。数据过来时,OS先把数据写入接收缓存,然后通知接收数据的等待线程线程收到通知后结束等待,开始读取数据。处理完这一批数据后,继续阻塞等待下一批数据到来,这样周而复始。

如下图所示:

补图。

可以看到,连接数变大时,就需要大量的线程来阻塞等待。会造成频繁的上下文切换。

2.异步网络框架

怎样的API好用呢?

最好是用户实现定义好处理逻辑,然后数据来了,框架自动回调处理逻辑就行。

示意图如下:

补图。

Netty就是这样的异步框架。

3.Netty

使用示例,在本地 9999 端口,启动了一个 Socket Server 来接收数据:

// 创建一组线程
EventLoopGroup group = new NioEventLoopGroup();
 
try{
    // 初始化 Server
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(group);
    serverBootstrap.channel(NioServerSocketChannel.class);
    serverBootstrap.localAddress(new InetSocketAddress("localhost", 9999));
 
    // 设置收到数据后的处理的 Handler
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new MyHandler());
        }
    });
    // 绑定端口,开始提供服务
    ChannelFuture channelFuture = serverBootstrap.bind().sync();
    channelFuture.channel().closeFuture().sync();
} catch(Exception e){
    e.printStackTrace();
} finally {
    group.shutdownGracefully().sync();
}

代码解析:

  1. 创建执行线程组:EventLoopGroup 对象,这组线程用来执行收发数据的业务逻辑。
  2. 初始化Server:初始化Socket Server,设置处理线程、channel类型、监听端口、回调方法(继承自ChannelInboundHandlerAdapter)。
  3. 启动服务:用server创建一个channelFuture,然后启动服务。

线程控制、缓存管理、连接管理这些异步网络 IO 中通用的、比较复杂的问题,Netty 已经自动处理。

4.NIO

Java原生的异步框架。Netty也是基于它来实现。

在 NIO 中,每个已经建立好的连接用一个 Channel 对象来表示。然后 NIO 提供了一个 Selector 对象,用来处理 channel。

使用方式:

  1. 把 Channel 绑定到 Selector
  2. 调用Selector.select()方法阻塞线程等待数据:在接收数据的线程,调用 Selector.select() 方法来等待数据到来。
  3. channel返回数据时回调线程:select 方法是一个阻塞方法,接收数据的线程会一直阻塞,直到Selector绑定的 Channel 中的任意一个有数据到来,就会结束等待返回数据。
  4. 线程解析返回数据:返回值是一个迭代器,你可以从这个迭代器里面获取所有 Channel 收到的数据,再执行业务逻辑。

小结

发数据直接写到内存,OS再调用网卡发出去,没有IO阻塞,不用异步。

收数据,不知道数据什么时候来,需要等待,可以用异步解决等待线程过多的问题。

Netty的用法…

NIO的用法…

提问

同步和异步网络框架有什么区别?分为收发。发数据不存在IO等待。主要是接数据的差别。

Netty怎么监听?

NIO怎么监听?

12.序列化与反序列化:如何通过网络传输结构化的数据?

上篇中,消息网络传输的问题解决了,那程序间就可以通信了吗?

还需要进行格式转换。我们知道TCP传输的是字节,而我们的消息时结构化的,如类等。

也就是网络传输和文件保存,都需要序列化与反序列化。

序列化与反序列化:字节流和结构化的数据相互转换的过程。

1.常用的序列化实现

最粗暴的就是转换成字符串,如对象可以转成 json、图片可以转换成BASE64。

选择序列化方法,主要考虑:

  1. 易读性
  2. 实现复杂度
  3. 执行性能:序列化与反序列化速度
  4. 信息密度:即同样的结构化数据,占用存储空间越小越好。

大多数情况下,易读性和信息密度是矛盾的,实现的复杂度和执行性能也是矛盾的。根据业务系统的特点来选择。

比如,电商类、社交类的应用系统,这些系统的特点是,业务复杂,需求变化快,但是对性能的要求没有那么苛刻,可以选择json:

byte [] serializedUser = JsonConvert.SerializeObject(user).getBytes("UTF-8");

需要性能的话,可以用 Kryo ,序列化性能更好,信息密度也更高,但代价就是失去了可读性:

kryo.register(User.class);
Output output = new Output(new FileOutputStream("file.bin"));
kryo.writeObject(output, user);

2.实现高性能的序列化和反序列化

消息队列对性能要求非常高,通用的序列化实现达不到性能要求,所以,很多的消息队列都选择自己实现高性能的专用序列化和反序列化。

典型的实现方法,比如Java的 .class 文件。

序列化的意义:

  1. 节省带宽资源:减小使用的带宽。
  2. 提升兼容性:即使只传输数据的内存结构,对于不同版本的代码,每种编程语言的不同版本,不同语言,都会导致兼容问题。序列化处理可以避开这些问题,保证程序的长期稳定运行,也符合中间件的使用方式

小结

选择序列化工具:从可读性、信息密度,实现复杂读、性能来考虑。有json,Kryo等。

提问

常用的序列化工具选择?从可读性、信息密度,实现复杂读、性能来考虑。有json,Kryo等。

为什么要使用序列化与反序列化?1.节省资源。2.解决兼容性问题。

13.传输协议:应用程序之间对话的语言

应用程序之间要想互相通信,一起配合来实现业务功能,还需要有一套传输协议来支持。

本篇这里的传输协议,指的是应用层的协议。设计协议,主要是考虑不要有歧义,性能好就行。

本篇说一下设计高性能传输协议的一些方法和技巧。

1.如何“断句”

收到的信息是分段零散的:对于传输层来说,收到的数据是是一段一段的字节。但是,因为网络的不确定性,收到的分段并不一定是我们发出去的分段。而且收到分段的顺序也不一定就是我们发送的顺序。

常用的几种分割方法:

  1. 统一加分隔符号:如 HTTP1 协议,它的分隔符是换行(\r\n)。分隔符需要加转义符号处理。比如:下雨天\r\n留客天\r\n天留\r\n我不留
  2. 标明长度:直接在信息前面表明长度。收到数据时,按按照长度来读取即可。比如:03 下雨天 03 留客天 02 天留 03 我不留

第二种方式实现较简单,性能也更好。目前运用比较普遍。

2.用双工收发协议提升吞吐量

1.半双工

半双工协议:同一时间只能收或只能发。简单说,就是一问一答

HTTP1 协议,就是单工协议。客户端与服务端建立一个连接后,客户端发送一个请求,直到服务端返回响应或者请求超时,这段时间内,这个连接通道上是不能再发送其他请求

如下图所示:

补图。

2.全双工

全双工:可以同时收发。且可以同时收发多条信息。

加序号:这时的问题时,收发的数据,没发保证顺序。如,同时发123,收到的顺序可能是321。这时的解决方法是,给消息加上序号。对自己发出去的请求来编号,回复对方响应的时候,带上对方请求的编号就可以了。这样就解决了双工通信的问题。

如下图所示:

补图。

小结

发送消息是连续的,不分段可能会有歧义。因此,需要断句。断句方法:加分隔符,或者加长度标志。

各个通信方式比较

  • 单工:被动接收。数据只在一个方向上传输,不能实现双方通信。如:电视、广播。
  • 半双工:一问一答。允许数据在两个方向上传输,但是同一时间数据只能在一个方向上传输,其实际上是切换的单工。如:对讲机。
  • 全双工:允许数据在两个方向上同时传输。如:手机通话。

提问

信息如何分割?

单工、半双工、全双工区别?

14.内存管理:如何避免内存溢出和频繁的垃圾回收?

1.自动内存管理机制的实现原理

一般采用“标记 - 清除”算法:

  • 标记阶段:从 GC Root 开始,你可以简单地把 GC Root 理解为程序入口的那个对象,标记所有可达的对象,因为程序中所有在用的对象一定都会被这个 GC Root 对象直接或者间接引用。
  • 清除阶段:遍历所有对象,找出所有没有标记的对象。这些没有标记的对象都是可以被回收的,清除这些对象,释放对应的内存即可。

2.为什么在高并发下程序会卡死?

高并发内存溢出的原因:高并发时,更多的请求涌进来,迅速占满内存,被迫频繁的执行垃圾回收。当垃圾回收的速度跟不上创建对象的速度时,就可能OOM了。

3.高并发下的内存管理技巧

如何降低垃圾回收的频率,减少进程暂停的时长?

  • 尽量少创建一次性对象:优化代码中处理请求的业务逻辑,尽量少的创建一次性对象,特别是占用内存较大的对象。

  • 对象池:频繁使用的对象可以考虑用对象池,即享元模式。

  • 自己进行内存管理:流计算平台 Flink,就是自行实现了一套内存管理机制,但是也带来了一些问题和 Bug,总体看来,效果并不是特别好。

小结

内存回收分为标记阶段和回收阶段。

高并发下,垃圾回收速度跟不上对象新建速度,等待时间就会延长,甚至是OOM。

高并发下,可以考虑对象池来减少内存使用。同时注意少创建一次性对象。

提问

高并发下怎么避免内存溢出?1、使用对象池;2、加内存;3、微服务扩容分流…..

15.Kafka如何实现高性能IO?

Kafka性能很强大:在一台配置比较好的服务器上,对 Kafka 做过极限的性能压测,Kafka 单个节点的极限处理能力接近每秒钟 2000 万条消息,吞吐量达到每秒钟 600MB

本篇主要讲 Kafka 是怎么进行性能优化的。

1.批量消息减少请求次数,提升服务端处理能力

在 Kafka 内部,消息都是以“批”为单位处理的。流转流程如下:

  • 发送端批量发送。攒一波一起发。在调用 send() 方法发送一条消息之后,无论是同步还是异步发送,Kafka 都不会立即把消息发出去。它会先把这条消息,存放在内存中缓存起来,然后在合适的时机把缓存中的所有消息组成一批,一次性发给 Broker。
  • Broker端批量处理。以批为单位处理,不会把消息拆开。无论是写入磁盘、从磁盘读出来、还是复制到其他副本这些流程中,批消息都不会被解开,一直是作为一条“批消息”来进行处理的。
  • 消费端批量接收。Consumer 从 Broker 拉到一批消息后,在客户端把批消息解开,再一条一条交给用户代码处理。

分批减少了请求次数:可以看到,消息的汇总成批和拆分都是在客户端进行的。不仅减轻了Broker压力,更重要的是较少了请求次数。

2.使用顺序读写提升磁盘 IO 性能

相比于网络和内存,磁盘的IO是很慢的。

顺序读可以提升IO性能:顺序读的情况下,减少了磁头寻址次数了时间。每次读,只需要寻址一次。对于SSD,顺序读比随机读性能强几倍,机械磁盘则是几十倍。因为机械磁盘需要移动磁头,寻址很慢。

Kafka 的优化

  • 顺序存储:存储设计非常简单,对于每个分区,把从 Producer 收到的消息,顺序地写入对应的 log 文件中。一个文件写满了,就开启一个新的文件这样顺序写下去。
  • 顺序消费:消费时,也是从某个全局的位置开始,即某个 log 文件中的某个位置开始,顺序地把消息读出来

3.利用 PageCache 加速消息读写

PageCache:现代OS都具有的特性。即OS在内存中给磁盘上的文件建立的缓存。所有程序读写文件时,实际操作的都是PageCache,而不是文件本身。

写文件过程:应用程序数据 -》PageCache -》OS分批写入磁盘

读文件过程

  • 命中PageChache -》直接读取。
  • 未命中PageChache -》OS引发缺页中断 -》应用程序读取线程阻塞 -》OS将文件从磁盘读入PageChache -》应用程序读取数据。

PageCache的缓存特性:OS会尽可能利用内存空间。对PageCache一般使用LRU算法管理,最近使用优先。

Kafka充分利用PageCacheLRU特性:尽量保证消息刚刚写入到服务端就能被消费,从而大幅提升PageCache命中率。

大幅提升命中率的好处是,读取速度非常快,且大幅减少了读磁盘的IO,从而提升了写磁盘的IO。

4.ZeroCopy:零拷贝技术

服务端处理消费的逻辑:

  1. 从文件中找到消息,读到内存。
  2. 把消息通过网络发给客户端。

这个过程数据做了2到3次复制

  1. 文件 -》PageCache:如果命中PageCache,则无需执行。
  2. PageCache -》应用程序内存空间:即我们的处理程序。
  3. 应用程序内存空间 -》 Socket 缓冲区:即我们调用网络应用框架的 API 发送数据的过程。

零拷贝技术,可以把复制减少一次,且无需切换到用户态:

  1. 文件 -》PageCache:如果命中PageCache,则无需执行。
  2. PageCache -》Socket 缓冲区:从文件直接到Socket缓冲区。由于无需把数据复制到用户内存空间,因此 DMA 控制器可直接完成数据复制,无需 CPU 参与,速度更快。

系统调用示例:

#include <sys/socket.h>
# 前两个参数分别是目的端和源端的文件描述符,后面两个参数是源端的偏移量和复制数据的长度,返回值是实际复制数据的长度。
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

小结

Kafka的优化:

  1. 批量读写,且拆解放到客户端。
  2. 顺序读写文件。
  3. 刚写入的消息尽量立即读出,充分利用pageCache。
  4. 使用零拷贝,减少一次复制,且避免进入用户态。

提问

Kafka有哪些提高性能的做法?1.批量读写,且拆解放到客户端。2.顺序读写文件。3.刚写入的消息尽量立即读出,充分利用pageCache。4.使用零拷贝,减少一次复制,且避免进入用户态。

16.缓存策略:如何使用缓存来减少磁盘IO?

磁盘读写速度很慢:SSD:每秒钟读写几千次。假设处理一个业务要3-5次磁盘请求的话,只能支持1000次左右请求每秒

用内存作为缓存加速:内存的随机读写速度是磁盘的 10 万倍。所以用内存来作为缓存,给磁盘访问加速。

缓存思想:把低速存储的数据,复制一份到高速存储。实践中,可以加@Cacheable 注解。

用缓存会遇到很多问题:

  • 如何提高命中率?
  • 缓存数据如何保持最新?
  • 缓存数据过期怎么办?

下面来看使用缓存的最佳实践。

1.选择只读缓存还是读写缓存?

读写缓存:例如PageCache。

  • 读写缓存天然不可靠:比如数据还未写入磁盘时,OS宕机。
  • 读写缓存实现比较复杂:需要记录哪些数据有变化、要保证数据一致性。

那么,既然不可靠,Kafka为啥还要用PageCache读写缓存

  • 消息队列读写比例1比1:此时只给一边加速没用。
  • 加入了其他保障手段:依赖不同节点上的多副本来解决数据可靠性问题,某个节点挂掉也无所谓。
  • 不涉及复杂实现:PageCache是由OS来完成的。

除了消息队列这种情况,我们应用程序一般是读多写少。

下面来看下,在构建一个只读缓存时,应该侧重考虑哪些问题。

2.只读缓存:如何保持数据最新

如何保存最新,一下是几个思路:

  • 数据和缓存同时增量更新:磁盘数据和缓存数据同步更新。如果是分布式系统,需要用到事务或者分布式一致性算法。
  • 增量更新需要考虑的问题:
    • 同步更新:磁盘或缓存更新失败时,重试还是报错?
    • 异步更新:如何保证更新时序?
  • 全量更新:很粗暴的每次把磁盘数据重新全部同步到缓存。
  • 自动过期:缓存自动过期。查不到缓存时,从磁盘拿数据。

像交易类这种业务,数据一致性很敏感。一般不用缓存,或者只用第一种方式,保证数据强一致性。

3.内存有限:该保留哪些数据

缓存肯定无法存下所有数据。那么缓存满了的时候,保留哪部分数据性价比最高呢?即缓存的命中率最高。以下是几个思路:

  • 根据业务定制:命中率最高。比如知道某些数据不会再被访问了,就及时删掉。比如右下删除已下线用户的数据。
  • 通用置换算法:一般可以用LRU算法,即最近最少使用算法。

Kafka 使用的 PageCache,是由 Linux 内核实现的,它的置换算法的就是一种 LRU 的变种算法。

小结

提问

如何保持缓存数据最新?数据缓存同步更新(同步、异步)、定期全量更新、制定过期时间。

实现一个LRU算法?

17.如何正确使用锁保护共享数据,协调异步线程?

本篇主题:如何正确使用锁?使用锁需要注意哪些事项?

内容太水,略。具体内容可参加并发编程文章内容,锁。

小结

提问

如何避免死锁?

18.如何用硬件同步原语(CAS)替代锁?

本篇主题:如何使用CAS锁。即乐观锁。

内容太水,略。

小结

提问

什么是CAS?ABA问题?修改时带上上次看到的旧值,符合时才修改。CAS底层由硬件来保证。

19.数据压缩:时间换空间的游戏

Kafka使用数据压缩时,可以大幅提升吞吐量。

数据压缩作用:1.不仅能节省存储空间。2.提升网络传输性能。

在IO和CPU间取舍:压缩和解压的操作都是计算密集型的操作,使用时注意平衡IO和CPU的消耗。

常见的压缩算法:分为有损压缩和无损压缩。常见的有ZIP,GZIP,SNAPPY,LZ4 等等。经典的压缩算法有哈夫曼编码等。

Kafka 如何处理消息压缩:

  • 可配置是否开启压缩,已经采用哪种压缩算法。
  • 开启压缩时,一批消息一起压缩。
  • 压缩和解压都是在发送和接受的客户端。

其他内容太水,略。

小结

提问

案例篇(29-35)

后续就是一些源码分析。以及如何实现RPC框架。暂时不想再跟进。之后复习时搞清楚以下问题:

个人提问汇总

  • 集群怎么搭建?怎么保证高可用
  • 宕机怎么办?
  • 消息怎么分发的?
  • 消息传输如何保证消息不丢?
  • 消息重复消费怎么办?
  • 消息消费失败怎么办?会不会阻塞主流程,死信队列?
  • 消息堆积怎么办?
  • 分布式事务怎么实现?

转载请注明来源