-
程序是由指令和数据组成的,但这些指令要运行起来,数据要读写,就必须将指令加载至 CPU,数据加载至内存。再指令运行过程中还需要用到磁盘、网络等设备。而进程就是用来加载指令、管理内存、管理IO的。
-
当一个程序被运行,从磁盘加载这个程序的代码到内存,这时就开启了一个进程。
-
进程可以视为程序的一个实例。有些程序可以同时运行多个实例进程(记事本、画图、浏览器等),有的程序只能启动一盒实例进程(360、网易云等)
-
一个进程中可以分为一到多个线程。
-
一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给CPU执行。
-
在 Java 中,线程作为最小调度单位,进程作为资源分配的最小单位。在 Windows 中进程是不活动的,只是作为线程的容器。
-
进程基本上相互独立,而线程存在于进程内,是进程的子集。
-
进程拥有共享的资源 ,如内存空间等,供其内部的线程共享。
-
进程间通信较为复杂:
-
同一台计算机的进程通信称为 IPC(Inter process communication)
-
不同计算机之间的进程通信,需要通过网络,并遵守协议,如 HTTP等。
-
-
线程通信相对简单,因为它们共享进程内的内存(比如多个线程可以访问到同一个共享变量)
-
线程更轻量,线程上下文(CPU时间片)切换成本一般要比进程上下切换低。
并行:同一时刻执行。
并发:同一时间段执行。
-
涉及到线程上下文切换,将 cpu时间片 分给不同的线程使用。(Windows 时间片最小约为15毫秒)
-
假设8核cpu开启了16个线程,但不足以同时满足1000人秒杀,这种情况操作系统的任务调度器就会让线程轮流使用cpu。
从方法调用的角度看:
-
方法的调用者需要等待结果返回,才能继续运行即是同步。
-
方法的调用者不需要等待结果返回,发起方法调用后,就可以返回,继续运行即是异步。
Thread t = new Thread("t1") {
@Override
public void run() {
log.debug("new Thread");
}
};
// 启动线程
t.start();
log.debug("main...");new Thread(() ->{
log.debug("Runnable...");
}, "t1").start();
log.debug("main...");FutureTask<Integer> future = new FutureTask<>(() -> {
log.debug("FutureTask...");
return 100;
});
new Thread(future, "t1").start();
// get() 阻塞同步等待 task 执行的结果
log.debug("result: {}", future.get());FutureTask 能够接收 Callable 类型的参数,用来处理有返回结果的情况。
-
任务管理器可以查看进程和线程数,也可以 kill 进程。
-
tasklist查看进程。 -
taskkill /F /PID <PID>kill 线程。
-
ps -fe查看 Java 进程。 -
ps -fT -p <PID>查看指定进程。 -
kill -9 <PID>kill 进程。 -
top查看进程,按 H 切换显示线程。 -
top -H -p <PID>查看指定进程的所以线程。
-
jps查看所以 Java 进程。 -
jstack <PID>查看指定 Java 进程的所以线程状态。 -
jconsole查看指定 Java 进程中线程的运行情况(可视化界面)。
jconsole 远程监控配置(Linux):
- 需要以如下配置运行 Java 类或 jar(java -jar)。
java -Djava.rmi.server.hostname=ip地址 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=连接端口 -Dcom.sun.management.jmxremote.ssl=是否安全连接 -Dcom.sun.management.jmxremote.authenticate=是否认证 Java类或jar-
修改 /etc/hosts 文件将 127.0.0.1 映射至主机名
-
如果需要认证访问,还需要几个步骤(具体Google,就不再赘述)
| API | 描述 |
|---|---|
start() |
启动一个新线程,在新的线程中运行 run 方法,但是 start 只是让线程进入就绪状态,并不是立刻执行,CPU的时间片还没分给它。每个线程对象的 start 方法只能调用一次,多次调用会出现 IllegalThreadStateException。 |
run() |
新线程启动后会调用的方法。如果在Thread对象中传递了 Runnable 参数,那么线程启动后会调用 Runnable 中的 run 方法。否则默认不执行任何操作。但可以创建 Thread 的子类,来覆盖默认行为。 |
join() |
等待线程运行结束。 |
join(long n) |
等待线程运行结束,最多等待 n 毫秒。 |
getId() |
获取线程的长整型id,id唯一。 |
getPriority() |
获取线程优先级。 |
setPriority() |
设置线程优先级。Java中线程优先级是1~10的整数,默认优先级5,较大的优先级能提高该线程被CPU调度的几率。 |
getState() |
获取线程状态。Java中线程状态使用枚举类表示的。分别为 NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING、TERMINATED。 |
isInterrupted() |
判断是否被中断,不会清除中断标记。 |
isAlive() |
线程是否存活,为true表示还没有执行完任务。 |
interrupt() |
中断线程,如果被中断的线程正在 sleep、wait、join 会导致被中断的线程抛出 InterruptedException,并清除中断标记;如果中断正在运行的线程,则会设置中断标记;park(LockSupport) 的线程被中断,也会设置中断标记。 |
sleep(long n) |
让当前指执行的线程休眠 n 毫秒,休眠期间会让出 cpu 的时间片(休眠期间不会占用cpu)。 |
-
调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)。
-
其它线程可以使用 interrupt() 方法中断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException。
-
所谓的中断,就是把正在睡眠的线程"叫醒"。
-
睡眠结束后的线程未必会立即得到执行(等待分配CPU时间片)。
-
建议使用 TimeUnit 的 sleep 替代 Thread 的 sleep 来获得更好的可读性。
-
yield():通知线程调度器让出当前线程对 cpu 的使用。 -
调用
yield()会让当前线程从 Running 进入 Runnable 状态(就绪)。然后调度器执行其它线程。(本身可能也会继续得到) -
具体的实现依赖操作系统的任务调度器。
join:等待线程运行结束,方法是同步的。
join(long n):等待线程运行结束,最多等待 n 毫秒。限时同步。但是如果线程提前执行完了,会导致 join 结束。(如果 join 等待3秒,但是线程两秒就执行完业务了,那么 join 会提前结束,这时等价于 无参 join)
中断正在睡眠(sleep、wait、join)的线程,会清空中断标记(false)。
如果中断正常执行的线程,则不会InterruptException,会将中断标记置为true。被中断的线程可以用Thread.currentThread().isInterrupted()判断是否被中断,然后执行对应的逻辑。可以用来停止线程。
graph TD
A("while(true)") --> B
B("有没有被打断?") --是--> C[收尾工作]
C --> G((结束循环))
B --否--> D(睡眠指定时间)
D --无异常--> E(执行监控记录)
D --有异常--> F(设置打断标记)
E --> A
F --> A
-
使用线程对象的
stop()方法停止线程- stop 方法会 kill 线程,如果这时线程锁住了共享资源,那么当它被 kill 后就没有机会释放锁了,其它线程也将无法获取锁。
-
使用
System.exit(int n)方法停止线程- 目的是仅停止一个线程。但这种做法会让整个程序都停止。
默认情况下,Java 进程需要等待所以线程都运行结束,才会结束。
守护(Daemon)线程:只要其它非守护线程运行结束了,即时守护线程的代码没有执行完,也会被强制结束。
-
垃圾回收器线程就是一个守护线程。
-
Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以当 Tomcat 接收到 shutdown 命令后,不会等待 Acceptor 和 Poller 处理完当前请求。
这是从操作系统层面来描述的:
-
初始状态:仅仅创建了线程对象,还没有和操作系统线程关联。
-
可运行状态:(就绪状态,yield)与操作系统线程关联,可以被 CPU 调度执行。
-
运行状态:获得了 CPU 时间片。当CPU时间片用完,会从运行状态转换中可运行状态,会导致线程的上下文切换。
-
阻塞状态:调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到CPU,会导致线程上下文切换,进入阻塞状态。当BIO操作完成,会由操作系统唤醒阻塞的线程,转换至可运行状态。
-
终止状态:生命周期结束,线程任务执行完了。
Java API 层面描述:
-
NEW:创建线程对象。 -
RUNNABLE:调用了 start() 方法。RUNNABLE状态涵盖了操作系统层面的可运行状态、运行状态以及阻塞状态(BIO操作、阻塞API)。 -
BLOCKED:synchronized。 -
WAITING:join()、wait()、LockSupport.park()。 -
TIMED_WAITING:sleep(long)、wait(long)、join(long)、LockSupport。 -
TERMINATED:生命周期结束,线程任务执行完了。
虚拟机栈、栈帧、程序计数器、局部变量表、返回地址 ...
管程:Monitor
i++不是原子操作,在字节码上对应四条指令:
getstatic i // 获取静态变量 i 的值
iconst_1 // 把数字1放到操作数栈中
iadd // 自增
putstatic i // 将修改后的值存入静态变量 i如果此时有A、B两个线程在多线程下修改静态变量 i 的值,因为两个线程在字节码种都会有putstatic(将值写入内存(方法区,JDK8是元空间,在直接内存中存储)),此时 i 的值就会有两种可能。
负数情况:
sequenceDiagram
participant t1 as 线程A
participant t2 as 线程B
participant i as static i
i ->> t2 : getstatic i 读取 0
t2 ->> t2 : iconst_1 准备常数 1
t2 ->> t2 : isub, 线程内 i = -1
t2 ->> t1 : 上下文切换(时间片用完)
i ->> t1 : getstatic i 读取 0
t1 ->> t1 : iconst_1 准备常数 1
t1 ->> t1 : isub, 线程内 i = 1
t1 ->> i : putstatic i 写入 1
t1 ->> t2 : 上下文切换(时间片用完)
t2 ->> i : putstatic i 写入 -1
如果是正数的情况,则线程A、B执行顺序调换即可。这是由于指令交错执行造成的线程不安全。
在多线程对共享资源读写操作时会发生指令交错,这时就会出现问题(并发问题)。
一段代码块内如果存在对共享资源的多线程读写操作,则称这段代码块为临界区。
多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,则称之为发生了竞态条件。
避免临界区的竞态条件发生:
-
阻塞式:synchronized、Lock
-
非阻塞式:原子变量
synchronized,俗称对象锁,采用互斥的方式让同一时刻只能有一个线程可以持有对象锁。即使被 synchronized 锁住,线程也不可能一直执行,如果CPU时间片用完了,再去等待获取CPU时间片,期间是不会释放锁的。
synchronized 用对象锁保证了临界区内代码的原子性,临界区内的代码不会被线程切换打断。
// 成员方法
class Test {
public synchronized void test() {}
}
// 等价于
class Test {
public void test() {
synchronized(this) {}
}
}
// 静态方法
class Test {
public synchronized static void test() {}
}
// 等价于
class Test {
public static void test() {
synchronized(Test.class) {}
}
}以32位虚拟机为例,32位虚拟机空对象头占8字节,64位占16字节。
|----------------------------------------------------------------|
| Object Header (64 bits) |
|-------------------------------|--------------------------------|
| Mark Word (32 bits) | Klass Word (32 bits) |
|-------------------------------|--------------------------------||---------------------------------------------------------------------------------------------------|
| Object Header (96 bits) |
|-------------------------------|--------------------------------|----------------------------------|
| Mark Word (32 bits) | Klass Word (32 bits) | array length (32 bits) |
|-------------------------------|--------------------------------|----------------------------------||----------------------------------------------------------------------|------------------------|
| Mark Word (32 bits) | State |
|----------------------------------------------------------------------|------------------------|
| hashcode:25 | age:4 | biased_lock:0 | 01 | Normal |
|----------------------------------------------------------------------|------------------------|
| thread:23 | epoch:2 | age:4 | biased_lock:1 | 01 | Biased |
|----------------------------------------------------------------------|------------------------|
| ptr_to_lock_record:30 | 00 | Lightweight Locked |
|----------------------------------------------------------------------|------------------------|
| ptr_to_heavyweight_monitor:30 | 10 | Heavyweight Locked |
|----------------------------------------------------------------------|------------------------|
| | 11 | Marked for GC |
|----------------------------------------------------------------------|------------------------|biased_lock:0:表示是否是偏向锁,紧接着最后2位01表示加锁状态。
|--------------------------------------------------------------------------------------------------------------|
| Mark Word (64 bits) | State |
|-------------------------------------------------------------------------------------|------------------------|
| unused:25 |hashcode:25 | unused:1 | age:4 | biased_lock:0 | 01 | Normal |
|-------------------------------------------------------------------------------------|------------------------|
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | 01 | Biased |
|-------------------------------------------------------------------------------------|------------------------|
| ptr_to_lock_record:62 | 00 | Lightweight Locked |
|-------------------------------------------------------------------------------------|------------------------|
| ptr_to_heavyweight_monitor:62 | 10 | Heavyweight Locked |
|-------------------------------------------------------------------------------------|------------------------|
| | 11 | Marked for GC |
|-------------------------------------------------------------------------------------|------------------------|Monitor:监视器。
每个 Java 对象都可以关联一个 Monitor(操作系统提供的对象) 对象,如果使用 synchronized 给对象加锁(重量级)之后,该对象头的 Mark Word 中就会被设置指向 Monitor 对象的指针。
Monitor 结构如下:
-
刚开始 Monitor 中 Owner 为 null。
-
当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 设置为 Thread-2,Monitor 中只能同时存在一个 Owner。
-
在 Thread-2 加锁的过程中,如果 Thread-3、Thread-4、Thread-5 也执行到 synchronized(obj),就会进入 EntryList BLOCKED。
-
等待 Thread-2 执行完同步代码块中的逻辑后,然后唤醒 EntryList 中等待的线程来竞争锁,竞争锁时是非公平的。
-
WaitSet 中的 Thread-0、Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程。
synchronized 必须是获取到同一个对象的 monitor 才有上述的效果。
public class App {
static final Object lock = new Object();
static int counter = 0;
public static void main(String[] args) {
synchronized (lock) {
counter++;
}
}
}字节码:
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: getstatic #2 // 加载静态变量 lock引用 到操作数栈上, (synchronized 开始)
3: dup // 复制一份 lock引用,解锁用
4: astore_1 // 将 lock引用 从操作数栈弹出,存储局部变量表 slot 1
5: monitorenter // (加锁)将 lock对象 MarkWord 设置为 Monitor 指针
6: getstatic #3 // 加载静态变量 counter 到操作数栈上
9: iconst_1 // 准备常数 1,弹入到操作数栈上
10: iadd // +1
11: putstatic #3 // 将修改后 counter 的值写回静态变量
14: aload_1 // 将局部变量表 slot 1 加载到操作数栈 (lock引用)
15: monitorexit // (解锁)将 lock 对象 MarkWord 重置,唤醒 EntryList
16: goto 24
19: astore_2 // 出现异常了,将异常信息从操作数栈弹出,存入局部变量表 slot 2
20: aload_1 // 从局部变量表 slot 1 加载 lock引用 到操作数栈上
21: monitorexit // (解锁)将 lock 对象 MarkWord 重置,唤醒 EntryList
22: aload_2 // 将局部变量表 slot 2 异常信息加载到操作数栈上
23: athrow // throw e
24: return
// 会监测同步代码块中的异常,即时出现异常,也能正常解锁
Exception table:
from to target type
6 16 19 any
19 22 19 any
LineNumberTable: ...
LocalVariableTable:
Start Length Slot Name Signature
0 25 0 args [Ljava/lang/String;-
轻量级锁(Lightweight Lock),也称为偏向锁(Biased Locking),是Java中用于提高多线程同步性能的一种机制。它是一种乐观锁的实现方式,用于解决竞争不激烈的情况下对同步操作的性能影响。
-
轻量级锁的基本思想是在对象头中添加一些标志位来表示锁的状态,通过CAS(Compare and Swap)操作来实现锁的获取和释放。当一个线程尝试获取锁时,首先会尝试使用CAS将对象头中的锁标志位更新为自己的线程ID,如果成功获取锁,则表示该线程获取到了轻量级锁。如果CAS操作失败,说明有竞争发生,此时轻量级锁会膨胀为重量级锁,进而采用传统的互斥量方式进行同步。
-
轻量级锁的优势在于避免了传统的互斥量同步带来的性能开销,适用于线程间竞争不激烈的场景。它的主要特点包括:
-
使用CAS操作实现锁的获取和释放,避免了线程阻塞和唤醒带来的开销;
-
锁的状态保存在对象头中,不需要额外的内存空间;
-
对于没有竞争的情况下,几乎没有额外的性能开销。
-
-
需要注意的是,轻量级锁并不适用于存在大量线程间竞争的情况,因为在竞争激烈的场景下,轻量级锁会频繁膨胀为重量级锁,这样反而会增加锁操作的开销。因此,在设计并发应用时,需要根据实际情况选择合适的锁机制。
直白讲就是:
-
虚拟机首先将在当前线程的栈帧创建一块空间用于存储锁记录(lock record),把对象头中的Mark Word复制到锁记录中,拷贝成功后,虚拟机将使用CAS操作将对象Mark Word更新为指向Lock Record的指针。(并将Lock Record里的owner指针指向对象的Mark Word)。
-
若操作成功,那就完成了轻量锁操作,把对象头里的tag改成00,表示此对象处于轻量级锁定状态。
-
如果失败,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行;
-
否则说明多个线程在竞争锁:若当前只有一个等待线程,尝试自旋获取锁,自旋超过一定的次数,或者多个线程竞争,则需要在当前对象上生成重量锁。
偏向锁使用场景:冲突很少,基本上就一个线程使用。
轻量级锁在没有竞争时,每次锁重入仍然需要执行 CAS 操作。在 Java6 引入了偏向锁来做进一步优化,只有第一次使用 CAS 时才将线程ID设置到对象的 Mark Word 中,之后锁重入发现这个线程ID是自己的,就表示没有竞争,不用再 CAS。以后只要不发生锁竞争,这个锁对象就归该线程所有。
|--------------------------------------------------------------------------------------------------------------|
| Mark Word (64 bits) | State |
|-------------------------------------------------------------------------------------|------------------------|
| unused:25 |hashcode:25 | unused:1 | age:4 | biased_lock:0 | 01 | Normal |
|-------------------------------------------------------------------------------------|------------------------|
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | 01 | Biased |
|-------------------------------------------------------------------------------------|------------------------|
| ptr_to_lock_record:62 | 00 | Lightweight Locked |
|-------------------------------------------------------------------------------------|------------------------|
| ptr_to_heavyweight_monitor:62 | 10 | Heavyweight Locked |
|-------------------------------------------------------------------------------------|------------------------|
| | 11 | Marked for GC |
|-------------------------------------------------------------------------------------|------------------------|-
如果开启了偏向锁(默认开启),那么对象创建后,Mark Word值为 0x05,即最后3位为 101,这时它的 thread、epoch、age都为0。
-
偏向锁默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以使用 VM 参数
-XX:BiasedLockingStartupDelay=0来禁用延迟。 -
如果没有开启偏向锁(
-XX:UseBiasedLocking=false),当对象创建后,Mark Word 值为 0x01,即最后3位为 001,这时它的 hashcode、age都为0,第一次用到 hashcode 时才会赋值。 -
只要没有发生锁竞争,对象头 Mark Word 就会一直存储着上一次的线程ID。
调用对象 hashCode、其它线程使用锁对象、调用 wait/notify 都会导致偏向锁撤销。wait/notify 只有重量级锁才有。
由可偏向锁撤销到不可偏向锁。
偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。
偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,检查持有偏向锁的线程是否处于活动状态(同步代码块是否执行完),不处于活动状态则将对象头设置为无锁状态,线程可以使用CAS竞争锁。处于活动状态,则升级到轻量级锁,唤醒暂停线程继续执行。
在Java中,批量重偏向是一种优化技术,用于解决偏向锁的不适用场景。当一个线程获取对象的偏向锁后,如果其他线程也想获取该对象的锁,那么需要撤销原有的偏向锁,并将锁状态进行升级,这个过程称为批量重偏向。
批量重偏向的目的是为了避免偏向锁在多线程竞争下的频繁撤销和重偏向操作,从而提高并发性能。在某些场景下,当对象的偏向锁被多个线程竞争时,频繁的偏向锁撤销和重偏向操作会导致性能下降。为了避免这种情况,JVM引入了批量重偏向机制。
具体来说,当一个偏向锁的对象被多个线程竞争时,JVM会在达到一定条件时触发批量重偏向操作。这个条件通常是当对象的偏向锁数量超过一定阈值(超过20次)时,或者当系统经过一段时间后,判断当前偏向锁的竞争情况不再适合偏向锁优化时。
当撤销偏向锁超过阈值(20次)后,会给这些对象加锁时重新偏向至加锁线程。
当撤销偏向锁超过阈值(40次)后,加锁对象会变为不可偏向(无锁状态),新建的对象也是不可偏向的。
即类的实例会变成不可偏向。原因是偏向是指类偏性,而不是对象实例偏向;一个类只能有一个偏向。
Java锁膨胀是指在多线程环境下,当某个线程获取锁之后,如果其他线程也需要获取相同锁,并且无法立即获取到,就会触发锁膨胀的过程。
Java中锁膨胀主要涉及到两种类型的锁(偏向锁和轻量级锁):
-
偏向锁膨胀:偏向锁是为了解决无竞争的情况下的性能问题。当一个线程获取了偏向锁后,如果其他线程也需要获取该锁,就会触发偏向锁膨胀,偏向锁会升级为轻量级锁。
-
轻量级锁膨胀:当一个线程获取锁时,如果该锁的标记位为轻量级锁,并且有其他线程尝试获取该锁但失败了,那么该锁会膨胀为重量级锁,即锁的实现由CAS操作变为互斥量操作。
锁膨胀的过程涉及到锁的升级,从偏向锁到轻量级锁,再到重量级锁,每一次升级都会增加锁操作的开销,因此在设计并发程序时,应该尽量避免不必要的锁膨胀,以提高程序的性能和并发能力。
锁升级后,解锁操作需按照升级后锁的解锁方式进行解锁。
重量级锁竞争的时候,还可以使用自旋来进行优化。在 Java6 之后自旋锁是自适应的,自旋成功的可能性较高,就会多自旋几次,反之,就会减少自旋次数或者不自旋。Java7之后不能控制是否开启自旋功能。
即锁对象没有逃离方法作用域,JVM 会认为可以不用加锁操作。
-XX:-EliminateLocks 关闭锁消除优化。
BLOCKED 是等待锁,而 WAITING 是已经获得锁,但是又放弃了锁。BLOCKED 和 WAITING都处于阻塞状态,不占用CPU时间片。
BLOCKED 线程会在 Owner 线程释放锁时被唤醒。WAITING 线程会在 Owner 线程调用 notify/notifyAll 时被唤醒。WAITING 线程被唤醒后不会立即获得锁,仍需进入 EntryList 重新竞争。
| API | 描述 |
|---|---|
obj.wait() |
让进入 obj 监视器的线程到 WaitSet 等待。wait() 方法会释放锁。有参 wait(long n) 会被提前唤醒。 |
obj.notify() |
随机唤醒一个在 obj 上正在 WaitSet 等待的线程。 |
obj.notifyAll() |
唤醒全部在 obj 上正在 WaitSet 等待的线程。 |
它们都是线程之间进行协作的手段,属于 Object 对象的方法,必须获得此对象,才能调用方法。
-
sleep 是 Thread 方法,wait 是 Object 的方法。
-
sleep 可以不用配合 synchronized 使用,wait 需要和 synchronized 配合使用。
-
sleep 不会释放锁,wait 会释放锁。
wait 无参即无限等待,有参就是有限等待。
sleep(long n) 和 wait(long n) 都是 TIMED_WAITING 状态。
Guarded Suspension,用在一个线程等待另一个线程的执行结果:
-
当有一个结果需要从一个线程传递到另一个线程,让这两个线程关联同一个 GuardedObject。
-
join、Future 的实现,采用的就是 Guarded Suspension 方式。
-
但是当一个线程传递多个结果到另一个线程时,那么可以使用消息队列(生产者/消费者)
FutureTask 内部实现采用下面两个方法进行同步的。
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)每个线程都有一个 Parker 对象,由三部分组成_counter、_cond、_mutex。调用 park() 时,_counter 减1,调用 unpark() 加1。如果先调用 unpark(),_counter=1,那么后调用 park() 时直接减1即可,无需等待阻塞。(0为加锁,1为解锁)
如果调用unpark()时线程在阻塞状态,则唤醒该线程。
调用多次 unpark() 的效果和调用一次 unpark() 一样。
检测死锁可以使用 jconsole 工具,或者使用 jps 定位进程id,再用 jstack 定位死锁。
两个线程互相持有对方想要的锁。
两个线程改变对方的结束条件。
生产者生产速度跟不上消费者消费速度。
ReentrantLock特点:
-
可中断。
-
可以设置超时时间。
-
可以设置公平锁。
-
支持多个条件变量。
与 synchronized 一样,都支持可重入。
基本语法:
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 临界区
} finally {
lock.unlock();
}可重入是指同一个线程如果首次获得了这把锁,可以有权利再次获取这把锁。如果是不可重入锁,那么第二次获得锁时,还是会被锁挡住。
public class App {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
m1();
}
public static void m1() {
lock.lock();
try {
m2();
} finally {
lock.unlock();
}
}
public static void m2() {
lock.lock();
try {
m3();
} finally {
lock.unlock();
}
}
public static void m3() {
lock.lock();
try {
System.out.println("m3");
} finally {
lock.unlock();
}
}
}中断处于阻塞等待的线程,防止无限制的阻塞。加锁使用ReentrantLock.lockInterruptibly(),别忘了解锁。
-
tryLock():获取不到锁立即返回false。 -
tryLock(long timeout, TimeUnit unit):获取不到锁时,等待指定等待时间,等待时间超时还没获取到锁,则返回false。
ReentrantLock 默认是不公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}wait()和notify()这两个方法主要用于多线程间的协同处理,即控制线程之间的等待、通知、切换及唤醒。而ReentrantLock也支持这样条件变量的能力。
条件变量维护的是一个单向链表的队列,调用await()在队列等待和当条件满足时signal()唤醒。
static ReentrantLock lock = new ReentrantLock();
static Condition condition1 = lock.newCondition();
static Condition condition2 = lock.newCondition();
public static void main(String[] args) {
new Thread(() -> {
lock.lock();
try {
condition1.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
lock.lock();
try {
condition1.signal();
} finally {
lock.unlock();
}
}).start();
}可以根据业务场景定义多个 Condition,用于存放不同等待事件的线程。
使用流程:
-
await 前需要获得锁。
-
await 执行后,会释放锁,然后进入 conditionObject 等待。
-
await 的线程被唤醒(中断、超时)后,可以重新竞争锁。
-
竞争锁成功后,从 await 后执行。
API:
// 等价 wait、wait(long n)
void await() throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void awaitUninterruptibly();
// 唤醒线程,等价于 notify、notifyAll
void signal();
void signalAll();JMM Java Memory Model,它定义了主存(线程共享内存)、工作内存(线程私有内存)的抽象概念。
JMM 体现在以下几个方面:
-
原子性:保证指令不会受到线程上下文切换的影响。
-
可见性:保证指令不会受到 CPU 缓存的影响。
-
有序性:保证指令不会受到 CPU 指令并行优化的影响。
synchronized、Atomic工具类、Lock...
修饰成员变量和静态变量。
一个线程对主存的数据进行修改,但对于另一个线程是不可见的(获取的是缓存中的值)。可以使用volatile关键字解决不可见。每次必须到主内存中读取。
synchronized 也能保证数据的可见性、原子性,但 synchronized 属于重量级操作,性能相对较低。被 synchronized 修饰的代码,在开始执行时会加锁,执行完成后会进行解锁,但在一个变量解锁之前,必须先把此变量同步回主存中。synchronized 会强制当前线程读取主内存中的值。
volatile不能保证原子性,仅用在一写多读的情况。只能保证看到最新值,无法解决指令交错。
同一个线程内,JVM 在不会改变程序结果的前提下,可以调整语句的执行顺序。这些指令的各个阶段可以通过重排序和组合来实现指令级并行。
比如 double-check-lock 单例模式,JVM 可能会优化为:先将引用地址赋值给 INSTANCE 变量后,再执行构造方法,那么另一个线程进来一看 INSTANCE 不为空了,可能拿到的是一个未初始化完成的单例对象。
在 JDK5 以上的版本 volatile 才会真正生效。volatile禁止指令重排序。防止之前的指令重排序,利用了内存屏障(写之后,读之前设置内存屏障)。
synchronized 不保证有序性。
sequenceDiagram
participant t1 as t1 线程
participant num as num=0
participant ready as volatile ready=false
participant t2 as t2 线程
t1 -->> t1 : num=2
t1 ->> ready : ready=true
Note over t1,ready: 写屏障
Note over num,t2: 读屏障
t2 ->> ready : 读取ready=true
t2 ->> num : 读取num=2
happens-before 规定了哪些写操作对其它线程的读操作可见,是一套可见性与有序性的规则总结。
以下方式都可以对共享变量读可见。
-
synchronized。
-
volatile。
-
start() 前对共享变量的修改。
-
线程结束前的写,对其它线程得知它结束后的读可见。
-
线程中断前的写。
-
对成员变量或静态变量的默认值(0、false、null)的写,对其它线程对该变量的读可见。
具有传递性,如果 x (happens-before) > y 并且 y (happens-before) > x,那么有 x (happens-before) > z
sequenceDiagram
participant t1 as 线程A
participant A as AtomicInteger对象
participant t2 as 线程B
t1 ->> A : 获取变量 0
t1 ->> t1 :自增1
t2 -->> A : 已经修改为 1 了
t1 ->> A : cas(0, 1) 交换失败
t1 ->> A : 获取变量 1
t1 ->> t1 :自增1
t2 -->> A : 已经修改为 2 了
t1 ->> A : cas(1, 2) 交换失败
t1 ->> A : 获取变量 2
t1 ->> t1 :自增1
t1 ->> A : cas(1, 2) 交换成功
在对数据操作之前,首先会比较当前值跟传入值是否一样,如果一样就更新,否则不执行更新操作直接返回失败状态。
CAS 底层使用的 lock cpmxchg指令(X86架构),在单核或多核CPU下都能够保证 compareAndSwap 的原子性。
在多核状态下,当某个核执行到带有 lock 的指令时,CPU 会让总线锁住,当这个核讲此指令执行完,再开启总线。这个过程中不会被线程的调度机制中断,保证了多个线程对内存操作的准确性,是原子操作。
// compareAndSet
/*
expect:获取的当前最新值
update:要修改的值
工作内存 expect 和 主内存 expect 比较,如果相等则将主内存 expect 替换为 update
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
// 自增
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
/*
var1:是当前 AtomicInteger 对象
var2:当前值(value)。(比如现在要实现 2+1)那么 var2 就是2,var4 就是 1
var5:获取底层当前的值(value),获取最新值
假设此时只有一个线程操作 AtomicInteger 对象,则 this.compareAndSwapInt(var1, var2, var5, var5 + var4) 的参数如下:
this.compareAndSwapInt(AtomicInteger, 2, 2, 2+1)
如果新值(主内存共享)和旧值(工作内存私有)相等,就将2+1写回主内存。在底层达到数据可见性。
不相等则一直循环,直到相等为止。
*/CAS 操作借助了 volatile 来达到交换并替换的效果。
-
CAS 是乐观锁。synchronized 是悲观锁。
-
CAS 体现的是无锁并发、无阻塞并发。但如果线程竞争激烈,必然会频繁重试,效率上也会受影响。线程数不超过CPU核心数时才能发挥 CAS 的性能。
CAS 底层是依赖于操作系统层的 CAS 指令。
以 AtomicInteger 为例:
AtomicInteger i = new AtomicInteger(0);
// i++
i.getAndIncrement();
// ++i
i.incrementAndGet();
// i--
i.getAndDecrement();
// --i
i.decrementAndGet();
// 等价于 i++,只不过不是加1,而是加指定步长
i.getAndAdd(5);
// 等价于 ++i,只不过不是加1,而是加指定步长
i.addAndGet(5);
// 使用函数式接口返回的结果更新当前值(value),返回更新后的值
i.updateAndGet(item -> {
return 5;
});
// 使用函数式接口返回的结果更新当前值(value),返回更新前的值
i.getAndUpdate(item -> {
return 5;
});-
AtomicReference
-
AtomicStampedReference:能够感知其它线程修改了共享变量对象,如果修改了,则自己的 cas 操作就算失败,然后再重试,不过 AtomicStampedReference cas 操作时会带一个版本号。
-
AtomicMarkableReference:简化版 AtomicStampedReference,使用布尔值标识共享变量是否被修改。
// AtomicStampedReference 基本用法
String obj = "A";
AtomicStampedReference<String> reference = new AtomicStampedReference<String>(obj, 0);
String oldReference = reference.getReference();
String newReference = "C";
int stamp = reference.getStamp();
reference.compareAndSet(oldReference, newReference, stamp, stamp+1);
// -------------------------------------------------------------------
// AtomicMarkableReference 基本用法
AtomicMarkableReference<String> markableReference = new AtomicMarkableReference<>(obj, true);
String oldReference = markableReference.getReference();
String newReference = "C";
markableReference.compareAndSet(oldReference, newReference, true, false);保护数组的元素变更时不受多线程影响(线程安全)。
原子更新数组的元素 Integer、Long、引用对象。AtomicReferenceArray类也支持比较并交换功能。即多线程下修改数组的元素 Integer、Long、引用对象 也是线程安全的。
-
AtomicIntegerArray
-
AtomicLongArray
-
AtomicReferenceArray
利用字段更新器,可以针对对象的某个域(Field,字段)进行原子操作。只能配合 volatile 修饰的字段使用,否则抛异常。java.lang.IllegalArgumentException: Must be volatile type
-
AtomicReferenceFieldUpdater
-
AtomicIntegerFieldUpdater
-
AtomicLongFieldUpdater
- LongAdder
cas锁,伪共享。
/**
* Table of cells. When non-null, size is a power of 2.
累加单元数组,懒惰初始化,容量是2次幂
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
基础值,如果没有竞争,则用 cas 累加
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
在 cells 创建或扩容时,置为 1,表示加锁(CAS实现加锁)
*/
transient volatile int cellsBusy;伪共享:即一个缓存行加入了多个Cell对象。
@sun.misc.Contended作用是防止缓存行伪共享,即一个 Cell 对应一个 缓存行。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法。用 cas 方式进行累加,cmp表示旧值,val表示新值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// ...
}| 从CPU到 | 大约需要的时钟周期 |
|---|---|
| 寄存器 | 1 cycle,(4GHz的PCU约为0.25ns) |
| L1 | 3~4cycle |
| L2 | 10~20cycle |
| L3 | 40~45cycle |
| 内存 | 120~240cycle |
因为CP与内存的速度差异很大,因此需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每隔缓存行对应一块内存,一般是64byte(8个long)
但是缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中。
CPU要保证数据的一致性,如果某个CPU核心更改了数据,那么其它CPU核心对应的整个缓存行必须失效。
如上图,因为Cell是数组形式,在内存中是连续存储的,一个Cell为24字节(16字节的对象头和8字节的value),因此缓存行可以存下2个Cell对象。那么问题来了:
-
Core-0要修改Cell[0]。
-
Core-1要修改Cell[1]。
无论谁修改成功,都会导致对象 Core 的缓存行失效。
@sun.misc.Contended注解就是用来解决这个问题的。它的原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,从而让CPU将对象预读至缓存时占用不同的缓存行。这样就不会造成对象缓存行的失效。如下图:
LongAdder adder = new subgraph Connector `NIO EndPoint`();
adder.increment();
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}graph LR
subgraph 分析 LongAdder.add 方法
A(当前线程)-->B(cells)
B --为空--> C(cas base 累加)
C --成功--> D(return)
C --失败--> E(longAccumulate)
B --不为空--> F(当前线程 cell 是否创建)
F --已创建--> G(cas cell 累加)
G --成功--> D
G --失败--> E
F --未创建--> E
end
graph LR
subgraph cells 创建
E --失败--> A
A(循环入口)-->B(cells不存在&加锁&未新建)
B --> C(加锁)
C --成功--> D(创建 cells 并初始化一个 cell)
C --失败--> E(cas base 累加)
D --> R(return)
E --成功--> R
end
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
/*
cells 未创建:
cellsBusy == 0:还没有其它线程对其加锁
cells == as:未新建
casCellsBusy():加锁
*/
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// 初始化累加单元数组长度为2
Cell[] rs = new Cell[2];
// x为要累加的值
rs[h & 1] = new Cell(x);
// 将初始化的 rs 赋值给成员变量 cells
cells = rs;
init = true;
}
} finally {
// 解锁
cellsBusy = 0;
}
if (init)
break;
}
/*
casBase():cas base 累加
*/
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}graph LR
subgraph cell 创建
S --失败--> A
A(循环入口)-->B(cells存在&cell未创建)
B --创建cell--> C(加锁)
C --失败--> A
C --成功--> S(数组槽位为空)
S --成功--> R(return)
end
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
/*
cell未创建
cells存在但cell未创建(另一个线程)
*/
if ((as = cells) != null && (n = as.length) > 0) {
// 如果为null,说明当前线程还没有创建累加单元 cell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
// 创建 cell
Cell r = new Cell(x); // Optimistically create
// 无锁的情况才尝试加锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
/*
rs[j = (m - 1) & h] == null:如果返回false,说明数组索引处不为空,有其它线程已经抢先一步占有了,继续循环。
*/
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 将创建的Cell赋值给数组对应索引处
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
/*
cells 未创建:
cellsBusy == 0:还没有其它线程对其加锁
cells == as:未新建
casCellsBusy():加锁
*/
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// 初始化累加单元数组长度为2
Cell[] rs = new Cell[2];
// x为要累加的值
rs[h & 1] = new Cell(x);
// 将初始化的 rs 赋值给成员变量 cells
cells = rs;
init = true;
}
} finally {
// 解锁
cellsBusy = 0;
}
if (init)
break;
}
/*
casBase():cas base 累加
*/
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}graph LR
subgraph cell 创建
G --> A
A(循环入口)-- cells存在&cell已创建 --> B(cas cell 累加)
B --成功--> R(return)
B --失败--> D(数组长度是否超过CPU上限)
D --是--> E(改变线程对应的cell)
E --> A
D --否--> F(加锁)
F --失败--> E
F --成功--> G(扩容)
end
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
/*
cell未创建
cells存在但cell未创建(另一个线程)
*/
if ((as = cells) != null && (n = as.length) > 0) {
// 如果为null,说明当前线程还没有创建累加单元 cell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
// 创建 cell
Cell r = new Cell(x); // Optimistically create
// 无锁的情况才尝试加锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
/*
rs[j = (m - 1) & h] == null:如果返回false,说明数组索引处不为空,有其它线程已经抢先一步占有了,继续循环。
*/
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 将创建的Cell赋值给数组对应索引处
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
/*
cas cell 累加
*/
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// 累加成功
break;
/*
判断数组长度是否在于CPU数
*/
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
/*
n >= NCPU:如果为true,collide = false
会进入这个 else if;就不会进入到扩容的else if中了
*/
else if (!collide)
collide = true;
/*
扩容
*/
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
// 扩容为原来的两倍,默认Cell[]为2
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
// 旧数组对应下标内容拷贝到新数组中
rs[i] = as[i];
cells = rs;
}
} finally {
// 解锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
/*
改变线程对应的cell,即换一个cell累加单元试试
*/
h = advanceProbe(h);
}
/*
cells 未创建:
cellsBusy == 0:还没有其它线程对其加锁
cells == as:未新建
casCellsBusy():加锁
*/
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// 初始化累加单元数组长度为2
Cell[] rs = new Cell[2];
// x为要累加的值
rs[h & 1] = new Cell(x);
// 将初始化的 rs 赋值给成员变量 cells
cells = rs;
init = true;
}
} finally {
// 解锁
cellsBusy = 0;
}
if (init)
break;
}
/*
casBase():cas base 累加
*/
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}获取最终结果通过sum方法:
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}简单来说即:原子累加器会在有竞争的情况下创建数组,数组元素对应不同的线程,不同线程的索引都是懒加载的对象。没有竞争情况下,就是单线程只会使用一个long类型。
读源码最难的还是理清作者的逻辑和思路,框架逻辑不见得有这个复杂。
Unsafe对象提供了非常底层的操作内存、线程发方法。Unsafe对象不饿能直接调用,需要通过反射获取。
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
System.out.println(unsafe);
} catch (Exception e) {
e.printStackTrace();
}-
属性用 final 修饰保证了该属性是只读的,不能修改。但如果是引用类型,虽然对象不可变,但其内部的属性是可变的。
-
类用 final 修饰保证了该类中的方法不能被覆盖,该类也无法被继承,防止子类无意间破坏不可变性。
单个方法是线程安全的,但如个其多个方法的组合不一定是线程安全的。
final 也用到了写屏障,final 之前的指令不会跑到 final 之后执行。
场景:当需要复用数量有限的同一类对象时,如字符串常量池、线程池、包装类的缓存等。
ThreadPoolExecutor 使用 int 的高3位来表示线程池状态,低29位表示线程数量。这些信息存储在一个原子变量 ctl 中,目的是可以用一次 cas 原子操作进行赋值。
| 状态 | 高3位 | 接收新任务 | 处理阻塞任务 | 描述 |
|---|---|---|---|---|
| RUNNING | 111 | Y | Y | |
| SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
| STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
| TIDYING | 010 | - | - | 任务全部执行完毕,活动线程为0,即将进入终结 |
| TERMINATED | 011 | - | - | 终结 |
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}-
corePoolSize:核心线程数(最多保留的线程数)。 -
maximumPoolSize:最大线程数(减去核心线程数之后还能创建多少个临时线程)。 -
keepAliveTime:存活时间,即超过 corePoolSize 的线程多久被回收。 -
unit:存活时间单位。 -
workQueue:阻塞队列。 -
threadFactory:线程工厂,用来创建线程。 -
handler:拒绝策略。
阻塞队列满了,新添加的任务才会去创建临时线程。最大线程数满了才会执行拒绝策略。
执行流程:
1. 线程池刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个核心线程来执行该任务。
2. 当线程数达到 corePoolSize 后,这时新添加的任务会被加入到 workQueue 阻塞队列中,等待被线程池执行。
3. 如果 workQueue 是有界队列,那么当阻塞队列中任务数量超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的临时线程来执行新添加的任务。
4. 如果线程数达到 maximumPoolSize,且还有新任务添加,这时会触发拒绝策略。JDK 提供了四种拒绝策略的实现:
AbortPolicy:默认策略。抛 RejectedExecutionException 异常。
CallerRunsPolicy:调用者线程执行该任务(即调用execute这个方法的线程)
DiscardPolicy:放弃本次任务
DiscardOldestPolicy:放弃队列中最早的任务,该任务取而代之。
5. 当高峰过去后,超过 corePoolSize 的临时线程如果在一段时间没有执行任务,则需要结束线程资源。这个时间由 keepAliveTime和unit 控制。固定大小线程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}特点:
-
核心线程数 == 最大线程数,没有临时线程。
-
阻塞队列是无界的,可以添加任意数量的任务。
适用于任务量已知,相对耗时的任务。
带缓冲线程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}特点:
-
核心线程数是 0,最大线程数是 Integer.MAX_VALUE,临时线程的空闲时间是 60s。临时线程可以无限创建,都是临时线程,可被回收。
-
采用 SynchronousQueue 队列,该队列没有容量。将任务放到该队列中,不能马上返回(阻塞),需要等待另一个线程取出这个任务了,才能返回。最多只能放一个。多用于消息队列。
适用于任务数比较密集,但每个任务执行时间较短的情况。
单线程的线程池:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}使用场景:希望多个任务串行执行。线程数为 1,任务数超过 1时,会放入无界队列等待。
如果执行过程中发生异常,该线程池还会再创建一个线程,保证线程池的正常工作。
// 执行任务
void execute(Runnable command);
// 提交任务 task,可以获取执行的返回值
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中的任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中的任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中的任务,哪个任务先执行成功,就返回哪个任务的执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
// 提交 tasks 中的任务,哪个任务先执行成功,就返回哪个任务的执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;-
线程池状态变为 SHUTDOWN。
-
不会接收新任务。
-
已提交的任务会继续执行。
-
不会阻塞调用线程的执行。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态变为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
}-
线程池状态变为 STOP。
-
不会接收新任务。
-
丢弃阻塞队列中的任务。
-
中断正在执行的任务。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}创建多少线程合适?:
-
CPU 密集型运算:通常采用
cpu 核数 + 1能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶替,保证 CPU 的时钟周期不被浪费。 -
IO 密集型运算:经验公式:
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
-
ScheduledThreadPoolExecutor.schedule():延时执行。 -
ScheduledThreadPoolExecutor.scheduleAtFixedRate():定时执行。固定计时。 -
ScheduledThreadPoolExecutor.scheduleWithFixedDelay():定时执行,以任务执行完成后开始计时。
线程池不会主动抛异常,需要手动捕获。或者使用 Callable,接收异常(会将异常封装到返回结果中)。
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
// 延时执行
executor.schedule(() -> {
System.out.println("ok");
}, 1, TimeUnit.SECONDS);
/*
第二个参数:(第一次执行)延时多长时间执行;
第三个参数:(非第一次执行)多久执行一次(频率);
第四个参数:时间单位。
*/
executor.scheduleAtFixedRate(() -> {
log.debug("ok");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 2, 1, TimeUnit.SECONDS);
/*
第二个参数:(第一次执行)延时多长时间执行;
第三个参数:(非第一次执行)多久执行一次(频率),注意:需等待上一个任务执行完后,才开始计时。
第四个参数:时间单位。
*/
executor.scheduleWithFixedDelay(() -> {
log.debug("ok");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 2, 1, TimeUnit.SECONDS);graph LR
subgraph Connector `NIO EndPoint`
L(LimitLatch) --> A(Acceptor)
A --> S1(SocketChannel1)
A --> S2(SocketChannel1)
S1 --读事件--> P(Poller)
S2 --读事件--> P
P --socketProcessor--> W1(worker1)
P --socketProcessor--> W2(worker2)
subgraph Executor
W1
W2
end
end
-
LimitLatch 用来限流,可以控制最大连接数,类似 JUC Semaphore。
-
Acceptor 负责接收新的 socket 连接。
-
Poller 负责监听 socket channel 是否有可读的 IO 事件。
-
一旦有可读事件,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理。
-
Executor 线程池中的工作线程负责处理请求。
Tomcat 的线程数如果达到了 maximumPoolSize,并不会立即抛 RejectedExecutionException。而是会再次尝试将任务放入队列中,如果还是失败,才抛 RejectedExecutionException。
Connector 配置
| 配置项 | 默认值 | 描述 |
|---|---|---|
| acceptorThreadCount | 1 | acceptor 线程数 |
| pollerThreadCount | 1 | poller 线程数 |
| minSpareThreads | 10 | 核心线程数,corePoolSize |
| maxThreads | 200 | 最大线程数,maximumPoolSize |
| executor | - | Executor 名称,用来自定义线程池,覆盖默认配置 |
Executor 线程配置
| 配置项 | 默认值 | 描述 |
|---|---|---|
| threadPriority | 5 | 线程优先级 |
| daemon | true | 是否守护线程 |
| minSpareThreads | 25 | 核心线程数,corePoolSize |
| maxThreads | 200 | 最大线程数,maximumPoolSize |
| maxIdleTime | 60000 | 线程存活时间,单位毫秒 ,默认1分钟 |
| maxQueueSize | Integer.MAX_VALUE | 队列长度 |
| prestartminSpareThreads | false | 核心线程是否再服务器启动时启动 |
Tomcat 当提交的任务数超过核心线程数,但没超过最大线程数,Tomcat 就会创建临时线程来处理任务。当提交的任务数超过最大线程数,才会将新任务放入队列。
JDK7 加入的新的线程池实现。体现了分治思想。适用于能够进行任务拆分的 CPU 密集型运算。所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直到不能拆分可以直接求解。
Fork/Join 可以把每个任务的分解和合并交给不同的线程来完成。提升运算效率。
Fork/Join 默认会创建与 CPU 核心数相同的线程池。
需要返回值就继承RecursiveTask,不需要返回值就继承RecusiveAction。fork()方法用于拆分任务,join()方法用于合并结果。
定义了一个对1~n之间的整数求和的任务:
public class Demo5 {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
forkJoinPool.invoke(new AddTask(1, 10));
}
static class AddTask extends RecursiveTask<Integer> {
int head;
int last;
AddTask(int head, int last) {
this.head = head;
this.last = last;
}
@Override
protected Integer compute() {
if (head == last) {
return head;
}
if (head == last - 1) {
System.out.println(head + "-" + last + "相加为" + (head + last));
return head + last;
}
int mid = (head + last) / 2;
AddTask addTask = new AddTask(head, mid);
/*
分解任务:
fork内部最终会执行 java.util.concurrent.RecursiveTask#exec()
其exec()方法内部又会调用compute()方法,相当于是递归调用了。直到遇到return为止
*/
addTask.fork();
AddTask addTask1 = new AddTask(mid + 1, last);
// 分解
addTask1.fork();
// 合并
int result = addTask.join() + addTask1.join();
System.out.println(head + "-" + last + "相加为" + result);
return result;
}
}
}类似二分查找法。深究的话,可以看作是递归调用,递归调用compute(),只不过是多线程下的递归调用。
AbstractQueuedSynchronizer,是阻塞式锁和相关同步器工具的框架。
特点:
-
用 state 属性来表示资源的状态(独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁。
-
getState()获取 state 状态。 -
setState()设置 state 状态。 -
compareAndSetState()cas 机制设置 state 状态。 -
独占模式下(锁)只能有一个线程能够访问资源,而共享模式下可以允许多个线程访问资源。
-
-
提供基于 FIFO 的等待队列,类似于 Monitor EntryList。
-
支持条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor WaitSet。
子类主要实现下面几个方法。(调用父类的会抛异常 UnsupportedOperationException)
-
tryAcquire() -
tryRelease() -
tryAcquireShared() -
tryReleaseShared() -
isHeldExclusively()
示例:
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 可以选择阻塞当前线程。加入阻塞队列,park()
}
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行,unpark()
}@Slf4j
public class App {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("加锁成功");
} finally {
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("加锁成功");
} finally {
lock.unlock();
}
}, "t2").start();
}
/**
* 自定义锁,不可重入锁
*/
static class MyLock implements Lock {
/**
* 独占锁,同步器类
*/
@Slf4j
static class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
boolean flag;
if ((flag = compareAndSetState(0, 1))) {
// 加锁成功,并设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
}
log.debug("{}", flag);
return flag;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 是否持有独占锁
*
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
}没有发生竞争时:
源码:
private final Sync sync;
// 默认构造
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
// lock() 是抽象方法,由子类 NonfairSync、FairSync 实现
sync.lock();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 是不是很熟悉,和上面自定义锁使用方式一样
// 将state从0改为1,0是未加锁,1是加锁
if (compareAndSetState(0, 1))
// 将owner线程改为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}源码:
private final Sync sync;
// 默认构造
public ReentrantLock() {
sync = new NonfairSync();
}
public void lock() {
// lock() 是抽象方法,由子类 NonfairSync、FairSync 实现
sync.lock();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 是不是很熟悉,和上面自定义锁使用方式一样
// 将state从0改为1,0是未加锁,1是加锁
if (compareAndSetState(0, 1))
// 将owner线程改为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 加锁失败
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire():
public final void acquire(int arg) {
/*
尝试加锁,tryAcquire(arg)加锁失败返回法拉瑟,取反未true
进入acquireQueued()逻辑,加入到阻塞队列
*/
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// parkAndCheckInterrupt() 阻塞线程,等待被unpark()唤醒
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 调整链表
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}加锁失败的流程:
-
CAS尝试将state由0改为1,结果失败。
-
进入tryAcquire()逻辑,这时state已经是1,结果仍然失败。
-
接下里进入到addWaiter()逻辑,构造Node队列:
-
下图中黄色三角形表示该Node的waitStatus状态,0为默认状态。
-
Node的创建是懒惰的。(第一次会创建两个Node)
-
第一个Node称为Dummy(哑元)或哨兵,用来占位,不关联线程。
-
-
当前线程再进入到acquireQueued()逻辑:
-
1、acquireQueued()会在自旋中不断尝试获得锁,失败后进入park阻塞:
-
2、如果当前线程紧挨着head(排第二个,即紧跟在Dummy节点后),那么再次tryAcquire()尝试获取锁,当然这时state仍然为1,获取锁失败。
-
3、进入shouldParkAfterFailedAcquire()逻辑,将前驱node(即head)的waitStatus改为-1,返回false。(-1 表示Dummy节点需要唤醒其后继节点)
-
-
shouldParkAfterFailedAcquire()执行完后,会再次tryAcquire()尝试获取锁,当然这时state仍为1,获取锁失败。
-
当再次执行shouldParkAfterFailedAcquire()时,这时因为其前驱node(Dummy节点)的waitStatus为-1,返回true。
-
进入parkAndCheckInterrupt()逻辑,将
Thread-1阻塞。(大概4次尝试获取锁时失败才会进入park阻塞状态)
假设此时有多个线程都经历上述过程且竞争失败,此时状态如下图所示(-1 表示需要唤醒后继节点):
释放锁源码:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}Thread-0 释放锁,进入tryRelease()流程,如果成功:
-
设置exclusiveOwnerThread为null。
-
state = 0。
如果释放锁成功,如果当前队列(park阻塞线程的队列)不为null,且head的waitStatus=-1,则会进入unparkSuccessor()逻辑。
找到队列中离head最近的一个Node(没取消的节点),调用unpark()恢复其运行。
Thread-1被唤醒,执行获取锁流程,acquireQueued流程:
如果加锁成功(没有竞争),则会设置:
-
exclusiveOwnerThread设置为Thread-1,state = 1。
-
head指向Thread-1所在的Node,该Node清空Thread。
-
将head从链表断开,可被垃圾回收。
如果这时候有其它线程来竞争(非公平锁的体现),假如此时Thread-4抢占锁了:
-
Thread-4被设置为exclusiveOwnerThread,state = 1。
-
Thread-1再次进入acquireQueued()流程,获取锁失败,重新进入park阻塞。
abstract static class Sync extends AbstractQueuedSynchronizer {
// 锁重入
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得锁,且owner设置的是当前线程,表示发生锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 继承自父类方法
// 解锁
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入,当state减为0时,锁才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}-
不可打断模式:
- 即使被打断,仍会驻留在AQS队列中,等待获得锁后才能继续运行(只是打断标记设置为true)
public void lock() {
sync.lock();
}
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// acquireQueued() 返回true说明线程park被打断且获取到锁
// 重新产生一次中断
selfInterrupt();
}
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是true,则park会失效
LockSupport.park(this);
// Thread.interrupted() 会清除打断标记,下次park还是会阻塞
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 需要获得锁后,才能返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果 interrupted 被唤醒,打断状态设置为 true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}- 可打断模式:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 可打断锁的获取锁流程
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 在 park 过程中如果被 interrupt,会进入if内
// 抛 InterruptedException 有异常x
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}回顾一下非公平锁的实现:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 没有其他线程抢占锁
if (c == 0) {
// 尝试用cas获取锁,不检查AQS队列,体现了非公平性
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}公平锁实现:
- 与非公平锁的主要区别在于tryAcquire()方法的实现。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查AQS队列中是否有前驱节点,没有才竞争锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
/*
h != t 表示队列中有Node
(s = h.next) == null 表示此时只有一个Dummy节点
s.thread != Thread.currentThread() Dummy节点的后继节点不是当前线程
*/
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}每个条件变量其实就对应着一个等待队列,其实现类是ConditionObject。
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 加入到阻塞队列
Node node = addConditionWaiter();
// 2. 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. park() 阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
public final void signal() {
// ownerThread 才有资格唤醒,否则抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// transferForSignal() 转移成功跳出循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入到AQS队列中,如果成功返回前驱节点,当前例子中则返回Thread-3
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
}-
await流程:
-
Thread-0持有锁,调用await(),进入ConditionObject的addConditionWaiter流程:
- 创建新的Node状态为-2(Node.CONDITION),关联Thread-0,加入到等待队列尾部。
-
接下来进入AQS的fullyRelease()流程,释放同步器上的锁。
- park()阻塞Thread-0。
-
-
signal流程:
由Thread-1唤醒Thread-0。
进入 ConditionObject 的doSignal()流程,获取等待队列中第一个Node,即Thread-0所在的Node。执行transferForSignal()流程,将该Node假如到AQS队列尾部,将Thread-0的waitStatus设置为0,Thread-3的waitStatus设置为-1。
读读并发,读写互斥,写写互斥
当读操作远高于写操作时,可以使用读写锁让读操作可以并发,提高性能。类似于MySQL中的select ... from ... lock in share mode。
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
ReentrantReadWriteLock.WriteLock rw = rwl.writeLock();
ReentrantReadWriteLock.ReadLock rr = rwl.readLock();
re.lock();
rr.lock();-
注意事项:
-
读锁不支持条件变量。
-
不支持重入时升级:即在持有读锁的情况下去获取写锁,会导致写锁永久等待。
r.lock(); try { w.lock(); try { } finally { w.unlock(); } } finally { r.unlock(); }
-
支持重入时降级:即在持有写锁的情况下去获取读锁(同一线程)。
-
读写锁使用的是同一个Sync同步器。因此等待队列、state也是同一个。
源码部分:
// w.lock
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// 1. c != 0:写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// r.lock()
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node的前一个节点
final Node p = node.predecessor();
// 判断p是否是Dummy节点
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire(p, node):把node节点的前驱节点的waitStatus设置为-1
// 循环三次获取锁还是失败,shouldParkAfterFailedAcquire(p, node)返回true
// parkAndCheckInterrupt() park 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}-
t1 w.lock():
- t1成功获取锁,获取锁流程与ReentrantLock相比没有特殊之处,不同的是写锁状态占了state的低16位,而读锁使用的是state的高16位。
-
t2 r.lock():
-
t2执行r.lock(),这时进入读锁的acquireShared(1)流程。首先会进入tryAcquireShared()流程。如果有写锁占据,那么tryAcquireShared()返回-1,表示获取锁失败。
-
tryAcquireShared()返回值说明:
-
-1表示失败。
-
0表示成功。不会继续唤醒后继节点。
-
大于0表示成功且还有几个后继节点需要唤醒,读写锁返回1。
-
-
-
这时会进入doAcquireShared()流程。首先是调用addWaiter()添加节点,不同之处在于节点被设置为Node.SHARED而非Node.EXCLUSIVE。此时t2仍处于活跃状态。
-
t2会检查自己的节点是不是Dummy节点的后一个节点。如果是,还会再次调用tryAcquireShared(1)尝试获取锁。
-
如果还是获取锁失败,则把前驱节点的waitStatus设置为-1。再循环尝试tryAcquireShared(1)获取锁,(总共循环三次)如果还是失败,则会在parkAndCheckInterrupt() park阻塞。
-
-
t3 r.lock()&t4 w.lock():
- 假设又有t3加读锁,t4加写锁,这期间t1仍然持有锁,则阻塞队列如下所示:
- 读锁是Node.SHARED,写锁是Node.EXCLUSIVE
t1.unlock()源码部分:
// w.unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计数-1
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// t2唤醒执行
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 读锁,高16位
int r = sharedCount(c);
// 读锁不应该被阻塞
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}-
t1.unlock():
- 写锁释放会调用sync.release(1),其内部如果调用tryRelease(1)成功,则同步器状态如下:
-
接下来执行唤醒流程unparkSuccessor(),即让Dummy节点的后继节点恢复运行(唤醒t2)。这时t2在parkAndCheckInterrupt()中恢复运行。
-
t2恢复运行后,再执行一次tryAcquireShared(1),如果成功则让读锁计数+1。
- 这时t2已经恢复运行了,接下来t2调用setHeadAndPropagate(node, r),将原head节点移除,将t2所在的节点置为head节点。
- 在setHeadAndPropagate方法内部还会检查新head节点的后继节点是不是shared,如果是则调用doReleaseShared()将head的状态从-1置为0,并唤醒其后继节点。此时t3在parkAndCheckInterrupt()中恢复运行。
- t3被唤醒后,也会再执行tryAcquireShared(1),如果成功则让读锁计数+1。重复t2被唤醒时执行的步骤。
- 此时新head节点的后继节点不是shared,因此不会继续唤醒t4所在节点。
t2.unlock()源码部分:
// r.unlock()
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}-
t2.unlock()、t3.unlock():
- t2调用unlock(),进入releaseShared(1),其内部调用tryReleaseShared(1),但由于计数还不为零。tryReleaseShared()返回false
- t3调用unlock(),进入releaseShared(1),其内部调用tryReleaseShared(1),此时计数为零,进入doReleaseShared()将head节点从-1置为0,并唤醒head后继节点。
- t4在parkAndCheckInterrupt()中恢复运行,判断t4前驱节点是不是head,如果是,再次执行tryAcquire(1)。此时没有其它线程竞争,获取写锁成功,修改head节点(t4所在节点置为head,原head移除),流程结束。
该类是在JDK8引入的,是为了进一步优化读性能。
Semaphore(信号量),用来限制同时访问共享资源的线程上限。可以理解为限流。
应用:使用Semaphore限流,在访问高峰期时,让请求线程阻塞,等待高峰期过去再释放许可。只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(如连接数,参考Tomcat LimitLatch的实现)
演示:
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 获取信号量
semaphore.acquire();
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放信号量
semaphore.release();
}
}).start();
}加锁源码:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// AQS中的方法。与ReentrantLock#doReleaseShared()逻辑一致
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}-
加锁流程:
- 初始化时,permits(state)为3,但有5个线程来获取资源。
- 假设其中Thread-1、2、4竞争锁成功。而Thread-0、3竞争失败,调用doAcquireSharedInterruptibly(),进入AQS队列park阻塞。
-
doAcquireSharedInterruptibly()流程与ReentrantLock中的类似,就不再详述了。
-
此时Thread-4释放permits,其Sync状态如下:
- 接着Thread-0竞争成功,permits再次设置为0,设置Thread-0所在节点为head节点,移除原head节点。unpark()唤醒后继节点Thread-3节点,但permits为0,因此Thread-3在尝试获取锁失败后再次进入park()状态。
CountdownLatch用来进行线程同步协作,等待所有线程完成倒计时
其构造参数用来初始化等待计数值,await()用来等待计数归零,countDown()用来让计数减1。
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(3);
try {
new Thread(() -> {
latch.countDown();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
latch.countDown();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
latch.countDown();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}应用:
-
等待多线程准备完毕。
-
等待多个远程调用结束
CyclicBarrier(循环栅栏),用来进行线程协作,等待线程满足某个计数。调用构造函数传递参数『计数个数』,线程需要同步时则调用await()方法进行等待,当等待的线程数满足『计数个数』时,才可以被唤醒。继续执行。
计数可重用。当计数为0后,再次调用await(),计数会恢复至指定的个数。
计数为0时线程被唤醒。线程数需要和计数个数保持一致。
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(2);
new Thread(() -> {
try {
// 等待计数为1,不满足CyclicBarrier计数个数2,等待
// 2-1=1
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
// 等待2秒后,等待计数为2,继续执行
// 1-1=0
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}Collections系列方法内部重写了Map方法,并使用Synchronized修饰代码块。
-
早期版本如:Hashtable、Vector
-
使用Collections装饰的线程安全集合:
-
Collections.synchronizedCollection()
-
Collections.synchronizedList()
-
Collections.synchronizedMap()
-
Collections.synchronizedSet()
-
Collections.synchronizedNavigableMap()
-
Collections.synchronizedNavigableSet()
-
Collections.synchronizedSortedMap()
-
Collections.synchronizedSortedSet()
-
-
java.util.concurrent.*
-
BlockingXxx:阻塞队列,大部分基于锁实现,提供阻塞方法。
-
CopyOnWriteXxx:适用于读多写少的场景。写时拷贝。
-
ConcurrentXxx:内部大部分使用cas优化,弱一致性。
-
遍历时如果发生了修改,对于非安全容器来讲,fail-fast(即让遍历立刻失败) 机制会抛出 ConcurrentModificationException,不再继续遍历
线程安全的容器是 fail-safe 机制
并发扩容无限循环问题只在JDK7中会复现。因为JDK7的HashMap是头插法,JDK8的HashMap改为尾插法。JDK7、8的扩容机制和hash的计算方法不一样。
测试代码:
public static void main(String[] args) {
final HashMap<Integer, Integer> map = new HashMap<>();
map.put(2, null);
map.put(3, null);
map.put(4, null);
map.put(5, null);
map.put(6, null);
map.put(7, null);
map.put(8, null);
map.put(9, null);
map.put(10, null);
map.put(16, null);
map.put(35, null);
map.put(1, null);
new Thread(() -> {
map.put(50, null);
}, "t1").start();
new Thread(() -> {
map.put(50, null);
}, "t2").start();
}transfer()源码:
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;//得到新数组的长度
// 遍历整个数组对应下标下的链表,e代表一个节点
for (Entry<K,V> e : table) {
// 当e == null时,则该链表遍历完了,继续遍历下一数组下标的链表
while(null != e) {
// 先把e节点的下一节点存起来
Entry<K,V> next = e.next;
if (rehash) { //得到新的hash值
e.hash = null == e.key ? 0 : hash(e.key);
}
// 在新数组下得到新的数组下标
int i = indexFor(e.hash, newCapacity);
// 将e的next指针指向新数组下标的位置
e.next = newTable[i];
// 将该数组下标的节点变为e节点
newTable[i] = e;
// 遍历链表的下一节点
e = next;
}
}
}究其原因,多个线程扩容导致链表链接循环。
/*
默认为0
当初始化时,为-1
当发生扩容时,为 -(1 + 扩容线程数)
当初始化或扩容完成后,sizeCtl为下一次的扩容的阈值
*/
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的新hash表
private transient volatile Node<K,V>[] nextTable;
/*
扩容时如果某个 Node[i] 迁移完成,则用 ForwardingNode 作为旧的 table 的头节点
表示 Node[i] 迁移完成,如果发生get操作,发现是 ForwardingNode,则去新的table中获取
*/
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent,用来占位,方法执行完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 TreeBin(红黑树) 的头节点,存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 TreeNode(红黑树节点) 的节点,存储 parent、left、right
static final class TreeNode<K,V> extends Node<K,V> {}
// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {}
// cas 修改 Node[] 中第 i 个 Node 的值,c为旧值,v为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {}
// 直接修改 Node[] 中第 i 个 Node 的值,v为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {}
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations实现懒惰初始化,在构造方法中仅计算table的大小,在第一个put时才会真正创建。
/**
* Creates a new, empty map with the default initial table size (16).
*/
public ConcurrentHashMap() {}
/*
initialCapacity:初始容量
loadFactor:负载因子,扩容阈值,0.75
concurrencyLevel:并发度
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// tableSizeFor() 保证 capacity 是2^n
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// spread() 方法能够确保key的hash是正数
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头节点是要查找的key,立即返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash为负数的情况表示:该桶下标正在扩容中或是正在转换为 TreeBin
//则 调用 find() 方法来查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 正常遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}数组可以理解为table,链表可以理解为bin
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key-value都不能为null,区别于HashMap
if (key == null || value == null) throw new NullPointerException();
// spread() 方法会综合高位低位,具有更好的散列性
int hash = spread(key.hashCode());
// 链表长度或TreeBin的深度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
/*
f:链表头节点
i:链表在数组中的下标
fh:链表头节点 hash
*/
Node<K,V> f; int n, i, fh;
// 第一次put则初始化table,默认size为16
if (tab == null || (n = tab.length) == 0)
// 初始化table使用了 cas
tab = initTable();
// Node[i]为null,则创建链表头节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 添加链表头使用了 cas
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果正在调整大小,则可以帮助转移。helpTransfer()会锁住当前链表进行帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// hash冲突,链表结构,synchronized锁住链表头节点,即锁对象是 Node[i]
V oldVal = null;
// 锁住头节点
synchronized (f) {
// 再次确认链表头节点有没有被移动,Double Check
if (tabAt(tab, i) == f) {
// 链表。hash大于0是普通节点,小于0可能是TreeBin或ForwardingNode
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果key相同,默认新值替换旧值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 更新
if (!onlyIfAbsent)
e.val = value;
break;
}
// 添加到链表尾部
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果链表长度 >= 树化阈值(8),尝试链表转为红黑树
if (binCount >= TREEIFY_THRESHOLD)
// 如果链表长度大于8,数组容量小于64,会先尝试数组扩容
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// size++
addCount(1L, binCount);
return null;
}private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 让出CPU使用权
Thread.yield(); // lost initialization race; just spin
// 尝试将 sizeCtl 设置为 -1(表示正在初始化 table)
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 修改成功表示获取到锁,执行创建 table流程
// 如果这时有其它线程来,会 Thread.yield() 直至 table 创建完成
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 下次扩容阈值(链表长度阈值)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}/*
addCount() 逻辑和 LongAdder 思想类似,都是用通过 Cell 累加单元进行计数累加
check:旧的 binCount 值(链表长度或红黑树深度)
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果 counterCells 不为空或者 cas baseCount 累加失败,则进入if逻辑
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
/*
counterCells 为空
或者 cell 未创建
或者 cell cas 累加计数失败
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
) {
// 创建累加单元数组和 cell,累加重试
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 获取元素个数
s = sumCount();
}
// 如果链表长度大于1,有可能需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
/*
s >= (long)(sc = sizeCtl):链表长度大于等于sizeCtl,满足扩容条件
*/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// 正在扩容
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 需要扩容,newTable 此时未创建
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}size计算实际发生在put、remove改变集合元素的逻辑中。
-
没有发生竞争,向 baseCount 累加计数。
-
有发生竞争,新建 counterCells,向其中一个 cell 累加计数
-
counterCells默认初始化容量为2
-
如果计数竞争较为激烈,则会创建新的 cell 来累加计数
-
但最终结果不是精确值。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
// 将 baseCount 计数与所有 cell 计数累加
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}/*
tab:旧 table
nextTab:新 table,延迟初始化,一开始是null
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// n表示扩容前的数组长度
int n = tab.length, stride;
/*
stride 表示分配给线程任务的步长,默认就是 16
MIN_TRANSFER_STRIDE 线程迁移数据【最小步长】,控制线程迁移任务的最小区间,默认16
*/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
/*
如果当前线程为触发本次扩容的线程,需要做一些扩容准备工作,【协助线程不做这一步】
创建新的 table,新容量为原 table 长度的两倍
*/
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 把新表赋值给对象属性 nextTable,方便其他线程获取新表
nextTable = nextTab;
// 记录迁移数据整体位置的一个标记,transferIndex 计数从1开始不是 0,所以这里是长度,不是长度-1
transferIndex = n;
}
// 新数组的长度
int nextn = nextTab.length;
// 当某个桶位数据处理完毕后,将此桶位设置为 fwd 节点,其它写线程或读线程看到后,可以获取到新表
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 推进标记
boolean advance = true;
// 完成标记
boolean finishing = false; // to ensure sweep before committing nextTab
// 节点迁移,倒序迁移
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 给当前线程【分配任务区间】
while (advance) {
// 分配任务的开始下标,分配任务的结束下标
int nextIndex, nextBound;
// --i 让当前线程处理下一个索引,true说明当前的迁移任务尚未完成,false说明线程已经完成或者还未分配
if (--i >= bound || finishing)
advance = false;
// 迁移的开始下标,小于0说明没有区间需要迁移了,设置当前线程的 i 变量为 -1 跳出循环
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 逻辑到这说明还有区间需要分配,然后给当前线程分配任务
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
// 判断区间是否还够一个步长,不够就全部分配
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
// 当前线程的结束下标
bound = nextBound;
// 当前线程的开始下标,上一个线程结束的下标的下一个索引就是这个线程开始的下标
i = nextIndex - 1;
// 任务分配结束,跳出循环执行迁移操作
advance = false;
}
}
/*
【分配完成,开始数据迁移操作】
i < 0 成立表示当前线程未分配到任务,或者任务执行完了
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 迁移已完成
if (finishing) {
nextTable = null;
// 新表赋值给当前对象(原旧数组,此时新旧数据一致)
table = nextTab;
// 扩容阈值为 2n - n/2 = 3n/2 = 0.75*(2n)
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 当前线程完成了分配的任务区间,可以退出,先把 sizeCtl 赋值给 sc 保留
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断当前线程是不是最后一个线程,不是的话直接 return
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 最后一个线程退出的时候,sizeCtl 的低 16 位为 1
finishing = advance = true;
// 这里表示最后一个线程需要重新检查一遍是否有漏掉的区间
i = n; // recheck before commit
}
}
// 如果链表头是null,说明这个链表已经被处理完了,将链表头替换为fwd(ForwardingNode)
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 已经是ForwardingNode,处理下一个链表
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 链表还没有被处理
else {
// 锁住链表头
synchronized (f) {
// 二次检查,防止头节点已经被修改了
if (tabAt(tab, i) == f) {
// ln 表示低位链表引用
// hn 表示高位链表引用
Node<K,V> ln, hn;
// 头节点hash大于0,表示普通节点
if (fh >= 0) {
/*
和 HashMap 的处理方式一致,与旧数组长度 & 运算,16 是 10000
判断对应的 1 的位置上是 0 或 1 分成高低位链表
*/
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 红黑树节点
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}ConcurrentHashMap在JDK7的实现中,维护了一个segment数组,每个segment对应一把锁。将整张表分成了多个数组(Segment),每个数组又是一个类似 HashMap 数组的结构。
-
优点:如果多个线程访问不同的segment,实际是没有冲突的,这与JDK8是类似的。
-
缺点:Segments数组默认大小为16,容量初始化指定后就不能扩容了,并且不是懒惰初始化。
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
// ssize 表示Segments数组容量,默认是16,ssize 是2^n次幂
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// segmentShift(移位) 默认 32 - 4 = 28
this.segmentShift = 32 - sshift;
// segmentMask(掩码) 默认是15 即 0000 0000 0000 1111
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
// HashEntry 内部又是数组+链表的结构
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// 创建 segments[] 以及 segments[0]
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}构造完成后,结构如下图所示:
其中 segmentShift 和 segmentMask 的作用是决定将 key 的 hash结果匹配到哪个 segment。
先将高位向低位移动 segmentShift 位:
再将移位后的值与 segmentMask 做 & 运算,最终得到下标为10的segment:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 计算出 segment 下标
int j = (hash >>> segmentShift) & segmentMask;
// 获取 segment 对象,为空则创建该 segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 此时无法确定是否真的为空,因为其它线程也会发现该 segment 为 null
// 因此在 ensureSegment() 中用 cas 方式保证该 segment 安全性
s = ensureSegment(j);
// 执行 segment 的 put 流程
return s.put(key, hash, value, false);
}segment 继承了可重入锁(ReentrantLock),其put方法如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
/*
尝试加锁,如果不成功则进入 scanAndLockForPut() 逻辑
scanAndLockForPut() 会循环尝试获取锁:
如果是多核CPU,最多 tryLock 64次,然后超过64次进入 lock 流程
在尝试期间,顺被检查该节点在链表中有没有,如果没有则顺被创建出来
*/
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
// 成功加锁
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// 相同key,新值替换旧值
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
// 添加到链表尾部、新增节点
else {
/*
新增
node != null 说明等待锁时已经被创建
next指向链表 head 节点
*/
if (node != null)
node.setNext(first);
else
// 创建新 node
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 将 node 作为链表的 head 节点,即 node 作为桶下标的元素
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
}扩容发生在put中,此时已经获得锁,扩容时不需要考虑线程安全。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 扩容完成,才加入新的节点
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
// 替换为新的 HashEntry table
table = newTable;
}get时未加锁,使用Unsafe保证可见性,扩容过程中,如果get先发生就从旧表取值,如果get后发生则从新表取值。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
// u 为 segment 对象在数组中的偏移量(可以理解为地址值)
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// s 即 segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}-
计算元素个数前,先尝试不加锁计算两次,如果前后两次结果一样,则认为个数正确,将结果返回。
-
如果不一样,进行重试,重试超过3次,则将所有 segment 锁住,重新计算个数返回。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
// 超过重试次数,需要为所有 segment 加锁
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node(真正的后继节点)
* - this Node, meaning the successor is head.next(指向自己,发生在出队)
* - null, meaning there is no successor (this is the last node)(没有后继节点了)
*/
Node<E> next;
Node(E x) { item = x; }
}
}-
入队:
-
出队:
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }












































