JUC《Java并发编程实战》笔记

  1. 零、绪论
  2. 一、并发理论基础
    1. 1.可见性、原子性和有序性问题:并发编程Bug的源头
      1. 1.源头一:缓存导致的可见性问题
      2. 2.源头二:线程切换带来的原子性问题
      3. 3.源头三:编译优化带来的有序性问题
      4. 提问
    2. 2.Java内存模型:看Java如何解决可见性和有序性问题
      1. Java 内存模型
      2. 使用 volatile 的困惑
      3. Happens-Before 规则
      4. final关键字
      5. 总结
      6. 提问
    3. 3.互斥锁(上):解决原子性问题
      1. 如何解决原子性问题
      2. 锁模型
      3. Java 语言提供的锁技术:synchronized
      4. 锁和受保护资源的关系
      5. 总结
      6. 提问
    4. 4.互斥锁(下):如何用一把锁保护多个资源?
      1. 保护没有关联关系的多个资源
      2. 保护有关联关系的多个资源
      3. 使用锁的正确姿势
      4. 总结
      5. 提问
    5. 5.一不小心就死锁了,怎么办?
      1. 细粒度锁
      2. 死锁
      3. 如何预防死锁
        1. 1.破坏占用且等待条件
        2. 2.破坏不可抢占条件
        3. 3.破坏循环等待条件
      4. 总结
      5. 提问
    6. 6.用“等待-通知”机制优化循环等待
      1. 用 synchronized 实现等待 - 通知机制
      2. 使用代码范式
      3. 尽量使用notifyAll()
      4. 提问
    7. 7.安全性、活跃性以及性能问题
      1. 安全性问题
      2. 活跃性问题
      3. 性能问题
        1. 1.串行对性能的影响:
        2. 2.性能问题如何解决
        3. 3.性能的指标
      4. 总结
      5. 提问
    8. 8.管程:并发编程的万能钥匙
      1. 什么是管程
      2. MESA 模型
      3. wait() 的编程范式
      4. notify() 何时可以使用
      5. 三种管程的区别
      6. 总结
      7. 提问
    9. 9.Java线程(上):Java线程的生命周期
      1. 通用的线程生命周期
      2. Java中线程的生命周期
        1. 1.RUNNABLE 与 BLOCKED 的状态转换
        2. 2.RUNNABLE 与 WAITING 的状态转换
        3. 3.RUNNABLE 与 TIMED_WAITING 的状态转换
        4. 4.从 NEW 到 RUNNABLE 状态
        5. 5.从 RUNNABLE 到 TERMINATED 状态
      3. interrupt()方法
      4. 总结
      5. 提问
    10. 10.Java线程(中):创建多少线程合适?
      1. 为什么要使用多线程?
      2. 多线程的应用场景
      3. 创建多少线程合适?
      4. 总结
      5. 提问
    11. 11.Java线程(下):为什么局部变量是线程安全的
      1. 方法是如何被执行的
      2. 局部变量放在哪里
      3. 调用栈与线程
      4. 线程封闭
      5. 提问
    12. 12.如何用面向对象思想写好并发程序
      1. 一、封装共享变量
      2. 二、识别共享变量间的约束条件
      3. 三、制定并发访问策略
      4. 总结
      5. 提问
    13. 13.并发编程理论基础总结串讲
      1. 1.总结串讲
      2. 2.关键词
      3. 3.用锁的最佳实践
  3. 二、并发工具类
    1. 14.Lock和Condition(上):隐藏在并发包中的管程
      1. 再造管程的理由
      2. 如何保证可见性
      3. 什么是可重入锁
      4. 公平锁与非公平锁
      5. 用锁的最佳实践
      6. 总结
      7. 提问
      8. 关键字
    2. 15.Lock和Condition(下):Dubbo如何用管程实现异步转同步?
      1. Lock&Condition的使用范式
      2. 同步和异步
      3. Dubbo 源码分析
      4. 总结
      5. 提问
    3. 16.Semaphore:如何快速实现一个限流器
      1. 信号量模型
      2. 如何使用信号量
      3. 实现一个限流器
      4. 总结
      5. 提问
    4. 17.ReadWriteLock:如何快速实现一个完备的缓存
      1. 什么是读写锁
      2. 快速实现一个缓存
      3. 缓存初始化示例
      4. 读写锁的升级和降级
      5. 总结
      6. 提问
    5. 18.StampedLock:有没有比读写锁更快的锁?
      1. StampedLock 支持的三种锁模式
      2. StampedLock 使用注意事项
      3. 总结
      4. 提问
    6. 19.CountDownLatch和CyclicBarrier:如何让多线程步调一致?
      1. 对账:单线程实现
      2. 对账:原生管程实现
      3. 对账:CountDownLatch实现
      4. 对账:CyclicBarrier实现
      5. 总结
      6. 提问
    7. 20.并发容器:都有哪些“坑”需要我们填?
      1. 同步容器及其注意事项
      2. 并发容器容器及其注意事项
        1. (1) List
        2. (2) Map
        3. (3) Set
        4. (4)Queue
      3. 总结
      4. 提问
    8. 21.原子类:无锁工具类的典范
      1. 无锁方案的实现原理
      2. Java如何实现CAS
      3. 原子类概览
        1. (1)原子化基本数据类型
        2. (2)原子化对象引用类型
        3. (3)原子化数组
        4. (4)原子化对象属性更新器
        5. (5)原子化的累加器
      4. 总结
      5. 提问
    9. 22.Executor与线程池:如何创建正确的线程池?
      1. 线程池是一种生产者 - 消费者模式
      2. 如何使用 Java 中的线程池
      3. 使用线程池要注意什么
      4. 总结
      5. 提问
    10. 23.Future:如何用多线程实现最优的“烧水泡茶”程序?
      1. 如何获取任务执行结果
      2. 使用实例:“烧水泡茶”程序
      3. 总结
      4. 提问
    11. 24.CompletableFuture:异步编程没那么难
      1. 概述
      2. CompletableFuture 的核心优势
      3. 创建 CompletableFuture 对象
      4. 如何理解 CompletionStage 接口
        1. 1. 描述串行关系
      5. 2. 描述 AND 汇聚关系
      6. 3. 描述 OR 汇聚关系
      7. 4. 异常处理
      8. 总结
      9. 提问
    12. 25.CompletionService:如何批量执行异步任务?
      1. 概述
      2. 利用 CompletionService 实现询价系统
      3. 如何创建 CompletionService
      4. CompletionService 接口说明
      5. 利用 CompletionService 实现 Dubbo 中的 Forking Cluster
      6. 总结
      7. 提问
    13. 26.Fork/Join:单机版的MapReduce
      1. 概述
      2. 分治任务模型
      3. Fork/Join 的使用
      4. ForkJoinPool 工作原理
      5. 模拟 MapReduce 统计单词数量
      6. 注意事项
      7. 总结
      8. 提问
    14. 27.并发工具类使用注意事项
      1. 1.使用while(true) 时要格外注意终止条件
      2. 2.尽量使用signalAll(),比signal()更安全
      3. 3.Semaphore 需要锁中锁
      4. 4.锁的申请和释放要成对出现
      5. 5.回调总要关心执行线程是谁
      6. 6. 共享线程池:有福同享就要有难同当
      7. 7.线上问题定位的利器:线程栈 dump
    15. 本部分总结
  4. 三、并发设计模式
  5. 四、案例分析
  6. 五、其他并发模型
    1. 42.Actor模型:面向对象原生的并发模型
      1. Hello Actor (Akka)
      2. 消息和对象方法的区别
      3. Actor 的规范化定义
      4. 总结
    2. 43.软件事务内存:借鉴数据库的并发经验
    3. 44.协程:更轻量级的线程
      1. Golang 中的协程
    4. 45.CSP模型:Golang的主力队员
      1. 什么是 CSP 模型
      2. CSP 模型与生产者 - 消费者模式
      3. CSP 模型与 Actor 模型的区别

零、绪论

Java里的并发编程Api,如 synchronized、wait()/notify(),是对管程模型的实现。解决并发的模型有信号量管程,Java语言描述管程模式更加方便,所以用管程实现。

其实并发编程可以总结为三个核心问题:分工、同步、互斥。

  • 分工指的是如何高效地拆解任务并分配给线程。即如何分工。
  • 同步指的是线程之间如何协作、通信。即如何合作。
  • 互斥则是保证同一时刻只允许一个线程访问共享资源。即如何解决冲突。

1.分工

Java SDK 并发包里的 Executor、Fork/Join、Future 本质上都是一种分工方法。除此之外,并发编程领域还总结了一些设计模式,基本上都是和分工方法相关的,例如生产者 - 消费者、Thread-Per-Message、Worker Thread 模式等都是用来指导你如何分工的。

2.同步

在并发编程领域里的同步,主要指的就是线程间的协作。工作中遇到的线程协作问题,基本上都可以描述为这样的一个问题:当某个条件不满足时,线程需要等待,当某个条件满足时,线程需要被唤醒执行

协作一般是和分工相关的。Java SDK 并发包里的 Executor、Fork/Join、Future 本质上都是分工方法,但同时也能解决线程协作的问题。例如,用 Future 可以发起一个异步调用,当主线程通过 get() 方法取结果时,主线程就会等待,当异步执行的结果返回时,get() 方法就自动返回了。主线程和异步线程之间的协作,Future 工具类已经帮我们解决了。除此之外,Java SDK 里提供的 CountDownLatch、CyclicBarrier、Phaser、Exchanger 也都是用来解决线程协作问题的。

在 Java 并发编程领域,解决协作问题的核心技术是管程,上面提到的所有线程协作技术底层都是利用管程解决的。管程是一种解决并发问题的通用模型,除了能解决线程协作问题,还能解决下面我们将要介绍的互斥问题。可以这么说,管程是解决并发问题的万能钥匙

3.互斥

分工、同步主要强调的是性能,但并发程序里还有一部分是关于正确性的,用专业术语叫“线程安全”。

并发程序里,当多个线程同时访问同一个共享变量的时候,结果是不确定的。不确定,则意味着可能正确,也可能错误,事先是不知道的。而导致不确定的主要源头是可见性问题有序性问题原子性问题,为了解决这三个问题,Java 语言引入了内存模型,内存模型提供了一系列的规则,利用这些规则,我们可以避免可见性问题、有序性问题,但是还不足以完全解决线程安全问题。解决线程安全问题的核心方案还是互斥

所谓互斥,指的是同一时刻,只允许一个线程访问共享变量。

实现互斥的核心技术就是锁,Java 语言里 synchronized、SDK 里的各种 Lock 都能解决互斥问题。虽说锁解决了安全性问题,但同时也带来了性能问题,那如何保证安全性的同时又尽量提高性能呢?可以分场景优化,Java SDK 里提供的 ReadWriteLock、StampedLock 就可以优化读多写少场景下锁的性能。还可以使用无锁的数据结构,例如 Java SDK 里提供的原子类都是基于无锁技术实现的。

除此之外,还有一些其他的方案,原理是不共享变量或者变量只允许读。这方面,Java 提供了Thread Local 和 final关键字,还有一种 Copy-on-write的模式。

使用锁除了要注意性能问题外,还需要注意死锁问题。

总结:分工、同步、互斥。分工直接分配即可,没有要解决的问题。同步需要解决线程协作问题,一般是条件不满足时线程等待,条件满足时唤醒线程。互斥需要解决线程安全问题,即多个线程访问同一共享变量时结果是不确定的。管程用来解决以上同步和互斥遇到的问题。线程安全问题的源头是可见性问题、有序性问题、原子性问题,它们分别由 CPU /缓存、编译优化、操作系统(后续完善)引起。其中,可见性问题、有序性问题引入了Java内存模型来解决。原子性问题则引入锁来解决,可以用有锁或无锁方式。有锁时为了提供性能需要对锁进行各种优化,包括各种粒度的锁等。无锁方式有ThreadLocal、final,以及Copy-on-write。

以下是并发编程总体思维导图:

image-20201021111046332

一、并发理论基础

1.可见性、原子性和有序性问题:并发编程Bug的源头

CPU、内存、I/O 设备的技术不断在快速迭代,但有一个核心矛盾一直存在,就是这三者的速度差异。为了合理利用 CPU 的高性能,平衡这三者的速度差异,计算机体系机构操作系统编译程序都做出了贡献,主要体现为:

  1. CPU 增加了缓存,以均衡CPU与内存的速度差异;
  2. 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;
  3. 编译程序优化指令执行次序,使得从而更加合理地利用缓存

同时以上三个优化也导致了以下三个问题

  1. 缓存导致了可见性问题。CPU自己的缓存和内存不同步。
  2. 线程切换导致了原子性问题
  3. 编译优化导致了有序性问题

1.源头一:缓存导致的可见性问题

下图一目了然,线程A和线程B,在自己CPU中的变量V对其他CPU是不可见的,只有在变量V同步到内存中后,才可以看到。这就是可见性问题。

image-20201021142858717

2.源头二:线程切换带来的原子性问题

CPU只保证CPU执行指令的原子性,而高级语言的一条指令是有可能对应CPU的多条指令的。

早期的操作系统基于进程来调度 CPU,不同进程间是不共享内存空间的,所以进程要做任务切换就要切换内存映射地址,而一个进程创建的所有线程,都是共享一个内存空间的,所以线程做任务切换成本就很低了。现代的操作系统都基于更轻量的线程来调度,现在我们提到的“任务切换”都是指“线程切换”。

当同一个进程创建的线程,同时操作一个共享变量时,就会发现线程A对共享变量的操作还没完,共享变量就又被B操作了,就破坏了原子性。因为CPU只保证CPU一条指令的原子性,不保证高级语言一条指令的原子性。而高级语言的一条指令是有可能对应CPU的多条指令的。

举例1:线程 A 和线程 B 按照下图的序列执行,那么我们会发现两个线程都执行了 count+=1 的操作,但是得到的结果不是我们期望的 2,而是 1。

image-20201021144243907

举例2:在 32 位的机器上对 long 型变量进行加减操作存在并发隐患。

long类型64位,所以在32位的机器上,对long类型的数据操作通常需要多条指令组合出来,无法保证原子性,所以并发的时候会出问题

3.源头三:编译优化带来的有序性问题

编译器为了优化性能,有时候会改变程序中语句的先后顺序,例如程序中:“a=6;b=7;”编译器优化后可能变成“b=7;a=6;”。

提问

1.是什么导致了并发编程的问题?最本质的原因是什么?

cpu与内存差异 -> 缓存 -> 可见性,cpu与I/O差异 -> 进程线程分时复用 -> 线程切换 -> 原子性,更合理利用缓存 -> 编译优化 -> 有序性。

2.Java内存模型:看Java如何解决可见性和有序性问题

可见性、原子性、有序性是共性问题,所有编程语言都会遇到。

在Java语言中,用Java内存模型解决了可见性问题和有序性问题

Java 内存模型

既然导致可见性的原因是缓存,导致有序性的原因是编译优化,那解决可见性、有序性最直接的办法就是禁用缓存和编译优化,不过这样程序的性能就很低了。k

合理的方案应该是按需禁用缓存以及编译优化。那么,如何做到“按需禁用”呢?肯定是按照程序员的需求。

所以Java程序员提供了按需禁用缓存和编译优化的方法,即Java 内存模型。

Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的方法。具体来说,这些方法包括 volatilesynchronizedfinal 三个关键字,以及六项 Happens-Before 规则。这也是本节的重点内容。

使用 volatile 的困惑

volatile 关键字在 C 语言里就有,它最原始的意义就是禁用 CPU 缓存,它表达的是:告诉编译器,对这个变量的读写,不能使用 CPU 缓存,必须从内存中读取或者写入。

在Java中,表达的是类似的含义,不过不是禁用缓存,有差异。看如下代码:

以下代码,直觉上看,应该是 42,那实际应该是多少呢?这个要看 Java 的版本,如果在低于 1.5 版本上运行,x 可能是 42,也有可能是 0;如果在 1.5 以上的版本上运行,x 就是等于 42。因为1.5引入了一个Happens-Before 规则对这个进行了优化。

// 以下代码来源于【参考 1】
class VolatileExample {
  int x = 0;
  volatile boolean v = false;
  public void writer() {
    x = 42;
    v = true;
  }
  public void reader() {
    if (v == true) {
      // 这里 x 会是多少呢?
    }
  }
}

Happens-Before 规则

Happens-Before 并不是说前面一个操作发生在后续操作的前面,它真正要表达的是:前面一个操作的结果对后续操作是可见的

比较正式的说法是:Happens-Before 约束了编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守 Happens-Before 规则。

以下是和程序员相关的六条规则:

  1. 顺序性同一个线程中,前面的结果对后面可见。在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。
  2. 传递性不同线程间,Happens-Before具有传递性。如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C。这一条很重要,意味着如果线程 B 读到了“v=true”,那么线程 A 设置的“x=42”对线程 B 是可见的。这就是 1.5 版本对 volatile 语义的增强,这个增强意义重大,1.5 版本的并发工具包(java.util.concurrent)就是靠 volatile 语义来搞定可见性的。
  3. volatile 变量规则不同线程间,volatile 的写对读可见。对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。
  4. 管程中锁的规则不同线程间,同一个锁解锁操作对后续的加锁可见。即对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。管程是一种通用的同步原语,在 Java 中指的就是 synchronized,synchronized 是 Java 里对管程的实现。管程中的锁在 Java 里是隐式实现的,加锁以及释放锁都是编译器帮我们实现的。
  5. 线程 start() 规则:这条是关于线程启动的。执行start()之前的共享变量结果对被start()的线程可见。它是指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。
  6. 线程 join() 规则:这条是关于线程等待的。join()线程的结果对主线程的后续操作可见。它是指主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程 B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作。

final关键字

volatile 为的是禁用缓存以及编译优化。有没有办法告诉编译器优化得更好一点呢?这个可以有,就是final 关键字final 修饰变量时,初衷是告诉编译器:这个变量生而不变,可以可劲儿优化。

总结

Happens-Before 规则最初是在一篇叫做Time, Clocks, and the Ordering of Events in a Distributed System的论文中提出来的,在这篇论文中,Happens-Before 的语义是一种因果关系。在现实世界里,如果 A 事件是导致 B 事件的起因,那么 A 事件一定是先于(Happens-Before)B 事件发生的,这个就是 Happens-Before 语义的现实理解。

在 Java 语言里面,Happens-Before 的语义本质上是一种可见性,A Happens-Before B 意味着 A 事件对 B 事件来说是可见的,无论 A 事件和 B 事件是否发生在同一个线程里。例如 A 事件发生在线程 1 上,B 事件发生在线程 2 上,Happens-Before 规则保证线程 2 上也能看到 A 事件的发生。

Java 内存模型主要分为两部分,一部分面向编写并发程序的应用开发人员,另一部分是面向 JVM 的实现人员的。面向JVM的规则还有两个:

  • 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。
  • 对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。

提问

Java内存模型是什么?用来解决什么问题?按需禁用缓存及编译优化,解决可见性、有序性。

Happens-Before 规则?6+2个。强调的是可见性。重点理解传递性。

3.互斥锁(上):解决原子性问题

本节思路:如何用一把锁保护一个资源。

如何解决原子性问题

原子性问题的源头是线程切换,而操作系统做线程切换是依赖 CPU 中断的,所以禁止 CPU 发生中断就能够禁止线程切换。

也就是说,可以通过禁止CPU中断来避免原子性问题。但这只适合早起单核CPU的情况。

同一时刻只有一个线程执行”这个条件非常重要,我们称之为互斥如果我们能够保证对共享变量的修改是互斥的,那么,无论是单核 CPU 还是多核 CPU,就都能保证原子性了

锁模型

我们想到了用锁来保证互斥:

image-20201021223831643

注意上图中,我们提前创建了保护资源R的锁,并且这个锁和受保护资源间是有关联的。这个关联关系非常重要。很多并发 Bug 的出现都是因为把它忽略了,然后就出现了类似锁自家门来保护他家资产的事情。

Java 语言提供的锁技术:synchronized

典型用法如下:

class X {
  // 修饰非静态方法
  synchronized void foo() {
    // 临界区
  }
  // 修饰静态方法
  synchronized static void bar() {
    // 临界区
  }
    
  // 修饰代码块
  Object obj = new Object();
  void baz() {
    synchronized(obj) {
      // 临界区
    }
  }
}  

我们注意到代码和上述的锁模型有所不同。因为Java做了一点小改动:

  • 无需手动加锁解锁:Java 编译器会在 synchronized 修饰的方法或代码块前后自动加上加锁 lock() 和解锁 unlock()。这样就保证他们是成对出现的。
  • 当修饰静态方法的时候,默认锁定当前类的 Class 对象,在上面的例子中就是 Class X;
  • 当修饰非静态方法的时候,默认锁定当前实例对象 this。

使用举例:

“管程中锁的规则:对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。”保证了不同线程就行addOne()不会有问题。

class SafeCalc {
  long value = 0L;
  synchronized long get() {
    return value;
  }
  synchronized void addOne() {
    value += 1;
  }
}

以上代码用我们之前的模型表示:

image-20201021224930410

锁和受保护资源的关系

我们前面提到,受保护资源和锁之间的关联关系非常重要,他们的关系是怎样的呢?一个合理的关系是:受保护资源和锁之间的关联关系是 N:1 的关系可以一把锁保护多个资源,但不能多把锁保护一个资源。这里把锁看作是访问资源的途径会更好理解一点。

现实世界里,我们可以用多把锁来保护同一个资源,但在并发领域不行,会有并发问题。不过我们可以反过来,用一把锁保护多个资源。以下是举例,我们稍稍改了一下上面的代码,把addOne()方法改为静态锁,此时value变量同时被静态对象和对象两个锁保护,就会触发并发的数据错误,访问不唯一了,两个线程可以分别用两把锁分别访问共享变量,互斥失效:

class SafeCalc {
  static long value = 0L;
  synchronized long get() {
    return value;
  }
  synchronized static void addOne() {
    value += 1;
  }
}

情况相当于下图,显然互斥失效:

image-20201021225645081

总结

临界区的代码是操作受保护资源的路径,类似于球场的入口,入口一定要检票,也就是要加锁,但不是随便一把锁都能有效。所以必须深入分析锁定的对象和受保护资源的关系,综合考虑受保护资源的访问路径,多方面考量才能用好互斥锁

synchronized 是 Java 在语言层面提供的互斥原语,其实 Java 里面还有很多其他类型的锁,但作为互斥锁,原理都是相通的:锁,一定有一个要锁定的对象,至于这个锁定的对象要保护的资源以及在哪里加锁 / 解锁,就属于设计层面的事情了。

提问

如何解决原子性问题?互斥锁。互斥即独占共享资源。

synchronized的本质是什么?管程。

一把锁可以保护多个资源吗?一个资源可以用多把锁保护吗?可。不可。锁这里理解为访问路径更直观。

4.互斥锁(下):如何用一把锁保护多个资源?

本节思路:如何用一把锁保护多个资源。

上一节中我们已经知道受保护资源和锁之间合理的关联关系应该是 N:1 的关系,也重点强调了“不能用多把锁来保护一个资源”。本次我们来看下如何保护多个资源

当我们要保护多个资源时,首先要区分这些资源是否存在关联关系

保护没有关联关系的多个资源

这种情况,不同的资源用不同的锁保护,各自管各自的,很简单。

以下是分别保护账户余额保护密码,用了两把锁:

class Account {
  // 锁:保护账户余额
  private final Object balLock
    = new Object();
  // 账户余额  
  private Integer balance;
  // 锁:保护账户密码
  private final Object pwLock
    = new Object();
  // 账户密码
  private String password;
 
  // 取款
  void withdraw(Integer amt) {
    synchronized(balLock) {
      if (this.balance > amt){
        this.balance -= amt;
      }
    }
  } 
  // 查看余额
  Integer getBalance() {
    synchronized(balLock) {
      return balance;
    }
  }
 
  // 更改密码
  void updatePassword(String pw){
    synchronized(pwLock) {
      this.password = pw;
    }
  } 
  // 查看密码
  String getPassword() {
    synchronized(pwLock) {
      return password;
    }
  }
}

当然,我们也可以用一把互斥锁来保护多个资源,例如我们可以用 this 这一把锁来管理账户类里所有的资源:账户余额和用户密码。具体实现很简单,示例程序中所有的方法都增加同步关键字 synchronized 就可以了

但是用一把锁性能太差了。我们用两把锁,取款和修改密码是可以并行的。用不同的锁对受保护资源进行精细化管理,能够提升性能。这种锁还有个名字,叫细粒度锁

保护有关联关系的多个资源

这个要具体分析。

例如银行业务里面的转账操作,账户 A 减少 100 元,账户 B 增加 100 元。这两个账户就是有关联关系的。代码描述如下:

class Account {
  private int balance;
  // 转账
  void transfer(
      Account target, int amt){
    if (this.balance > amt) {
      this.balance -= amt;
      target.balance += amt;
    }
  } 
}

像下面这样,直接加synchronized用this一把锁来保护,肯定是错的。因为锁没覆盖到所有临界区。临界区内有别人的账户,用自己的this是锁不住别人家的账户的,相当于用自家的锁来保护别人家的资产,别人的账户随时有可能被更改

class Account {
  private int balance;
  // 转账
  synchronized void transfer(
      Account target, int amt){
    if (this.balance > amt) {
      this.balance -= amt;
      target.balance += amt;// 注意这里,这个账户是从形参传进来的。这里不能访问,但在别处随时能访问。锁没覆盖到所有临界区。
    }
  } 
}

this 这把锁可以保护自己的余额 this.balance,却保护不了别人的余额 target.balance:

image-20201021233441206

使用锁的正确姿势

那么以上问题怎么解决呢?

很简单,只要我们的锁能覆盖所有受保护资源就可以了。所以我们直接用了类级别锁,它的class对象是所有线程共享的:

class Account {
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    synchronized(Account.class) {
      if (this.balance > amt) {
        this.balance -= amt;
        target.balance += amt;
      }
    }
  } 
}

上面的锁太粗了,下一节会讲优化。

总结

如何保护多个资源,关键是要分析多个资源之间的关系。如果资源之间没有关系,每个资源一把锁就可以了。如果资源之间有关联关系,就要选择一个粒度更大的锁,这个锁应该能够覆盖所有相关的资源。除此之外,还要梳理出有哪些访问路径,所有的访问路径都要设置合适的锁。

“原子性”的本质是什么?其实不是不可分割,不可分割只是外在表现,其本质是多个资源间有一致性的要求,操作的中间状态对外不可见。所以解决原子性问题,是要保证中间状态对外不可见

提问

锁如何保护多个资源?资源间无关联时?有关联时?无关联用细粒度锁。有关联用覆盖所有资源的大锁或细粒度锁,使用时注意死锁。

什么是细粒度锁?就是用多个锁细化管理的锁。

原子性的本质是什么?中间状态对外不可见。

5.一不小心就死锁了,怎么办?

细粒度锁

如何解决上一小节中,用类对象锁,效率过低的问题?

我们可以用细粒度锁,即用两把锁,分别锁住转入账户和转出账户。只有两把锁都拿到的时候,才执行操作。

image-20201022100202547

代码如下:

class Account {
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    // 锁定转出账户
    synchronized(this) {              
      // 锁定转入账户
      synchronized(target) {           
        if (this.balance > amt) {
          this.balance -= amt;
          target.balance += amt;
        }
      }
    }
  } 
}

但是此时会有死锁问题:两个线程都各自拿了一把锁,然后等对方释放。

死锁

死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象

上述死锁的情况,用资源分配图表示如下:

image-20201022101105682

如何预防死锁

死锁发生时再去解决,我们手段很少,基本只能重启。所以我们的思路改为如何预防死锁:

Coffman总结了发生死锁的四个条件

  1. 互斥,共享资源 X 和 Y 只能被一个线程占用;规则:所有的盘子只能一个人用
  2. 占有等待,即占有且等待。线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;规则:拿了一个盘子就一定要等着,直到拿到所有的剩下盘子。即拿了就不放。
  3. 不可抢占,其他线程不能强行抢占线程 T1 占有的资源;规则:别人拿了就不能抢
  4. 循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。情景:相互在等待对方的盘子

也就是说只要我们破坏其中一个,就可以成功避免死锁的发生。简单来说,就是资源是互斥的,拿到一部分资源后不能放弃,也不能去抢,还相互等着

我们一一来分析:

互斥条件肯定没法破坏,因为我们用锁就是为了互斥。

  1. 对于“占有且等待”这个条件,我们可以一次性申请所有的资源,这样就不存在等待了。即一次拿完,那占有后就无需等待。
  2. 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
  3. 对于“循环等待”这个条件,可以靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化后自然就不存在循环了。不给循环等待的机会。

以下是示例代码,场景是转账,必须同时获得转入账户和转出账户的锁。

1.破坏占用且等待条件

即一次性申请所有资源。我们需要一个角色(Java 里面的类)来管理临界区,我们就把这个角色定为 Allocator。它有两个重要功能,分别是:同时申请资源 apply() 和同时释放资源 free()。账户 Account 类里面持有一个 Allocator 的单例(必须是单例,只能由一个人来分配资源)。当账户 Account 在执行转账操作的时候,首先向 Allocator 同时申请转出账户和转入账户这两个资源,成功后再锁定这两个资源;当转账操作执行完,释放锁之后,我们需通知 Allocator 同时释放转出账户和转入账户这两个资源。具体的代码实现如下。

class Allocator {
  private List<Object> als =
    new ArrayList<>();
  // 一次性申请所有资源
  synchronized boolean apply(
    Object from, Object to){
    if(als.contains(from) ||
         als.contains(to)){
      return false;  
    } else {
      als.add(from);
      als.add(to);  
    }
    return true;
  }
  // 归还资源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
  }
}
 
class Account {
  // actr 应该为单例
  private Allocator actr;
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    // 一次性申请转出账户和转入账户,直到成功
    while(!actr.apply(this, target))
      ;
    try{
      // 锁定转出账户
      synchronized(this){              
        // 锁定转入账户
        synchronized(target){           
          if (this.balance > amt){
            this.balance -= amt;
            target.balance += amt;
          }
        }
      }
    } finally {
      actr.free(this, target)
    }
  } 
}

2.破坏不可抢占条件

破坏不可抢占条件看上去很简单,核心是要能够主动释放它占有的资源,这一点 synchronized 是做不到的原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

Java 在语言层次没有解决这个问题,不过在 SDK 层面还是解决了的,java.util.concurrent 这个包下面提供的 Lock 可以解决这个问题,这个之后讲。

3.破坏循环等待条件

破坏这个条件,需要对多个资源进行排序,然后按序申请资源

我们假设每个账户都有不同的属性 id,这个 id 可以作为排序字段,申请的时候,我们可以按照从小到大的顺序来申请即可。

class Account {
  private int id;
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    Account left = this       // ① 将资源排序,按序申请
    Account right = target;   // ②
    if (this.id > target.id) {// ③
      left = target;          // ④
      right = this;           // ⑤
    }                         // ⑥
    // 锁定序号小的账户
    synchronized(left){
      // 锁定序号大的账户
      synchronized(right){ 
        if (this.balance > amt){
          this.balance -= amt;
          target.balance += amt;
        }
      }
    }
  } 
}

总结

我们这一节主要讲了用细粒度锁来锁定多个资源时,要注意死锁的问题。这个就需要你能把它强化为一个思维定势,遇到这种场景,马上想到可能存在死锁问题。当你知道风险之后,才有机会谈如何预防和避免,因此,识别出风险很重要

预防死锁主要是破坏三个条件中的一个,有了这个思路后,实现就简单了。注意到上面破坏占用且等待条件时我们用了死循环 while(!actr.apply(this, target));方法,这里性能占用还是挺高的。我们后一节会尝试使用“等待-通知”的线程间通讯来解决,后续将。

我们在选择具体方案的时候,还需要评估一下操作成本,从中选择一个成本最低的方案

提问

死锁由什么引起?死锁同时满足的四个条件?分别如何预防?

使用细粒度锁时,一个操作要一次同时获取多个锁时才能操作。多个线程同时获取这些锁时就引起了竞争。竞争就导致了死锁。解决死锁太麻烦,预防是最好的。

6.用“等待-通知”机制优化循环等待

用来避免用死循环的方式等待,提高程序性能。

上例中,如果线程要求的条件(转出账本和转入账本同在文件架上)不满足,则线程阻塞自己,进入等待状态;当线程要求的条件(转出账本和转入账本同在文件架上)满足后,通知等待的线程重新执行。其中,使用线程阻塞的方式就能避免循环等待消耗 CPU 的问题。

拿就医流程对比,我们可以得到一个完整的等待 - 通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁

用 synchronized 实现等待 - 通知机制

在 Java 语言里,等待 - 通知机制可以有多种实现方式,比如 Java 语言内置的 synchronized 配合 wait()notify()notifyAll() 这三个方法就能轻松实现。

在下面这个图里,左边有一个等待队列,同一时刻,只允许一个线程进入 synchronized 保护的临界区(这个临界区可以看作大夫的诊室),当有一个线程进入临界区后,其他线程就只能进入图中左边的等待队列里等待(相当于患者分诊等待)。这个等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列。wait()方法工作原理图:

image-20201023105635941

当调用 wait() 方法后,当前线程就会被阻塞,并且进入到右边的等待队列中,这个等待队列也是互斥锁的等待队列。 线程在进入等待队列的同时,会释放持有的互斥锁

当线程要求的条件满足时,调用 notify() 和 notifyAll() 方法。会通知等待队列(互斥锁的等待队列)中的线程,告诉它条件曾经满足过。因为notify() 只能保证在通知时间点,条件是满足的。这一点你要格外注意。

此外,被通知的线程要想重新执行,仍然需要获取到互斥锁

我们一直强调 wait()、notify()、notifyAll() 方法操作的等待队列是互斥锁的等待队列,如果 synchronized 锁定的是 target,那么对应的一定是 target.wait()、target.notify()、target.notifyAll() 。而且 wait()、notify()、notifyAll() 这三个方法能够被调用的前提是已经获取了相应的互斥锁,所以我们会发现 wait()、notify()、notifyAll() 都是在 synchronized{}内部被调用的。如果在 synchronized{}外部调用,或者锁定的 this,而用 target.wait() 调用的话,JVM 会抛出一个运行时异常:java.lang.IllegalMonitorStateException

使用代码范式

我们使用等待-通知机制的范式要特别注意,建议用以下经典的范式,因为线程被唤醒时,只是表示条件曾经满足过,所以要再次检验条件满不满足:

while(条件不满足) {
    wait();
  }

上一节中,我们的转账时,我们委托单例,一次性拿到两个账本的锁的代码示例如下:

class Allocator {
  private List<Object> als;
  // 一次性申请所有资源
  synchronized void apply(
    Object from, Object to){
    // 经典写法
    while(als.contains(from) ||
         als.contains(to)){
      try{
        wait();
      }catch(Exception e){
      }   
    } 
    als.add(from);
    als.add(to);  
  }
  // 归还资源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
    notifyAll();
  }
}

尽量使用notifyAll()

notify() 是会随机地通知等待队列中的一个线程,而 notifyAll() 会通知等待队列中的所有线程。使用 notify() 的风险在于可能导致某些线程永远不会被通知到。

例如,线程1申请到A锁,线程2申请到B锁,线程3等待A锁释放,线程4等待B锁释放。如果线程1用完并释放A资源,选择通知线程4,那么此时线程4继续等待。线程3没有机会再被唤醒,会一直等待下去,因为已经没有机会再被通知了。

提问

为什么需要等待-通知机制?避免等待互斥资源时,cpu循环消耗。

等待-通知机制原理及流程?

wait()和sleep()区别?

wait()实现的典型范式?为什么要这样做?

notify()和notifyAll()有什么区别?推荐用哪个?为什么?

7.安全性、活跃性以及性能问题

这一小节是对前面6小节的一个整理归纳

并发编程中我们需要注意的问题有很多,很庆幸前人已经帮我们总结过了,主要有三个方面,分别是:安全性问题、活跃性问题和性能问题

安全性问题

那什么是线程安全呢?其实本质上就是正确性,而正确性的含义就是程序按照我们期望的执行

那如何保证线程安全呢?避免出现原子性问题可见性问题有序性问题即可。

那是不是所有的代码都要分析这三个问题呢?当然不是,其实只有一种情况需要:存在共享数据并且该数据会发生变化,通俗地讲就是有多个线程会同时读写同一数据

所以基于上诉原因,做到不共享数据或者数据状态不发生变化也可以保证线程安全。例如线程本地存储(Thread Local Storage,TLS)、不变模式等等。

但是,现实生活中,必须共享会发生变化的数据。当多个线程同时访问同一数据,并且至少有一个线程会写这个数据时,我们不采取措施,就会发生并发Bug。也叫发生了数据竞争。例如下例,多个线程同时调用add10K():

public class Test {
  private long count = 0;
  void add10K() {
    int idx = 0;
    while(idx++ < 10000) {
      count += 1;
    }
  }
}

是否加锁就能避免并发问题呢?并不是。以下get()和set()加了锁,但是add10K还是有问题:

public class Test {
  private long count = 0;
  synchronized long get(){
    return count;
  }
  synchronized void set(long v){
    count = v;
  } 
  void add10K() {
    int idx = 0;
    while(idx++ < 10000) {
      set(get()+1)      
    }
  }
}

以上,假设 count=0,当两个线程同时执行 get() 方法时,get() 方法会返回相同的值 0,两个线程执行 get()+1 操作,结果都是 1,之后两个线程再将结果 1 写入了内存。你本来期望的是 2,而结果却是 1。

这种问题,有个官方的称呼,叫竞态条件(Race Condition)。所谓竞态条件,指的是程序的执行结果依赖线程执行的顺序。而线程的执行顺序是不确定的,如果程序存在竞态条件问题,那就意味着程序执行的结果是不确定的,而执行结果不确定这可是个大 Bug。例如上面的例子,如果两个线程完全同时执行,那么结果是 1;如果两个线程是前后执行,那么结果就是 2。

可以按照下面这样来理解竞态条件。在并发场景中,程序的执行依赖于某个状态变量,也就是类似于下面这样:

if (状态变量 满足 执行条件) {
  执行操作
}

在A线程执行操作时,B线程改变了他的前置状态变量条件,导致该执行操作错误,因为前提条件被修改了。很多时候这个前置条件时隐式的,例如前面 addOne 的例子中,set(get()+1) 这个复合操作,其实就隐式依赖 get() 的结果。

面对数据竞争和竞态条件问题,如何保证线程安全?我们可以用互斥互斥的方案有很多,CPU 提供了相关的互斥指令,操作系统、编程语言也会提供相关的 API。从逻辑上来看,我们可以统一归为:

活跃性问题

所谓活跃性问题,指的是某个操作无法执行下去。包括“死锁”、“活锁”和“饥饿”

  • 死锁线程相互等待,表现形式是线程永久“阻塞”。解决方式:破坏4个死锁条件中的三个中的一个。
  • 活锁线程没阻塞,但是还是执行不下去。即两者同时互相谦让。解决方式:等待随机时间。Raft 这样知名的分布式一致性算法中也用到了它
  • 饥饿指线程因无法访问所需资源而无法执行下去的情况。例如有些线程优先级过小,一直拿不到资源。解决方式:一是保证资源充足,二是公平地分配资源,三就是避免持有锁的线程长时间执行。第二种使用相对较多。一和三实际中都难以避免。那如何公平地分配资源呢?在并发编程里,主要是使用公平锁。所谓公平锁,是一种先来后到的方案,线程的等待是有顺序的,排在等待队列前面的线程会优先获得资源

性能问题

“锁”的过度使用可能导致串行化的范围过大,这样性能反而低了,违反了我们用多线程提升性能的初衷。

1.串行对性能的影响:

有个阿姆达尔(Amdahl)定律,代表了处理器并行运算之后效率提升的能力,它正好可以解决这个问题,具体公式如下:

image-20201023164820021

公式里的 n 可以理解为 CPU 的核数,p 可以理解为并行百分比,那(1-p)就是串行百分比了,也就是我们假设的 5%。我们再假设 CPU 的核数(也就是 n)无穷大,那加速比 S 的极限就是 20。也就是说,如果我们的串行率是 5%,那么我们无论采用什么技术,最高也就只能提高 20 倍的性能。

2.性能问题如何解决

所以使用锁的时候一定要关注对性能的影响。 那怎么才能避免锁带来的性能问题呢?这个问题很复杂,Java SDK 并发包里之所以有那么多东西,有很大一部分原因就是要提升在某个特定领域的性能

不过从方案层面,我们可以这样来解决这个问题:

  1. 使用无锁的算法和数据结构:例如线程本地存储 (Thread Local Storage, TLS)、写入时复制 (Copy-on-write)、乐观锁等;Java 并发包里面的原子类也是一种无锁的数据结构;Disruptor 则是一个无锁的内存队列…
  2. 减少锁持有的时间:例如使用细粒度的锁,一个典型的例子就是 Java 并发包里的 ConcurrentHashMap,它使用了所谓分段锁的技术(这个技术后面我们会详细介绍);还可以使用读写锁,也就是读是无锁的,只有写的时候才会互斥。

3.性能的指标

指标有很多,有三个指标非常重要,就是:吞吐量延迟并发量

  1. 吞吐量:指的是单位时间内能处理的请求数量。单位“个/秒”。吞吐量越高,说明性能越好。
  2. 延迟:指的是从发出请求到收到响应的时间。单位“秒”。延迟越小,说明性能越好。
  3. 并发量:指的是能同时处理的请求数量。单位:“个”。一般来说随着并发量的增加、延迟也会增加(因为排队的人多了?)。所以延迟这个指标,一般都会是基于并发量来说的。例如并发量是 1000 的时候,延迟是 50 毫秒。

总结

并发微观上涉及到原子性问题、可见性问题和有序性问题宏观则表现为安全性、活跃性以及性能问题。

我们在设计并发程序的时候,主要是从宏观出发,也就是要重点关注它的安全性、活跃性以及性能。安全性方面要注意数据竞争竞态条件活跃性方面需要注意死锁活锁饥饿等问题,性能方面使用无锁减少锁持有时间。具体问题具体分析。

安全性、活跃性以及性能问题:线程要安全、不卡死、性能。

提问

如何保证线程安全?避免出现原子性/可见性/有序性问题。非数据竞争(线程本地数据/不变模式),数据竞争->竞态条件->互斥->锁。

活跃性问题是啥?有哪些?如何解决?某个操作无法执行。死锁,活锁,饥饿…

如何计算串行对性能的影响?串行度为5%,性能最多提高所少?阿姆达尔(Amdahl)定律

如何提升性能?无锁、减少锁持有时间

性能指标有哪些?分别什么意思?吞吐量、并发量、延迟…

8.管程:并发编程的万能钥匙

管程是一把解决并发问题的万能钥匙。

Java 语言在 1.5 之前,提供的唯一的并发原语就是管程,而且 1.5 之后提供的 SDK 并发包,也是以管程技术为基础的。除此之外,C/C++、C# 等高级语言也都支持管程。

什么是管程

Java 采用了管程技术,因为管程模型和面向对象高度契合。synchronized 关键字及 wait()notify()notifyAll() 这三个方法都是管程的组成部分。而管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。但是管程更容易使用,所以 Java 选择了管程。

管程,对应的英文是 Monitor,很多 Java 领域的同学都喜欢将其翻译成“监视器”。

所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。翻译为 Java 领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的。

管程有三个模型:Hasen 模型Hoare 模型MESA 模型

Java用的是MESA,我们以MESA为例来讲讲。

MESA 模型

如何解决线程间互斥问题?

管程解决互斥问题的思路很简单,就是将共享变量及其对共享变量的操作统一封装起来。

以下是对一个队列操作的例子,入队和出队保持互斥,只允许一个线程进入管程:

image-20201027104017871

如何解决线程间同步问题?

类似就医流程,用了两个队列:

  • 入口等待队列:当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。
  • 条件变量等待队列:管程里引入了条件变量的概念,而且每个条件变量都对应有一个等待队列。线程进入管程后,条件不满足时进入等待队列等待。条件满足后接到notifyAll(),重新进入入口等待队列。(因为可能会被其他线程敲定,所以只是表示条件曾经满足过,要重新到入口等待队列排队)。

image-20201027104515269

以下是对一个队列操作的举例:

有一个共享变量,就是队列。有两个条件变量,队列不为满或队列不为空,以此作为入队和出队的前提。

// 对于入队操作,如果队列已满,就需要等待直到队列不满,所以这里用了notFull.await();。
// 对于出队操作,如果队列为空,就需要等待直到队列不空,所以就用了notEmpty.await();。
// 如果入队成功,那么队列就不空了,就需要通知条件变量:队列不空notEmpty对应的等待队列。
// 如果出队成功,那就队列就不满了,就需要通知条件变量:队列不满notFull对应的等待队列。
public class BlockedQueue<T>{
  final Lock lock =
    new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull =
    lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty =
    lock.newCondition();
 
  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满 
        notFull.await();
      }  
      // 省略入队操作...
      // 入队后, 通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }
      // 省略出队操作...
      // 出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

synchronized以及wait()notify()notifyAll()就相当于是只有一个条件变量的MESA模型。

wait() 的编程范式

对于 MESA 管程来说,有一个编程范式,就是需要在一个 while 循环里面调用 wait()。这个是 MESA 管程特有的

while(条件不满足) {
  wait();
}

notify() 何时可以使用

前面说过,**尽量使用 notifyAll()**。如果要用notify(),需要具备以下条件:

  1. 所有等待线程拥有相同的等待条件;
  2. 所有等待线程被唤醒后,执行相同的操作;
  3. 只需要唤醒一个线程。

也就说,所有线程是等价的(等待条件执行操作一样),且只需要唤醒一个线程。

三种管程的区别

最主要的区别就是如何唤醒的区别。假如线程B正在条件变量等待队列里面等待,线程A正在执行。此时线程B执行的条件满足了,线程A如何通知线程B,即如何给线程B notify()? 此时线程A和线程B如何协作:

  • Hasen 模型:线程A比较自私。线程A自己执行完再通知线程B。即notify()放代码最后一行。此时可以保证一个线程执行。
  • Hoare 模型:线程A是老好人。线程A立即通知线程B,自己阻塞,线程B立刻执行。线程B执行完后唤醒线程A。此时可以保证一个线程执行。但比Hasen模型多一次阻塞唤醒。
  • MESA 模型:线程A比较灵活。线程A通知线程B,然后线程A继续自己玩自己的。线程B从条件变量等待队列进入到入口等待队列,可以重新排队啦。此时可以保证一个线程执行。好处是编程灵活,不用一定要把notify()放最后一行。缺点是线程B重新执行时,条件就不一定满足了,所以要用前面的while()范式循环检测条件变量。另外MESA多了notifyAll()的便利,但是缺点时前两个模式线程都会被执行到,而MESA由于一直等待,不一定会立刻被执行到。所以多了带超时时间的wait()。

总结

并发编程里两大核心问题——互斥和同步,都可以由管程来帮你解决。学好管程,理论上所有的并发问题你都可以解决,并且很多并发工具类底层都是管程实现的。

Java 内置的管程方案(synchronized)参考了 MESA 模型,并进行了精简。只使用一个条件变量,并自动加锁解锁。而JUC包的锁提供了多个条件变量,且要自己加锁解锁。

Java管程示意:

image-20201027111645934

提问

MESA管程模型是怎样的?分别如何解决互斥、同步问题?只允许一个线程进入管程。

三种管程模型有什么差别?唤醒时如何唤醒。Hasen(哈森)执行完再notify()、Hoare(霍尔)阻塞自己等唤醒、MESA(弥撒)直接踢入入口等待队列。

synchronized的和管程什么关系?内置的精简MESA管程。只有一个条件变量。且自动加锁解锁。

synchronized和reentrantLock有什么区别和联系?都是MESA管程,前者一个变量、自动加解锁。

9.Java线程(上):Java线程的生命周期

线程是操作系统里的一个概念,Java、C# 等开发语言对其进行了封装Java 语言里的线程本质上就是操作系统的线程,它们是一一对应的。

在操作系统层面,线程也有“生老病死”。我们先了解一下通用的线程生命周期模型,这部分内容也适用于很多其他编程语言;然后再详细地学习 Java 中线程的生命周期

通用的线程生命周期

通用的线程生命周期基本上可以用五态模型”来描述。分别是:初始状态、可运行状态、运行状态、休眠状态终止状态

image-20201027144421557

以下分别进行解析:

  1. 初始状态。线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建
  2. 可运行状态,线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。
  3. 运行状态。线程正在被CPU执行。有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,此时线程的状态就转换成了运行状态。
  4. 休眠状态:线程等待被唤醒。运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
  5. 终止状态:线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。

这五种状态在不同编程语言里会有简化合并。例如,C 语言的 POSIX Threads 规范,就把初始状态和可运行状态合并了;Java 语言里则把可运行状态和运行状态合并了,这两个状态在操作系统调度层面有用,而 JVM 层面不关心这两个状态,因为 JVM 把线程调度交给操作系统处理了。

除了简化合并,这五种状态也有可能被细化,比如,Java 语言里就细化了休眠状态(这个下面我们会详细讲解)。

Java中线程的生命周期

Java 语言中线程共有六种状态,分别是:

  1. NEW:初始化
  2. RUNNABLE:可运行/运行
  3. BLOCKED:阻塞
  4. WAITING:无限等待
  5. TIMED_WAITING:有限等待
  6. TERMINATED:终止

Java把通用线程的运行/可运行合并为RUNNABLE(因为线程丢给OS去玩,JVM不管),把休眠拆解为BLOCKED、WAITING、TIMED_WAITING。在操作系统层面,Java 线程中的 BLOCKED、WAITING、TIMED_WAITING 是一种状态,即前面我们提到的休眠状态。也就是说只要 Java 线程处于这三种状态之一,那么这个线程就永远没有 CPU 的使用权

对比上图通用生命周期,Java生命周期简化如下:

img

休眠被拆成了BLOCKED、WAITING、TIMED_WAITING,下面讨论这几个状态如何转换:

1.RUNNABLE 与 BLOCKED 的状态转换

只有一种场景会触发这种转换:线程等待 synchronized 的隐式锁。等待隐式锁时 RUNNABLE -> BLOCKED,获得隐式锁时 BLOCKED -> RUNNABLE。

线程调用阻塞式 API 时,Java 线程的状态会依然保持 RUNNABLE 状态。因为JVM 层面并不关心操作系统调度相关的状态。在操作系统层面,线程是会转换到休眠状态的。

在 JVM 看来,等待 CPU 使用权(操作系统层面此时处于可执行状态)与等待 I/O(操作系统层面此时处于休眠状态)没有区别,都是在等待某个资源,所以都归入了 RUNNABLE 状态

2.RUNNABLE 与 WAITING 的状态转换

有三种场景会触发这种转换:

  1. 调用无参的 lockObject.wait() 方法(获得 synchronized 隐式锁的线程)。
  2. 调用无参的 thread.join() 方法。
  3. 调用无参的 LockSupport.park() 方法。

Java 并发包中的锁,都是基于LockSupport 实现的。调用 LockSupport.park() 方法,当前线程会阻塞,线程的状态会从 RUNNABLE 转换到 WAITING。调用 LockSupport.unpark(Thread thread) 可唤醒目标线程,目标线程的状态又会从 WAITING 状态转换到 RUNNABLE。

3.RUNNABLE 与 TIMED_WAITING 的状态转换

有五种场景会触发这种转换:

  1. 调用带超时参数object.wait(long timeout) 方法(获得 synchronized 隐式锁的线程);
  2. 调用带超时参数thread.join(long millis) 方法;
  3. 调用带超时参数Thread.sleep(long millis) 方法;
  4. 调用带超时参数LockSupport.parkNanos(Object blocker, long deadline) 方法;
  5. 调用带超时参数LockSupport.parkUntil(long deadline) 方法。

TIMED_WAITING 和 WAITING 状态的区别,仅仅是触发条件多了超时参数

4.从 NEW 到 RUNNABLE 状态

调用线程对象的 threadObj.start() 方法即可

扩展:

创建线程对象的方法:

1.方法一:继承 Thread 对象,重写 run() 方法

// 自定义线程对象
class MyThread extends Thread {
  public void run() {
    // 线程需要执行的代码
    ......
  }
}
// 创建线程对象
MyThread myThread = new MyThread();

2.方法二:实现 Runnable 接口,重写 run() 方法,并将该实现类作为创建 Thread 对象的参数。

// 实现 Runnable 接口
class Runner implements Runnable {
  @Override
  public void run() {
    // 线程需要执行的代码
    ......
  }
}
// 创建线程对象
Thread thread = new Thread(new Runner());

以上两种方法其实是一种方法。看thread::run()的源码:

public class Thread implements Runnable {
    @Override
        public void run() {
            if (target != null) {
                target.run();
            }
        }
}

从 NEW 状态转换到 RUNNABLE 状态很简单,只要调用线程对象的 start() 方法即可:

MyThread myThread = new MyThread();
// 从 NEW 状态转换到 RUNNABLE 状态
myThread.start();

5.从 RUNNABLE 到 TERMINATED 状态

以下两种情况,会导致线程终止:

  1. run() 方法执行完或抛异常导致线程终止。
  2. thread.interrupt()方法停止线程。另外有一个thread.stop()方法可以直接停止线程,但是已经被标记为@Deprecated

interrupt()方法

thread::stop()thread::interrupt()的区别:

  • stop()方法直接把线程干掉。如果用的是ReentrantLock,根本没有unlock的机会。
  • interrupt() 方法仅仅是通知线程停止,线程可以选择执行,也可以选择无视。

调用thread::interrupt()可以通知线程终止。线程怎么收到interrupt()通知的呢?一种是异常,另一种是主动检测

  1. 线程在等待时,被打断,切换到RUNNABLE并抛出异常:当线程 A 处于 WAITING、TIMED_WAITING 状态时,如果其他线程调用线程 A 的 interrupt() 方法,会使线程 A 返回到 RUNNABLE 状态,同时线程 A 的代码会触发 InterruptedException 异常。上面我们提到转换到 WAITING、TIMED_WAITING 状态的触发条件,都是调用了类似 wait()、join()、sleep() 这样的方法,我们看这些方法的签名,发现都会 throws InterruptedException 这个异常。这个异常的触发条件就是:其他线程调用了该线程的 interrupt() 方法。

  2. 线程在运行且被阻塞时,被打断,抛出异常或直接返回。当线程 A 处于 RUNNABLE 状态时,并且阻塞在 java.nio.channels.InterruptibleChannel 上时,如果其他线程调用线程 A 的 interrupt() 方法,线程 A 会触发 java.nio.channels.ClosedByInterruptException 这个异常;而阻塞在 java.nio.channels.Selector 上时,如果其他线程调用线程 A 的 interrupt() 方法,线程 A 的 java.nio.channels.Selector 会立即返回。

  3. 线程在运行时,被打断,需要调用 isInterrupted() 方法主动检测它自己的打断状态。上面这两种情况属于被中断的线程通过异常的方式获得了通知。还有一种是主动检测,如果线程处于 RUNNABLE 状态,并且没有阻塞在某个 I/O 操作上,例如中断计算圆周率的线程 A,这时就得依赖线程 A 主动检测中断状态了。如果其他线程调用线程 A 的 interrupt() 方法,那么线程 A 可以通过 isInterrupted() 方法,检测是不是自己被中断了。isInterrupted()是一个Native方法。

注意:在触发 InterruptedException 异常的同时,JVM 会同时把线程的中断标志位清除,所以这个时候th.isInterrupted()返回的是 false。

总结

多线程程序很难调试,出了 Bug 基本上都是靠日志,靠线程 dump 来跟踪问题,分析线程 dump 的一个基本功就是分析线程状态,大部分的死锁、饥饿、活锁问题都需要跟踪分析线程的状态。

可以通过 jstack 命令或者Java VisualVM这个可视化工具将 JVM 所有的线程栈信息导出来,完整的线程栈信息不仅包括线程的当前状态、调用栈,还包括了锁的信息。

提问

通用的线程生命周期有哪些?怎么转换?

Java线程生命周期有哪些?和通用的生命周期有什么区别?调用阻塞API时,如IO,JVM和OS层面的线程状态怎么转换?

Java线程生命周期怎么转换?

stop()和interrupt()有什么区别?

被 interrupt 的线程,怎么收到通知?休眠、运行且阻塞Api、运行

如何通过日志及工具诊断多线程运行时Bug?

10.Java线程(中):创建多少线程合适?

要解决这个问题,首先要分析以下两个问题:

  1. 为什么要使用多线程?
  2. 多线程的应用场景有哪些?

为什么要使用多线程?

用多线程是为了提高性能。

性能有两个核心指标:一个是时间维度延迟,另外一个是空间维度吞吐量延迟指的是发出请求到收到响应这个过程的时间;延迟越短,意味着程序执行得越快,性能也就越好。 吞吐量指的是在单位时间内能处理请求的数量;吞吐量越大,意味着程序能处理的请求越多,性能也就越好。这两个指标内部有一定的联系(同等条件下,延迟越短,吞吐量越大)

我们所谓提升性能,从度量的角度,主要是降低延迟,提高吞吐量。这也是我们使用多线程的主要目的。

我们如何降低延迟,提高吞吐量?我们从多线程的应用场景说起。

多线程的应用场景

要想“降低延迟,提高吞吐量”,基本上有两个方向,软件方向是优化算法,硬件方向是将硬件的性能发挥到极致。前者属于算法范畴,后者和并发编程息息相关。那计算机主要有哪些硬件呢?主要是两类:一个是 I/O,一个是 CPU。简言之,在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率

操作系统层面已经解决了单一的硬件设备的利用率问题,而我们的并发程序,往往需要 CPU 和 I/O 设备相互配合工作,也就是说,我们需要解决 CPU 和 I/O 设备综合利用率的问题综合利用率的问题,操作系统虽然没有办法完美解决,但是却给我们提供了方案,那就是:多线程

以下是多线程提升综合利用率的示例,多线程将综合利用率从50%提升至100%:

image-20201027173010415

image-20201027173035688

以上,单位时间处理的请求数量翻了一番,也就是说吞吐量提高了 1 倍。此时可以逆向思维一下,如果 CPU 和 I/O 设备的利用率都很低,那么可以尝试通过增加线程来提高吞吐量

另外,单核时代,如果是计算密集型,多线程反而会增加线程切换,降低利用率。而多核时代,多线程可以提高cpu核的利用率。

创建多少线程合适?

对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,用于当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。

对于 I/O 密集型的计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:最佳线程数 =1 +(I/O 耗时 / CPU 耗时)。针对多核,**最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]**。具体工程上,我们要估算这个比例,然后做各种不同场景下的压测来验证我们的估计。压测时,我们需要重点关注 CPU、I/O 设备的利用率和性能指标(响应时间、吞吐量)之间的关系。

目的是为了按比例填平。如下图:

image-20201027174142587

总结

设置线程数,把我一个原则,就是将硬件的性能发挥到极致。工程上,我们先进行估算,然后压测时重点关注 CPU、I/O 设备的利用率和性能指标(响应时间、吞吐量)之间的关系即可。IO/CPU 的耗时测试可以用apm(Application Performance Management)工具,如skywalking、zipkin等。

提问

为什么要使用多线程?从优化算法、硬件优化(CPU/IO)、硬件单一利用率、硬件综合利用率回答。

单核主机用多线程有意义吗?有意义,IO密集型。

为什么Redis单线程却很快?Redis把读写操作都放在了CPU和内存,几乎没有I/O。又减少了多线程上下文切换的过程,因此Redis即便是单线程也很快。

线程设置多少合理?计算密集、I/O密集,按比例,单核/多核。

11.Java线程(下):为什么局部变量是线程安全的

多个线程同时访问共享变量的时候,会导致并发问题。

局部变量是线程安全的。因为每个线程都有自己的调用栈,局部变量保存在线程各自的调用栈里面。要弄懂这个,我们需要弄清楚方法的执行。在CPU层面,没有方法的概念,所有的东西都是转换成一条条指令执行。

方法是如何被执行的

方法间的调用过程:

如执行以下代码:

int a = 7;
int[] b = fibonacci(a);
int[] c = b;

image-20201027180703831

局部变量放在哪里

每个方法都有一个栈帧,方法的入参、局部变量、返回地址被存到栈帧里。栈帧存到CPU的堆栈寄存器里,也叫调用栈。当调用方法时,会创建新的栈帧,并压入调用栈;当方法返回时,栈帧弹出。栈帧和方法同生共死

image-20201027181258861

我们也可以联想到,Java为什么吧 new 出来的对象是在堆里,局部变量放到栈里。因为局部变量是和方法同生共死的,一个变量如果想跨越方法的边界,就必须创建在堆里。

调用栈与线程

每个线程都有自己独立的调用栈

如图所示,线程 A、B、C 每个线程都有自己独立的调用栈。

image-20201027181446657

线程封闭

方法里的局部变量,因为不会和其他线程共享,所以没有并发问题。这个解决并发问题的思路就是线程封闭,比较官方的解释是:仅在单线程内访问数据

采用线程封闭技术的案例非常多,例如从数据库连接池里获取的连接 Connection,在 JDBC 规范里并没有要求这个 Connection 必须是线程安全的。数据库连接池通过线程封闭技术,保证一个 Connection 一旦被一个线程获取之后,在这个线程关闭 Connection 之前的这段时间里,不会再分配给其他线程,从而保证了 Connection 不会有并发问题。

ThreadLocal原理?待补充

提问

局部变量是线程安全的吗?为什么?线程->调用栈->栈帧。

线程和调用栈的对应关系?一个调用栈对应几个线程?

线程封闭是啥?举个例子?

12.如何用面向对象思想写好并发程序

如何才能用面向对象思想写好并发程序呢?可以从封装共享变量识别共享变量间的约束条件制定并发访问策略这三个方面下手。

一、封装共享变量

关键是要严格把控共享变量的访问路径。结合面向对象的特点,我们把共享变量作为对象的属性,那对于共享变量的访问路径就是对象的公共方法。利用面向对象思想写并发程序的思路,就是:将共享变量作为对象属性封装在内部,对所有公共方法制定并发访问策略

如下,线程安全的计数器举例:

public class Counter {
  private long value;
  synchronized long get(){
    return value;
  }
  synchronized long addOne(){
    return ++value;
  }
}

此外,对于不会发生变化的共享变量,可以用 final 关键字来修饰。这也可以向其他的同事表明意图,这些变量已经考虑了线程安全问题。

二、识别共享变量间的约束条件

共享变量间约束条件,决定了并发访问策略。例如,库存管理,库存量不能太高,也不能太低。如下:

public class SafeWM {
  // 库存上限
  private final AtomicLong upper =
        new AtomicLong(0);
  // 库存下限
  private final AtomicLong lower =
        new AtomicLong(0);
  // 设置库存上限,原子类是线程安全的,所以这里不用同步
  void setUpper(long v){
    upper.set(v);
  }
  // 设置库存下限,原子类是线程安全的,所以这里不用同步
  void setLower(long v){
    lower.set(v);
  }
  // 省略其他业务代码
}

以上忽略了共享变量之间的约束条件,即库存下限要小于库存上限。加上参数校验能解决问题吗?

public class SafeWM {
  // 库存上限
  private final AtomicLong upper =
        new AtomicLong(0);
  // 库存下限
  private final AtomicLong lower =
        new AtomicLong(0);
  // 设置库存上限
  void setUpper(long v){
    // 检查参数合法性
    if (v < lower.get()) {
      throw new IllegalArgumentException();
    }
    upper.set(v);
  }
  // 设置库存下限
  void setLower(long v){
    // 检查参数合法性
    if (v > upper.get()) {
      throw new IllegalArgumentException();
    }
    lower.set(v);
  }
  // 省略其他业务代码
}

显然并发的时候锁不住,满足不了下限小于上限。所以我们要把锁的粒度再增大。在设计阶段,我们一定要识别出所有共享变量之间的约束条件共享变量之间的约束条件,反映在代码里,基本上都会有 if 语句,一定要特别注意竞态条件

三、制定并发访问策略

制定并发访问策略,从方案上来看,无外乎就是以下“三件事”。后面的第二部分工具类,都是讲的并发访问策略。

  1. 避免共享:主要是利用线程本地存储以及为每个任务分配独立的线程
  2. 不变模式: 即无状态。Java 用得很少,但在其他领域却有着广泛的应用,例如 Actor 模式、CSP 模式以及函数式编程的基础都是不变模式。
  3. 管程及其他同步工具:Java 领域万能的解决方案是管程,但是对于很多特定场景,使用Java 并发包提供的读写锁并发容器等同步工具会更好。

除了这些方案之外,还有一些宏观的原则,有助于我们写出“健壮”的并发程序。主要有三条:

  1. 优先使用成熟的工具类Java SDK 并发包里提供了丰富的工具类,基本上能满足日常需要,基本无需再自造轮子。
  2. 迫不得已时才使用低级的同步原语:低级的同步原语主要指的是 synchronized、Lock、Semaphore 等,这些虽然感觉简单,但实际上并没那么简单,一定要小心使用。
  3. 避免过早优化:安全第一,并发程序首先要保证安全出现性能瓶颈后再优化

总结

利用面向对象思想编写并发程序,一个关键点就是利用面向对象里的封装特性。而对共享变量进行封装,要避免“逸出”,所谓“逸出”简单讲就是共享变量逃逸到对象的外面。

提问

如何封装共享变量?

如何识别共享变量间的约束条件?

并发访问共享变量的策略,有哪些方法?

写好并发程序,有哪些经验或方法论?

13.并发编程理论基础总结串讲

1.总结串讲

起源是一个硬件的核心矛盾:CPU 与内存、I/O 的速度差异,系统软件(操作系统、编译器)在解决这个核心矛盾的同时,引入了可见性、原子性和有序性问题,这三个问题就是很多并发程序的 Bug 之源。这,就是01的内容。

那如何解决这三个问题呢?既然可见性、有序性问题是由缓存及编译优化引起,那么我们按需禁用缓存及编译优化,即可解决可见性、有序性问题,所以有了volatile、synchronized、final,Java内存模型(六/八项Happens-Before规则)约束编译器优化行为。关于原子性问题,本质上是解决中间状态可见性问题,所以我们有了互斥锁方案。在02,我们介绍了 Java 内存模型,以应对可见性和有序性问题;在0304,我们介绍了互斥锁,以解决原子性问题。

互斥锁是解决并发问题的核心工具,但也可能会带来死锁问题。05介绍了死锁的产生原因以及解决方案;同时还引出一个线程间协作的问题,这就引出了06的线程间的协作机制:等待 - 通知。

前六篇文章,我们更多地是站在微观的角度看待并发问题。07则是换一个角度,站在宏观的角度重新审视并发编程相关的概念和理论。原子性问题的核心是互斥,其实本质只是需要我们的多线程要安全、不卡死、性能,其实也就是安全性、活跃性以及性能问题。对应了前文的锁、死锁/活锁/饥饿、细粒度锁/乐观锁等问题。锁还引申出了一把锁锁多个资源的问题。

08介绍了管程,是 Java 并发编程技术的基础。并发编程里两大核心问题——互斥和同步,都是可以由管程来解决的。

至此,并发编程相关的问题,理论上都找到问题所在,并能给出理论上的解决方案了。

091011介绍了线程知识,因为Java并发编程要靠多线程来实现。包括线程的生命周期、如何计算合适的线程数以及线程内部是如何执行的。

12,我们介绍了如何用面向对象思想写好并发程序,在 Java 语言里,面向对象思想能够让并发编程变得更简单。

2.关键词

a.

  • 分工
  • 同步:管程
  • 互斥:管程

b.

  • 可见性:Java内存模型
  • 有序性:Java内存模型
  • 原子性:锁。一把锁锁多个资源。

c.

  • 安全性:互斥、锁
  • 活跃性:死锁、活锁、饥饿
  • 性能:细粒度锁、竞态、读写锁、CopyOnWrite、CAS乐观锁等等

d.

  • 线程生命周期:通用模型、Java模型
  • 创建多少线程:IO/CPU综合利用率,性能、延迟、吞吐量、并发
  • 局部变量:方法、栈桢、调用栈

e.

  • 共享变量封装
  • 共享变量间约束
  • 并发访问策略

之后是一些最佳实践的总结:

3.用锁的最佳实践

略。……. 后续补充

二、并发工具类

14.Lock和Condition(上):隐藏在并发包中的管程

Java SDK 并发包最核心的还是其对管程的实现。

并发编程领域,有两大核心问题,一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。

Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

本篇主要介绍Lock的使用。Java 语言本身提供的 synchronized 也是管程的一种实现,那为什么我们还要“重复”造轮子呢?

再造管程的理由

比起 synchronized:1.可以防止死锁,破坏不可抢占条件。2.管程模型中,可以有多个condition

我们还记得,前篇中提到解决死锁问题时,我们有几种方案。互斥(前提条件)、*占有且等待(一次申请所有)不可抢占(主动放弃)循环等待(资源按序获取)*。其中,一次申请所有和资源按序获取实现起来不够灵活,而1.破坏不可抢占方案synchronized是没办法做到的,而且synchronized申请不到资源后,线程就直接进入阻塞了。2.不满足条件时,将会一直占用资源。但是我们希望做到:

对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。

所以我们重新设计一把互斥锁去支持“破坏不可抢占方案”,那该怎么设计呢?有三种方案,即获取不到资源时,并不进入永久阻塞,而是直接返回、支持超时、支持唤醒:

  1. 非阻塞地获取锁直接不等待,之后可以选择自己放弃锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  2. 支持超时等待时可以自己醒,之后可以选择自己放弃锁。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 能够响应中断等待时可以被唤醒,之后可以选择自己放弃锁。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。

这三种方案可以全面弥补 synchronized 的问题。这三个方案就是“重复造轮子”的主要原因,体现在 API 上,就是 Lock 接口的三个方法:

// 支持非阻塞获取锁的 API
boolean tryLock();
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit) 
  throws InterruptedException;
// 支持中断的 API
void lockInterruptibly() 
  throws InterruptedException;

如何保证可见性

Java JDK使用Lock要遵守try{} finally{},在finally里释放锁。

我们知道使用synchronized时,可见性由Happens-Before原则保证:synchronized 相关的规则:synchronized 的解锁 Happens-Before 于后续对这个锁的加锁

那么使用Lock时,可见性怎么保证呢?Java SDK 里面锁的实现非常复杂,简单介绍一下:它是利用了 volatile 相关的 Happens-Before 规则。Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会先读state 的值;解锁的时候,也会写 state 的值(简化后的代码如下面所示)。

class SampleLock {
  volatile int state;
  // 加锁
  lock() {
    // 省略代码无数
    state = 1;
  }
  // 解锁
  unlock() {
    // 省略代码无数
    state = 0;
  }
}

以下代码线程T2是可以看到T1对value的操作结果的,用到了3个Happens-Before 原则:

  1. 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
  2. volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
  3. 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。
class X {
  private final Lock rtl =
  new ReentrantLock();
  int value;
  public void addOne() {
    // 获取锁
    rtl.lock();  
    try {
      value+=1;
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
}

什么是可重入锁

前面我们创建的锁是ReentrantLock,也就是可重入锁所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁

除了可重入锁,还有可重入函数的概念。所谓可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;这意味着线程安全。(无状态?)

可重入锁举例,如下,在2处是可以再一次获得锁的:

class X {
  private final Lock rtl =
  new ReentrantLock();
  int value;
  public int get() {
    // 获取锁
    rtl.lock();         ②
    try {
      return value;
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
  public void addOne() {
    // 获取锁
    rtl.lock();  
    try {
      value = 1 + get(); ①
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
}

公平锁与非公平锁

可以根据传入的参数构建公平锁或非公平锁,区别是,唤醒时,公平锁是入口等待队列按照排队的顺序被唤醒,非公平锁则没有这种保障。此外非公平锁在线程释放锁之后,如果来了一个线程获取锁,他不必去排队就直接获取,如果获取到,不会入队。获取不到后再进行排队:

// 无参构造函数:默认非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
    sync = fair ? new FairSync() 
                : new NonfairSync();
}

用锁的最佳实践

用锁可以提高性能,但是可能会出现死锁或是安全性问题。大师 Doug Lea《Java 并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践,它们分别是:

  1. 永远只在更新对象的成员变量时加锁
  2. 永远只在访问可变的成员变量时加锁
  3. 永远调用其他对象的方法时加锁

关于第三条,因为其他对象里有可能有慢操作、sleep()、其他类的加锁导致双重加锁导致死锁等。

总结

有了synchronized为什么要造Lock这个轮子?synchronized无法做到不满足条件时主动放弃,线程会一直阻塞占用资源。我们希望可以主动放弃,所以又新增了Lock的管程轮子。重新设置的互斥锁有以下三个思路或功能。支持非阻塞、超时、可中断。此外,Lock还支持多条件变量。

用锁的三个最佳实践。除此之外还有减少锁的持有时间、减小锁的粒度等等。

个人理解,锁的用法,就是在共享变量外面,再封装一层线程安全的操作,就形成了带锁的类。核心就是:共享变量 + 并发访问路径控制(用锁)。

提问

synchronized和Lock有什么区别?有了synchronized为什么要造Lock这个轮子?只有一个条件队列、加锁解锁默认。解决占有且等待问题,以及无限等待不释放资源问题,同时也加大了处理死锁的灵活性。

Lock的可见性是怎么保证的?顺序、volatile、传递性原则。

什么是可重入锁?什么是公平锁?

用锁的最佳实践?

关键字

synchronized、Lock、可见性、可重入锁、公平锁、非公平锁、最佳实践。

15.Lock和Condition(下):Dubbo如何用管程实现异步转同步?

这一节讲Java SDK 并发包里的 Condition,Condition 实现了管程模型里面的条件变量

Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的,这是二者的一个重要区别。

Lock&Condition的使用范式

在很多并发场景下,用多个条件变量,可以让我们的程序可读性更好。

以下是用两个条件变量实现入队、出队操作的列子:

public class BlockedQueue<T>{
  final Lock lock =
    new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull =
    lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty =
    lock.newCondition();
 
  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满
        notFull.await();
      }  
      // 省略入队操作...
      // 入队后, 通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }  
      // 省略出队操作...
      // 出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

注意到条件变量Condition是通过锁来产生的,然后**await()signalsignalAll都是由Condition来操作**。再然后是注意等待条件满足时的while()范式。以上代码是一个标准的lock使用范式,包含了try{} finally{unlock();}while(条件不满足) {await();}两个标准范式

同步和异步

同步和异步的区别:通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步

Java默认用同步处理。要使用异步,有以下两种方式:

  1. 新建线程调用方法:调用方创建一个子线程,在子线程中执行方法调用,这种调用我们称为异步调用
  2. 方法里新建线程执行逻辑:方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return,这种方法我们一般称为异步方法

Dubbo 源码分析

这段源码是一个Lock&Condition的实践示例,实现的是异步转同步的一个实现,用的范式也是很典型的锁的范式。简单的总结,就是死循环不停的去检测条件是否满足,满足时执行。就是一个“等待-通知”机制。

我们知道,TCP 协议本身是异步的。而我们工作中经常用到的 RPC 调用,比如Dubbo,用起来像是同步的,而它底层的TCP又是异步的。其实是Dubbo做了一个异步转同步的操作。那么怎么实现这种转换呢?下面是简化代码:

// 创建锁与条件变量
private final Lock lock 
    = new ReentrantLock();
private final Condition done 
    = lock.newCondition();
 
// 调用方通过该方法等待结果
Object get(int timeout){
  long start = System.nanoTime();
  lock.lock();
  try {
    while (!isDone()) {
      done.await(timeout);
      long cur=System.nanoTime();
      if (isDone() || 
          cur-start > timeout){
        break;
      }
    }
  } finally {
    lock.unlock();
  }
  if (!isDone()) {
    throw new TimeoutException();
  }
  return returnFromResponse();
}
// RPC 结果是否已经返回
boolean isDone() {
  return response != null;
}
// RPC 结果返回时调用该方法   
private void doReceived(Response res) {
  lock.lock();
  try {
    response = res;
    if (done != null) {
      done.signalALL();	// 注意这里用到signalAll(),用signal()会导致大量请求超时。
    }
  } finally {
    lock.unlock();
  }
}

总结

Lock&Condition 实现的管程相对于 synchronized 实现的管程来说更加灵活、功能也更丰富。Dubbo同步转异步就是一个典型的实践。

Java SDK 并发包里锁和条件变量的实现,可以参照《Java并发编程的艺术》第 5 章《Java 中的锁》

提问

synchronized或lock&condition的使用范式?以入队出队为例

Java锁和条件的实现原理?

怎么实现TCP异步转同步?

16.Semaphore:如何快速实现一个限流器

这一节将信号量,Semaphore,类似生活中的信号灯

信号量由计算机科学家迪杰斯特拉(Dijkstra)于 1965 年提出,直到 1980 年管程被提出之前,信号量一直都是并发编程领域的唯一选择。目前几乎所有支持并发编程的语言都支持信号量机制。

下面现将信号量模型,再讲怎么用,最后实现一个例子:限流器。

信号量模型

信号量模型可以简单概括为:一个计数器,一个等待队列,三个方法。其中计数器和等待队列对外透明,外界通过三个方法访问:init()、down() 和 up()。

image-20201031094502860

三个方法的语义:

  • **init()*:设置计数器的初始值。表示能让几个线程同时进入临界区,互斥则是1*。
  • **down()*:计数器的值减 1(进入临界区*);如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
  • *up()*:计数器的值加 1(离开临界区*);如果此时计数器的值小于或者等于 0,则唤醒*等待队列中的一个线程,并将其从等待队列中移除。

三个方法都是原子性的。在 Java SDK 里面,信号量模型由 java.util.concurrent.Semaphore 实现,Semaphore 这个类能够保证这三个方法都是原子操作。

*down()、up()*这两个操作历史上最早称为 P 操作V 操作。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 *release()*。

以下是代码化的信号量模型:

class Semaphore{
  // 计数器
  int count;
  // 等待队列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      // 将当前线程插入等待队列
      // 阻塞当前线程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      // 移除等待队列中的某个线程 T
      // 唤醒线程 T
    }
  }
}

如何使用信号量

*在进入临界区之前执行一下 down() 操作(P/acquire()),退出临界区之前执行一下 up() 操作(V/release())*就可以了。以下是Java代码示例:

static int count;
// 初始化信号量
static final Semaphore s 
    = new Semaphore(1);
// 用信号量保证互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

实现一个限流器

Semaphore 可以允许多个线程访问一个临界区。这个用Lock来实现比较麻烦。现实中是有这种需求的,比如各种池化资源,如连接池、对象池、线程池等等。

对象池代码举例:

class ObjPool<T, R> {
  final List<T> pool;
  // 用信号量实现限流器
  final Semaphore sem;
  // 构造函数
  ObjPool(int size, T t){
    pool = new Vector<T>(){};
    for(int i=0; i<size; i++){
      pool.add(t);
    }
    sem = new Semaphore(size);
  }
  // 利用对象池的对象,调用 func
  R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();	// 注意这里的P操作
    try {
      t = pool.remove(0);
      return func.apply(t);
    } finally {
      pool.add(t);
      sem.release(); // 注意这里的V操作
    }
  }
}
// 创建对象池
ObjPool<Long, String> pool = 
  new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

信号量唤醒时,是随机唤醒等待队列还是按顺序?这样有没有可能造成饥饿?

总结

Java重点还是支持管程模型,管程易用性和工程性更好。例如如果用信号量解决阻塞队列会很麻烦。

提问

什么是PV操作/信号量?模型?

PV操作怎么用?

为什么有了Lock还要Semaphore?Semaphore 可以允许多个线程访问一个临界区。

信号量唤醒时,是随机唤醒等待队列还是按顺序?这样有没有问题?

实现一个限流器?

17.ReadWriteLock:如何快速实现一个完备的缓存

JDK包中,除了管程和信号量,其他工具类类都是为了分场景优化性能,提升易用性

今天我们用读写锁ReadWriteLock来解决读多写少的并发场景,典型的就是缓存。

什么是读写锁

读写锁不是 Java 语言特有的,所有的读写锁都遵守以下三条基本原则:

  1. 允许多个线程同时读共享变量;
  2. 只允许一个线程写共享变量;
  3. 如果一个写线程正在执行写操作,此时禁止读线程共享变量,也禁止其他操作。

读写锁与互斥锁的区别是读写锁允许多个线程同时读共享变量。这样在读多写少的场景下,它的性能就优于互斥锁

快速实现一个缓存

下面我们用ReadWriteLock来快速实现一个通用的缓存工具类。

我们声明了一个 Cache<K, V> 类,缓存的数据保存在 Cache 类内部的 HashMap 里面,用读写锁 ReadWriteLock 来保证 HashMap 线程安全。

ReadWriteLock 是一个接口,**它的实现类是 ReentrantReadWriteLock*。注意先新建锁,然后获得读锁和写锁,并且用到了 try{} finally{}范式*。

class Cache<K,V> {
  final Map<K, V> m = new HashMap<>();
  final ReadWriteLock rwl = new ReentrantReadWriteLock();
  // 读锁
  final Lock r = rwl.readLock();
  // 写锁
  final Lock w = rwl.writeLock();
  // 读缓存
  V get(K key) {
    r.lock();
    try { return m.get(key); }
    finally { r.unlock(); }
  }
  // 写缓存
  V put(String key, Data v) {
    w.lock();
    try { return m.put(key, v); }
    finally { w.unlock(); }
  }
}

缓存初始化示例

缓存都要初始化。我们可以一次加载完成,也可以按需加载。

以下是从数据库按需加载的示例。注意到我们6、7处,我们获取到写锁以后,又查了一次有没有缓存。这是为了防止其他线程已经查到数据了。多线程编程时,我们要时时刻刻想着其他线程同时运行当前代码的情况。

class Cache<K,V> {
  final Map<K, V> m = new HashMap<>();
  final ReadWriteLock rwl = new ReentrantReadWriteLock();
  final Lock r = rwl.readLock();
  final Lock w = rwl.writeLock();
 
  V get(K key) {
    V v = null;
    // 读缓存
    r.lock();         ①
    try {
      v = m.get(key); ②
    } finally{
      r.unlock();     ③
    }
    // 缓存中存在,返回
    if(v != null) {   ④
      return v;
    }  
    // 缓存中不存在,查询数据库
    w.lock();         ⑤
    try {
      // 再次验证
      // 其他线程可能已经查询过数据库
      v = m.get(key); ⑥
      if(v == null){  ⑦
        // 查询数据库
        v= 省略代码无数
        m.put(key, v);
      }
    } finally{
      w.unlock();
    }
    return v; 
  }
}

读写锁的升级和降级

升级即拿到读锁是否还能拿写锁,降级即拿到写锁后是否能再拿读锁。

读写锁不支持锁的升级,支持锁的降级

简单理解:

不支持锁的升级是为了保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其写操作对其他获取到读锁的线程不可见

支持锁降级中读锁的获取也是为了保证数据的可见性,是为了防止自己读数据前数据又被其他线程写掉。如果当前线程直接释放写锁,此刻另一个线程(记作线程T)获取写锁并修改数据,则当前线程无法感知线程T的数据更新。而如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。

获取写锁的前提是读锁和写锁均未被占用。获取读锁的前提是没有其他线程占用写锁。

锁的升级,会造成死锁:

// 读缓存
r.lock();         ①
try {
  v = m.get(key); ②
  if (v == null) {
    w.lock();
    try {
      // 再次验证并更新缓存
      // 省略详细代码
    } finally{
      w.unlock();
    }
  }
} finally{
  r.unlock();     ③
}

锁的降级,没有问题:

class CachedData {
  Object data;
  volatile boolean cacheValid;
  final ReadWriteLock rwl = new ReentrantReadWriteLock();
  // 读锁  
  final Lock r = rwl.readLock();
  // 写锁
  final Lock w = rwl.writeLock();
  
  void processCachedData() {
    // 获取读锁
    r.lock();
    if (!cacheValid) {
      // 释放读锁,因为不允许读锁的升级
      r.unlock();
      // 获取写锁
      w.lock();
      try {
        // 再次检查状态  
        if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
        // 释放写锁前,降级为读锁
        // 降级是可以的
        r.lock(); ①
      } finally {
        // 释放写锁
        w.unlock(); 
      }
    }
    // 此处仍然持有读锁
    try {use(data);} 
    finally {r.unlock();}
  }
}

总结

读写锁类似于 ReentrantLock,也支持公平模式和非公平模式。读锁和写锁都实现了 java.util.concurrent.locks.Lock 接口,所以除了支持 lock() 方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。

我们实现的缓存没有支持数据同步,数据同步最简单的方案就是超时机制

读写锁用来解决读多写少的场景,支持降级不支持升级。注意用的时候使用try{} finally{}范式。

为了防止读锁饿死,可以用公平锁

提问

什么是读写锁?用来解决什么问题?和互斥锁什么区别?ReadWriteLock

用读写锁实现一个Cathe缓存工具类

读写锁是否支持锁的升级?锁的降级呢?为什么?

18.StampedLock:有没有比读写锁更快的锁?

也是用于读多写少的场景,不过比读写锁更快。

StampedLock 支持的三种锁模式

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。

而 StampedLock 支持三种模式,分别是:写锁悲观读锁乐观读

写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。相关的示例代码如下

final StampedLock sl = new StampedLock();
  
// 获取 / 释放悲观读锁示意代码
long stamp = sl.readLock();
try {
  // 省略业务相关代码
} finally {
  sl.unlockRead(stamp);
}
 
// 获取 / 释放写锁示意代码
long stamp = sl.writeLock();
try {
  // 省略业务相关代码
} finally {
  sl.unlockWrite(stamp);
}

StampedLock性能好是因为支持乐观读,乐观读是无锁操作。ReadWriteLock在多个线程同时读时,写操作会被阻塞。而StampedLock多个线程同时读时,允许一个线程获取写操作。

原理就是获取一个版本号,看版本号有没有被修改,没有修改则读取的结果有效。有修改则重新读或升级悲观读锁

以下是操作示例,先用tryOptimisticRead()获取一个stamp,然后用sllock.validate(stamp)看下有没有被修改即可。被修改,要么循环乐观读,要么锁升级为悲观读。推荐锁升级方式:

class Point {
  private int x, y;
  final StampedLock sl = new StampedLock();
  // 计算到原点的距离  
  int distanceFromOrigin() {
    // 乐观读
    long stamp = sl.tryOptimisticRead();
    // 读入局部变量,
    // 读的过程数据可能被修改
    int curX = x, curY = y;
    // 判断执行读操作期间,
    // 是否存在写操作,如果存在,
    // 则 sl.validate 返回 false
    if (!sl.validate(stamp)){
      // 升级为悲观读锁
      stamp = sl.readLock();
      try {
        curX = x;
        curY = y;
      } finally {
        // 释放悲观读锁
        sl.unlockRead(stamp);
      }
    }
    return Math.sqrt(
      curX * curX + curY * curY);
  }
}

StampedLock 使用注意事项

  • StampedLock 的功能仅仅是 ReadWriteLock 的子集,简单场景性能更高,可以代替ReadWriteLock 。
  • StampedLock 不支持重入
  • StampedLock 的悲观读锁、写锁都不支持条件变量

  • **使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()**。

如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。例如下面的代码中,线程 T1 获取写锁之后将自己阻塞,线程 T2 尝试获取悲观读锁,也会阻塞;如果此时调用线程 T2 的 interrupt() 方法来中断线程 T2 的话,你会发现线程 T2 所在 CPU 会飙升到 100%:

final StampedLock lock = new StampedLock();
Thread T1 = new Thread(()->{
  // 获取写锁
  lock.writeLock();
  // 永远阻塞在此处,不释放写锁
  LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
  // 阻塞在悲观读锁
  lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();

总结

StampedLock就是用到了版本管理的读写锁,与读写锁相比,多了乐观锁,少了可重入、悲观读写锁的条件变量、乐观锁不能用中断。以下是简化的Java官方模板,使用时建议用以下模板:

StampedLock 读模板:

final StampedLock sl = new StampedLock();
 
// 乐观读
long stamp = 
  sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验 stamp
if (!sl.validate(stamp)){
  // 升级为悲观读锁
  stamp = sl.readLock();
  try {
    // 读入方法局部变量
    .....
  } finally {
    // 释放悲观读锁
    sl.unlockRead(stamp);
  }
}
// 使用方法局部变量执行业务操作
......

StampedLock 写模板:

long stamp = sl.writeLock();
try {
  // 写共享变量
  ......
} finally {
  sl.unlockWrite(stamp);
}

提问

StampedLock和ReadWriteLock的区别?

StampedLock原理?

使用StampedLock要注意啥?(和ReadWriteLock的区别,interruput导致cpu飙升问题)

19.CountDownLatch和CyclicBarrier:如何让多线程步调一致?

这两个主要用来做线程间的通信。实现类似 threadObj.join()的功能。因为用线程池的时候是没有join()方法的。

下面是一个对账的场景:T1线程查订单。T2线程查派送单。最后主线程或T3线程执行检查。检查的前提是订单和派送单要都查到。并且查订单和查派送单步调要一致,因为两者是一一对应的。

对账:单线程实现

单线程,缺点是串行,效率低。

while(存在未对账订单){
    // 查询未对账订单
    pos = getPOrders();
    // 查询派送单
    dos = getDOrders();
    // 执行对账操作
    diff = check(pos, dos);
    // 差异写入差异库
    save(diff);
} 

对账:原生管程实现

用原生管程的多线程同时查订单和派送单,用join()方法通信。不过缺点是每次都要新建线程,很耗资源。

while(存在未对账订单){
    Thread T1 = new Thread(() -> {
        // 查询未对账订单
        pos = getPOrders();
    });
    T1.start();

    Thread T2 = new Thread(() -> {
        // 查询派送单
        dos = getDOrders();
    });
    T2.start();

    T1.join();
    T2.join();

    // 执行对账操作
    diff = check(pos, dos);
    // 差异写入差异库
    save(diff);
} 

对账:CountDownLatch实现

改用线程池实现,实现了线程的重复利用。不过此时拿不到join()方法了,所以用到了CountDownLatch进行通信。

线程池我们后面还会讲。CountDownLatch其实就是一个计数器,计数器减到0以后才继续往下执行。latch.countDown()方法来进行计数器减1操作,使用latch.await()方法进行等待。

Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
    CountDownLatch latch = new CountDownLatch(2);
    executor.exucute(() -> {
        // 查询未对账订单
        pos = getPOrders();
        latch.countDown();
    });
    
    executor.execute(() -> {
        // 查询派送单
        dos = getDOrders();
        latch.countDown();  
    });

    latch.await();
    
    // 执行对账操作
    diff = check(pos, dos);
    // 差异写入差异库
    save(diff);
} 

对账:CyclicBarrier实现

CyclicBarrier就是可以重复使用的、会自动复位的计数器,并且提供了计数为0后的回调。用 barrier.await() 来将计数器减 1(同时等待计数器变成 0)。所有线程调用barrier.await()后,计数器减到0。此时会自动调用 barrier 的回调函数来执行剩余操作。回调执行完后唤醒等待的线程,同时计数器自动重置,其他线程又可以执行下一条语句并把计数器减一,如此循环。

比起CountDownLatch的等大家执行完一起往前冲,CyclicBarrier更像是一个同步器,等一堆线程执行完之后,某个线程可以继续执行其他操作,同时这一堆线程又可以继续循环执行到闭锁处。

image-20201111205305179

image-20201111205409177

实现:

我们用两个队列,一个订单队列,一个派送单队列,两个队列一一对应,每次从两个队列分别取出一个元素,进行检查。我们注意到回调函数只用了一个线程来执行,这样做是为了

// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池 
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
  new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
  
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p, d);
  // 差异写入差异库
  save(diff);
}
  
void checkAll(){
  // 循环查询订单库
  Thread T1 = new Thread(()->{
    while(存在未对账订单){
      // 查询订单库
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();  
  // 循环查询运单库
  Thread T2 = new Thread(()->{
    while(存在未对账订单){
      // 查询运单库
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}

总结

CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。

CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。

除此之外,CyclicBarrier 还可以设置回调函数。

提问

CountDownLatch用来做什么的?怎么用?代码举例?

CyclicBarrier用来做什么的?怎么用?代码举例?

CountDownLatch和CyclicBarrier的区别?

20.并发容器:都有哪些“坑”需要我们填?

Java 1.5 版本之后对并发容器进行了很多优化,提升了性能。Java 1.5 之前的线程安全容器基本都是用同步包装的,我们一般叫同步容器。1.5之后的线程安全容器采用了其他方案,我们一般叫做并发容器

Java 中的容器主要可以分为四个大类,分别是 List、Map、Set 和 Queue

同步容器及其注意事项

同步容器的原理其实就是把容器所有的方法都加上synchronized关键字。

Java也提供了包装工具类来进行转换:

List list = Collections.synchronizedList(new ArrayList());
Set set = Collections.synchronizedSet(new HashSet());
Map map = Collections.synchronizedMap(new HashMap());

需要注意的是,组合操作需要注意竞态条件问题,即便每个操作都能保证原子性,也并不能保证组合操作的原子性

典型的例子就是迭代器。用迭代器遍历容器,以下是组合操作,不具备原子性。调用 foo()方法有并发问题。

List list = Collections.synchronizedList(new ArrayList());
Iterator i = list.iterator(); 
while (i.hasNext())
  foo(i.next());

正确的做法是,要把list锁起来再操作。因为 Collections 内部的包装类,公共方法锁的是对象的 this。

List list = Collections.synchronizedList(new ArrayList());
synchronized (list) {  
  Iterator i = list.iterator(); 
  while (i.hasNext())
    foo(i.next());
}    

并发容器容器及其注意事项

同步容器所有方法都用synchronized 来保证互斥,串行度太高了。所以1.5后使用了并发容器。

并发容器也是包括List、Map、Set 和 Queue四大类。下图基本包括了所有常用的并发容器:

image-20201111225045079

下面一一简单介绍,限于篇幅,只提关键点:

(1) List

List接口只有一个实现类,CopyOnWriteArrayList。CopyOnWrite,就是写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁

实现原理:

CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。

image-20201111225554402

如果在遍历 array 的同时,还有一个写操作,那么CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。CopyOnWriteArrayList写操作是互斥的。

image-20201111225646079

要注意的坑:

  • CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。因为写入的新元素并不能立刻被遍历到。
  • CopyOnWriteArrayList 迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照,而对快照进行增删改是没有意义的。

(2) Map

Map 接口的两个实现是 ConcurrentHashMapConcurrentSkipListMap。主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的

使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key 和 value 都不能为空。具体比较如下图:

image-20201111230005701

ConcurrentSkipListMap 里面的 SkipList 就是跳表。跳表插入、删除、查询操作平均的时间复杂度是 O(log n),理论上和并发线程数没有关系,所以在并发度非常高的情况下,如果对 ConcurrentHashMap 的性能还不满意,可以尝试一下 ConcurrentSkipListMap。跳表就跟字典的索引一样,通过这个索引既能快速定位数据,也能隔离并发(可以并发查看不同页上的字)。

(3) Set

Set 接口的两个实现是CopyOnWriteArraySetConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的。

(4)Queue

Java的Queue比较复杂。可以用以下两个维度来分类。

一个维度是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。

Java 并发包里阻塞队列都用 Blocking关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识

这两个维度组合后,可以将 Queue 细分为四大类:

单端阻塞队列

内部一般会持有一个队列,常用的以下6个:

  • ArrayBlockingQueue:队列是数组。
  • LinkedBlockingQueue:队列是链表。
  • SynchronousQueue:不持有队列,此时生产者线程的入队操作必须等待消费者线程的出队操作。
  • LinkedTransferQueue:融合 LinkedBlockingQueue 和 SynchronousQueue 。性能比 LinkedBlockingQueue 更好。
  • PriorityBlockingQueue :支持按照优先级出队。
  • DelayQueue :支持延时出队。

image-20201111231150904

双端阻塞队列:实现是LinkedBlockingDeque

image-20201111231240266

单端非阻塞队列:实现是 ConcurrentLinkedQueue

双端非阻塞队列:实现是 ConcurrentLinkedDeque

另外,使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患

总结

Java 并发容器不单要清楚每种容器的特性,最重要的是能选对容器。至于用法,API的说明都很清楚。

在选对容器时,甚至根本不会触发Java 容器的快速失败机制(Fail-Fast)

扩展:Java7中的HashMap在执行put操作时会涉及到扩容,由于扩容时链表并发操作会造成链表成环,所以可能导致cpu飙升100%。

提问

怎样把一个线程不安全的容器变成线程安全?

常用的同步容器有哪些?四大类,工具类…

同步容器和并发容器区别?

常用的并发容器有哪些?各实现原理?四大类…

哪些队列是支持有界的?

什么是Java的快速失败机制?如何避免?

21.原子类:无锁工具类的典范

解决简单的可见性和原子性问题,除了使用volatilesynchronized互斥锁方案外,还可以使用无锁方案

JUC将这些无锁方案进行提炼后,实现了一系列的原子类

下面是用原子类实现线程安全的long型count++示例:

public class Test {
  AtomicLong count = 
    new AtomicLong(0);
  void add10K() {
    int idx = 0;
    while(idx++ < 10000) {
      count.getAndIncrement();
    }
  }
}

无锁方案相对互斥锁方案,最大的好处就是性能无需加解锁操作、也没有拿不到锁时的阻塞

无锁方案的实现原理

就是CPU的硬件支持。CPU提供了 CAS 指令(CAS,全称是 Compare And Swap,即“比较并交换”)来解决并发问题。

CAS 指令包含 3 个参数:共享变量的内存地址 A、用于比较的值 B(即老值快照) 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C。作为一条 CPU 指令,CAS 指令本身是能够保证原子性的

用CAS解决问题时,遇到条件不满足,即当前值与期望值不相等,一般会进行自旋

以下是带自旋的CAS指令的代码模拟:

class SimulatedCAS{
  volatile int count;
  // 实现 count+=1
  addOne(){
    do {
      newValue = count+1; //①
    }while(count !=
      cas(count,newValue) //②
  }
  // 模拟实现 CAS,仅用来帮助理解
  synchronized int cas(
    int expect, int newValue){
    // 读目前 count 的值
    int curValue = count;
    // 比较目前 count 值是否 == 期望值
    if(curValue == expect){
      // 如果是,则更新 count 的值
      count= newValue;
    }
    // 返回写入前的值
    return curValue;
  }
}

使用CAS时还会遇到ABA问题。即另一个线程把值改为其他值又改回来,此时值和期望值还是相等的,你就会以为值没有被其他线程动过。

可能大多数情况下我们并不关心 ABA 问题,例如数值的原子递增。但有些情况下要关心:例如原子化的更新对象,两个 A 虽然相等,但是第二个 A 的属性可能已经发生变化了。所以在使用 CAS 方案的时候,一定要先 check 一下。

Java如何实现CAS

AtomicLonggetAndIncrement()方法为例,getAndIncrement()转调了unsafe.getAndAddLong(this, valueOffset, 1L)。其中this 和 valueOffset 两个参数可以唯一确定共享变量的内存地址。

unsafe.getAndAddLong(this, valueOffset, 1L)方法的实现是,先读取内存值,然后调用native boolean compareAndSwapLong(Object o, long offset, long expected, long x)更新值。更新失败则自旋,直到更新成功。源码如下:

public final long getAndAddLong(
  Object o, long offset, long delta){
  long v;
  do {
    // 读取内存中的值
    v = getLongVolatile(o, offset);
  } while (!compareAndSwapLong(
      o, offset, v, v + delta));
  return v;
}
// 原子性地将变量更新为 x
// 条件是内存中的值等于 expected
// 更新成功则返回 true
native boolean compareAndSwapLong(
  Object o, long offset, 
  long expected,
  long x);

Java 提供的原子类里面 CAS一般被实现为 compareAndSet(),compareAndSet() 的语义和 CAS 指令的语义的差别仅仅是返回值不同而已,compareAndSet() 里面如果更新成功,则会返回 true,否则返回 false。CAS的使用范式如下:

do {
  // 获取当前值
  oldV = xxxx;
  // 根据当前值计算新值
  newV = ...oldV...
}while(!compareAndSet(oldV,newV);

原子类概览

JUC的原子类可分为五类:

  • 原子化的基本数据类型
  • 原子化的对象引用类型
  • 原子化数组
  • 原子化对象属性更新器
  • 原子化的累加器

它们提供的方法是类似的。以下是概览图:

image-20201112110154330

下面我们来具体讲讲:

(1)原子化基本数据类型

相关实现有 AtomicBooleanAtomicIntegerAtomicLong,提供的方法主要为以下,细节可参考源码:

getAndIncrement() // 原子化 i++
getAndDecrement() // 原子化的 i--
incrementAndGet() // 原子化的 ++i
decrementAndGet() // 原子化的 --i

// 当前值 +=delta,返回 += 前的值
getAndAdd(delta) 
// 当前值 +=delta,返回 += 后的值
addAndGet(delta)
    
//CAS 操作,返回是否成功
compareAndSet(expect, update)

// 以下四个方法
// 新值可以通过传入 func 函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)

(2)原子化对象引用类型

相关实现有 AtomicReferenceAtomicStampedReferenceAtomicMarkableReference,利用它们可以实现对象引用的原子化更新。(因为更新对象,对象引用是不更新的,无法记录更新,所以用该原子类对对象进行包装,再加一个版本维度进行记录?)提供的方法和原子化基本类型差不多。

对象引用的更新需要重点关注 ABA 问题AtomicStampedReferenceAtomicMarkableReference 这两个原子类可以解决 ABA 问题。思路其实很简单,增加一个版本号维度就可以了。每次执行 CAS 操作,附加再更新一个版本号,只要保证版本号是递增的就OK。

AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,方法签名如下:

boolean compareAndSet(
  V expectedReference,
  V newReference,
  int expectedStamp,
  int newStamp) 

AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,表示引用变量是否被更改过,方法签名如下:

boolean compareAndSet(
  V expectedReference,
  V newReference,
  boolean expectedMark,
  boolean newMark)

疑问:AtomicMarkableReference的标记位是否也有ABA的问题?个人猜想,运用场景可能是仅仅进行一次更新的情况。

待落实原子化对象的运用场景

(3)原子化数组

相关实现有 AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray,他们可以原子化地更新数组里面的每一个元素。这些类提供的方法和原子化的基本数据类型的区别仅仅是:每个方法多了一个数组的索引参数,也不再赘述。

(4)原子化对象属性更新器

相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性,这三个方法都是利用反射机制实现的,创建更新器的方法如下:

public static <U>
AtomicXXXFieldUpdater<U> 
newUpdater(Class<U> tclass, 
  String fieldName)

需要注意的是,对象属性必须是 volatile 类型的,只有这样才能保证可见性;如果对象属性不是 volatile 类型的,newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常。

上面的参数没有涉及对象,而我们要关联更新的对象,这个对象在哪里关联呢?答案是通过方法参数传入:

boolean compareAndSet(
  T obj, 
  int expect, 
  int update)

就是多了一个对象参数传入,其他和原子化的基本数据类型一致,不在赘述。

(5)原子化的累加器

DoubleAccumulatorDoubleAdderLongAccumulatorLongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好。

总结

无锁方案性能好、无死锁(但可能饥饿和活锁)。

Java 提供的原子类能够解决一些简单的原子性问题,但上面我们所有原子类的方法都是针对一个共享变量的,如果你需要解决多个变量的原子性问题,建议还是使用互斥锁方案

提问

JUC使用无锁方案的示例?

无锁方案的原理?

什么是CAS?CAS的原理?CAS能够保证原子性吗?CAS条件不满足时怎么处理?

什么是ABA问题?什么时候需要关心ABA问题?怎么处理?

Java如何实现CAS?调用了哪些方法底层怎么实现?使用CAS的范式?

常用的原子类有哪些?各怎么实现?

22.Executor与线程池:如何创建正确的线程池?

创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,所以线程是一个重量级的对象,应该避免频繁创建和销毁

我们的方案就是线程池

线程池和一般意义上的池化资源与所不同。常见的池化资源,一般是要资源的时候就调用 acquire() 方法来申请资源,用完之后就调用 release() 释放资源。如下:

class XXXPool{
  // 获取池化资源
  XXX acquire() {
  }
  // 释放池化资源
  void release(XXX x){
  }
}  

而线程池是一种“生产者 - 消费者模式”。

线程池是一种生产者 - 消费者模式

目前业界线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。

以下是一个典型的简单线程池设计示意,我们可以用它来了解线程池原理:

// 简化的线程池,仅用来说明工作原理
class MyThreadPool{
  // 利用阻塞队列实现生产者 - 消费者模式
  BlockingQueue<Runnable> workQueue;
  // 保存内部工作线程
  List<WorkerThread> threads = new ArrayList<>();
  // 构造方法
  MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
    this.workQueue = workQueue;
    // 创建工作线程
    for(int idx=0; idx<poolSize; idx++){
      WorkerThread work = new WorkerThread();
      work.start();
      threads.add(work);
    }
  }
  // 提交任务
  void execute(Runnable command){
    workQueue.put(command);
  }
  // 工作线程负责消费任务,并执行任务
  class WorkerThread extends Thread{
    public void run() {
      // 循环取任务并执行
      while(true){ ①
        Runnable task = workQueue.take();
        task.run();
      } 
    }
  }  
}
 
/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = 
  new LinkedBlockingQueue<>(2);
// 创建线程池  
MyThreadPool pool = new MyThreadPool(
  10, workQueue);
// 提交任务  
pool.execute(()->{
    System.out.println("hello");
});

原理概述:在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,

如何使用 Java 中的线程池

Java提供的线程池比上面的更复杂。Java 提供的线程池相关的工具类中,最核心的是ThreadPoolExecutor,看名字也知道它更强调的是 Executor,而不是一般的池化资源。

ThreadPoolExecutor 的完整构造函数如下,总共7个参数:

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler) 

包括:核心线程数,最大线程数,超时时间,线程工厂,超时时间,阻塞队列,拒绝策略。

  • corePoolSize:核心线程数,即最小线程数。
  • maximumPoolSize:最大线程数。
  • keepAliveTime & unit:超时时间及单位。即线程空闲多久被回收。
  • workQueue:同前文的工作队列。
  • threadFactory:线程工厂,自定义如何创建线程,例如给他们取名字。
  • handler:拒绝策略。当所有线程都有活,且工作队列已满(有界工作队列),此时提交任务,线程池如何拒绝。总共4种:
    • CallerRunsPolicy:提交任务的线程自己去执行该任务。
    • AbortPolicy:默认的拒绝策略,直接抛异常 throws RejectedExecutionException。
    • DiscardPolicy:直接丢弃任务,不抛异常。
    • DiscardOldestPolicy:丢弃最老的任务,就是把最早进入工作队列的任务丢弃,再把新任务加入到工作队列。

Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时。

使用线程池要注意什么

ThreadPoolExecutor 的构造函数太复杂,JUC提供了 Executors快速创建线程池。不过不建议使用 Executors 了,最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列

使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。

try {
  // 业务逻辑
} catch (RuntimeException x) {
  // 按需处理
} catch (Throwable x) {
  // 按需处理
} 

总结

线程池的原理是一个任务队列(一般用阻塞队列)加一组执行线程

很多大厂的编码规范都要求必须通过线程池来管理线程。线程池和普通的池化资源有很大不同,线程池实际上是生产者 - 消费者模式的一种实现。线程池是消费者,线程池的使用者是生产者

创建线程池设置合适的线程数非常重要,可参考《10.Java 线程(中):创建多少线程才是合适的?》的内容。另外《Java 并发编程实战》的第 7 章《取消与关闭》的 7.3 节“处理非正常的线程终止” 详细介绍了异常处理的方案,第 8 章《线程池的使用》对线程池的使用也有更深入的介绍。

提问

为什么要用线程池?

线程池底层原理?怎么实现的?

Java线程池的几个关键参数?构造函数的参数有哪些?

实际使用线程池要注意什么?

线程池怎么处理异常?

23.Future:如何用多线程实现最优的“烧水泡茶”程序?

前文使用线程池时,讲到任务执行直接提交ThreadPoolExecutorexecute()即可。那么如果我们想要获取任务执行的结果怎么办呢?这一节就来解决这个问题。

如何获取任务执行结果

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法1 个 FutureTask 工具类来支持获得任务执行结果的需求。

3 个方法的方法签名如下:

// 提交 Runnable 任务
Future<?> submit(Runnable task);
// 提交 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交 Runnable 任务及结果引用  
<T> Future<T> submit(Runnable task, T result);

它们的返回值都是 Future 接口,Future 接口有 5 个方法,分别是取消任务 *cancel()*、判断任务是否已取消 *isCancelled()*、判断任务是否已结束 isDone()以及2个获得任务执行结果的get()get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消  
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);

ThreadPoolExecutor 的 3 个 submit() 方法之间的区别在于方法参数不同:

  • 提交 Runnable 任务 submit(Runnable task) :Runnable 接口的 run() 方法没有返回值,该方法返回的 Future对象仅用来判断任务是否已结束。类似于Thread.join()。

  • 提交 Callable 任务 submit(Callable<T> task):Callable接口的 call() 方法有返回值,该方法返回的 Future对象可通过get()方法来获取执行结果。

  • 提交 Runnable 任务及结果引用 submit(Runnable task, T result):假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。以下是经典用法,result从runnable的构造函数传入,又从future.get()返回。它相当于主线程和子线程之间的桥梁

    ExecutorService executor = Executors.newFixedThreadPool(1);
    // 创建 Result 对象 r
    Result r = new Result();
    r.setAAA(a);
    // 提交任务
    Future<Result> future = executor.submit(new Task(r), r);  
    Result fr = future.get();
    // 下面等式成立
    fr === r;
    fr.getAAA() === a;
    fr.getXXX() === x;
     
    class Task implements Runnable{
      Result r;
      // 通过构造函数传入 result
      Task(Result r){
        this.r = r;
      }
      void run() {
        // 可以操作 result
        a = r.getAAA();
        r.setXXX(x);
      }
    }
    

下面介绍FutureTask 工具类。该类有两个构造函数,参数和前面介绍的 submit() 方法类似,不再赘述。

FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);

FutureTask 实现了 RunnableFuture 接口。所以可以作为runnable交给ThreadPoolExecutor或Thread执行,也可以用来获得线程池执行任务的返回结果。

提交给线程池执行示例:

// 创建 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交 FutureTask 
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();

提交给Thread执行示例,利用 FutureTask 对象可以很容易获取子线程的执行结果

// 创建 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();

使用实例:“烧水泡茶”程序

image-20201125110015197

需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。

// 创建任务 T2 的 FutureTask
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
// 创建任务 T1 的 FutureTask
FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));	// 注意这里,构造方法传入了ft2
// 线程 T1 执行任务 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程 T2 执行任务 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程 T1 执行结果
System.out.println(ft1.get());
 
// T1Task 需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
  FutureTask<String> ft2;
  // T1 任务需要 T2 任务的 FutureTask
  T1Task(FutureTask<String> ft2){
    this.ft2 = ft2;
  }
  @Override
  String call() throws Exception {
    System.out.println("T1: 洗水壶...");
    TimeUnit.SECONDS.sleep(1);
    
    System.out.println("T1: 烧开水...");
    TimeUnit.SECONDS.sleep(15);
    // 获取 T2 线程的茶叶  
    String tf = ft2.get();
    System.out.println("T1: 拿到茶叶:"+tf);
 
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  }
}

// T2Task 需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
  @Override
  String call() throws Exception {
    System.out.println("T2: 洗茶壶...");
    TimeUnit.SECONDS.sleep(1);
 
    System.out.println("T2: 洗茶杯...");
    TimeUnit.SECONDS.sleep(2);
 
    System.out.println("T2: 拿茶叶...");
    TimeUnit.SECONDS.sleep(1);
    return " 龙井 ";
  }
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

总结

本节讲了线程池如何获取返回值。主要包括:ThreadPoolExecutor的几个submit()方法,Future接口,FutureTask工具类(实现了 RunnableFuture 接口)。

利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。实际编程时,用有向图描述一下任务之间的依赖关系,同时将线程的分工也做好。

提问

ThreadPoolExecutorexecute()submit()什么区别?

线程池执行时如何获取任务执行结果?

Future的是什么?有哪些方法?怎么用?

ThreadPoolExecutor 有几个 submit()方法?有什么区别?怎么用?

FutureTask 工具类如何使用?

实现一个烧水泡茶程序?模拟两线程之间的配合?用Thread方式,再用线程池方式实现。

做一个询价应用,这个应用需要从三个电商询价,然后保存在自己的数据库里?

24.CompletableFuture:异步编程没那么难

概述

性能优化是大厂的核心需求,手段就是使用多线程异步化,减少程序串行。

为了更直观的使用异步编程,更贴近业务而非线程管理,Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程。

CompletableFuture 类实现了 Future 接口和 CompletionStage 接口

CompletableFuture 的核心优势

简单来说,就是更贴近业务,屏蔽了繁琐的线程维护。

1.无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
2.语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
3.代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。

使用举例:

我们再来实现一遍烧水泡茶程序:

image-20201125163741344

// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1: 洗水壶...");
  sleep(1, TimeUnit.SECONDS);
 
  System.out.println("T1: 烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2: 洗茶壶...");
  sleep(1, TimeUnit.SECONDS);
 
  System.out.println("T2: 洗茶杯...");
  sleep(2, TimeUnit.SECONDS);
 
  System.out.println("T2: 拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1: 拿到茶叶:" + tf);
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  });
// 等待任务 3 执行结果
System.out.println(f3.join());
 
void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

创建 CompletableFuture 对象

主要通过CompletableFuture 类自己的4个静态方法来创建对象,主要区别是两个维度,get()方法是否有返回值,是否用公用的forkJoin线程池。

// 使用默认的forkJoin线程池
static CompletableFuture<Void> runAsync(Runnable runnable)	      // Runnable 接口的 run() 方法没有返回值
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) // Supplier 接口的 get() 方法有返回值
    
// 也可以指定线程池  
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)  

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。

如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法。那么有两个问题:

  • 一个是异步操作什么时候结束
  • 另一个是如何获取异步操作的执行结果

因为 CompletableFuture 类实现了 Future 接口和 CompletionStage 接口

  • 线程间的分工协作可由CompletionStage 接口来完成。
  • 获取结果则可以通过 Future 接口来解决。

如何理解 CompletionStage 接口

CompletionStage 就是用来解决线程间的分工协作问题的。各线程的任务是有时序关系的,有串行关系、并行关系、汇聚关系等。CompletionStage 接口可以清晰地描述任务之间的这种时序关系。此外,它也进行了异常处理

下文的内容:

CompletionStage 接口如何描述串行关系AND 聚合关系OR 聚合关系以及异常处理其实总体就是根据两个维度分类:

  • 是否异步执行(async)
  • *执行的函数是否有入参及返回值(Funtion、Consumer、Runnable)*。

1. 描述串行关系

主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口,几个系列方法的命名其实就是根据参数的执行方法名来命名的。参数分为三种,Funtion、Consumer、Runnable,分别表示有入参有返回有入参无返回无入参无返回三种情况。

*thenApply 系列方法对应R function.apply(t)*。参数 function 的类型是接口 Function<T, R>,T是输入,R是返回值。该接口的 R apply(T t)方法与CompletionStage 相关,该方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<R>

*thenAccept 系列方法对应void consumer. accept(T t)*。参数 consumer 的类型是接口Consumer<T>,T是输入,无返回值。该接口的 void accept(T t)方法与CompletionStage 相关,该方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>

*thenRun 系列方法对应void runnable. run()*。参数 runnable 的类型是接口 Runnable,无输入,无返回值。所以 thenRun 系列方法返回的也是CompletionStage<Void>

thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。

这些方法里面 Async 代表的是异步执行 function、consumer 或者 runnable。

CompletionStage<R> thenApply(function);
CompletionStage<R> thenApplyAsync(function);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(runnable);
CompletionStage<Void> thenRunAsync(runnable);
CompletionStage<R> thenCompose(function);
CompletionStage<R> thenComposeAsync(function);

以下是 thenApply() 方法的示例代码:

先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

CompletableFuture<String> f0 = 
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " QQ")  //②
  .thenApply(String::toUpperCase);//③
 
System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ

2. 描述 AND 汇聚关系

描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口。区别也是function、consumer、runnable 这三个核心参数不同。使用可以参考上面烧水泡茶的实现程序,不再赘述。

CompletionStage<R> thenCombine(other, function);
CompletionStage<R> thenCombineAsync(other, function);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, runnable);
CompletionStage<Void> runAfterBothAsync(other, runnable);

3. 描述 OR 汇聚关系

描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,区别也是function、consumer、runnable 这三个核心参数不同。

CompletionStage applyToEither(other, function);
CompletionStage applyToEitherAsync(other, function);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, runnable);
CompletionStage runAfterEitherAsync(other, runnable);

使用示例:

CompletableFuture<String> f1 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});
 
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});
 
CompletableFuture<String> f3 = 
  f1.applyToEither(f2,s -> s);
 
System.out.println(f3.join());

4. 异常处理

function、consumer、runnable 都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常

以下是处理异常的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

exceptionally() 的使用非常类似于 try{}catch{}中的 catch{}。

whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{}。无论是否发生异常都会执行fn。

whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

使用示例:

CompletableFuture<Integer> 
  f0 = CompletableFuture
    .supplyAsync(()->7/0))
    .thenApply(r->r*10)
    .exceptionally(e->0);
System.out.println(f0.join());

总结

本节主要阐述了CompletableFuture异步编程的用法,包括怎么新建对象(静态方法),怎么获取返回结果(future接口),怎么描述任务间的分工时序关系及异常处理(CompletionStage接口)等

近几年,伴随着ReactiveX的发展(Java 语言的实现版本是 RxJava),回调地狱已经被完美解决,异步编程已经慢慢开始成熟,Java 语言也开始官方支持异步编程:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本则提供了更加完备的 Flow API,异步编程目前已经完全工业化。

CompletableFuture 已经能够满足简单的异步编程需求,此外可以关注 RxJava 这个项目,利用 RxJava,能在 Java 1.6 版本就使用异步编程。

提问

为什么要使用CompletableFuture?

CompletableFuture怎么新建对象?几种方法有哪些区别?使用时要注意什么?

CompletableFuture怎么获取操作执行结果?

CompletionStage 接口和CompletableFuture什么关系?是用来干啥的?

CompletionStage 接口有哪几大类方法?有什么区别?

CompletionStage 接口如何处理异常?

25.CompletionService:如何批量执行异步任务?

概述

前面讲到,用异步去查询三个商家的价格,最后存到数据库。这种写法有个小缺陷,如果S1耗时很长,那么即是S2耗时很短,也要等S1执行完才能执行。

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 异步向电商 S1 询价
Future<Integer> f1 = 
  executor.submit(()->getPriceByS1());
// 异步向电商 S2 询价
Future<Integer> f2 = 
  executor.submit(()->getPriceByS2());
// 异步向电商 S3 询价
Future<Integer> f3 = 
  executor.submit(()->getPriceByS3());
    
// 获取电商 S1 报价并保存
r=f1.get();
executor.execute(()->save(r));
  
// 获取电商 S2 报价并保存
r=f2.get();
executor.execute(()->save(r));
  
// 获取电商 S3 报价并保存  
r=f3.get();
executor.execute(()->save(r));

针对这种一堆任务一起异步,想要哪个先返回结果哪个先执行后续操作。我们用到了阻塞队列。这样就保证了先获取到报价的先保存到数据库。

// 创建阻塞队列
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
// 电商 S1 报价异步进入阻塞队列  
executor.execute(()-> bq.put(f1.get()));
// 电商 S2 报价异步进入阻塞队列  
executor.execute(()-> bq.put(f2.get()));
// 电商 S3 报价异步进入阻塞队列  
executor.execute(()-> bq.put(f3.get()));
// 异步保存所有报价  
for (int i=0; i<3; i++) {
  Integer r = bq.take();
  executor.execute(()->save(r));
}  

利用 CompletionService 实现询价系统

对应上面的阻塞队列,Java给出了CompletionService接口。它的本质就是内部维护一个阻塞队列,外加一个线程池。用来解决批量任务异步时,想要先返回的任务先执行后续操作的问题。

CompletionService 的实现原理是内部维护一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中, 不过CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。

如何创建 CompletionService

CompletionService 接口的实现类是ExecutorCompletionService,这个实现类的构造方法有两个:

  • ExecutorCompletionService(Executor executor)
  • ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

两个构造方法都传入一个线程池。阻塞队列参数,不传则默认使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到这个阻塞队列中。

以下是执行代码示例。使用无界的 LinkedBlockingQueue,submit() 方法提交了三个询价操作并异步执行,最后通过 take() 方法获取一个 Future 对象,调用 Future 对象的 get() 方法返回询价操作的执行结果。

// 创建线程池
ExecutorService executor = 
  Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
  Integer r = cs.take().get();
  executor.execute(()->save(r));
}

CompletionService 接口说明

该接口共有5个方法:

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

两个submit方法,一个入参是Callable<V> task,另外一个入参是Runnable taskV result,result参数也作为结果返回,和前面讲过的一样。

其余的3个方法是和阻塞队列相关的:take()、poll() 都是从阻塞队列中获取并移除一个元素;如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值。poll()方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。

利用 CompletionService 实现 Dubbo 中的 Forking Cluster

Dubbo 中有一种叫做Forking 的集群模式,该模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。例如同时调用3个地图商的API以保证高可用:

geocoder(addr) {
  // 并行执行以下 3 个查询服务, 
  r1=geocoderByS1(addr);
  r2=geocoderByS2(addr);
  r3=geocoderByS3(addr);
  // 只要 r1,r2,r3 有一个返回
  // 则返回
  return r1|r2|r3;
}

实现:创建了一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future<Integer>类型的列表 futures,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务,会返回一个 Future 对象,把这些 Future 对象保存在列表 futures 中。通过调用 cs.take().get(),就能拿到最快返回的任务执行结果:

// 创建线程池
ExecutorService executor =
  Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures = new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures 
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
  // 只要有一个成功返回,则 break
  for (int i = 0; i < 3; ++i) {
    r = cs.take().get();
    // 简单地通过判空来检查是否成功返回
    if (r != null) {
      break;
    }
  }
} finally {
  // 取消所有任务
  for(Future<Integer> f : futures)
    f.cancel(true);
}
// 返回结果
return r;

总结

需要批量提交异步任务的时候建议使用 CompletionService。CompletionService 融合了线程池 Executor 和阻塞队列 BlockingQueue ,让批量异步任务的管理更简单。

CompletionService 能够让异步任务先执行完的先进入阻塞队列,利用这个特性,可以谁先执行完谁进行后续操作,避免无谓的等待,也可以实现 Forking Cluster 这样的需求。

CompletionService 的实现类 ExecutorCompletionService,需要自己创建线程池,看上去有些啰嗦,好处是你可以让多个 ExecutorCompletionService 的线程池隔离,避免几个特别耗时的任务拖垮整个应用。

提问

如何解决批量异步任务谁先返回谁先执行后续操作?

CompletionService接口的实现原理?实现类?实现类两个构造方法的区别?

CompletionService接口有哪些方法?

如何实现调用多个查询服务,只要有一个返回整个服务就可以返回?

ExecutorCompletionService有必要自己提供线程池吗?

26.Fork/Join:单机版的MapReduce

概述

线程池、Future、CompletableFuture 和 CompletionService这些工具类,都是属于多线程“互斥、同步、分工”三大类操作的“分工”。它们让我们站在任务的视角来解决并发问题,而不是让我们纠缠在线程之间如何协作的细节上(比如线程之间如何实现等待、通知等)。

对于简单的并行任务,可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。

这些工具类为我们提供了并行、聚合、批量并行这三种任务模型的解决方案。基本上能够覆盖日常工作中的并发场景,不过还剩下一个“分治”的场景未覆盖,这个则由我们今天要提到的Fork/Join框架来解决。

分治,简单来说,就是指把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解的思想,例如归并排序、快速排序都属于分治算法,二分法查找、大数据领域知名的计算框架 MapReduce。

分治任务模型

分治任务模型可分为两个阶段:一个阶段是任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图:

image-20201126145950209

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。

Fork/Join 的使用

该框架是Java用来支持分治的框架。

这个计算框架里的Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。

ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。

ForkJoinTask 有两个子类——RecursiveActionRecursiveTask,它们都是用递归的方式来处理分治任务的。它们都定义了抽象方法 *compute()*。区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要定义子类去扩展。

以下是用 Fork/Join 计算斐波那契数列的示例:

先创建一个分治任务线程池以及计算斐波那契数列的分治任务。之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的。

static void main(String[] args){
  // 创建分治任务线程池  
  ForkJoinPool fjp = new ForkJoinPool(4);
  // 创建分治任务
  Fibonacci fib = new Fibonacci(30);   
  // 启动分治任务  
  Integer result = fjp.invoke(fib);
  // 输出结果  
  System.out.println(result);
}
// 递归任务
static class Fibonacci extends RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
    
  protected Integer compute(){
    if (n <= 1) return n;
    Fibonacci f1 = new Fibonacci(n - 1);
    // 创建子任务  
    f1.fork();
    Fibonacci f2 = new Fibonacci(n - 2);
    // 等待子任务结果,并合并结果  
    return f2.compute() + f1.join();
  }
}

ForkJoinPool 工作原理

通过前文我们知道,ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程都共享一个任务队列。

ForkJoinPool 本质上也是一个生产者 - 消费者的实现,但是更加智能,ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中

如果工作线程对应的任务队列空了,ForkJoinPool 支持一种叫做“任务窃取”的机制,它可以“窃取”其他工作任务队列里的任务。如下图,T2可以窃取T1的任务。

ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。

image-20201126151423347

模拟 MapReduce 统计单词数量

以下是用Fork/Join统计单词数量的示例:

先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果。

下面用一个字符串数组 String[] fc 来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute())。

static void main(String[] args){
  String[] fc = {"hello world",
          "hello me",
          "hello fork",
          "hello join",
          "fork join in world"};
  // 创建 ForkJoin 线程池    
  ForkJoinPool fjp = new ForkJoinPool(3);
  // 创建任务    
  MR mr = new MR(fc, 0, fc.length);  
  // 启动任务    
  Map<String, Long> result = fjp.invoke(mr);
  // 输出结果    
  result.forEach((k, v)->
    System.out.println(k+":"+v));
}
//MR 模拟类
static class MR extends RecursiveTask<Map<String, Long>> {
  private String[] fc;
  private int start, end;
  // 构造函数
  MR(String[] fc, int fr, int to){
    this.fc = fc;
    this.start = fr;
    this.end = to;
  }
  @Override protected 
  Map<String, Long> compute(){
    if (end - start == 1) {
      return calc(fc[start]);
    } else {
      int mid = (start + end) / 2;
      MR mr1 = new MR(fc, start, mid);
      mr1.fork();
      MR mr2 = new MR(fc, mid, end);
      // 计算子任务,并返回合并的结果    
      return merge(mr2.compute(), mr1.join());
    }
  }
  // 合并结果
  private Map<String, Long> merge(
      Map<String, Long> r1, 
      Map<String, Long> r2) {
    Map<String, Long> result = new HashMap<>();
    result.putAll(r1);
    // 合并结果
    r2.forEach((k, v) -> {
      Long c = result.get(k);
      if (c != null)
        result.put(k, c+v);
      else 
        result.put(k, v);
    });
    return result;
  }
  // 统计单词数量
  private Map<String, Long> calc(String line) {
    Map<String, Long> result = new HashMap<>();
    // 分割单词    
    String [] words = line.split("\\s+");
    // 统计单词数量    
    for (String w : words) {
      Long v = result.get(w);
      if (v != null) 
        result.put(w, v+1);
      else
        result.put(w, 1L);
    }
    return result;
  }
}

注意事项

不要用两次fork()。

用两次fork()在join的时候,需要用这样的顺序:a.fork(); b.fork(); b.join(); a.join();这个要求在JDK官方文档里有说明。

如果是一不小心写成a.fork(); b.fork(); a.join(); b.join();就会有廖雪峰老师说的问题。

建议还是用fork()+compute(),这种方式的执行过程普通人还是能理解的,fork()+fork()内部做了很多优化,不好理解。

廖雪峰老师的例子理解fork()+compute()很到位。

总结

Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。

Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 内部有多个双端任务队列,支持任务窃取机制。

Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数。计算密集型没问题。如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务

如果对 ForkJoinPool 详细的实现细节感兴趣,可参考Doug Lea 的论文

提问

Fork/Join框架用来解决什么问题?

分治算法怎么执行?任务模型?

使用Fork/Join计算斐波那契数列?

ForkJoinPool 工作原理?几个任务队列?窃取机制?双端队列?

用Fork/Join统计单词数量?

使用Fork/Join要注意什么?

对于一个 CPU 密集型计算程序,在单核 CPU 上,使用 Fork/Join 并行计算框架是否能够提高性能呢?

27.并发工具类使用注意事项

1.使用while(true) 时要格外注意终止条件

2.尽量使用signalAll(),比signal()更安全

3.Semaphore 需要锁中锁

Semaphore 允许多个线程访问一个临界区,这也是一把双刃剑,当多个线程进入临界区时,如果需要访问共享变量就会存在并发问题,所以必须加锁,也就是说 Semaphore 需要锁中锁。

4.锁的申请和释放要成对出现

成对出现是重点,例如锁升级后,try{}finally{}的finally{}中解锁的要是升级后的锁。

5.回调总要关心执行线程是谁

当遇到回调函数的时候,你应该本能地问自己:执行回调函数的线程是哪一个?这个在多线程场景下非常重要。因为不同线程 ThreadLocal 里的数据是不同的,有些框架比如 Spring 就用 ThreadLocal 来管理事务,如果不清楚回调函数用的是哪个线程,很可能会导致错误的事务管理,并最终导致数据不一致。当看到回调函数的时候,一定问一问执行回调函数的线程是谁

6. 共享线程池:有福同享就要有难同当

用共享线程池优点是能快速实现,缺点就是当有阻塞式 I/O 时,可能导致所有的 ForkJoinPool 线程都阻塞,进而影响整个系统的性能。所以更倾向于推荐隔离的方案。

7.线上问题定位的利器:线程栈 dump

定位线上并发问题,方案很简单,就是通过查看线程栈来定位问题。重点是查看线程状态,分析线程进入该状态的原因是否合理。参考Java线程的生命周期即可。

为了便于分析定位线程问题,你需要给线程赋予一个有意义的名字,对于线程池可以通过自定义 ThreadFactory 来给线程池中的线程赋予有意义的名字,也可以在执行 run() 方法时通过Thread.currentThread().setName();来给线程赋予一个更贴近业务的名字。

本部分总结

串讲:

14解决互斥问题、15解决同步问题、16信号量用来解决多个线程同时访问临界区问题(比如池),17用细粒度锁-读写锁来解决读多写少的场景。18用细粒度锁-读写乐观锁来解决简单的读多写少场景。19讲了用于线程间通信的两个类,用来取代使用线程池时的join()方法。20讲了并发容器的用法。21讲了无锁操作原理及它的实现原子类。22讲了线程池原理、用法(execute()方法)及使用注意事项。23讲了使用线程池时提交任务后如何用Future获取执行返回值(submit()方法)。24讲了Java1.8异步编程的用法,CompletableFuture即是异步执行器也是结果,所以可以链式编程。25讲了如何批量提交异步任务,谁先有结果谁先执行不用相互等待。核心就是CompletionService,它的实质就是整合了线程池的阻塞队列。26前篇讲了并行、聚合、批量任务的处理。本节则讲了最后一种情况,分治:Fork/Join 计算框架,包括分治线程池ForkJoinPool和分治任务 ForkJoinTask。

22-26总结:都是从任务的视角,来讲分工。22-25:对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。22-25覆盖了并行、聚合、批量。26则讲Java分治框架Fork/Join的用法。

Java几种任务函数Runnable、Callable、Consumer、Fucntion,分别是无参无返回、无参有返回、有参无返回、有参有返回。*

总结所有锁的常用API:

三、并发设计模式

略。

四、案例分析

略。

五、其他并发模型

42.Actor模型:面向对象原生的并发模型

Hello Actor (Akka)

在 Actor 模型中,所有的计算都是在 Actor 中执行的。在面向对象编程里面,一切都是对象;在 Actor 模型里,一切都是 Actor,并且 Actor 之间是完全隔离的,不会共享任何变量。

并发问题的根源就在于共享变量,而 Actor 模型中 Actor 之间不共享变量,那用 Actor 模型解决并发问题,一定是相当顺手。的确是这样,所以很多人就把 Actor 模型定义为一种并发计算模型

但是 Java 语言本身并不支持 Actor 模型,想在 Java 语言里使用 Actor 模型,需要借助类库,如Akka

以下是使用Akka的示例代码:

我们首先创建了一个 ActorSystem(Actor 不能脱离 ActorSystem 存在);之后创建了一个 HelloActor,Akka 中创建 Actor 并不是 new 一个对象出来,而是通过调用 system.actorOf() 方法创建的,该方法返回的是 ActorRef,而不是 HelloActor;最后通过调用 ActorRef 的 tell() 方法给 HelloActor 发送了一条消息 “Actor” 。

// 该 Actor 当收到消息 message 后,
// 会打印 Hello message
static class HelloActor extends UntypedActor {
  @Override
  public void onReceive(Object message) {
    System.out.println("Hello " + message);
  }
}
 
public static void main(String[] args) {
  // 创建 Actor 系统
  ActorSystem system = ActorSystem.create("HelloSystem");
  // 创建 HelloActor
  ActorRef helloActor = system.actorOf(Props.create(HelloActor.class));
  // 发送消息给 HelloActor
  helloActor.tell("Actor", ActorRef.noSender());
}

Actor 模型和面向对象编程契合度非常高,完全可以用 Actor 类比面向对象编程里面的对象,而且 Actor 之间的通信方式完美地遵守了消息机制,而不是通过对象方法来实现对象之间的通信。

消息和对象方法的区别

Actor 中的消息机制,可以类比这现实世界里的写信。Actor 内部有一个邮箱(Mailbox),接收到的消息都是先放到邮箱里,如果邮箱里有积压的消息,那么新收到的消息就不会马上得到处理,也正是因为 Actor 使用单线程处理消息,所以不会出现并发问题。你可以把 Actor 内部的工作模式想象成只有一个消费者线程的生产者 - 消费者模式

在 Actor 模型里,发送消息仅仅是把消息发出去而已,接收消息的 Actor 在接收到消息后,也不一定会立即处理,也就是说Actor 中的消息机制完全是异步的。而调用对象方法,实际上是同步的,对象方法 return 之前,调用方会一直等待。

除此之外,调用对象方法,需要持有对象的引用,所有的对象必须在同一个进程中。而在 Actor 中发送消息,类似于现实中的写信,只需要知道对方的地址就可以,发送消息和接收消息的 Actor 可以不在一个进程中,也可以不在同一台机器上。因此,Actor 模型不但适用于并发计算,还适用于分布式计算。

Actor 的规范化定义

Actor 是一种基础的计算单元,具体来讲包括三部分能力,分别是:

  1. 处理能力,处理接收到的消息。
  2. 存储能力,Actor 可以存储自己的内部状态,并且内部状态在不同 Actor 之间是绝对隔离的。
  3. 通信能力,Actor 可以和其他 Actor 之间通信。

当一个 Actor 接收的一条消息之后,这个 Actor 可以做以下三件事:

  1. 创建更多的 Actor;
  2. 发消息给其他 Actor;
  3. 确定如何处理下一条消息。

总结

在 Java 领域,除了可以使用 Akka 来支持 Actor 模型外,还可以使用 Vert.x,不过相对来说 Vert.x 更像是 Actor 模型的隐式实现,对应关系不像 Akka 那样明显,不过本质上也是一种 Actor 模型。

Actor 可以创建新的 Actor,这些 Actor 最终会呈现出一个树状结构,非常像现实世界里的组织结构,所以利用 Actor 模型来对程序进行建模,和现实世界的匹配度非常高。Actor 模型和现实世界一样都是异步模型,理论上不保证消息百分百送达,也不保证消息送达的顺序和发送的顺序是一致的,甚至无法保证消息会被百分百处理,这个是目前厂商在解决的难点。

43.软件事务内存:借鉴数据库的并发经验

软件事务内存(Software Transactional Memory,简称 STM)

数据库事务发展了几十年了,目前被广泛使用的是MVCC(全称是 Multi-Version Concurrency Control),也就是多版本并发控制。

MVCC 可以简单地理解为数据库事务在开启的时候,会给数据库打一个快照,以后所有的读写都是基于这个快照的。当提交事务的时候,如果所有读写过的数据在该事务执行期间没有发生过变化,那么就可以提交;如果发生了变化,说明该事务和有其他事务读写的数据冲突了,这个时候是不可以提交的。

STM 借鉴的是数据库的经验,数据库虽然复杂,但仅仅存储数据,而编程语言除了有共享变量之外,还会执行各种 I/O 操作,很显然 I/O 操作是很难支持回滚的。所以,STM 也不是万能的。目前支持 STM 的编程语言主要是函数式语言,函数式语言里的数据天生具备不可变性,利用这种不可变性实现 STM 相对来说更简单。

示例:

class Account {
  // 余额
  private TxnRef<Integer> balance;
  // 构造方法
  public Account(int balance) {
    this.balance = new TxnRef<Integer>(balance);
  }
  // 转账操作
  public void transfer(Account target, int amt){
    STM.atomic((txn)->{
      Integer from = balance.getValue(txn);
      balance.setValue(from-amt, txn);
      Integer to = target.balance.getValue(txn);
      target.balance.setValue(to+amt, txn);
    });
  }
}

44.协程:更轻量级的线程

线程太重,因此出了线程池等一堆复杂的工具类。为了更好的使用并发,Golang提供了更轻便的协程。

我们可以把协程简单地理解为一种轻量级的线程。从操作系统的角度来看,线程是在内核态中调度的,而协程是在用户态调度的,所以相对于线程来说,协程切换的成本更低。

协程也有自己的栈,但是相比线程栈要更小,典型的线程栈大小差不多有 1M,而协程栈的大小往往只有几 K 或者几十 K。

Golang 中的协程

要让 hello() 方法在一个新的协程中执行,只需要go hello("World") 这一行代码就搞定:

import (
    "fmt"
    "time"
)
func hello(msg string) {
    fmt.Println("Hello " + msg)
}
func main() {
    // 在新的协程中执行 hello 方法
    go hello("World")
    fmt.Println("Run in main")
    // 等待 100 毫秒让协程执行结束
    time.Sleep(100 * time.Millisecond)
}

协程是为了解决易用性问题,摒弃复杂的线程池。Java OpenSDK 中 Loom 项目的目标就是支持协程,未来Java也将支持协程。

45.CSP模型:Golang的主力队员

Golang 提供了两种不同的方案解决线程/协程间的协作问题:

  • 一种方案支持协程之间以共享内存的方式通信,Golang 提供了管程和原子类来对协程进行同步控制,这个方案与 Java 语言类似;
  • 另一种方案支持协程之间以消息传递(Message-Passing)的方式通信,本质上是要避免共享,Golang 的这个方案是基于CSP(Communicating Sequential Processes)模型实现的。Golang 比较推荐的方案是后者。

什么是 CSP 模型

前文中介绍了 Actor 模型,Actor 模型中 Actor 之间就是不能共享内存的,彼此之间通信只能依靠消息传递的方式。Golang 实现的 CSP 模型和 Actor 模型看上去非常相似,Golang 程序员中有句格言:“不要以共享内存方式通信,要以通信方式共享内存(Don’t communicate by sharing memory, share memory by communicating)。”

Golang 中协程之间通信推荐的是使用 channel,channel 你可以形象地理解为现实世界里的管道。另外,calc() 方法的返回值是一个只能接收数据的 channel ch,它创建的子协程会把计算结果发送到这个 ch 中,而主协程也会将这个计算结果通过 ch 读取出来。以下是4个协程累加的示例:

import (
    "fmt"
    "time"
)
 
func main() {
    // 变量声明
    var result, i uint64
    // 单个协程执行累加操作
    start := time.Now()
    for i = 1; i <= 10000000000; i++ {
        result += i
    }
    // 统计计算耗时
    elapsed := time.Since(start)
    fmt.Printf(" 执行消耗的时间为:", elapsed)
    fmt.Println(", result:", result)
 
    // 4 个协程共同执行累加操作
    start = time.Now()
    ch1 := calc(1, 2500000000)
    ch2 := calc(2500000001, 5000000000)
    ch3 := calc(5000000001, 7500000000)
    ch4 := calc(7500000001, 10000000000)
    // 汇总 4 个协程的累加结果
    result = <-ch1 + <-ch2 + <-ch3 + <-ch4
    // 统计计算耗时
    elapsed = time.Since(start)
    fmt.Printf(" 执行消耗的时间为:", elapsed)
    fmt.Println(", result:", result)
}
// 在协程中异步执行累加操作,累加结果通过 channel 传递
func calc(from uint64, to uint64) <-chan uint64 {
    // channel 用于协程间的通信
    ch := make(chan uint64)
    // 在协程中执行累加操作
    go func() {
        result := from
        for i := from + 1; i <= to; i++ {
            result += i
        }
        // 将结果写入 channel
        ch <- result
    }()
    // 返回结果是用于通信的 channel
    return ch
}

CSP 模型与生产者 - 消费者模式

可以简单地把 Golang 实现的 CSP 模型类比为生产者 - 消费者模式,而 channel 可以类比为生产者 - 消费者模式中的阻塞队列。不过,需要注意的是 Golang 中 channel 的容量可以是 0,容量为 0 的 channel 在 Golang 中被称为无缓冲的 channel,容量大于 0 的则被称为有缓冲的 channel

无缓冲的 channel 类似于 Java 中提供的 SynchronousQueue,主要用途是在两个协程之间做数据交换。比如上面累加器的示例代码中,calc() 方法内部创建的 channel 就是无缓冲的 channel。

而创建一个有缓冲的 channel 也很简单,在下面的示例代码中,我们创建了一个容量为 4 的 channel,同时创建了 4 个协程作为生产者、4 个协程作为消费者。

// 创建一个容量为 4 的 channel 
ch := make(chan int, 4)
// 创建 4 个协程,作为生产者
for i := 0; i < 4; i++ {
    go func() {
        ch <- 7
    }()
}
// 创建 4 个协程,作为消费者
for i := 0; i < 4; i++ {
    go func() {
        o := <-ch
        fmt.Println("received:", o)
    }()
}

Golang 中的 channel 是语言层面支持的,所以可以使用一个左向箭头(<-)来完成向 channel 发送数据和读取数据的任务,使用上还是比较简单的。Golang 中的 channel 是支持双向传输的,所谓双向传输,指的是一个协程既可以通过它发送数据,也可以通过它接收数据。

不仅如此,Golang 中还可以将一个双向的 channel 变成一个单向的 channel,在累加器的例子中,calc() 方法中创建了一个双向 channel,但是返回的就是一个只能接收数据的单向 channel,所以主协程中只能通过它接收数据,而不能通过它发送数据,如果试图通过它发送数据,编译器会提示错误。对比之下,双向变单向的功能,如果以 SDK 方式实现,还是很困难的。

CSP 模型与 Actor 模型的区别

第一个最明显的区别就是:Actor 模型中没有 channel。Actor 模型中的 mailbox 对于程序员来说是“透明”的,mailbox 明确归属于一个特定的 Actor,是 Actor 模型中的内部机制;而且 Actor 之间是可以直接通信的,不需要通信中介。但 CSP 模型中的 channel 就不一样了,它对于程序员来说是“可见”的,是通信的中介,传递的消息都是直接发送到 channel 中的。

第二个区别是:Actor 模型中发送消息是非阻塞的,而 CSP 模型中是阻塞的。Golang 实现的 CSP 模型,channel 是一个阻塞队列,当阻塞队列已满的时候,向 channel 中发送数据,会导致发送消息的协程阻塞。

第三个区别则是关于消息送达的:Actor 模型理论上不保证消息百分百送达,而在 Golang 实现的CSP 模型中,是能保证消息百分百送达的。不过这种百分百送达也是有代价的,那就是有可能会导致死锁

比如,下面这段代码就存在死锁问题,在主协程中,我们创建了一个无缓冲的 channel ch,然后从 ch 中接收数据,此时主协程阻塞,main() 方法中的主协程阻塞,整个应用就阻塞了。这就是 Golang 中最简单的一种死锁。

func main() {
    // 创建一个无缓冲的 channel  
    ch := make(chan int)
    // 主协程会阻塞在此处,发生死锁
    <- ch 
}

Java 领域可以借助第三方的类库JCSP来支持 CSP 模型,不过JCSP 并没有经过广泛的生产环境检验,所以并不建议在生产环境中使用。

CSP 模型是托尼·霍尔(Tony Hoare)在 1978 年提出的,霍尔在并发领域还有一项重要成就,那就是提出了霍尔管程模型,Java 领域解决并发问题的理论基础就是它。


转载请注明来源