零、绪论
Java里的并发编程Api,如 synchronized、wait()/notify(),是对管程模型的实现。解决并发的模型有信号量和管程,Java语言描述管程模式更加方便,所以用管程实现。
其实并发编程可以总结为三个核心问题:分工、同步、互斥。
- 分工指的是如何高效地拆解任务并分配给线程。即如何分工。
- 同步指的是线程之间如何协作、通信。即如何合作。
- 互斥则是保证同一时刻只允许一个线程访问共享资源。即如何解决冲突。
1.分工
Java SDK 并发包里的 Executor、Fork/Join、Future 本质上都是一种分工方法。除此之外,并发编程领域还总结了一些设计模式,基本上都是和分工方法相关的,例如生产者 - 消费者、Thread-Per-Message、Worker Thread 模式等都是用来指导你如何分工的。
2.同步
在并发编程领域里的同步,主要指的就是线程间的协作。工作中遇到的线程协作问题,基本上都可以描述为这样的一个问题:当某个条件不满足时,线程需要等待,当某个条件满足时,线程需要被唤醒执行。
协作一般是和分工相关的。Java SDK 并发包里的 Executor、Fork/Join、Future 本质上都是分工方法,但同时也能解决线程协作的问题。例如,用 Future 可以发起一个异步调用,当主线程通过 get() 方法取结果时,主线程就会等待,当异步执行的结果返回时,get() 方法就自动返回了。主线程和异步线程之间的协作,Future 工具类已经帮我们解决了。除此之外,Java SDK 里提供的 CountDownLatch、CyclicBarrier、Phaser、Exchanger 也都是用来解决线程协作问题的。
在 Java 并发编程领域,解决协作问题的核心技术是管程,上面提到的所有线程协作技术底层都是利用管程解决的。管程是一种解决并发问题的通用模型,除了能解决线程协作问题,还能解决下面我们将要介绍的互斥问题。可以这么说,管程是解决并发问题的万能钥匙。
3.互斥
分工、同步主要强调的是性能,但并发程序里还有一部分是关于正确性的,用专业术语叫“线程安全”。
并发程序里,当多个线程同时访问同一个共享变量的时候,结果是不确定的。不确定,则意味着可能正确,也可能错误,事先是不知道的。而导致不确定的主要源头是可见性问题、有序性问题和原子性问题,为了解决这三个问题,Java 语言引入了内存模型,内存模型提供了一系列的规则,利用这些规则,我们可以避免可见性问题、有序性问题,但是还不足以完全解决线程安全问题。解决线程安全问题的核心方案还是互斥。
所谓互斥,指的是同一时刻,只允许一个线程访问共享变量。
实现互斥的核心技术就是锁,Java 语言里 synchronized、SDK 里的各种 Lock 都能解决互斥问题。虽说锁解决了安全性问题,但同时也带来了性能问题,那如何保证安全性的同时又尽量提高性能呢?可以分场景优化,Java SDK 里提供的 ReadWriteLock、StampedLock 就可以优化读多写少场景下锁的性能。还可以使用无锁的数据结构,例如 Java SDK 里提供的原子类都是基于无锁技术实现的。
除此之外,还有一些其他的方案,原理是不共享变量或者变量只允许读。这方面,Java 提供了Thread Local 和 final关键字,还有一种 Copy-on-write的模式。
使用锁除了要注意性能问题外,还需要注意死锁问题。
总结:分工、同步、互斥。分工直接分配即可,没有要解决的问题。同步需要解决线程协作问题,一般是条件不满足时线程等待,条件满足时唤醒线程。互斥需要解决线程安全问题,即多个线程访问同一共享变量时结果是不确定的。管程用来解决以上同步和互斥遇到的问题。线程安全问题的源头是可见性问题、有序性问题、原子性问题,它们分别由 CPU /缓存、编译优化、操作系统(后续完善)引起。其中,可见性问题、有序性问题引入了Java内存模型来解决。原子性问题则引入锁来解决,可以用有锁或无锁方式。有锁时为了提供性能需要对锁进行各种优化,包括各种粒度的锁等。无锁方式有ThreadLocal、final,以及Copy-on-write。
以下是并发编程总体思维导图:
一、并发理论基础
1.可见性、原子性和有序性问题:并发编程Bug的源头
CPU、内存、I/O 设备的技术不断在快速迭代,但有一个核心矛盾一直存在,就是这三者的速度差异。为了合理利用 CPU 的高性能,平衡这三者的速度差异,计算机体系机构、操作系统、编译程序都做出了贡献,主要体现为:
- CPU 增加了缓存,以均衡CPU与内存的速度差异;
- 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;
- 编译程序优化指令执行次序,使得从而更加合理地利用缓存。
同时以上三个优化也导致了以下三个问题:
- 缓存导致了可见性问题。CPU自己的缓存和内存不同步。
- 线程切换导致了原子性问题。
- 编译优化导致了有序性问题。
1.源头一:缓存导致的可见性问题
下图一目了然,线程A和线程B,在自己CPU中的变量V对其他CPU是不可见的,只有在变量V同步到内存中后,才可以看到。这就是可见性问题。
2.源头二:线程切换带来的原子性问题
CPU只保证CPU执行指令的原子性,而高级语言的一条指令是有可能对应CPU的多条指令的。
早期的操作系统基于进程来调度 CPU,不同进程间是不共享内存空间的,所以进程要做任务切换就要切换内存映射地址,而一个进程创建的所有线程,都是共享一个内存空间的,所以线程做任务切换成本就很低了。现代的操作系统都基于更轻量的线程来调度,现在我们提到的“任务切换”都是指“线程切换”。
当同一个进程创建的线程,同时操作一个共享变量时,就会发现线程A对共享变量的操作还没完,共享变量就又被B操作了,就破坏了原子性。因为CPU只保证CPU一条指令的原子性,不保证高级语言一条指令的原子性。而高级语言的一条指令是有可能对应CPU的多条指令的。
举例1:线程 A 和线程 B 按照下图的序列执行,那么我们会发现两个线程都执行了 count+=1 的操作,但是得到的结果不是我们期望的 2,而是 1。
举例2:在 32 位的机器上对 long 型变量进行加减操作存在并发隐患。
long类型64位,所以在32位的机器上,对long类型的数据操作通常需要多条指令组合出来,无法保证原子性,所以并发的时候会出问题
3.源头三:编译优化带来的有序性问题
编译器为了优化性能,有时候会改变程序中语句的先后顺序,例如程序中:“a=6;b=7;”编译器优化后可能变成“b=7;a=6;”。
提问
1.是什么导致了并发编程的问题?最本质的原因是什么?
cpu与内存差异 -> 缓存 -> 可见性,cpu与I/O差异 -> 进程线程分时复用 -> 线程切换 -> 原子性,更合理利用缓存 -> 编译优化 -> 有序性。
2.Java内存模型:看Java如何解决可见性和有序性问题
可见性、原子性、有序性是共性问题,所有编程语言都会遇到。
在Java语言中,用Java内存模型解决了可见性问题和有序性问题。
Java 内存模型
既然导致可见性的原因是缓存,导致有序性的原因是编译优化,那解决可见性、有序性最直接的办法就是禁用缓存和编译优化,不过这样程序的性能就很低了。k
合理的方案应该是按需禁用缓存以及编译优化。那么,如何做到“按需禁用”呢?肯定是按照程序员的需求。
所以Java程序员提供了按需禁用缓存和编译优化的方法,即Java 内存模型。
Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的方法。具体来说,这些方法包括 volatile、synchronized 和 final 三个关键字,以及六项 Happens-Before 规则。这也是本节的重点内容。
使用 volatile 的困惑
volatile 关键字在 C 语言里就有,它最原始的意义就是禁用 CPU 缓存,它表达的是:告诉编译器,对这个变量的读写,不能使用 CPU 缓存,必须从内存中读取或者写入。
在Java中,表达的是类似的含义,不过不是禁用缓存,有差异。看如下代码:
以下代码,直觉上看,应该是 42,那实际应该是多少呢?这个要看 Java 的版本,如果在低于 1.5 版本上运行,x 可能是 42,也有可能是 0;如果在 1.5 以上的版本上运行,x 就是等于 42。因为1.5引入了一个Happens-Before 规则对这个进行了优化。
// 以下代码来源于【参考 1】
class VolatileExample {
int x = 0;
volatile boolean v = false;
public void writer() {
x = 42;
v = true;
}
public void reader() {
if (v == true) {
// 这里 x 会是多少呢?
}
}
}
Happens-Before 规则
Happens-Before 并不是说前面一个操作发生在后续操作的前面,它真正要表达的是:前面一个操作的结果对后续操作是可见的。
比较正式的说法是:Happens-Before 约束了编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守 Happens-Before 规则。
以下是和程序员相关的六条规则:
- 顺序性:同一个线程中,前面的结果对后面可见。在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。
- 传递性:不同线程间,Happens-Before具有传递性。如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C。这一条很重要,意味着如果线程 B 读到了“v=true”,那么线程 A 设置的“x=42”对线程 B 是可见的。这就是 1.5 版本对 volatile 语义的增强,这个增强意义重大,1.5 版本的并发工具包(java.util.concurrent)就是靠 volatile 语义来搞定可见性的。
- volatile 变量规则:不同线程间,volatile 的写对读可见。对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。
- 管程中锁的规则:不同线程间,同一个锁解锁操作对后续的加锁可见。即对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。管程是一种通用的同步原语,在 Java 中指的就是 synchronized,synchronized 是 Java 里对管程的实现。管程中的锁在 Java 里是隐式实现的,加锁以及释放锁都是编译器帮我们实现的。
- 线程 start() 规则:这条是关于线程启动的。执行start()之前的共享变量结果对被start()的线程可见。它是指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。
- 线程 join() 规则:这条是关于线程等待的。join()线程的结果对主线程的后续操作可见。它是指主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程 B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作。
final关键字
volatile 为的是禁用缓存以及编译优化。有没有办法告诉编译器优化得更好一点呢?这个可以有,就是final 关键字。final 修饰变量时,初衷是告诉编译器:这个变量生而不变,可以可劲儿优化。
总结
Happens-Before 规则最初是在一篇叫做Time, Clocks, and the Ordering of Events in a Distributed System的论文中提出来的,在这篇论文中,Happens-Before 的语义是一种因果关系。在现实世界里,如果 A 事件是导致 B 事件的起因,那么 A 事件一定是先于(Happens-Before)B 事件发生的,这个就是 Happens-Before 语义的现实理解。
在 Java 语言里面,Happens-Before 的语义本质上是一种可见性,A Happens-Before B 意味着 A 事件对 B 事件来说是可见的,无论 A 事件和 B 事件是否发生在同一个线程里。例如 A 事件发生在线程 1 上,B 事件发生在线程 2 上,Happens-Before 规则保证线程 2 上也能看到 A 事件的发生。
Java 内存模型主要分为两部分,一部分面向编写并发程序的应用开发人员,另一部分是面向 JVM 的实现人员的。面向JVM的规则还有两个:
- 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。
- 对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。
提问
Java内存模型是什么?用来解决什么问题?按需禁用缓存及编译优化,解决可见性、有序性。
Happens-Before 规则?6+2个。强调的是可见性。重点理解传递性。
3.互斥锁(上):解决原子性问题
本节思路:如何用一把锁保护一个资源。
如何解决原子性问题
原子性问题的源头是线程切换,而操作系统做线程切换是依赖 CPU 中断的,所以禁止 CPU 发生中断就能够禁止线程切换。
也就是说,可以通过禁止CPU中断来避免原子性问题。但这只适合早起单核CPU的情况。
“同一时刻只有一个线程执行”这个条件非常重要,我们称之为互斥。如果我们能够保证对共享变量的修改是互斥的,那么,无论是单核 CPU 还是多核 CPU,就都能保证原子性了。
锁模型
我们想到了用锁来保证互斥:
注意上图中,我们提前创建了保护资源R的锁,并且这个锁和受保护资源间是有关联的。这个关联关系非常重要。很多并发 Bug 的出现都是因为把它忽略了,然后就出现了类似锁自家门来保护他家资产的事情。
Java 语言提供的锁技术:synchronized
典型用法如下:
class X {
// 修饰非静态方法
synchronized void foo() {
// 临界区
}
// 修饰静态方法
synchronized static void bar() {
// 临界区
}
// 修饰代码块
Object obj = new Object();
void baz() {
synchronized(obj) {
// 临界区
}
}
}
我们注意到代码和上述的锁模型有所不同。因为Java做了一点小改动:
- 无需手动加锁解锁:Java 编译器会在 synchronized 修饰的方法或代码块前后自动加上加锁 lock() 和解锁 unlock()。这样就保证他们是成对出现的。
- 当修饰静态方法的时候,默认锁定当前类的 Class 对象,在上面的例子中就是 Class X;
- 当修饰非静态方法的时候,默认锁定当前实例对象 this。
使用举例:
“管程中锁的规则:对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。”保证了不同线程就行addOne()不会有问题。
class SafeCalc {
long value = 0L;
synchronized long get() {
return value;
}
synchronized void addOne() {
value += 1;
}
}
以上代码用我们之前的模型表示:
锁和受保护资源的关系
我们前面提到,受保护资源和锁之间的关联关系非常重要,他们的关系是怎样的呢?一个合理的关系是:受保护资源和锁之间的关联关系是 N:1 的关系。可以一把锁保护多个资源,但不能多把锁保护一个资源。这里把锁看作是访问资源的途径会更好理解一点。
现实世界里,我们可以用多把锁来保护同一个资源,但在并发领域不行,会有并发问题。不过我们可以反过来,用一把锁保护多个资源。以下是举例,我们稍稍改了一下上面的代码,把addOne()
方法改为静态锁,此时value变量同时被静态对象和对象两个锁保护,就会触发并发的数据错误,访问不唯一了,两个线程可以分别用两把锁分别访问共享变量,互斥失效:
class SafeCalc {
static long value = 0L;
synchronized long get() {
return value;
}
synchronized static void addOne() {
value += 1;
}
}
情况相当于下图,显然互斥失效:
总结
临界区的代码是操作受保护资源的路径,类似于球场的入口,入口一定要检票,也就是要加锁,但不是随便一把锁都能有效。所以必须深入分析锁定的对象和受保护资源的关系,综合考虑受保护资源的访问路径,多方面考量才能用好互斥锁
synchronized 是 Java 在语言层面提供的互斥原语,其实 Java 里面还有很多其他类型的锁,但作为互斥锁,原理都是相通的:锁,一定有一个要锁定的对象,至于这个锁定的对象要保护的资源以及在哪里加锁 / 解锁,就属于设计层面的事情了。
提问
如何解决原子性问题?互斥锁。互斥即独占共享资源。
synchronized的本质是什么?管程。
一把锁可以保护多个资源吗?一个资源可以用多把锁保护吗?可。不可。锁这里理解为访问路径更直观。
4.互斥锁(下):如何用一把锁保护多个资源?
本节思路:如何用一把锁保护多个资源。
上一节中我们已经知道受保护资源和锁之间合理的关联关系应该是 N:1 的关系,也重点强调了“不能用多把锁来保护一个资源”。本次我们来看下如何保护多个资源。
当我们要保护多个资源时,首先要区分这些资源是否存在关联关系。
保护没有关联关系的多个资源
这种情况,不同的资源用不同的锁保护,各自管各自的,很简单。
以下是分别保护账户余额和保护密码,用了两把锁:
class Account {
// 锁:保护账户余额
private final Object balLock
= new Object();
// 账户余额
private Integer balance;
// 锁:保护账户密码
private final Object pwLock
= new Object();
// 账户密码
private String password;
// 取款
void withdraw(Integer amt) {
synchronized(balLock) {
if (this.balance > amt){
this.balance -= amt;
}
}
}
// 查看余额
Integer getBalance() {
synchronized(balLock) {
return balance;
}
}
// 更改密码
void updatePassword(String pw){
synchronized(pwLock) {
this.password = pw;
}
}
// 查看密码
String getPassword() {
synchronized(pwLock) {
return password;
}
}
}
当然,我们也可以用一把互斥锁来保护多个资源,例如我们可以用 this 这一把锁来管理账户类里所有的资源:账户余额和用户密码。具体实现很简单,示例程序中所有的方法都增加同步关键字 synchronized 就可以了。
但是用一把锁性能太差了。我们用两把锁,取款和修改密码是可以并行的。用不同的锁对受保护资源进行精细化管理,能够提升性能。这种锁还有个名字,叫细粒度锁。
保护有关联关系的多个资源
这个要具体分析。
例如银行业务里面的转账操作,账户 A 减少 100 元,账户 B 增加 100 元。这两个账户就是有关联关系的。代码描述如下:
class Account {
private int balance;
// 转账
void transfer(
Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
像下面这样,直接加synchronized
用this一把锁来保护,肯定是错的。因为锁没覆盖到所有临界区。临界区内有别人的账户,用自己的this是锁不住别人家的账户的,相当于用自家的锁来保护别人家的资产,别人的账户随时有可能被更改:
class Account {
private int balance;
// 转账
synchronized void transfer(
Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;// 注意这里,这个账户是从形参传进来的。这里不能访问,但在别处随时能访问。锁没覆盖到所有临界区。
}
}
}
this 这把锁可以保护自己的余额 this.balance,却保护不了别人的余额 target.balance:
使用锁的正确姿势
那么以上问题怎么解决呢?
很简单,只要我们的锁能覆盖所有受保护资源就可以了。所以我们直接用了类级别锁,它的class对象是所有线程共享的:
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
synchronized(Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
上面的锁太粗了,下一节会讲优化。
总结
如何保护多个资源,关键是要分析多个资源之间的关系。如果资源之间没有关系,每个资源一把锁就可以了。如果资源之间有关联关系,就要选择一个粒度更大的锁,这个锁应该能够覆盖所有相关的资源。除此之外,还要梳理出有哪些访问路径,所有的访问路径都要设置合适的锁。
“原子性”的本质是什么?其实不是不可分割,不可分割只是外在表现,其本质是多个资源间有一致性的要求,操作的中间状态对外不可见。所以解决原子性问题,是要保证中间状态对外不可见。
提问
锁如何保护多个资源?资源间无关联时?有关联时?无关联用细粒度锁。有关联用覆盖所有资源的大锁或细粒度锁,使用时注意死锁。
什么是细粒度锁?就是用多个锁细化管理的锁。
原子性的本质是什么?中间状态对外不可见。
5.一不小心就死锁了,怎么办?
细粒度锁
如何解决上一小节中,用类对象锁,效率过低的问题?
我们可以用细粒度锁,即用两把锁,分别锁住转入账户和转出账户。只有两把锁都拿到的时候,才执行操作。
代码如下:
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
// 锁定转出账户
synchronized(this) {
// 锁定转入账户
synchronized(target) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
}
但是此时会有死锁问题:两个线程都各自拿了一把锁,然后等对方释放。
死锁
死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
上述死锁的情况,用资源分配图表示如下:
如何预防死锁
死锁发生时再去解决,我们手段很少,基本只能重启。所以我们的思路改为如何预防死锁:
Coffman总结了发生死锁的四个条件:
- 互斥,共享资源 X 和 Y 只能被一个线程占用;规则:所有的盘子只能一个人用。
- 占有且等待,即占有且等待。线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;规则:拿了一个盘子就一定要等着,直到拿到所有的剩下盘子。即拿了就不放。
- 不可抢占,其他线程不能强行抢占线程 T1 占有的资源;规则:别人拿了就不能抢。
- 循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。情景:相互在等待对方的盘子。
也就是说只要我们破坏其中一个,就可以成功避免死锁的发生。简单来说,就是资源是互斥的,拿到一部分资源后不能放弃,也不能去抢,还相互等着。
我们一一来分析:
互斥条件肯定没法破坏,因为我们用锁就是为了互斥。
- 对于“占有且等待”这个条件,我们可以一次性申请所有的资源,这样就不存在等待了。即一次拿完,那占有后就无需等待。
- 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
- 对于“循环等待”这个条件,可以靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化后自然就不存在循环了。不给循环等待的机会。
以下是示例代码,场景是转账,必须同时获得转入账户和转出账户的锁。
1.破坏占用且等待条件
即一次性申请所有资源。我们需要一个角色(Java 里面的类)来管理临界区,我们就把这个角色定为 Allocator。它有两个重要功能,分别是:同时申请资源 apply() 和同时释放资源 free()。账户 Account 类里面持有一个 Allocator 的单例(必须是单例,只能由一个人来分配资源)。当账户 Account 在执行转账操作的时候,首先向 Allocator 同时申请转出账户和转入账户这两个资源,成功后再锁定这两个资源;当转账操作执行完,释放锁之后,我们需通知 Allocator 同时释放转出账户和转入账户这两个资源。具体的代码实现如下。
class Allocator {
private List<Object> als =
new ArrayList<>();
// 一次性申请所有资源
synchronized boolean apply(
Object from, Object to){
if(als.contains(from) ||
als.contains(to)){
return false;
} else {
als.add(from);
als.add(to);
}
return true;
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
}
}
class Account {
// actr 应该为单例
private Allocator actr;
private int balance;
// 转账
void transfer(Account target, int amt){
// 一次性申请转出账户和转入账户,直到成功
while(!actr.apply(this, target))
;
try{
// 锁定转出账户
synchronized(this){
// 锁定转入账户
synchronized(target){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
} finally {
actr.free(this, target)
}
}
}
2.破坏不可抢占条件
破坏不可抢占条件看上去很简单,核心是要能够主动释放它占有的资源,这一点 synchronized 是做不到的。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。
Java 在语言层次没有解决这个问题,不过在 SDK 层面还是解决了的,java.util.concurrent 这个包下面提供的 Lock 可以解决这个问题,这个之后讲。
3.破坏循环等待条件
破坏这个条件,需要对多个资源进行排序,然后按序申请资源。
我们假设每个账户都有不同的属性 id,这个 id 可以作为排序字段,申请的时候,我们可以按照从小到大的顺序来申请即可。
class Account {
private int id;
private int balance;
// 转账
void transfer(Account target, int amt){
Account left = this // ① 将资源排序,按序申请
Account right = target; // ②
if (this.id > target.id) {// ③
left = target; // ④
right = this; // ⑤
} // ⑥
// 锁定序号小的账户
synchronized(left){
// 锁定序号大的账户
synchronized(right){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}
}
总结
我们这一节主要讲了用细粒度锁来锁定多个资源时,要注意死锁的问题。这个就需要你能把它强化为一个思维定势,遇到这种场景,马上想到可能存在死锁问题。当你知道风险之后,才有机会谈如何预防和避免,因此,识别出风险很重要。
预防死锁主要是破坏三个条件中的一个,有了这个思路后,实现就简单了。注意到上面破坏占用且等待条件时我们用了死循环 while(!actr.apply(this, target));
方法,这里性能占用还是挺高的。我们后一节会尝试使用“等待-通知”的线程间通讯来解决,后续将。
我们在选择具体方案的时候,还需要评估一下操作成本,从中选择一个成本最低的方案。
提问
死锁由什么引起?死锁同时满足的四个条件?分别如何预防?
使用细粒度锁时,一个操作要一次同时获取多个锁时才能操作。多个线程同时获取这些锁时就引起了竞争。竞争就导致了死锁。解决死锁太麻烦,预防是最好的。
6.用“等待-通知”机制优化循环等待
用来避免用死循环的方式等待,提高程序性能。
上例中,如果线程要求的条件(转出账本和转入账本同在文件架上)不满足,则线程阻塞自己,进入等待状态;当线程要求的条件(转出账本和转入账本同在文件架上)满足后,通知等待的线程重新执行。其中,使用线程阻塞的方式就能避免循环等待消耗 CPU 的问题。
拿就医流程对比,我们可以得到一个完整的等待 - 通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁。
用 synchronized 实现等待 - 通知机制
在 Java 语言里,等待 - 通知机制可以有多种实现方式,比如 Java 语言内置的 synchronized 配合 wait()、notify()、notifyAll() 这三个方法就能轻松实现。
在下面这个图里,左边有一个等待队列,同一时刻,只允许一个线程进入 synchronized 保护的临界区(这个临界区可以看作大夫的诊室),当有一个线程进入临界区后,其他线程就只能进入图中左边的等待队列里等待(相当于患者分诊等待)。这个等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列。wait()方法工作原理图:
当调用 wait() 方法后,当前线程就会被阻塞,并且进入到右边的等待队列中,这个等待队列也是互斥锁的等待队列。 线程在进入等待队列的同时,会释放持有的互斥锁。
当线程要求的条件满足时,调用 notify() 和 notifyAll() 方法。会通知等待队列(互斥锁的等待队列)中的线程,告诉它条件曾经满足过。因为notify() 只能保证在通知时间点,条件是满足的。这一点你要格外注意。
此外,被通知的线程要想重新执行,仍然需要获取到互斥锁。
我们一直强调 wait()、notify()、notifyAll() 方法操作的等待队列是互斥锁的等待队列,如果 synchronized 锁定的是 target,那么对应的一定是 target.wait()、target.notify()、target.notifyAll() 。而且 wait()、notify()、notifyAll() 这三个方法能够被调用的前提是已经获取了相应的互斥锁,所以我们会发现 wait()、notify()、notifyAll() 都是在 synchronized{}内部被调用的。如果在 synchronized{}外部调用,或者锁定的 this,而用 target.wait() 调用的话,JVM 会抛出一个运行时异常:java.lang.IllegalMonitorStateException
。
使用代码范式
我们使用等待-通知机制的范式要特别注意,建议用以下经典的范式,因为线程被唤醒时,只是表示条件曾经满足过,所以要再次检验条件满不满足:
while(条件不满足) {
wait();
}
上一节中,我们的转账时,我们委托单例,一次性拿到两个账本的锁的代码示例如下:
class Allocator {
private List<Object> als;
// 一次性申请所有资源
synchronized void apply(
Object from, Object to){
// 经典写法
while(als.contains(from) ||
als.contains(to)){
try{
wait();
}catch(Exception e){
}
}
als.add(from);
als.add(to);
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
notifyAll();
}
}
尽量使用notifyAll()
notify() 是会随机地通知等待队列中的一个线程,而 notifyAll() 会通知等待队列中的所有线程。使用 notify() 的风险在于可能导致某些线程永远不会被通知到。
例如,线程1申请到A锁,线程2申请到B锁,线程3等待A锁释放,线程4等待B锁释放。如果线程1用完并释放A资源,选择通知线程4,那么此时线程4继续等待。线程3没有机会再被唤醒,会一直等待下去,因为已经没有机会再被通知了。
提问
为什么需要等待-通知机制?避免等待互斥资源时,cpu循环消耗。
等待-通知机制原理及流程?
wait()和sleep()区别?
wait()实现的典型范式?为什么要这样做?
notify()和notifyAll()有什么区别?推荐用哪个?为什么?
7.安全性、活跃性以及性能问题
这一小节是对前面6小节的一个整理归纳。
并发编程中我们需要注意的问题有很多,很庆幸前人已经帮我们总结过了,主要有三个方面,分别是:安全性问题、活跃性问题和性能问题。
安全性问题
那什么是线程安全呢?其实本质上就是正确性,而正确性的含义就是程序按照我们期望的执行,
那如何保证线程安全呢?避免出现原子性问题、可见性问题和有序性问题即可。
那是不是所有的代码都要分析这三个问题呢?当然不是,其实只有一种情况需要:存在共享数据并且该数据会发生变化,通俗地讲就是有多个线程会同时读写同一数据。
所以基于上诉原因,做到不共享数据或者数据状态不发生变化也可以保证线程安全。例如线程本地存储(Thread Local Storage,TLS)、不变模式等等。
但是,现实生活中,必须共享会发生变化的数据。当多个线程同时访问同一数据,并且至少有一个线程会写这个数据时,我们不采取措施,就会发生并发Bug。也叫发生了数据竞争。例如下例,多个线程同时调用add10K():
public class Test {
private long count = 0;
void add10K() {
int idx = 0;
while(idx++ < 10000) {
count += 1;
}
}
}
是否加锁就能避免并发问题呢?并不是。以下get()和set()加了锁,但是add10K还是有问题:
public class Test {
private long count = 0;
synchronized long get(){
return count;
}
synchronized void set(long v){
count = v;
}
void add10K() {
int idx = 0;
while(idx++ < 10000) {
set(get()+1)
}
}
}
以上,假设 count=0,当两个线程同时执行 get() 方法时,get() 方法会返回相同的值 0,两个线程执行 get()+1 操作,结果都是 1,之后两个线程再将结果 1 写入了内存。你本来期望的是 2,而结果却是 1。
这种问题,有个官方的称呼,叫竞态条件(Race Condition)。所谓竞态条件,指的是程序的执行结果依赖线程执行的顺序。而线程的执行顺序是不确定的,如果程序存在竞态条件问题,那就意味着程序执行的结果是不确定的,而执行结果不确定这可是个大 Bug。例如上面的例子,如果两个线程完全同时执行,那么结果是 1;如果两个线程是前后执行,那么结果就是 2。
可以按照下面这样来理解竞态条件。在并发场景中,程序的执行依赖于某个状态变量,也就是类似于下面这样:
if (状态变量 满足 执行条件) {
执行操作
}
在A线程执行操作时,B线程改变了他的前置状态变量条件,导致该执行操作错误,因为前提条件被修改了。很多时候这个前置条件时隐式的,例如前面 addOne 的例子中,set(get()+1) 这个复合操作,其实就隐式依赖 get() 的结果。
面对数据竞争和竞态条件问题,如何保证线程安全?我们可以用互斥。互斥的方案有很多,CPU 提供了相关的互斥指令,操作系统、编程语言也会提供相关的 API。从逻辑上来看,我们可以统一归为:锁。
活跃性问题
所谓活跃性问题,指的是某个操作无法执行下去。包括“死锁”、“活锁”和“饥饿”
- 死锁:线程相互等待,表现形式是线程永久“阻塞”。解决方式:破坏4个死锁条件中的三个中的一个。
- 活锁:线程没阻塞,但是还是执行不下去。即两者同时互相谦让。解决方式:等待随机时间。Raft 这样知名的分布式一致性算法中也用到了它
- 饥饿:指线程因无法访问所需资源而无法执行下去的情况。例如有些线程优先级过小,一直拿不到资源。解决方式:一是保证资源充足,二是公平地分配资源,三就是避免持有锁的线程长时间执行。第二种使用相对较多。一和三实际中都难以避免。那如何公平地分配资源呢?在并发编程里,主要是使用公平锁。所谓公平锁,是一种先来后到的方案,线程的等待是有顺序的,排在等待队列前面的线程会优先获得资源。
性能问题
“锁”的过度使用可能导致串行化的范围过大,这样性能反而低了,违反了我们用多线程提升性能的初衷。
1.串行对性能的影响:
有个阿姆达尔(Amdahl)定律,代表了处理器并行运算之后效率提升的能力,它正好可以解决这个问题,具体公式如下:
公式里的 n 可以理解为 CPU 的核数,p 可以理解为并行百分比,那(1-p)就是串行百分比了,也就是我们假设的 5%。我们再假设 CPU 的核数(也就是 n)无穷大,那加速比 S 的极限就是 20。也就是说,如果我们的串行率是 5%,那么我们无论采用什么技术,最高也就只能提高 20 倍的性能。
2.性能问题如何解决
所以使用锁的时候一定要关注对性能的影响。 那怎么才能避免锁带来的性能问题呢?这个问题很复杂,Java SDK 并发包里之所以有那么多东西,有很大一部分原因就是要提升在某个特定领域的性能。
不过从方案层面,我们可以这样来解决这个问题:
- 使用无锁的算法和数据结构:例如线程本地存储 (Thread Local Storage, TLS)、写入时复制 (Copy-on-write)、乐观锁等;Java 并发包里面的原子类也是一种无锁的数据结构;Disruptor 则是一个无锁的内存队列…
- 减少锁持有的时间:例如使用细粒度的锁,一个典型的例子就是 Java 并发包里的 ConcurrentHashMap,它使用了所谓分段锁的技术(这个技术后面我们会详细介绍);还可以使用读写锁,也就是读是无锁的,只有写的时候才会互斥。
3.性能的指标
指标有很多,有三个指标非常重要,就是:吞吐量、延迟和并发量。
- 吞吐量:指的是单位时间内能处理的请求数量。单位“个/秒”。吞吐量越高,说明性能越好。
- 延迟:指的是从发出请求到收到响应的时间。单位“秒”。延迟越小,说明性能越好。
- 并发量:指的是能同时处理的请求数量。单位:“个”。一般来说随着并发量的增加、延迟也会增加(因为排队的人多了?)。所以延迟这个指标,一般都会是基于并发量来说的。例如并发量是 1000 的时候,延迟是 50 毫秒。
总结
并发微观上涉及到原子性问题、可见性问题和有序性问题,宏观则表现为安全性、活跃性以及性能问题。
我们在设计并发程序的时候,主要是从宏观出发,也就是要重点关注它的安全性、活跃性以及性能。安全性方面要注意数据竞争和竞态条件,活跃性方面需要注意死锁、活锁、饥饿等问题,性能方面使用无锁或减少锁持有时间。具体问题具体分析。
安全性、活跃性以及性能问题:线程要安全、不卡死、性能。
提问
如何保证线程安全?避免出现原子性/可见性/有序性问题。非数据竞争(线程本地数据/不变模式),数据竞争->竞态条件->互斥->锁。
活跃性问题是啥?有哪些?如何解决?某个操作无法执行。死锁,活锁,饥饿…
如何计算串行对性能的影响?串行度为5%,性能最多提高所少?阿姆达尔(Amdahl)定律
如何提升性能?无锁、减少锁持有时间
性能指标有哪些?分别什么意思?吞吐量、并发量、延迟…
8.管程:并发编程的万能钥匙
管程是一把解决并发问题的万能钥匙。
Java 语言在 1.5 之前,提供的唯一的并发原语就是管程,而且 1.5 之后提供的 SDK 并发包,也是以管程技术为基础的。除此之外,C/C++、C# 等高级语言也都支持管程。
什么是管程
Java 采用了管程技术,因为管程模型和面向对象高度契合。synchronized
关键字及 wait()
、notify()
、notifyAll()
这三个方法都是管程的组成部分。而管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。但是管程更容易使用,所以 Java 选择了管程。
管程,对应的英文是 Monitor,很多 Java 领域的同学都喜欢将其翻译成“监视器”。
所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。翻译为 Java 领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的。
管程有三个模型:Hasen 模型、Hoare 模型、MESA 模型。
Java用的是MESA,我们以MESA为例来讲讲。
MESA 模型
如何解决线程间互斥问题?
管程解决互斥问题的思路很简单,就是将共享变量及其对共享变量的操作统一封装起来。
以下是对一个队列操作的例子,入队和出队保持互斥,只允许一个线程进入管程:
如何解决线程间同步问题?
类似就医流程,用了两个队列:
- 入口等待队列:当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。
- 条件变量等待队列:管程里引入了条件变量的概念,而且每个条件变量都对应有一个等待队列。线程进入管程后,条件不满足时进入等待队列等待。条件满足后接到notifyAll(),重新进入入口等待队列。(因为可能会被其他线程敲定,所以只是表示条件曾经满足过,要重新到入口等待队列排队)。
以下是对一个队列操作的举例:
有一个共享变量,就是队列。有两个条件变量,队列不为满或队列不为空,以此作为入队和出队的前提。
// 对于入队操作,如果队列已满,就需要等待直到队列不满,所以这里用了notFull.await();。
// 对于出队操作,如果队列为空,就需要等待直到队列不空,所以就用了notEmpty.await();。
// 如果入队成功,那么队列就不空了,就需要通知条件变量:队列不空notEmpty对应的等待队列。
// 如果出队成功,那就队列就不满了,就需要通知条件变量:队列不满notFull对应的等待队列。
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
// 入队后, 通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
// 出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}
synchronized
以及wait()
、notify()
、notifyAll()
就相当于是只有一个条件变量的MESA模型。
wait() 的编程范式
对于 MESA 管程来说,有一个编程范式,就是需要在一个 while 循环里面调用 wait()。这个是 MESA 管程特有的。
while(条件不满足) {
wait();
}
notify() 何时可以使用
前面说过,**尽量使用 notifyAll()**。如果要用notify(),需要具备以下条件:
- 所有等待线程拥有相同的等待条件;
- 所有等待线程被唤醒后,执行相同的操作;
- 只需要唤醒一个线程。
也就说,所有线程是等价的(等待条件、执行操作一样),且只需要唤醒一个线程。
三种管程的区别
最主要的区别就是如何唤醒的区别。假如线程B正在条件变量等待队列里面等待,线程A正在执行。此时线程B执行的条件满足了,线程A如何通知线程B,即如何给线程B notify()
? 此时线程A和线程B如何协作:
- Hasen 模型:线程A比较自私。线程A自己执行完再通知线程B。即
notify()
放代码最后一行。此时可以保证一个线程执行。 - Hoare 模型:线程A是老好人。线程A立即通知线程B,自己阻塞,线程B立刻执行。线程B执行完后唤醒线程A。此时可以保证一个线程执行。但比Hasen模型多一次阻塞唤醒。
- MESA 模型:线程A比较灵活。线程A通知线程B,然后线程A继续自己玩自己的。线程B从条件变量等待队列进入到入口等待队列,可以重新排队啦。此时可以保证一个线程执行。好处是编程灵活,不用一定要把notify()放最后一行。缺点是线程B重新执行时,条件就不一定满足了,所以要用前面的while()范式循环检测条件变量。另外MESA多了notifyAll()的便利,但是缺点时前两个模式线程都会被执行到,而MESA由于一直等待,不一定会立刻被执行到。所以多了带超时时间的wait()。
总结
并发编程里两大核心问题——互斥和同步,都可以由管程来帮你解决。学好管程,理论上所有的并发问题你都可以解决,并且很多并发工具类底层都是管程实现的。
Java 内置的管程方案(synchronized)参考了 MESA 模型,并进行了精简。只使用一个条件变量,并自动加锁解锁。而JUC包的锁提供了多个条件变量,且要自己加锁解锁。
Java管程示意:
提问
MESA管程模型是怎样的?分别如何解决互斥、同步问题?只允许一个线程进入管程。
三种管程模型有什么差别?唤醒时如何唤醒。Hasen(哈森)执行完再notify()、Hoare(霍尔)阻塞自己等唤醒、MESA(弥撒)直接踢入入口等待队列。
synchronized的和管程什么关系?内置的精简MESA管程。只有一个条件变量。且自动加锁解锁。
synchronized和reentrantLock有什么区别和联系?都是MESA管程,前者一个变量、自动加解锁。
9.Java线程(上):Java线程的生命周期
线程是操作系统里的一个概念,Java、C# 等开发语言对其进行了封装。Java 语言里的线程本质上就是操作系统的线程,它们是一一对应的。
在操作系统层面,线程也有“生老病死”。我们先了解一下通用的线程生命周期模型,这部分内容也适用于很多其他编程语言;然后再详细地学习 Java 中线程的生命周期。
通用的线程生命周期
通用的线程生命周期基本上可以用五态模型”来描述。分别是:初始状态、可运行状态、运行状态、休眠状态和终止状态。
以下分别进行解析:
- 初始状态。线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
- 可运行状态,线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。
- 运行状态。线程正在被CPU执行。有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,此时线程的状态就转换成了运行状态。
- 休眠状态:线程等待被唤醒。运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
- 终止状态:线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
这五种状态在不同编程语言里会有简化合并。例如,C 语言的 POSIX Threads 规范,就把初始状态和可运行状态合并了;Java 语言里则把可运行状态和运行状态合并了,这两个状态在操作系统调度层面有用,而 JVM 层面不关心这两个状态,因为 JVM 把线程调度交给操作系统处理了。
除了简化合并,这五种状态也有可能被细化,比如,Java 语言里就细化了休眠状态(这个下面我们会详细讲解)。
Java中线程的生命周期
Java 语言中线程共有六种状态,分别是:
- NEW:初始化
- RUNNABLE:可运行/运行
- BLOCKED:阻塞
- WAITING:无限等待
- TIMED_WAITING:有限等待
- TERMINATED:终止
Java把通用线程的运行/可运行合并为RUNNABLE(因为线程丢给OS去玩,JVM不管),把休眠拆解为BLOCKED、WAITING、TIMED_WAITING。在操作系统层面,Java 线程中的 BLOCKED、WAITING、TIMED_WAITING 是一种状态,即前面我们提到的休眠状态。也就是说只要 Java 线程处于这三种状态之一,那么这个线程就永远没有 CPU 的使用权。
对比上图通用生命周期,Java生命周期简化如下:
休眠被拆成了BLOCKED、WAITING、TIMED_WAITING,下面讨论这几个状态如何转换:
1.RUNNABLE 与 BLOCKED 的状态转换
只有一种场景会触发这种转换:线程等待 synchronized 的隐式锁。等待隐式锁时 RUNNABLE -> BLOCKED,获得隐式锁时 BLOCKED -> RUNNABLE。
线程调用阻塞式 API 时,Java 线程的状态会依然保持 RUNNABLE 状态。因为JVM 层面并不关心操作系统调度相关的状态。在操作系统层面,线程是会转换到休眠状态的。
在 JVM 看来,等待 CPU 使用权(操作系统层面此时处于可执行状态)与等待 I/O(操作系统层面此时处于休眠状态)没有区别,都是在等待某个资源,所以都归入了 RUNNABLE 状态。
2.RUNNABLE 与 WAITING 的状态转换
有三种场景会触发这种转换:
- 调用无参的 lockObject.wait() 方法(获得 synchronized 隐式锁的线程)。
- 调用无参的 thread.join() 方法。
- 调用无参的 LockSupport.park() 方法。
Java 并发包中的锁,都是基于LockSupport 实现的。调用 LockSupport.park() 方法,当前线程会阻塞,线程的状态会从 RUNNABLE 转换到 WAITING。调用 LockSupport.unpark(Thread thread) 可唤醒目标线程,目标线程的状态又会从 WAITING 状态转换到 RUNNABLE。
3.RUNNABLE 与 TIMED_WAITING 的状态转换
有五种场景会触发这种转换:
- 调用带超时参数的 object.wait(long timeout) 方法(获得 synchronized 隐式锁的线程);
- 调用带超时参数的 thread.join(long millis) 方法;
- 调用带超时参数的 Thread.sleep(long millis) 方法;
- 调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
- 调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。
TIMED_WAITING 和 WAITING 状态的区别,仅仅是触发条件多了超时参数。
4.从 NEW 到 RUNNABLE 状态
调用线程对象的 threadObj.start() 方法即可。
扩展:
创建线程对象的方法:
1.方法一:继承 Thread 对象,重写 run() 方法
// 自定义线程对象
class MyThread extends Thread {
public void run() {
// 线程需要执行的代码
......
}
}
// 创建线程对象
MyThread myThread = new MyThread();
2.方法二:实现 Runnable 接口,重写 run() 方法,并将该实现类作为创建 Thread 对象的参数。
// 实现 Runnable 接口
class Runner implements Runnable {
@Override
public void run() {
// 线程需要执行的代码
......
}
}
// 创建线程对象
Thread thread = new Thread(new Runner());
以上两种方法其实是一种方法。看thread::run()的源码:
public class Thread implements Runnable {
@Override
public void run() {
if (target != null) {
target.run();
}
}
}
从 NEW 状态转换到 RUNNABLE 状态很简单,只要调用线程对象的 start() 方法即可:
MyThread myThread = new MyThread();
// 从 NEW 状态转换到 RUNNABLE 状态
myThread.start();
5.从 RUNNABLE 到 TERMINATED 状态
以下两种情况,会导致线程终止:
- run() 方法执行完或抛异常导致线程终止。
- 用thread.interrupt()方法停止线程。另外有一个
thread.stop()
方法可以直接停止线程,但是已经被标记为@Deprecated
。
interrupt()方法
thread::stop()
和thread::interrupt()
的区别:
- stop()方法直接把线程干掉。如果用的是ReentrantLock,根本没有unlock的机会。
- interrupt() 方法仅仅是通知线程停止,线程可以选择执行,也可以选择无视。
调用thread::interrupt()
可以通知线程终止。线程怎么收到interrupt()
通知的呢?一种是异常,另一种是主动检测。
线程在等待时,被打断,切换到RUNNABLE并抛出异常:当线程 A 处于 WAITING、TIMED_WAITING 状态时,如果其他线程调用线程 A 的 interrupt() 方法,会使线程 A 返回到 RUNNABLE 状态,同时线程 A 的代码会触发 InterruptedException 异常。上面我们提到转换到 WAITING、TIMED_WAITING 状态的触发条件,都是调用了类似 wait()、join()、sleep() 这样的方法,我们看这些方法的签名,发现都会 throws InterruptedException 这个异常。这个异常的触发条件就是:其他线程调用了该线程的 interrupt() 方法。
线程在运行且被阻塞时,被打断,抛出异常或直接返回。当线程 A 处于 RUNNABLE 状态时,并且阻塞在 java.nio.channels.InterruptibleChannel 上时,如果其他线程调用线程 A 的 interrupt() 方法,线程 A 会触发 java.nio.channels.ClosedByInterruptException 这个异常;而阻塞在 java.nio.channels.Selector 上时,如果其他线程调用线程 A 的 interrupt() 方法,线程 A 的 java.nio.channels.Selector 会立即返回。
线程在运行时,被打断,需要调用 isInterrupted() 方法主动检测它自己的打断状态。上面这两种情况属于被中断的线程通过异常的方式获得了通知。还有一种是主动检测,如果线程处于 RUNNABLE 状态,并且没有阻塞在某个 I/O 操作上,例如中断计算圆周率的线程 A,这时就得依赖线程 A 主动检测中断状态了。如果其他线程调用线程 A 的 interrupt() 方法,那么线程 A 可以通过 isInterrupted() 方法,检测是不是自己被中断了。isInterrupted()是一个Native方法。
注意:在触发 InterruptedException 异常的同时,JVM 会同时把线程的中断标志位清除,所以这个时候th.isInterrupted()
返回的是 false。
总结
多线程程序很难调试,出了 Bug 基本上都是靠日志,靠线程 dump 来跟踪问题,分析线程 dump 的一个基本功就是分析线程状态,大部分的死锁、饥饿、活锁问题都需要跟踪分析线程的状态。
可以通过 jstack
命令或者Java VisualVM
这个可视化工具将 JVM 所有的线程栈信息导出来,完整的线程栈信息不仅包括线程的当前状态、调用栈,还包括了锁的信息。
提问
通用的线程生命周期有哪些?怎么转换?
Java线程生命周期有哪些?和通用的生命周期有什么区别?调用阻塞API时,如IO,JVM和OS层面的线程状态怎么转换?
Java线程生命周期怎么转换?
stop()和interrupt()有什么区别?
被 interrupt 的线程,怎么收到通知?休眠、运行且阻塞Api、运行
如何通过日志及工具诊断多线程运行时Bug?
10.Java线程(中):创建多少线程合适?
要解决这个问题,首先要分析以下两个问题:
- 为什么要使用多线程?
- 多线程的应用场景有哪些?
为什么要使用多线程?
用多线程是为了提高性能。
性能有两个核心指标:一个是时间维度的延迟,另外一个是空间维度的吞吐量。延迟指的是发出请求到收到响应这个过程的时间;延迟越短,意味着程序执行得越快,性能也就越好。 吞吐量指的是在单位时间内能处理请求的数量;吞吐量越大,意味着程序能处理的请求越多,性能也就越好。这两个指标内部有一定的联系(同等条件下,延迟越短,吞吐量越大)。
我们所谓提升性能,从度量的角度,主要是降低延迟,提高吞吐量。这也是我们使用多线程的主要目的。
我们如何降低延迟,提高吞吐量?我们从多线程的应用场景说起。
多线程的应用场景
要想“降低延迟,提高吞吐量”,基本上有两个方向,软件方向是优化算法,硬件方向是将硬件的性能发挥到极致。前者属于算法范畴,后者和并发编程息息相关。那计算机主要有哪些硬件呢?主要是两类:一个是 I/O,一个是 CPU。简言之,在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率。
操作系统层面已经解决了单一的硬件设备的利用率问题,而我们的并发程序,往往需要 CPU 和 I/O 设备相互配合工作,也就是说,我们需要解决 CPU 和 I/O 设备综合利用率的问题。综合利用率的问题,操作系统虽然没有办法完美解决,但是却给我们提供了方案,那就是:多线程。
以下是多线程提升综合利用率的示例,多线程将综合利用率从50%提升至100%:
以上,单位时间处理的请求数量翻了一番,也就是说吞吐量提高了 1 倍。此时可以逆向思维一下,如果 CPU 和 I/O 设备的利用率都很低,那么可以尝试通过增加线程来提高吞吐量。
另外,单核时代,如果是计算密集型,多线程反而会增加线程切换,降低利用率。而多核时代,多线程可以提高cpu核的利用率。
创建多少线程合适?
对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,用于当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
对于 I/O 密集型的计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:最佳线程数 =1 +(I/O 耗时 / CPU 耗时)。针对多核,**最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]**。具体工程上,我们要估算这个比例,然后做各种不同场景下的压测来验证我们的估计。压测时,我们需要重点关注 CPU、I/O 设备的利用率和性能指标(响应时间、吞吐量)之间的关系。
目的是为了按比例填平。如下图:
总结
设置线程数,把我一个原则,就是将硬件的性能发挥到极致。工程上,我们先进行估算,然后压测时重点关注 CPU、I/O 设备的利用率和性能指标(响应时间、吞吐量)之间的关系即可。IO/CPU 的耗时测试可以用apm(Application Performance Management)工具,如skywalking、zipkin等。
提问
为什么要使用多线程?从优化算法、硬件优化(CPU/IO)、硬件单一利用率、硬件综合利用率回答。
单核主机用多线程有意义吗?有意义,IO密集型。
为什么Redis单线程却很快?Redis把读写操作都放在了CPU和内存,几乎没有I/O。又减少了多线程上下文切换的过程,因此Redis即便是单线程也很快。
线程设置多少合理?计算密集、I/O密集,按比例,单核/多核。
11.Java线程(下):为什么局部变量是线程安全的
多个线程同时访问共享变量的时候,会导致并发问题。
局部变量是线程安全的。因为每个线程都有自己的调用栈,局部变量保存在线程各自的调用栈里面。要弄懂这个,我们需要弄清楚方法的执行。在CPU层面,没有方法的概念,所有的东西都是转换成一条条指令执行。
方法是如何被执行的
方法间的调用过程:
如执行以下代码:
int a = 7;
int[] b = fibonacci(a);
int[] c = b;
局部变量放在哪里
每个方法都有一个栈帧,方法的入参、局部变量、返回地址被存到栈帧里。栈帧存到CPU的堆栈寄存器里,也叫调用栈。当调用方法时,会创建新的栈帧,并压入调用栈;当方法返回时,栈帧弹出。栈帧和方法同生共死。
我们也可以联想到,Java为什么吧 new 出来的对象是在堆里,局部变量放到栈里。因为局部变量是和方法同生共死的,一个变量如果想跨越方法的边界,就必须创建在堆里。
调用栈与线程
每个线程都有自己独立的调用栈。
如图所示,线程 A、B、C 每个线程都有自己独立的调用栈。
线程封闭
方法里的局部变量,因为不会和其他线程共享,所以没有并发问题。这个解决并发问题的思路就是线程封闭,比较官方的解释是:仅在单线程内访问数据。
采用线程封闭技术的案例非常多,例如从数据库连接池里获取的连接 Connection,在 JDBC 规范里并没有要求这个 Connection 必须是线程安全的。数据库连接池通过线程封闭技术,保证一个 Connection 一旦被一个线程获取之后,在这个线程关闭 Connection 之前的这段时间里,不会再分配给其他线程,从而保证了 Connection 不会有并发问题。
ThreadLocal原理?待补充
提问
局部变量是线程安全的吗?为什么?线程->调用栈->栈帧。
线程和调用栈的对应关系?一个调用栈对应几个线程?
线程封闭是啥?举个例子?
12.如何用面向对象思想写好并发程序
如何才能用面向对象思想写好并发程序呢?可以从封装共享变量、识别共享变量间的约束条件和制定并发访问策略这三个方面下手。
一、封装共享变量
关键是要严格把控共享变量的访问路径。结合面向对象的特点,我们把共享变量作为对象的属性,那对于共享变量的访问路径就是对象的公共方法。利用面向对象思想写并发程序的思路,就是:将共享变量作为对象属性封装在内部,对所有公共方法制定并发访问策略。
如下,线程安全的计数器举例:
public class Counter {
private long value;
synchronized long get(){
return value;
}
synchronized long addOne(){
return ++value;
}
}
此外,对于不会发生变化的共享变量,可以用 final 关键字来修饰。这也可以向其他的同事表明意图,这些变量已经考虑了线程安全问题。
二、识别共享变量间的约束条件
共享变量间约束条件,决定了并发访问策略。例如,库存管理,库存量不能太高,也不能太低。如下:
public class SafeWM {
// 库存上限
private final AtomicLong upper =
new AtomicLong(0);
// 库存下限
private final AtomicLong lower =
new AtomicLong(0);
// 设置库存上限,原子类是线程安全的,所以这里不用同步
void setUpper(long v){
upper.set(v);
}
// 设置库存下限,原子类是线程安全的,所以这里不用同步
void setLower(long v){
lower.set(v);
}
// 省略其他业务代码
}
以上忽略了共享变量之间的约束条件,即库存下限要小于库存上限。加上参数校验能解决问题吗?
public class SafeWM {
// 库存上限
private final AtomicLong upper =
new AtomicLong(0);
// 库存下限
private final AtomicLong lower =
new AtomicLong(0);
// 设置库存上限
void setUpper(long v){
// 检查参数合法性
if (v < lower.get()) {
throw new IllegalArgumentException();
}
upper.set(v);
}
// 设置库存下限
void setLower(long v){
// 检查参数合法性
if (v > upper.get()) {
throw new IllegalArgumentException();
}
lower.set(v);
}
// 省略其他业务代码
}
显然并发的时候锁不住,满足不了下限小于上限。所以我们要把锁的粒度再增大。在设计阶段,我们一定要识别出所有共享变量之间的约束条件,共享变量之间的约束条件,反映在代码里,基本上都会有 if 语句,一定要特别注意竞态条件。
三、制定并发访问策略
制定并发访问策略,从方案上来看,无外乎就是以下“三件事”。后面的第二部分工具类,都是讲的并发访问策略。
- 避免共享:主要是利用线程本地存储以及为每个任务分配独立的线程。
- 不变模式: 即无状态。Java 用得很少,但在其他领域却有着广泛的应用,例如 Actor 模式、CSP 模式以及函数式编程的基础都是不变模式。
- 管程及其他同步工具:Java 领域万能的解决方案是管程,但是对于很多特定场景,使用Java 并发包提供的读写锁、并发容器等同步工具会更好。
除了这些方案之外,还有一些宏观的原则,有助于我们写出“健壮”的并发程序。主要有三条:
- 优先使用成熟的工具类:Java SDK 并发包里提供了丰富的工具类,基本上能满足日常需要,基本无需再自造轮子。
- 迫不得已时才使用低级的同步原语:低级的同步原语主要指的是 synchronized、Lock、Semaphore 等,这些虽然感觉简单,但实际上并没那么简单,一定要小心使用。
- 避免过早优化:安全第一,并发程序首先要保证安全,出现性能瓶颈后再优化。
总结
利用面向对象思想编写并发程序,一个关键点就是利用面向对象里的封装特性。而对共享变量进行封装,要避免“逸出”,所谓“逸出”简单讲就是共享变量逃逸到对象的外面。
提问
如何封装共享变量?
如何识别共享变量间的约束条件?
并发访问共享变量的策略,有哪些方法?
写好并发程序,有哪些经验或方法论?
13.并发编程理论基础总结串讲
1.总结串讲
起源是一个硬件的核心矛盾:CPU 与内存、I/O 的速度差异,系统软件(操作系统、编译器)在解决这个核心矛盾的同时,引入了可见性、原子性和有序性问题,这三个问题就是很多并发程序的 Bug 之源。这,就是01的内容。
那如何解决这三个问题呢?既然可见性、有序性问题是由缓存及编译优化引起,那么我们按需禁用缓存及编译优化,即可解决可见性、有序性问题,所以有了volatile、synchronized、final,Java内存模型(六/八项Happens-Before规则)约束编译器优化行为。关于原子性问题,本质上是解决中间状态可见性问题,所以我们有了互斥锁方案。在02,我们介绍了 Java 内存模型,以应对可见性和有序性问题;在03,04,我们介绍了互斥锁,以解决原子性问题。
互斥锁是解决并发问题的核心工具,但也可能会带来死锁问题。05介绍了死锁的产生原因以及解决方案;同时还引出一个线程间协作的问题,这就引出了06的线程间的协作机制:等待 - 通知。
前六篇文章,我们更多地是站在微观的角度看待并发问题。07则是换一个角度,站在宏观的角度重新审视并发编程相关的概念和理论。原子性问题的核心是互斥,其实本质只是需要我们的多线程要安全、不卡死、性能,其实也就是安全性、活跃性以及性能问题。对应了前文的锁、死锁/活锁/饥饿、细粒度锁/乐观锁等问题。锁还引申出了一把锁锁多个资源的问题。
08介绍了管程,是 Java 并发编程技术的基础。并发编程里两大核心问题——互斥和同步,都是可以由管程来解决的。
至此,并发编程相关的问题,理论上都找到问题所在,并能给出理论上的解决方案了。
09、10、11介绍了线程知识,因为Java并发编程要靠多线程来实现。包括线程的生命周期、如何计算合适的线程数以及线程内部是如何执行的。
在12,我们介绍了如何用面向对象思想写好并发程序,在 Java 语言里,面向对象思想能够让并发编程变得更简单。
2.关键词
a.
- 分工
- 同步:管程
- 互斥:管程
b.
- 可见性:Java内存模型
- 有序性:Java内存模型
- 原子性:锁。一把锁锁多个资源。
c.
- 安全性:互斥、锁
- 活跃性:死锁、活锁、饥饿
- 性能:细粒度锁、竞态、读写锁、CopyOnWrite、CAS乐观锁等等
d.
- 线程生命周期:通用模型、Java模型
- 创建多少线程:IO/CPU综合利用率,性能、延迟、吞吐量、并发
- 局部变量:方法、栈桢、调用栈
e.
- 共享变量封装
- 共享变量间约束
- 并发访问策略
之后是一些最佳实践的总结:
3.用锁的最佳实践
略。……. 后续补充
二、并发工具类
14.Lock和Condition(上):隐藏在并发包中的管程
Java SDK 并发包最核心的还是其对管程的实现。
并发编程领域,有两大核心问题,一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。
Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
本篇主要介绍Lock的使用。Java 语言本身提供的 synchronized 也是管程的一种实现,那为什么我们还要“重复”造轮子呢?
再造管程的理由
比起 synchronized:1.可以防止死锁,破坏不可抢占条件。2.管程模型中,可以有多个condition。
我们还记得,前篇中提到解决死锁问题时,我们有几种方案。互斥(前提条件)、*占有且等待(一次申请所有)、不可抢占(主动放弃)、循环等待(资源按序获取)*。其中,一次申请所有和资源按序获取实现起来不够灵活,而1.破坏不可抢占方案,synchronized是没办法做到的,而且synchronized申请不到资源后,线程就直接进入阻塞了。2.不满足条件时,将会一直占用资源。但是我们希望做到:
对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
所以我们重新设计一把互斥锁去支持“破坏不可抢占方案”,那该怎么设计呢?有三种方案,即获取不到资源时,并不进入永久阻塞,而是直接返回、支持超时、支持唤醒:
- 非阻塞地获取锁。直接不等待,之后可以选择自己放弃锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
- 支持超时。等待时可以自己醒,之后可以选择自己放弃锁。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
- 能够响应中断。等待时可以被唤醒,之后可以选择自己放弃锁。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
这三种方案可以全面弥补 synchronized 的问题。这三个方案就是“重复造轮子”的主要原因,体现在 API 上,就是 Lock 接口的三个方法:
// 支持非阻塞获取锁的 API
boolean tryLock();
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持中断的 API
void lockInterruptibly()
throws InterruptedException;
如何保证可见性
Java JDK使用Lock
要遵守try{} finally{}
,在finally里释放锁。
我们知道使用synchronized
时,可见性由Happens-Before
原则保证:synchronized 相关的规则:synchronized 的解锁 Happens-Before 于后续对这个锁的加锁。
那么使用Lock
时,可见性怎么保证呢?Java SDK 里面锁的实现非常复杂,简单介绍一下:它是利用了 volatile 相关的 Happens-Before 规则。Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会先读state 的值;解锁的时候,也会写 state 的值(简化后的代码如下面所示)。
class SampleLock {
volatile int state;
// 加锁
lock() {
// 省略代码无数
state = 1;
}
// 解锁
unlock() {
// 省略代码无数
state = 0;
}
}
以下代码线程T2是可以看到T1对value的操作结果的,用到了3个Happens-Before 原则:
- 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
- volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
- 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
什么是可重入锁
前面我们创建的锁是ReentrantLock
,也就是可重入锁。所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁。
除了可重入锁,还有可重入函数的概念。所谓可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;这意味着线程安全。(无状态?)
可重入锁举例,如下,在2处是可以再一次获得锁的:
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
公平锁与非公平锁
可以根据传入的参数构建公平锁或非公平锁,区别是,唤醒时,公平锁是入口等待队列按照排队的顺序被唤醒,非公平锁则没有这种保障。此外非公平锁在线程释放锁之后,如果来了一个线程获取锁,他不必去排队就直接获取,如果获取到,不会入队。获取不到后再进行排队:
// 无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync()
: new NonfairSync();
}
用锁的最佳实践
用锁可以提高性能,但是可能会出现死锁或是安全性问题。大师 Doug Lea《Java 并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践,它们分别是:
- 永远只在更新对象的成员变量时加锁
- 永远只在访问可变的成员变量时加锁
- 永远不在调用其他对象的方法时加锁
关于第三条,因为其他对象里有可能有慢操作、sleep()、其他类的加锁导致双重加锁导致死锁等。
总结
有了synchronized
为什么要造Lock
这个轮子?synchronized无法做到不满足条件时主动放弃,线程会一直阻塞占用资源。我们希望可以主动放弃,所以又新增了Lock的管程轮子。重新设置的互斥锁有以下三个思路或功能。支持非阻塞、超时、可中断。此外,Lock还支持多条件变量。
用锁的三个最佳实践。除此之外还有减少锁的持有时间、减小锁的粒度等等。
个人理解,锁的用法,就是在共享变量外面,再封装一层线程安全的操作,就形成了带锁的类。核心就是:共享变量 + 并发访问路径控制(用锁)。
提问
synchronized和Lock有什么区别?有了synchronized
为什么要造Lock
这个轮子?只有一个条件队列、加锁解锁默认。解决占有且等待问题,以及无限等待不释放资源问题,同时也加大了处理死锁的灵活性。
Lock的可见性是怎么保证的?顺序、volatile
、传递性原则。
什么是可重入锁?什么是公平锁?
用锁的最佳实践?
关键字
synchronized、Lock、可见性、可重入锁、公平锁、非公平锁、最佳实践。
15.Lock和Condition(下):Dubbo如何用管程实现异步转同步?
这一节讲Java SDK 并发包里的 Condition,Condition 实现了管程模型里面的条件变量。
Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的,这是二者的一个重要区别。
Lock&Condition的使用范式
在很多并发场景下,用多个条件变量,可以让我们的程序可读性更好。
以下是用两个条件变量实现入队、出队操作的列子:
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
// 入队后, 通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
// 出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}
注意到条件变量Condition是通过锁来产生的,然后**await()
、signal
、signalAll
都是由Condition来操作**。再然后是注意等待条件满足时的while()范式。以上代码是一个标准的lock使用范式,包含了try{} finally{unlock();}
和 while(条件不满足) {await();}
两个标准范式。
同步和异步
同步和异步的区别:通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。
Java默认用同步处理。要使用异步,有以下两种方式:
- 新建线程调用方法:调用方创建一个子线程,在子线程中执行方法调用,这种调用我们称为异步调用;
- 方法里新建线程执行逻辑:方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return,这种方法我们一般称为异步方法。
Dubbo 源码分析
这段源码是一个Lock&Condition的实践示例,实现的是异步转同步的一个实现,用的范式也是很典型的锁的范式。简单的总结,就是死循环不停的去检测条件是否满足,满足时执行。就是一个“等待-通知”机制。
我们知道,TCP 协议本身是异步的。而我们工作中经常用到的 RPC 调用,比如Dubbo,用起来像是同步的,而它底层的TCP又是异步的。其实是Dubbo做了一个异步转同步的操作。那么怎么实现这种转换呢?下面是简化代码:
// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();
// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC 结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC 结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signalALL(); // 注意这里用到signalAll(),用signal()会导致大量请求超时。
}
} finally {
lock.unlock();
}
}
总结
Lock&Condition 实现的管程相对于 synchronized 实现的管程来说更加灵活、功能也更丰富。Dubbo同步转异步就是一个典型的实践。
Java SDK 并发包里锁和条件变量的实现,可以参照《Java并发编程的艺术》第 5 章《Java 中的锁》。
提问
synchronized或lock&condition的使用范式?以入队出队为例
Java锁和条件的实现原理?
怎么实现TCP异步转同步?
16.Semaphore:如何快速实现一个限流器
这一节将信号量,Semaphore,类似生活中的信号灯。
信号量由计算机科学家迪杰斯特拉(Dijkstra)于 1965 年提出,直到 1980 年管程被提出之前,信号量一直都是并发编程领域的唯一选择。目前几乎所有支持并发编程的语言都支持信号量机制。
下面现将信号量模型,再讲怎么用,最后实现一个例子:限流器。
信号量模型
信号量模型可以简单概括为:一个计数器,一个等待队列,三个方法。其中计数器和等待队列对外透明,外界通过三个方法访问:init()、down() 和 up()。
三个方法的语义:
- **init()*:设置计数器的初始值。表示能让几个线程同时进入临界区,互斥则是1*。
- **down()*:计数器的值减 1(进入临界区*);如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
- *up()*:计数器的值加 1(离开临界区*);如果此时计数器的值小于或者等于 0,则唤醒*等待队列中的一个线程,并将其从等待队列中移除。
三个方法都是原子性的。在 Java SDK 里面,信号量模型由 java.util.concurrent.Semaphore
实现,Semaphore 这个类能够保证这三个方法都是原子操作。
*down()、up()*这两个操作历史上最早称为 P 操作和 V 操作。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 *release()*。
以下是代码化的信号量模型:
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
// 将当前线程插入等待队列
// 阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
// 移除等待队列中的某个线程 T
// 唤醒线程 T
}
}
}
如何使用信号量
*在进入临界区之前执行一下 down() 操作(P/acquire()),退出临界区之前执行一下 up() 操作(V/release())*就可以了。以下是Java代码示例:
static int count;
// 初始化信号量
static final Semaphore s
= new Semaphore(1);
// 用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}
实现一个限流器
Semaphore 可以允许多个线程访问一个临界区。这个用Lock来实现比较麻烦。现实中是有这种需求的,比如各种池化资源,如连接池、对象池、线程池等等。
对象池代码举例:
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire(); // 注意这里的P操作
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release(); // 注意这里的V操作
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
信号量唤醒时,是随机唤醒等待队列还是按顺序?这样有没有可能造成饥饿?
总结
Java重点还是支持管程模型,管程易用性和工程性更好。例如如果用信号量解决阻塞队列会很麻烦。
提问
什么是PV操作/信号量?模型?
PV操作怎么用?
为什么有了Lock还要Semaphore?Semaphore 可以允许多个线程访问一个临界区。
信号量唤醒时,是随机唤醒等待队列还是按顺序?这样有没有问题?
实现一个限流器?
17.ReadWriteLock:如何快速实现一个完备的缓存
JDK包中,除了管程和信号量,其他工具类类都是为了分场景优化性能,提升易用性。
今天我们用读写锁ReadWriteLock
来解决读多写少的并发场景,典型的就是缓存。
什么是读写锁
读写锁不是 Java 语言特有的,所有的读写锁都遵守以下三条基本原则:
- 允许多个线程同时读共享变量;
- 只允许一个线程写共享变量;
- 如果一个写线程正在执行写操作,此时禁止读线程读共享变量,也禁止其他写操作。
读写锁与互斥锁的区别是读写锁允许多个线程同时读共享变量。这样在读多写少的场景下,它的性能就优于互斥锁。
快速实现一个缓存
下面我们用ReadWriteLock
来快速实现一个通用的缓存工具类。
我们声明了一个 Cache<K, V> 类,缓存的数据保存在 Cache 类内部的 HashMap 里面,用读写锁 ReadWriteLock 来保证 HashMap 线程安全。
ReadWriteLock
是一个接口,**它的实现类是 ReentrantReadWriteLock
*。注意先新建锁,然后获得读锁和写锁,并且用到了 try{} finally{}
范式*。
class Cache<K,V> {
final Map<K, V> m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 写缓存
V put(String key, Data v) {
w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
}
}
缓存初始化示例
缓存都要初始化。我们可以一次加载完成,也可以按需加载。
以下是从数据库按需加载的示例。注意到我们6、7处,我们获取到写锁以后,又查了一次有没有缓存。这是为了防止其他线程已经查到数据了。多线程编程时,我们要时时刻刻想着其他线程同时运行当前代码的情况。
class Cache<K,V> {
final Map<K, V> m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
V get(K key) {
V v = null;
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
} finally{
r.unlock(); ③
}
// 缓存中存在,返回
if(v != null) { ④
return v;
}
// 缓存中不存在,查询数据库
w.lock(); ⑤
try {
// 再次验证
// 其他线程可能已经查询过数据库
v = m.get(key); ⑥
if(v == null){ ⑦
// 查询数据库
v= 省略代码无数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}
读写锁的升级和降级
升级即拿到读锁是否还能拿写锁,降级即拿到写锁后是否能再拿读锁。
读写锁不支持锁的升级,支持锁的降级。
简单理解:
不支持锁的升级是为了保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其写操作对其他获取到读锁的线程不可见。
支持锁降级中读锁的获取也是为了保证数据的可见性,是为了防止自己读数据前数据又被其他线程写掉。如果当前线程直接释放写锁,此刻另一个线程(记作线程T)获取写锁并修改数据,则当前线程无法感知线程T的数据更新。而如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
获取写锁的前提是读锁和写锁均未被占用。获取读锁的前提是没有其他线程占用写锁。
锁的升级,会造成死锁:
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
if (v == null) {
w.lock();
try {
// 再次验证并更新缓存
// 省略详细代码
} finally{
w.unlock();
}
}
} finally{
r.unlock(); ③
}
锁的降级,没有问题:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}
finally {r.unlock();}
}
}
总结
读写锁类似于 ReentrantLock,也支持公平模式和非公平模式。读锁和写锁都实现了 java.util.concurrent.locks.Lock 接口,所以除了支持 lock() 方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。
我们实现的缓存没有支持数据同步,数据同步最简单的方案就是超时机制。
读写锁用来解决读多写少的场景,支持降级不支持升级。注意用的时候使用try{} finally{}
范式。
为了防止读锁饿死,可以用公平锁。
提问
什么是读写锁?用来解决什么问题?和互斥锁什么区别?ReadWriteLock
用读写锁实现一个Cathe缓存工具类
读写锁是否支持锁的升级?锁的降级呢?为什么?
18.StampedLock:有没有比读写锁更快的锁?
也是用于读多写少的场景,不过比读写锁更快。
StampedLock 支持的三种锁模式
ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。
而 StampedLock 支持三种模式,分别是:写锁、悲观读锁和乐观读。
写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。相关的示例代码如下。
final StampedLock sl = new StampedLock();
// 获取 / 释放悲观读锁示意代码
long stamp = sl.readLock();
try {
// 省略业务相关代码
} finally {
sl.unlockRead(stamp);
}
// 获取 / 释放写锁示意代码
long stamp = sl.writeLock();
try {
// 省略业务相关代码
} finally {
sl.unlockWrite(stamp);
}
StampedLock性能好是因为支持乐观读,乐观读是无锁操作。ReadWriteLock在多个线程同时读时,写操作会被阻塞。而StampedLock多个线程同时读时,允许一个线程获取写操作。
原理就是获取一个版本号,看版本号有没有被修改,没有修改则读取的结果有效。有修改则重新读或升级悲观读锁。
以下是操作示例,先用tryOptimisticRead()
获取一个stamp,然后用sllock.validate(stamp)
看下有没有被修改即可。被修改,要么循环乐观读,要么锁升级为悲观读。推荐锁升级方式:
class Point {
private int x, y;
final StampedLock sl = new StampedLock();
// 计算到原点的距离
int distanceFromOrigin() {
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入局部变量,
// 读的过程数据可能被修改
int curX = x, curY = y;
// 判断执行读操作期间,
// 是否存在写操作,如果存在,
// 则 sl.validate 返回 false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(
curX * curX + curY * curY);
}
}
StampedLock 使用注意事项
- StampedLock 的功能仅仅是 ReadWriteLock 的子集,简单场景性能更高,可以代替ReadWriteLock 。
- StampedLock 不支持重入。
StampedLock 的悲观读锁、写锁都不支持条件变量。
**使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()**。
如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。例如下面的代码中,线程 T1 获取写锁之后将自己阻塞,线程 T2 尝试获取悲观读锁,也会阻塞;如果此时调用线程 T2 的 interrupt() 方法来中断线程 T2 的话,你会发现线程 T2 所在 CPU 会飙升到 100%:
final StampedLock lock = new StampedLock();
Thread T1 = new Thread(()->{
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();
总结
StampedLock就是用到了版本管理的读写锁,与读写锁相比,多了乐观锁,少了可重入、悲观读写锁的条件变量、乐观锁不能用中断。以下是简化的Java官方模板,使用时建议用以下模板:
StampedLock 读模板:
final StampedLock sl = new StampedLock();
// 乐观读
long stamp =
sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验 stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使用方法局部变量执行业务操作
......
StampedLock 写模板:
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}
提问
StampedLock和ReadWriteLock的区别?
StampedLock原理?
使用StampedLock要注意啥?(和ReadWriteLock的区别,interruput导致cpu飙升问题)
19.CountDownLatch和CyclicBarrier:如何让多线程步调一致?
这两个主要用来做线程间的通信。实现类似 threadObj.join()的功能。因为用线程池的时候是没有join()方法的。
下面是一个对账的场景:T1线程查订单。T2线程查派送单。最后主线程或T3线程执行检查。检查的前提是订单和派送单要都查到。并且查订单和查派送单步调要一致,因为两者是一一对应的。
对账:单线程实现
单线程,缺点是串行,效率低。
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
对账:原生管程实现
用原生管程的多线程同时查订单和派送单,用join()方法通信。不过缺点是每次都要新建线程,很耗资源。
while(存在未对账订单){
Thread T1 = new Thread(() -> {
// 查询未对账订单
pos = getPOrders();
});
T1.start();
Thread T2 = new Thread(() -> {
// 查询派送单
dos = getDOrders();
});
T2.start();
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
对账:CountDownLatch实现
改用线程池实现,实现了线程的重复利用。不过此时拿不到join()方法了,所以用到了CountDownLatch进行通信。
线程池我们后面还会讲。CountDownLatch其实就是一个计数器,计数器减到0以后才继续往下执行。用latch.countDown()
方法来进行计数器减1操作,使用latch.await()
方法进行等待。
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
CountDownLatch latch = new CountDownLatch(2);
executor.exucute(() -> {
// 查询未对账订单
pos = getPOrders();
latch.countDown();
});
executor.execute(() -> {
// 查询派送单
dos = getDOrders();
latch.countDown();
});
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
对账:CyclicBarrier实现
CyclicBarrier
就是可以重复使用的、会自动复位的计数器,并且提供了计数为0后的回调。用 barrier.await()
来将计数器减 1(同时等待计数器变成 0)。所有线程调用barrier.await()
后,计数器减到0。此时会自动调用 barrier 的回调函数来执行剩余操作。回调执行完后唤醒等待的线程,同时计数器自动重置,其他线程又可以执行下一条语句并把计数器减一,如此循环。
比起CountDownLatch
的等大家执行完一起往前冲,CyclicBarrier
更像是一个同步器,等一堆线程执行完之后,某个线程可以继续执行其他操作,同时这一堆线程又可以继续循环执行到闭锁处。
实现:
我们用两个队列,一个订单队列,一个派送单队列,两个队列一一对应,每次从两个队列分别取出一个元素,进行检查。我们注意到回调函数只用了一个线程来执行,这样做是为了
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
总结
CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。
CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。
除此之外,CyclicBarrier 还可以设置回调函数。
提问
CountDownLatch用来做什么的?怎么用?代码举例?
CyclicBarrier用来做什么的?怎么用?代码举例?
CountDownLatch和CyclicBarrier的区别?
20.并发容器:都有哪些“坑”需要我们填?
Java 1.5 版本之后对并发容器进行了很多优化,提升了性能。Java 1.5 之前的线程安全容器基本都是用同步包装的,我们一般叫同步容器。1.5之后的线程安全容器采用了其他方案,我们一般叫做并发容器
Java 中的容器主要可以分为四个大类,分别是 List、Map、Set 和 Queue。
同步容器及其注意事项
同步容器的原理其实就是把容器所有的方法都加上synchronized
关键字。
Java也提供了包装工具类来进行转换:
List list = Collections.synchronizedList(new ArrayList());
Set set = Collections.synchronizedSet(new HashSet());
Map map = Collections.synchronizedMap(new HashMap());
需要注意的是,组合操作需要注意竞态条件问题,即便每个操作都能保证原子性,也并不能保证组合操作的原子性。
典型的例子就是迭代器。用迭代器遍历容器,以下是组合操作,不具备原子性。调用 foo()方法有并发问题。
List list = Collections.synchronizedList(new ArrayList());
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
正确的做法是,要把list锁起来再操作。因为 Collections 内部的包装类,公共方法锁的是对象的 this。
List list = Collections.synchronizedList(new ArrayList());
synchronized (list) {
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
}
并发容器容器及其注意事项
同步容器所有方法都用synchronized 来保证互斥,串行度太高了。所以1.5后使用了并发容器。
并发容器也是包括List、Map、Set 和 Queue四大类。下图基本包括了所有常用的并发容器:
下面一一简单介绍,限于篇幅,只提关键点:
(1) List
List接口只有一个实现类,CopyOnWriteArrayList。CopyOnWrite,就是写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。
实现原理:
CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。
如果在遍历 array 的同时,还有一个写操作,那么CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。CopyOnWriteArrayList写操作是互斥的。
要注意的坑:
- CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。因为写入的新元素并不能立刻被遍历到。
- CopyOnWriteArrayList 迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照,而对快照进行增删改是没有意义的。
(2) Map
Map 接口的两个实现是 ConcurrentHashMap和 ConcurrentSkipListMap。主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。
使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key 和 value 都不能为空。具体比较如下图:
ConcurrentSkipListMap 里面的 SkipList 就是跳表。跳表插入、删除、查询操作平均的时间复杂度是 O(log n),理论上和并发线程数没有关系,所以在并发度非常高的情况下,如果对 ConcurrentHashMap 的性能还不满意,可以尝试一下 ConcurrentSkipListMap。跳表就跟字典的索引一样,通过这个索引既能快速定位数据,也能隔离并发(可以并发查看不同页上的字)。
(3) Set
Set 接口的两个实现是CopyOnWriteArraySet和ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的。
(4)Queue
Java的Queue比较复杂。可以用以下两个维度来分类。
一个维度是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。
Java 并发包里阻塞队列都用 Blocking关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识。
这两个维度组合后,可以将 Queue 细分为四大类:
单端阻塞队列:
内部一般会持有一个队列,常用的以下6个:
- ArrayBlockingQueue:队列是数组。
- LinkedBlockingQueue:队列是链表。
- SynchronousQueue:不持有队列,此时生产者线程的入队操作必须等待消费者线程的出队操作。
- LinkedTransferQueue:融合 LinkedBlockingQueue 和 SynchronousQueue 。性能比 LinkedBlockingQueue 更好。
- PriorityBlockingQueue :支持按照优先级出队。
- DelayQueue :支持延时出队。
双端阻塞队列:实现是LinkedBlockingDeque。
单端非阻塞队列:实现是 ConcurrentLinkedQueue。
双端非阻塞队列:实现是 ConcurrentLinkedDeque。
另外,使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。
总结
Java 并发容器不单要清楚每种容器的特性,最重要的是能选对容器。至于用法,API的说明都很清楚。
在选对容器时,甚至根本不会触发Java 容器的快速失败机制(Fail-Fast)。
扩展:Java7中的HashMap在执行put操作时会涉及到扩容,由于扩容时链表并发操作会造成链表成环,所以可能导致cpu飙升100%。
提问
怎样把一个线程不安全的容器变成线程安全?
常用的同步容器有哪些?四大类,工具类…
同步容器和并发容器区别?
常用的并发容器有哪些?各实现原理?四大类…
哪些队列是支持有界的?
什么是Java的快速失败机制?如何避免?
21.原子类:无锁工具类的典范
解决简单的可见性和原子性问题,除了使用volatile
和synchronized
互斥锁方案外,还可以使用无锁方案。
JUC将这些无锁方案进行提炼后,实现了一系列的原子类。
下面是用原子类实现线程安全的long型count++示例:
public class Test {
AtomicLong count =
new AtomicLong(0);
void add10K() {
int idx = 0;
while(idx++ < 10000) {
count.getAndIncrement();
}
}
}
无锁方案相对互斥锁方案,最大的好处就是性能。无需加解锁操作、也没有拿不到锁时的阻塞。
无锁方案的实现原理
就是CPU的硬件支持。CPU提供了 CAS 指令(CAS,全称是 Compare And Swap,即“比较并交换”)来解决并发问题。
CAS 指令包含 3 个参数:共享变量的内存地址 A、用于比较的值 B(即老值快照) 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C。作为一条 CPU 指令,CAS 指令本身是能够保证原子性的。
用CAS解决问题时,遇到条件不满足,即当前值与期望值不相等,一般会进行自旋。
以下是带自旋的CAS指令的代码模拟:
class SimulatedCAS{
volatile int count;
// 实现 count+=1
addOne(){
do {
newValue = count+1; //①
}while(count !=
cas(count,newValue) //②
}
// 模拟实现 CAS,仅用来帮助理解
synchronized int cas(
int expect, int newValue){
// 读目前 count 的值
int curValue = count;
// 比较目前 count 值是否 == 期望值
if(curValue == expect){
// 如果是,则更新 count 的值
count= newValue;
}
// 返回写入前的值
return curValue;
}
}
使用CAS时还会遇到ABA问题。即另一个线程把值改为其他值又改回来,此时值和期望值还是相等的,你就会以为值没有被其他线程动过。
可能大多数情况下我们并不关心 ABA 问题,例如数值的原子递增。但有些情况下要关心:例如原子化的更新对象,两个 A 虽然相等,但是第二个 A 的属性可能已经发生变化了。所以在使用 CAS 方案的时候,一定要先 check 一下。
Java如何实现CAS
以AtomicLong
的 getAndIncrement()
方法为例,getAndIncrement()
转调了unsafe.getAndAddLong(this, valueOffset, 1L)
。其中this 和 valueOffset 两个参数可以唯一确定共享变量的内存地址。
unsafe.getAndAddLong(this, valueOffset, 1L)
方法的实现是,先读取内存值,然后调用native boolean compareAndSwapLong(Object o, long offset, long expected, long x)
更新值。更新失败则自旋,直到更新成功。源码如下:
public final long getAndAddLong(
Object o, long offset, long delta){
long v;
do {
// 读取内存中的值
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(
o, offset, v, v + delta));
return v;
}
// 原子性地将变量更新为 x
// 条件是内存中的值等于 expected
// 更新成功则返回 true
native boolean compareAndSwapLong(
Object o, long offset,
long expected,
long x);
Java 提供的原子类里面 CAS一般被实现为 compareAndSet(),compareAndSet() 的语义和 CAS 指令的语义的差别仅仅是返回值不同而已,compareAndSet() 里面如果更新成功,则会返回 true,否则返回 false。CAS的使用范式如下:
do {
// 获取当前值
oldV = xxxx;
// 根据当前值计算新值
newV = ...oldV...
}while(!compareAndSet(oldV,newV);
原子类概览
JUC的原子类可分为五类:
- 原子化的基本数据类型
- 原子化的对象引用类型
- 原子化数组
- 原子化对象属性更新器
- 原子化的累加器
它们提供的方法是类似的。以下是概览图:
下面我们来具体讲讲:
(1)原子化基本数据类型
相关实现有 AtomicBoolean、AtomicInteger 和 AtomicLong,提供的方法主要为以下,细节可参考源码:
getAndIncrement() // 原子化 i++
getAndDecrement() // 原子化的 i--
incrementAndGet() // 原子化的 ++i
decrementAndGet() // 原子化的 --i
// 当前值 +=delta,返回 += 前的值
getAndAdd(delta)
// 当前值 +=delta,返回 += 后的值
addAndGet(delta)
//CAS 操作,返回是否成功
compareAndSet(expect, update)
// 以下四个方法
// 新值可以通过传入 func 函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)
(2)原子化对象引用类型
相关实现有 AtomicReference、AtomicStampedReference 和 AtomicMarkableReference,利用它们可以实现对象引用的原子化更新。(因为更新对象,对象引用是不更新的,无法记录更新,所以用该原子类对对象进行包装,再加一个版本维度进行记录?)提供的方法和原子化基本类型差不多。
对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题。思路其实很简单,增加一个版本号维度就可以了。每次执行 CAS 操作,附加再更新一个版本号,只要保证版本号是递增的就OK。
AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,方法签名如下:
boolean compareAndSet(
V expectedReference,
V newReference,
int expectedStamp,
int newStamp)
AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,表示引用变量是否被更改过,方法签名如下:
boolean compareAndSet(
V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)
疑问:AtomicMarkableReference的标记位是否也有ABA的问题?个人猜想,运用场景可能是仅仅进行一次更新的情况。
待落实:原子化对象的运用场景?
(3)原子化数组
相关实现有 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray,他们可以原子化地更新数组里面的每一个元素。这些类提供的方法和原子化的基本数据类型的区别仅仅是:每个方法多了一个数组的索引参数,也不再赘述。
(4)原子化对象属性更新器
相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性,这三个方法都是利用反射机制实现的,创建更新器的方法如下:
public static <U>
AtomicXXXFieldUpdater<U>
newUpdater(Class<U> tclass,
String fieldName)
需要注意的是,对象属性必须是 volatile 类型的,只有这样才能保证可见性;如果对象属性不是 volatile 类型的,newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常。
上面的参数没有涉及对象,而我们要关联更新的对象,这个对象在哪里关联呢?答案是通过方法参数传入:
boolean compareAndSet(
T obj,
int expect,
int update)
就是多了一个对象参数传入,其他和原子化的基本数据类型一致,不在赘述。
(5)原子化的累加器
DoubleAccumulator
、DoubleAdder
、LongAccumulator
和 LongAdder
,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好。
总结
无锁方案性能好、无死锁(但可能饥饿和活锁)。
Java 提供的原子类能够解决一些简单的原子性问题,但上面我们所有原子类的方法都是针对一个共享变量的,如果你需要解决多个变量的原子性问题,建议还是使用互斥锁方案。
提问
JUC使用无锁方案的示例?
无锁方案的原理?
什么是CAS?CAS的原理?CAS能够保证原子性吗?CAS条件不满足时怎么处理?
什么是ABA问题?什么时候需要关心ABA问题?怎么处理?
Java如何实现CAS?调用了哪些方法底层怎么实现?使用CAS的范式?
常用的原子类有哪些?各怎么实现?
22.Executor与线程池:如何创建正确的线程池?
创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,所以线程是一个重量级的对象,应该避免频繁创建和销毁。
我们的方案就是线程池。
线程池和一般意义上的池化资源与所不同。常见的池化资源,一般是要资源的时候就调用 acquire() 方法来申请资源,用完之后就调用 release() 释放资源。如下:
class XXXPool{
// 获取池化资源
XXX acquire() {
}
// 释放池化资源
void release(XXX x){
}
}
而线程池是一种“生产者 - 消费者模式”。
线程池是一种生产者 - 消费者模式
目前业界线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。
以下是一个典型的简单线程池设计示意,我们可以用它来了解线程池原理:
// 简化的线程池,仅用来说明工作原理
class MyThreadPool{
// 利用阻塞队列实现生产者 - 消费者模式
BlockingQueue<Runnable> workQueue;
// 保存内部工作线程
List<WorkerThread> threads = new ArrayList<>();
// 构造方法
MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
// 创建工作线程
for(int idx=0; idx<poolSize; idx++){
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任务
void execute(Runnable command){
workQueue.put(command);
}
// 工作线程负责消费任务,并执行任务
class WorkerThread extends Thread{
public void run() {
// 循环取任务并执行
while(true){ ①
Runnable task = workQueue.take();
task.run();
}
}
}
}
/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(
10, workQueue);
// 提交任务
pool.execute(()->{
System.out.println("hello");
});
原理概述:在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,
如何使用 Java 中的线程池
Java提供的线程池比上面的更复杂。Java 提供的线程池相关的工具类中,最核心的是ThreadPoolExecutor,看名字也知道它更强调的是 Executor,而不是一般的池化资源。
ThreadPoolExecutor 的完整构造函数如下,总共7个参数:
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
包括:核心线程数,最大线程数,超时时间,线程工厂,超时时间,阻塞队列,拒绝策略。
- corePoolSize:核心线程数,即最小线程数。
- maximumPoolSize:最大线程数。
- keepAliveTime & unit:超时时间及单位。即线程空闲多久被回收。
- workQueue:同前文的工作队列。
- threadFactory:线程工厂,自定义如何创建线程,例如给他们取名字。
- handler:拒绝策略。当所有线程都有活,且工作队列已满(有界工作队列),此时提交任务,线程池如何拒绝。总共4种:
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,直接抛异常 throws RejectedExecutionException。
- DiscardPolicy:直接丢弃任务,不抛异常。
- DiscardOldestPolicy:丢弃最老的任务,就是把最早进入工作队列的任务丢弃,再把新任务加入到工作队列。
Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时。
使用线程池要注意什么
ThreadPoolExecutor 的构造函数太复杂,JUC提供了 Executors快速创建线程池。不过不建议使用 Executors 了,最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。
使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。
try {
// 业务逻辑
} catch (RuntimeException x) {
// 按需处理
} catch (Throwable x) {
// 按需处理
}
总结
线程池的原理是一个任务队列(一般用阻塞队列)加一组执行线程。
很多大厂的编码规范都要求必须通过线程池来管理线程。线程池和普通的池化资源有很大不同,线程池实际上是生产者 - 消费者模式的一种实现。线程池是消费者,线程池的使用者是生产者。
创建线程池设置合适的线程数非常重要,可参考《10.Java 线程(中):创建多少线程才是合适的?》的内容。另外《Java 并发编程实战》的第 7 章《取消与关闭》的 7.3 节“处理非正常的线程终止” 详细介绍了异常处理的方案,第 8 章《线程池的使用》对线程池的使用也有更深入的介绍。
提问
为什么要用线程池?
线程池底层原理?怎么实现的?
Java线程池的几个关键参数?构造函数的参数有哪些?
实际使用线程池要注意什么?
线程池怎么处理异常?
23.Future:如何用多线程实现最优的“烧水泡茶”程序?
前文使用线程池时,讲到任务执行直接提交ThreadPoolExecutor
的execute()
即可。那么如果我们想要获取任务执行的结果怎么办呢?这一节就来解决这个问题。
如何获取任务执行结果
Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。
3 个方法的方法签名如下:
// 提交 Runnable 任务
Future<?> submit(Runnable task);
// 提交 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交 Runnable 任务及结果引用
<T> Future<T> submit(Runnable task, T result);
它们的返回值都是 Future 接口,Future 接口有 5 个方法,分别是取消任务 *cancel()*、判断任务是否已取消 *isCancelled()*、判断任务是否已结束 isDone()以及2个获得任务执行结果的get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);
ThreadPoolExecutor 的 3 个 submit() 方法之间的区别在于方法参数不同:
提交 Runnable 任务
submit(Runnable task)
:Runnable 接口的 run() 方法没有返回值,该方法返回的 Future对象仅用来判断任务是否已结束。类似于Thread.join()。提交 Callable 任务
submit(Callable<T> task)
:Callable接口的 call() 方法有返回值,该方法返回的 Future对象可通过get()方法来获取执行结果。提交 Runnable 任务及结果引用
submit(Runnable task, T result)
:假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。以下是经典用法,result从runnable的构造函数传入,又从future.get()返回。它相当于主线程和子线程之间的桥梁:ExecutorService executor = Executors.newFixedThreadPool(1); // 创建 Result 对象 r Result r = new Result(); r.setAAA(a); // 提交任务 Future<Result> future = executor.submit(new Task(r), r); Result fr = future.get(); // 下面等式成立 fr === r; fr.getAAA() === a; fr.getXXX() === x; class Task implements Runnable{ Result r; // 通过构造函数传入 result Task(Result r){ this.r = r; } void run() { // 可以操作 result a = r.getAAA(); r.setXXX(x); } }
下面介绍FutureTask 工具类。该类有两个构造函数,参数和前面介绍的 submit() 方法类似,不再赘述。
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
FutureTask 实现了 Runnable
和 Future
接口。所以可以作为runnable交给ThreadPoolExecutor或Thread执行,也可以用来获得线程池执行任务的返回结果。
提交给线程池执行示例:
// 创建 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交 FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
提交给Thread执行示例,利用 FutureTask 对象可以很容易获取子线程的执行结果:
// 创建 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();
使用实例:“烧水泡茶”程序
需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。
// 创建任务 T2 的 FutureTask
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
// 创建任务 T1 的 FutureTask
FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2)); // 注意这里,构造方法传入了ft2
// 线程 T1 执行任务 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程 T2 执行任务 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程 T1 执行结果
System.out.println(ft1.get());
// T1Task 需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
FutureTask<String> ft2;
// T1 任务需要 T2 任务的 FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1: 洗水壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1: 烧开水...");
TimeUnit.SECONDS.sleep(15);
// 获取 T2 线程的茶叶
String tf = ft2.get();
System.out.println("T1: 拿到茶叶:"+tf);
System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
}
}
// T2Task 需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2: 洗茶壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2: 洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2: 拿茶叶...");
TimeUnit.SECONDS.sleep(1);
return " 龙井 ";
}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井
总结
本节讲了线程池如何获取返回值。主要包括:ThreadPoolExecutor的几个submit()方法,Future接口,FutureTask工具类(实现了 Runnable
和 Future
接口)。
利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。实际编程时,用有向图描述一下任务之间的依赖关系,同时将线程的分工也做好。
提问
ThreadPoolExecutor
的execute()
和submit()
什么区别?
线程池执行时如何获取任务执行结果?
Future的是什么?有哪些方法?怎么用?
ThreadPoolExecutor 有几个 submit()方法?有什么区别?怎么用?
FutureTask 工具类如何使用?
实现一个烧水泡茶程序?模拟两线程之间的配合?用Thread方式,再用线程池方式实现。
做一个询价应用,这个应用需要从三个电商询价,然后保存在自己的数据库里?
24.CompletableFuture:异步编程没那么难
概述
性能优化是大厂的核心需求,手段就是使用多线程异步化,减少程序串行。
为了更直观的使用异步编程,更贴近业务而非线程管理,Java 在 1.8 版本提供了 CompletableFuture
来支持异步编程。
CompletableFuture 类实现了 Future
接口和 CompletionStage
接口。
CompletableFuture 的核心优势
简单来说,就是更贴近业务,屏蔽了繁琐的线程维护。
1.无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
2.语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
3.代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
使用举例:
我们再来实现一遍烧水泡茶程序:
// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(()->{
System.out.println("T1: 洗水壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1: 烧开水...");
sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
System.out.println("T2: 洗茶壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2: 洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2: 拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 =
f1.thenCombine(f2, (__, tf)->{
System.out.println("T1: 拿到茶叶:" + tf);
System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
});
// 等待任务 3 执行结果
System.out.println(f3.join());
void sleep(int t, TimeUnit u) {
try {
u.sleep(t);
}catch(InterruptedException e){}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井
创建 CompletableFuture 对象
主要通过CompletableFuture
类自己的4个静态方法来创建对象,主要区别是两个维度,get()方法是否有返回值,是否用公用的forkJoin线程池。
// 使用默认的forkJoin线程池
static CompletableFuture<Void> runAsync(Runnable runnable) // Runnable 接口的 run() 方法没有返回值
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) // Supplier 接口的 get() 方法有返回值
// 也可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
默认情况下 CompletableFuture 会使用公共的 ForkJoinPool
线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。
如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法。那么有两个问题:
- 一个是异步操作什么时候结束。
- 另一个是如何获取异步操作的执行结果。
因为 CompletableFuture 类实现了 Future
接口和 CompletionStage
接口:
- 线程间的分工协作可由CompletionStage 接口来完成。
- 获取结果则可以通过 Future 接口来解决。
如何理解 CompletionStage 接口
CompletionStage 就是用来解决线程间的分工协作问题的。各线程的任务是有时序关系的,有串行关系、并行关系、汇聚关系等。CompletionStage 接口可以清晰地描述任务之间的这种时序关系。此外,它也进行了异常处理。
下文的内容:
CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。其实总体就是根据两个维度分类:
- 是否异步执行(async)
- *执行的函数是否有入参及返回值(Funtion、Consumer、Runnable)*。
1. 描述串行关系
主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口,几个系列方法的命名其实就是根据参数的执行方法名来命名的。参数分为三种,Funtion、Consumer、Runnable,分别表示有入参有返回、有入参无返回、无入参无返回三种情况。
*thenApply 系列方法对应R function.apply(t)
*。参数 function 的类型是接口 Function<T, R>
,T是输入,R是返回值。该接口的 R apply(T t)
方法与CompletionStage 相关,该方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<R>
。
*thenAccept 系列方法对应void consumer. accept(T t)
*。参数 consumer 的类型是接口Consumer<T>
,T是输入,无返回值。该接口的 void accept(T t)
方法与CompletionStage 相关,该方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>
。
*thenRun 系列方法对应void runnable. run()
*。参数 runnable 的类型是接口 Runnable
,无输入,无返回值。所以 thenRun 系列方法返回的也是CompletionStage<Void>
。
thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。
这些方法里面 Async 代表的是异步执行 function、consumer 或者 runnable。
CompletionStage<R> thenApply(function);
CompletionStage<R> thenApplyAsync(function);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(runnable);
CompletionStage<Void> thenRunAsync(runnable);
CompletionStage<R> thenCompose(function);
CompletionStage<R> thenComposeAsync(function);
以下是 thenApply() 方法的示例代码:
先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。
CompletableFuture<String> f0 =
CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + " QQ") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ
2. 描述 AND 汇聚关系
描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口。区别也是function、consumer、runnable 这三个核心参数不同。使用可以参考上面烧水泡茶的实现程序,不再赘述。
CompletionStage<R> thenCombine(other, function);
CompletionStage<R> thenCombineAsync(other, function);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, runnable);
CompletionStage<Void> runAfterBothAsync(other, runnable);
3. 描述 OR 汇聚关系
描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,区别也是function、consumer、runnable 这三个核心参数不同。
CompletionStage applyToEither(other, function);
CompletionStage applyToEitherAsync(other, function);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, runnable);
CompletionStage runAfterEitherAsync(other, runnable);
使用示例:
CompletableFuture<String> f1 =
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture<String> f3 =
f1.applyToEither(f2,s -> s);
System.out.println(f3.join());
4. 异常处理
function、consumer、runnable 都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常。
以下是处理异常的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
exceptionally() 的使用非常类似于 try{}catch{}中的 catch{}。
whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{}。无论是否发生异常都会执行fn。
whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
使用示例:
CompletableFuture<Integer>
f0 = CompletableFuture
.supplyAsync(()->7/0))
.thenApply(r->r*10)
.exceptionally(e->0);
System.out.println(f0.join());
总结
本节主要阐述了CompletableFuture异步编程的用法,包括怎么新建对象(静态方法),怎么获取返回结果(future接口),怎么描述任务间的分工时序关系及异常处理(CompletionStage接口)等。
近几年,伴随着ReactiveX的发展(Java 语言的实现版本是 RxJava),回调地狱已经被完美解决,异步编程已经慢慢开始成熟,Java 语言也开始官方支持异步编程:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本则提供了更加完备的 Flow API,异步编程目前已经完全工业化。
CompletableFuture 已经能够满足简单的异步编程需求,此外可以关注 RxJava 这个项目,利用 RxJava,能在 Java 1.6 版本就使用异步编程。
提问
为什么要使用CompletableFuture?
CompletableFuture怎么新建对象?几种方法有哪些区别?使用时要注意什么?
CompletableFuture怎么获取操作执行结果?
CompletionStage 接口和CompletableFuture什么关系?是用来干啥的?
CompletionStage 接口有哪几大类方法?有什么区别?
CompletionStage 接口如何处理异常?
25.CompletionService:如何批量执行异步任务?
概述
前面讲到,用异步去查询三个商家的价格,最后存到数据库。这种写法有个小缺陷,如果S1耗时很长,那么即是S2耗时很短,也要等S1执行完才能执行。
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 异步向电商 S1 询价
Future<Integer> f1 =
executor.submit(()->getPriceByS1());
// 异步向电商 S2 询价
Future<Integer> f2 =
executor.submit(()->getPriceByS2());
// 异步向电商 S3 询价
Future<Integer> f3 =
executor.submit(()->getPriceByS3());
// 获取电商 S1 报价并保存
r=f1.get();
executor.execute(()->save(r));
// 获取电商 S2 报价并保存
r=f2.get();
executor.execute(()->save(r));
// 获取电商 S3 报价并保存
r=f3.get();
executor.execute(()->save(r));
针对这种一堆任务一起异步,想要哪个先返回结果哪个先执行后续操作。我们用到了阻塞队列。这样就保证了先获取到报价的先保存到数据库。
// 创建阻塞队列
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
// 电商 S1 报价异步进入阻塞队列
executor.execute(()-> bq.put(f1.get()));
// 电商 S2 报价异步进入阻塞队列
executor.execute(()-> bq.put(f2.get()));
// 电商 S3 报价异步进入阻塞队列
executor.execute(()-> bq.put(f3.get()));
// 异步保存所有报价
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}
利用 CompletionService 实现询价系统
对应上面的阻塞队列,Java给出了CompletionService
接口。它的本质就是内部维护一个阻塞队列,外加一个线程池。用来解决批量任务异步时,想要先返回的任务先执行后续操作的问题。
CompletionService 的实现原理是内部维护一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中, 不过CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。
如何创建 CompletionService
CompletionService 接口的实现类是ExecutorCompletionService,这个实现类的构造方法有两个:
ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
两个构造方法都传入一个线程池。阻塞队列参数,不传则默认使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到这个阻塞队列中。
以下是执行代码示例。使用无界的 LinkedBlockingQueue,submit() 方法提交了三个询价操作并异步执行,最后通过 take() 方法获取一个 Future 对象,调用 Future 对象的 get() 方法返回询价操作的执行结果。
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}
CompletionService 接口说明
该接口共有5个方法:
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
两个submit方法,一个入参是Callable<V> task
,另外一个入参是Runnable task
和V result
,result参数也作为结果返回,和前面讲过的一样。
其余的3个方法是和阻塞队列相关的:take()、poll() 都是从阻塞队列中获取并移除一个元素;如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值。poll()方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。
利用 CompletionService 实现 Dubbo 中的 Forking Cluster
Dubbo 中有一种叫做Forking 的集群模式,该模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。例如同时调用3个地图商的API以保证高可用:
geocoder(addr) {
// 并行执行以下 3 个查询服务,
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
// 只要 r1,r2,r3 有一个返回
// 则返回
return r1|r2|r3;
}
实现:创建了一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future<Integer>
类型的列表 futures,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务,会返回一个 Future 对象,把这些 Future 对象保存在列表 futures 中。通过调用 cs.take().get()
,就能拿到最快返回的任务执行结果:
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures = new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则 break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
// 简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
// 取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
总结
需要批量提交异步任务的时候建议使用 CompletionService。CompletionService 融合了线程池 Executor 和阻塞队列 BlockingQueue ,让批量异步任务的管理更简单。
CompletionService 能够让异步任务先执行完的先进入阻塞队列,利用这个特性,可以谁先执行完谁进行后续操作,避免无谓的等待,也可以实现 Forking Cluster 这样的需求。
CompletionService 的实现类 ExecutorCompletionService,需要自己创建线程池,看上去有些啰嗦,好处是你可以让多个 ExecutorCompletionService 的线程池隔离,避免几个特别耗时的任务拖垮整个应用。
提问
如何解决批量异步任务谁先返回谁先执行后续操作?
CompletionService接口的实现原理?实现类?实现类两个构造方法的区别?
CompletionService接口有哪些方法?
如何实现调用多个查询服务,只要有一个返回整个服务就可以返回?
ExecutorCompletionService有必要自己提供线程池吗?
26.Fork/Join:单机版的MapReduce
概述
线程池、Future、CompletableFuture 和 CompletionService这些工具类,都是属于多线程“互斥、同步、分工”三大类操作的“分工”。它们让我们站在任务的视角来解决并发问题,而不是让我们纠缠在线程之间如何协作的细节上(比如线程之间如何实现等待、通知等)。
对于简单的并行任务,可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。
这些工具类为我们提供了并行、聚合、批量并行这三种任务模型的解决方案。基本上能够覆盖日常工作中的并发场景,不过还剩下一个“分治”的场景未覆盖,这个则由我们今天要提到的Fork/Join框架来解决。
分治,简单来说,就是指把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解的思想,例如归并排序、快速排序都属于分治算法,二分法查找、大数据领域知名的计算框架 MapReduce。
分治任务模型
分治任务模型可分为两个阶段:一个阶段是任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图:
在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。
Fork/Join 的使用
该框架是Java用来支持分治的框架。
这个计算框架里的Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。
ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,它们都是用递归的方式来处理分治任务的。它们都定义了抽象方法 *compute()*。区别是 RecursiveAction
定义的 compute() 没有返回值,而 RecursiveTask
定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要定义子类去扩展。
以下是用 Fork/Join 计算斐波那契数列的示例:
先创建一个分治任务线程池以及计算斐波那契数列的分治任务。之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1)
使用了异步子任务,这是通过 f1.fork()
这条语句实现的。
static void main(String[] args){
// 创建分治任务线程池
ForkJoinPool fjp = new ForkJoinPool(4);
// 创建分治任务
Fibonacci fib = new Fibonacci(30);
// 启动分治任务
Integer result = fjp.invoke(fib);
// 输出结果
System.out.println(result);
}
// 递归任务
static class Fibonacci extends RecursiveTask<Integer>{
final int n;
Fibonacci(int n){this.n = n;}
protected Integer compute(){
if (n <= 1) return n;
Fibonacci f1 = new Fibonacci(n - 1);
// 创建子任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// 等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}
ForkJoinPool 工作原理
通过前文我们知道,ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程都共享一个任务队列。
ForkJoinPool 本质上也是一个生产者 - 消费者的实现,但是更加智能,ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。
如果工作线程对应的任务队列空了,ForkJoinPool 支持一种叫做“任务窃取”的机制,它可以“窃取”其他工作任务队列里的任务。如下图,T2可以窃取T1的任务。
ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。
模拟 MapReduce 统计单词数量
以下是用Fork/Join统计单词数量的示例:
先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果。
下面用一个字符串数组 String[] fc
来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。关键的代码在 compute()
这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute())。
static void main(String[] args){
String[] fc = {"hello world",
"hello me",
"hello fork",
"hello join",
"fork join in world"};
// 创建 ForkJoin 线程池
ForkJoinPool fjp = new ForkJoinPool(3);
// 创建任务
MR mr = new MR(fc, 0, fc.length);
// 启动任务
Map<String, Long> result = fjp.invoke(mr);
// 输出结果
result.forEach((k, v)->
System.out.println(k+":"+v));
}
//MR 模拟类
static class MR extends RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start, end;
// 构造函数
MR(String[] fc, int fr, int to){
this.fc = fc;
this.start = fr;
this.end = to;
}
@Override protected
Map<String, Long> compute(){
if (end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start + end) / 2;
MR mr1 = new MR(fc, start, mid);
mr1.fork();
MR mr2 = new MR(fc, mid, end);
// 计算子任务,并返回合并的结果
return merge(mr2.compute(), mr1.join());
}
}
// 合并结果
private Map<String, Long> merge(
Map<String, Long> r1,
Map<String, Long> r2) {
Map<String, Long> result = new HashMap<>();
result.putAll(r1);
// 合并结果
r2.forEach((k, v) -> {
Long c = result.get(k);
if (c != null)
result.put(k, c+v);
else
result.put(k, v);
});
return result;
}
// 统计单词数量
private Map<String, Long> calc(String line) {
Map<String, Long> result = new HashMap<>();
// 分割单词
String [] words = line.split("\\s+");
// 统计单词数量
for (String w : words) {
Long v = result.get(w);
if (v != null)
result.put(w, v+1);
else
result.put(w, 1L);
}
return result;
}
}
注意事项
不要用两次fork()。
用两次fork()在join的时候,需要用这样的顺序:a.fork(); b.fork(); b.join(); a.join();这个要求在JDK官方文档里有说明。
如果是一不小心写成a.fork(); b.fork(); a.join(); b.join();就会有廖雪峰老师说的问题。
建议还是用fork()+compute(),这种方式的执行过程普通人还是能理解的,fork()+fork()内部做了很多优化,不好理解。
廖雪峰老师的例子理解fork()+compute()很到位。
总结
Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。
Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 内部有多个双端任务队列,支持任务窃取机制。
Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数。计算密集型没问题。如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。
如果对 ForkJoinPool 详细的实现细节感兴趣,可参考Doug Lea 的论文。
提问
Fork/Join框架用来解决什么问题?
分治算法怎么执行?任务模型?
使用Fork/Join计算斐波那契数列?
ForkJoinPool 工作原理?几个任务队列?窃取机制?双端队列?
用Fork/Join统计单词数量?
使用Fork/Join要注意什么?
对于一个 CPU 密集型计算程序,在单核 CPU 上,使用 Fork/Join 并行计算框架是否能够提高性能呢?
27.并发工具类使用注意事项
1.使用while(true) 时要格外注意终止条件
2.尽量使用signalAll(),比signal()更安全
3.Semaphore 需要锁中锁
Semaphore 允许多个线程访问一个临界区,这也是一把双刃剑,当多个线程进入临界区时,如果需要访问共享变量就会存在并发问题,所以必须加锁,也就是说 Semaphore 需要锁中锁。
4.锁的申请和释放要成对出现
成对出现是重点,例如锁升级后,try{}finally{}的finally{}中解锁的要是升级后的锁。
5.回调总要关心执行线程是谁
当遇到回调函数的时候,你应该本能地问自己:执行回调函数的线程是哪一个?这个在多线程场景下非常重要。因为不同线程 ThreadLocal 里的数据是不同的,有些框架比如 Spring 就用 ThreadLocal 来管理事务,如果不清楚回调函数用的是哪个线程,很可能会导致错误的事务管理,并最终导致数据不一致。当看到回调函数的时候,一定问一问执行回调函数的线程是谁。
6. 共享线程池:有福同享就要有难同当
用共享线程池优点是能快速实现,缺点就是当有阻塞式 I/O 时,可能导致所有的 ForkJoinPool 线程都阻塞,进而影响整个系统的性能。所以更倾向于推荐隔离的方案。
7.线上问题定位的利器:线程栈 dump
定位线上并发问题,方案很简单,就是通过查看线程栈来定位问题。重点是查看线程状态,分析线程进入该状态的原因是否合理。参考Java线程的生命周期即可。
为了便于分析定位线程问题,你需要给线程赋予一个有意义的名字,对于线程池可以通过自定义 ThreadFactory 来给线程池中的线程赋予有意义的名字,也可以在执行 run() 方法时通过Thread.currentThread().setName();
来给线程赋予一个更贴近业务的名字。
本部分总结
串讲:
14解决互斥问题、15解决同步问题、16信号量用来解决多个线程同时访问临界区问题(比如池),17用细粒度锁-读写锁来解决读多写少的场景。18用细粒度锁-读写乐观锁来解决简单的读多写少场景。19讲了用于线程间通信的两个类,用来取代使用线程池时的join()方法。20讲了并发容器的用法。21讲了无锁操作原理及它的实现原子类。22讲了线程池原理、用法(execute()方法)及使用注意事项。23讲了使用线程池时提交任务后如何用Future获取执行返回值(submit()方法)。24讲了Java1.8异步编程的用法,CompletableFuture即是异步执行器也是结果,所以可以链式编程。25讲了如何批量提交异步任务,谁先有结果谁先执行不用相互等待。核心就是CompletionService,它的实质就是整合了线程池的阻塞队列。26前篇讲了并行、聚合、批量任务的处理。本节则讲了最后一种情况,分治:Fork/Join 计算框架,包括分治线程池ForkJoinPool和分治任务 ForkJoinTask。
22-26总结:都是从任务的视角,来讲分工。22-25:对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。22-25覆盖了并行、聚合、批量。26则讲Java分治框架Fork/Join的用法。
Java几种任务函数:Runnable、Callable、Consumer、Fucntion,分别是无参无返回、无参有返回、有参无返回、有参有返回。*
总结所有锁的常用API:
三、并发设计模式
略。
四、案例分析
略。
五、其他并发模型
42.Actor模型:面向对象原生的并发模型
Hello Actor (Akka)
在 Actor 模型中,所有的计算都是在 Actor 中执行的。在面向对象编程里面,一切都是对象;在 Actor 模型里,一切都是 Actor,并且 Actor 之间是完全隔离的,不会共享任何变量。
并发问题的根源就在于共享变量,而 Actor 模型中 Actor 之间不共享变量,那用 Actor 模型解决并发问题,一定是相当顺手。的确是这样,所以很多人就把 Actor 模型定义为一种并发计算模型。
但是 Java 语言本身并不支持 Actor 模型,想在 Java 语言里使用 Actor 模型,需要借助类库,如Akka。
以下是使用Akka的示例代码:
我们首先创建了一个 ActorSystem(Actor 不能脱离 ActorSystem 存在);之后创建了一个 HelloActor,Akka 中创建 Actor 并不是 new 一个对象出来,而是通过调用 system.actorOf() 方法创建的,该方法返回的是 ActorRef,而不是 HelloActor;最后通过调用 ActorRef 的 tell() 方法给 HelloActor 发送了一条消息 “Actor” 。
// 该 Actor 当收到消息 message 后,
// 会打印 Hello message
static class HelloActor extends UntypedActor {
@Override
public void onReceive(Object message) {
System.out.println("Hello " + message);
}
}
public static void main(String[] args) {
// 创建 Actor 系统
ActorSystem system = ActorSystem.create("HelloSystem");
// 创建 HelloActor
ActorRef helloActor = system.actorOf(Props.create(HelloActor.class));
// 发送消息给 HelloActor
helloActor.tell("Actor", ActorRef.noSender());
}
Actor 模型和面向对象编程契合度非常高,完全可以用 Actor 类比面向对象编程里面的对象,而且 Actor 之间的通信方式完美地遵守了消息机制,而不是通过对象方法来实现对象之间的通信。
消息和对象方法的区别
Actor 中的消息机制,可以类比这现实世界里的写信。Actor 内部有一个邮箱(Mailbox),接收到的消息都是先放到邮箱里,如果邮箱里有积压的消息,那么新收到的消息就不会马上得到处理,也正是因为 Actor 使用单线程处理消息,所以不会出现并发问题。你可以把 Actor 内部的工作模式想象成只有一个消费者线程的生产者 - 消费者模式。
在 Actor 模型里,发送消息仅仅是把消息发出去而已,接收消息的 Actor 在接收到消息后,也不一定会立即处理,也就是说Actor 中的消息机制完全是异步的。而调用对象方法,实际上是同步的,对象方法 return 之前,调用方会一直等待。
除此之外,调用对象方法,需要持有对象的引用,所有的对象必须在同一个进程中。而在 Actor 中发送消息,类似于现实中的写信,只需要知道对方的地址就可以,发送消息和接收消息的 Actor 可以不在一个进程中,也可以不在同一台机器上。因此,Actor 模型不但适用于并发计算,还适用于分布式计算。
Actor 的规范化定义
Actor 是一种基础的计算单元,具体来讲包括三部分能力,分别是:
- 处理能力,处理接收到的消息。
- 存储能力,Actor 可以存储自己的内部状态,并且内部状态在不同 Actor 之间是绝对隔离的。
- 通信能力,Actor 可以和其他 Actor 之间通信。
当一个 Actor 接收的一条消息之后,这个 Actor 可以做以下三件事:
- 创建更多的 Actor;
- 发消息给其他 Actor;
- 确定如何处理下一条消息。
总结
在 Java 领域,除了可以使用 Akka 来支持 Actor 模型外,还可以使用 Vert.x,不过相对来说 Vert.x 更像是 Actor 模型的隐式实现,对应关系不像 Akka 那样明显,不过本质上也是一种 Actor 模型。
Actor 可以创建新的 Actor,这些 Actor 最终会呈现出一个树状结构,非常像现实世界里的组织结构,所以利用 Actor 模型来对程序进行建模,和现实世界的匹配度非常高。Actor 模型和现实世界一样都是异步模型,理论上不保证消息百分百送达,也不保证消息送达的顺序和发送的顺序是一致的,甚至无法保证消息会被百分百处理,这个是目前厂商在解决的难点。
43.软件事务内存:借鉴数据库的并发经验
软件事务内存(Software Transactional Memory,简称 STM)。
数据库事务发展了几十年了,目前被广泛使用的是MVCC(全称是 Multi-Version Concurrency Control),也就是多版本并发控制。
MVCC 可以简单地理解为数据库事务在开启的时候,会给数据库打一个快照,以后所有的读写都是基于这个快照的。当提交事务的时候,如果所有读写过的数据在该事务执行期间没有发生过变化,那么就可以提交;如果发生了变化,说明该事务和有其他事务读写的数据冲突了,这个时候是不可以提交的。
STM 借鉴的是数据库的经验,数据库虽然复杂,但仅仅存储数据,而编程语言除了有共享变量之外,还会执行各种 I/O 操作,很显然 I/O 操作是很难支持回滚的。所以,STM 也不是万能的。目前支持 STM 的编程语言主要是函数式语言,函数式语言里的数据天生具备不可变性,利用这种不可变性实现 STM 相对来说更简单。
示例:
class Account {
// 余额
private TxnRef<Integer> balance;
// 构造方法
public Account(int balance) {
this.balance = new TxnRef<Integer>(balance);
}
// 转账操作
public void transfer(Account target, int amt){
STM.atomic((txn)->{
Integer from = balance.getValue(txn);
balance.setValue(from-amt, txn);
Integer to = target.balance.getValue(txn);
target.balance.setValue(to+amt, txn);
});
}
}
44.协程:更轻量级的线程
线程太重,因此出了线程池等一堆复杂的工具类。为了更好的使用并发,Golang提供了更轻便的协程。
我们可以把协程简单地理解为一种轻量级的线程。从操作系统的角度来看,线程是在内核态中调度的,而协程是在用户态调度的,所以相对于线程来说,协程切换的成本更低。
协程也有自己的栈,但是相比线程栈要更小,典型的线程栈大小差不多有 1M,而协程栈的大小往往只有几 K 或者几十 K。
Golang 中的协程
要让 hello() 方法在一个新的协程中执行,只需要go hello("World")
这一行代码就搞定:
import (
"fmt"
"time"
)
func hello(msg string) {
fmt.Println("Hello " + msg)
}
func main() {
// 在新的协程中执行 hello 方法
go hello("World")
fmt.Println("Run in main")
// 等待 100 毫秒让协程执行结束
time.Sleep(100 * time.Millisecond)
}
协程是为了解决易用性问题,摒弃复杂的线程池。Java OpenSDK 中 Loom 项目的目标就是支持协程,未来Java也将支持协程。
45.CSP模型:Golang的主力队员
Golang 提供了两种不同的方案解决线程/协程间的协作问题:
- 一种方案支持协程之间以共享内存的方式通信,Golang 提供了管程和原子类来对协程进行同步控制,这个方案与 Java 语言类似;
- 另一种方案支持协程之间以消息传递(Message-Passing)的方式通信,本质上是要避免共享,Golang 的这个方案是基于CSP(Communicating Sequential Processes)模型实现的。Golang 比较推荐的方案是后者。
什么是 CSP 模型
前文中介绍了 Actor 模型,Actor 模型中 Actor 之间就是不能共享内存的,彼此之间通信只能依靠消息传递的方式。Golang 实现的 CSP 模型和 Actor 模型看上去非常相似,Golang 程序员中有句格言:“不要以共享内存方式通信,要以通信方式共享内存(Don’t communicate by sharing memory, share memory by communicating)。”
Golang 中协程之间通信推荐的是使用 channel,channel 你可以形象地理解为现实世界里的管道。另外,calc() 方法的返回值是一个只能接收数据的 channel ch,它创建的子协程会把计算结果发送到这个 ch 中,而主协程也会将这个计算结果通过 ch 读取出来。以下是4个协程累加的示例:
import (
"fmt"
"time"
)
func main() {
// 变量声明
var result, i uint64
// 单个协程执行累加操作
start := time.Now()
for i = 1; i <= 10000000000; i++ {
result += i
}
// 统计计算耗时
elapsed := time.Since(start)
fmt.Printf(" 执行消耗的时间为:", elapsed)
fmt.Println(", result:", result)
// 4 个协程共同执行累加操作
start = time.Now()
ch1 := calc(1, 2500000000)
ch2 := calc(2500000001, 5000000000)
ch3 := calc(5000000001, 7500000000)
ch4 := calc(7500000001, 10000000000)
// 汇总 4 个协程的累加结果
result = <-ch1 + <-ch2 + <-ch3 + <-ch4
// 统计计算耗时
elapsed = time.Since(start)
fmt.Printf(" 执行消耗的时间为:", elapsed)
fmt.Println(", result:", result)
}
// 在协程中异步执行累加操作,累加结果通过 channel 传递
func calc(from uint64, to uint64) <-chan uint64 {
// channel 用于协程间的通信
ch := make(chan uint64)
// 在协程中执行累加操作
go func() {
result := from
for i := from + 1; i <= to; i++ {
result += i
}
// 将结果写入 channel
ch <- result
}()
// 返回结果是用于通信的 channel
return ch
}
CSP 模型与生产者 - 消费者模式
可以简单地把 Golang 实现的 CSP 模型类比为生产者 - 消费者模式,而 channel 可以类比为生产者 - 消费者模式中的阻塞队列。不过,需要注意的是 Golang 中 channel 的容量可以是 0,容量为 0 的 channel 在 Golang 中被称为无缓冲的 channel,容量大于 0 的则被称为有缓冲的 channel。
无缓冲的 channel 类似于 Java 中提供的 SynchronousQueue,主要用途是在两个协程之间做数据交换。比如上面累加器的示例代码中,calc() 方法内部创建的 channel 就是无缓冲的 channel。
而创建一个有缓冲的 channel 也很简单,在下面的示例代码中,我们创建了一个容量为 4 的 channel,同时创建了 4 个协程作为生产者、4 个协程作为消费者。
// 创建一个容量为 4 的 channel
ch := make(chan int, 4)
// 创建 4 个协程,作为生产者
for i := 0; i < 4; i++ {
go func() {
ch <- 7
}()
}
// 创建 4 个协程,作为消费者
for i := 0; i < 4; i++ {
go func() {
o := <-ch
fmt.Println("received:", o)
}()
}
Golang 中的 channel 是语言层面支持的,所以可以使用一个左向箭头(<-)来完成向 channel 发送数据和读取数据的任务,使用上还是比较简单的。Golang 中的 channel 是支持双向传输的,所谓双向传输,指的是一个协程既可以通过它发送数据,也可以通过它接收数据。
不仅如此,Golang 中还可以将一个双向的 channel 变成一个单向的 channel,在累加器的例子中,calc() 方法中创建了一个双向 channel,但是返回的就是一个只能接收数据的单向 channel,所以主协程中只能通过它接收数据,而不能通过它发送数据,如果试图通过它发送数据,编译器会提示错误。对比之下,双向变单向的功能,如果以 SDK 方式实现,还是很困难的。
CSP 模型与 Actor 模型的区别
第一个最明显的区别就是:Actor 模型中没有 channel。Actor 模型中的 mailbox 对于程序员来说是“透明”的,mailbox 明确归属于一个特定的 Actor,是 Actor 模型中的内部机制;而且 Actor 之间是可以直接通信的,不需要通信中介。但 CSP 模型中的 channel 就不一样了,它对于程序员来说是“可见”的,是通信的中介,传递的消息都是直接发送到 channel 中的。
第二个区别是:Actor 模型中发送消息是非阻塞的,而 CSP 模型中是阻塞的。Golang 实现的 CSP 模型,channel 是一个阻塞队列,当阻塞队列已满的时候,向 channel 中发送数据,会导致发送消息的协程阻塞。
第三个区别则是关于消息送达的:Actor 模型理论上不保证消息百分百送达,而在 Golang 实现的CSP 模型中,是能保证消息百分百送达的。不过这种百分百送达也是有代价的,那就是有可能会导致死锁。
比如,下面这段代码就存在死锁问题,在主协程中,我们创建了一个无缓冲的 channel ch,然后从 ch 中接收数据,此时主协程阻塞,main() 方法中的主协程阻塞,整个应用就阻塞了。这就是 Golang 中最简单的一种死锁。
func main() {
// 创建一个无缓冲的 channel
ch := make(chan int)
// 主协程会阻塞在此处,发生死锁
<- ch
}
Java 领域可以借助第三方的类库JCSP来支持 CSP 模型,不过JCSP 并没有经过广泛的生产环境检验,所以并不建议在生产环境中使用。
CSP 模型是托尼·霍尔(Tony Hoare)在 1978 年提出的,霍尔在并发领域还有一项重要成就,那就是提出了霍尔管程模型,Java 领域解决并发问题的理论基础就是它。
转载请注明来源