进程与线程

进程:当一个程序被开启,即开启了一个进程

线程:一个进程之内可以分为多个线程、一个线程就是一个指令流、Java中线程是最小的调度单位,进程是作为资源分配的最小单位

对比:

  1. 进程基本上是互相独立的,二线程存在于进程内,是进程的一个子集
  2. 进程拥有共享的资源,如内存空间等,使其内部的线程共享
  3. 进程间通信更复杂
  4. 线程通信相对简单(因为它们共享一个进程的内存),并且更加轻量
  5. 线程的上下文切换比进程的上下文切换成本低

并发&并行

  • 并发:同一时间应对多件事情的能力
  • 并行:同一时间做多件事情的能力

异步同步

  • 需要等级结果返回,才能继续运行就是同步

  • 不需要等待结果返回,就能继续运行就是异步

同步
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
FileReader fileReader = new FileReader("E:\\todo_files\\login.bat");
String s = fileReader.readString();
System.out.println(s);
log.debug("hello");
}

// 结果
f:
cd F:\pycharm\code\demo
python Login.py
pause
18:23:38.423 asyncsync.Async [main] - hello
异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
new Thread(()-> {
FileReader fileReader = new FileReader("E:\\todo_files\\login.bat");
String s = fileReader.readString();
System.out.println(s);
}).start();
log.debug("hello");
}

// 结果
18:25:29.591 asyncsync.Sync [main] - hello
f:
cd F:\pycharm\code\demo
python Login.py
pause

创建运行线程

Thread
  1. 继承或者直接new
  2. 重写run方法
1
2
3
4
5
6
7
8
9
10
public class c_thread {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println(222);
});
thread.setName("child");
thread.start();
System.out.println("main");
}
}
Runnable
  1. 实现Runnable接口
  2. 重写run方法
1
2
3
4
5
6
7
8
9
10
11
public class c_runnable implements Runnable{
@Override
public void run() {
System.out.println("child");
}

public static void main(String[] args) {
new Thread(new c_runnable()).start();
System.out.println("main");
}
}
Thread和Runnable区别

推荐使用 Runnable:

  1. 因为Thread是线程和任务在一起的,并没有分开。
  2. Runnable是将线程和任务分开的,从而更容易与线程池等高级API配合
  3. 根据设计模式应该尽量用组合或者聚合代替继承
  4. 并且Java是类单继承多实现的语言
FutureTask

继承了Runnable

run方法会抛出异常,并且有返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
public class c_futuretask {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> fk = new FutureTask<>(() -> {
log.debug("child");
Thread.sleep(1000);
return 1;
});

Thread thread = new Thread(fk, "fk");
thread.start();
log.debug("返回值: {}", fk.get()); // fk.get() 会阻塞, 直到有返回结果
}
}

查看线程方法

windows
  1. 任务管理器
  2. tasklist查看进程
  3. taskkill杀死进程
linux
  1. ps -fe 查看所有进程
  2. ps -fT -p 查看某个
  3. top -H -p 查看某个进程
  4. kill 杀死进程
java
  1. jps
  2. jstack 查看某个Java进程的所有线程状态
  3. jconsole可以查看(并且也可以远程)

线程运行原理

栈和栈帧

JVM是由堆、栈、方法去所组成,其中栈内存是给线程所使用,每一个线程启动后,虚拟机都会为其分配一块栈内存

  1. 每个栈由多个栈帧组成,对应着每次方法调用时所占用的内存
  2. 每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
图解

cup调度main线程执行main方法,加入main方法的栈帧,并且main方法中的局部变量args指向堆中中的一个字符串数组,然后调用method1方法,加入栈帧,初始化一些局部变量,并且返回地址指向调用者,最后method2加入栈帧同样局部变量n指向堆中的new Object(),返回地址指向method1(调用者),最后method2执行完成弹出,继续执行method1后再弹出,然后再执行main方法,执行完成后弹出,程序执行完毕。

image-20230731212612803

线程上下文

应为一些原因导致cpu不在执行当前线程,转而执行另一个线程

  • 线程cpu时间片用完
  • 垃圾回收
  • 有更高的优先级的线程需要执行
  • 线程自己调用了sleep、yield、wait、join、park、synchronized、lock等方法

线程常见方法

start()

启动线程

getState()

获取线程状态信息

sleep()

睡眠,将线程正在running状态改为timed waiting状态

可用TimeUnit代替 (TimeUnit.SECONDS.sleep(1))

interrupt()

中断线程,谁调谁被打断,会抛出异常 (InterruptedException)

  1. 并不能被打断,需要配合打断表记
yield()

让出cpu的使用权

不一定能成功,因为即便让出了,任务调度器也有可能再次执行当前线程

join()

插队,等待线程执行结束,谁调用谁就会先执行结束

多个join的等待时间是最大那个

底层是wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread t2 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
long start = System.currentTimeMillis();
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(System.currentTimeMillis() - start); // 2000
}
wait()
stop()

直接停止线程,会释放资源和锁

  1. 如果直接stop,会释放资源,但是不安全,可能会导致程序并不会执行完毕,最终导致数据不一致问题
  2. 与此类似的还有 suspend()、resume()方法
isInterrupted()

查看是否打断正在运行的线程,不会清除打断标记

  1. 如果是阻塞线程会是 false (因为异常会重置打断标记)
  2. 如果是正常的 true
interrupted()

查看是否打断正在运行的线程,但是会清除标记

park()

LockSupport.park(),阻塞线程

底层是UnSafe类的park方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
log.debug("park");
LockSupport.park();
log.debug("unpark");
// log.debug("{}", Thread.currentThread().isInterrupted());
Thread.interrupted();
// 并不会继续阻塞
// 所以park是根据打断状态来判定是否会执行的
// 所以使用 Thread.interrupted()来清除打断标记,则可以让park继续执行
LockSupport.park();
log.debug("park again");
});

thread.start();
Thread.sleep(1000);
thread.interrupt();
}

notify()

在正在waitSet中等待的线程挑选一个唤醒

notifyAll()

将waitSet中的等待的所有的线程唤醒

防止cpu占用100%
  1. 使用sleep
  2. wait或条件变量

后两种需要加锁,并且需要相应的唤醒操作,一般适用于同步场景

sleep适用于无需加锁的同步场景

两阶段停止模式

xxxxxxxxxx RPM(ReadHat Package Manager) ​rpm -qa 查询已安装软件rpm -qi 软件名称 是否已经安装此软件rpm -e 软件包 卸载软件rpm -e –nodeps 软件包 卸载软件,不考虑依赖关系​rpm -ivh rpm包名 安装软件-i install-v –verbose,显示详细信息-h –hash,进度条–nodeps 安装前不检查依赖​推荐yum install 软件名yum remove 软件名yum update 升级软件参数—install 安装 update 更新check-update 检查是否有可用更新remove 移除list 显示软件包信息clean 清理yum过期的缓存deplist 显示yum软件包的所有依赖关系​更改软件更新下载镜像源🎈🎈🎈todomarkdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
log.debug("退出");
break;
}
try {
Thread.sleep(1000);
log.debug("监控");
} catch (InterruptedException e) {
// 重置标记
Thread.currentThread().interrupt();
}
}
});
thread.start();
Thread.sleep(4000);
thread.interrupt();
}
主线程与守护线程

默认情况下,Java进程需要所有线程结束后才会终止程序,有一种特殊线程叫做守护线程,只要其他非守护线程运行结束了,即使守护线程的代码没有执行完毕,也会强制结束。

线程生命周期

  1. 新建、准备、运行、阻塞、终结
  2. 新建、运行、带时等待、等待、锁定、终结 (Java中的)
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void main(String[] args) {
Thread thread1 = new Thread(() -> { // NEW
while (true) ;
});
Thread thread2 = new Thread(() -> { // RUNNABLE
synchronized (LifeState.class) {
while (true) ;
}
});
Thread thread3 = new Thread(() -> { // TIMED_WAITING
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread thread4 = new Thread(() -> { // WAITING
try {
thread3.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Thread thread5 = new Thread(() -> { // BLOCKED
synchronized (LifeState.class) {
System.out.println(Thread.currentThread().getName());
}
});
Thread thread6 = new Thread(() -> { // TERMINATED
});

thread2.start();
thread3.start();
thread4.start();
thread5.start();
thread6.start();

System.out.println(thread1.getState());
System.out.println(thread2.getState());
System.out.println(thread3.getState());
System.out.println(thread4.getState());
System.out.println(thread5.getState());
System.out.println(thread6.getState());
}

共享模型管理

共享问题

i++并非原子操作,它会被分为3条指令:

  1. 获取i值
  2. 进行自增或自减
  3. 对i进行赋值
synchronized

对代进行加锁

  • public void static synchronized a(){}; 此时锁住的是类对象
  • public void synchronized a(){}; 此时锁住的是this对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static int counter = 0;

public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
counter++;
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
counter--;
}
});

thread1.start();
thread2.start();

Thread.sleep(2000);
System.out.println(counter); // 最终counter的值并不一定是 0
}



public static int counter = 0;

public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
synchronized (ThreadSecurity.class) { // 加上 synchronized 锁
counter++;
}
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
synchronized (ThreadSecurity.class) { // 加上 synchronized 锁
counter--;
}
}
});

thread1.start();
thread2.start();

Thread.sleep(2000);
System.out.println(counter); -- counter 每次值都为 0
}
线程安全分析
成员变量和静态变量是否安全

如果共享了并且有写操作就不安全

局部变量是否线程安全

局部变量是安全的,但是局部引用变量不一定是安全的

常见安全类

String(其中存储字符的String的是个char数组, 并且其中的所有修改方法都是会再new 一个String)

还有一些StringBuffer、Hashtable、以及一些包装类(Integer、Double…)

练习

购票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) throws InterruptedException {
Ticket ticket = new Ticket();
ThreadLocalRandom random = ThreadLocalRandom.current();
List<Thread> list = new ArrayList<>();
List<Integer> ret = new Vector<>();
for (int i = 0; i < 5000; i++) {
Thread thread = new Thread(() -> {
int res = ticket.buyTicket(random.nextInt(1, 10));
ret.add(res);
});
list.add(thread);
thread.start();
}

Thread.sleep(1000);
int sum = ret.stream().mapToInt(i -> i).sum();
System.out.println(ticket.count);
System.out.println(sum);
}
}

class Ticket {
int count = 500;

public synchronized int buyTicket(int num) {
if (this.count >= num) {
this.count -= num;
return num;
}
return 0;
}

转账

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class PracticeTransfer {
public static void main(String[] args) throws InterruptedException {
Account account1 = new Account();
Account account2 = new Account();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
account1.transfer(account2, 500);
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
account2.transfer(account1, 500);
}
});

thread2.start();
thread1.start();
Thread.sleep(1000);
System.out.println(account1);
System.out.println(account2);

}
}

class Account {
int balance = 1000;

public void transfer(Account target, int money) {
synchronized (Account.class) { // 多个Account, 并不是只有this一个对象
if (this.balance > money) {
target.balance += money;
this.balance -= money;
}
}
}

@Override
public String toString() {
return "Account{" +
"balance=" + balance +
'}';
}
}
Monitor
原理

Monitor(监视器), 每个Java对象都可以关联一个Monitor对象,如果使用synchronized给对象上锁(重量级) 之后,该对象的Mark Word就被设置指向Monitor对象的指针

Java对象头

Java对象由两部分组成: 对象头 + 对象中的成员变量

普通对象

Object Header (64 bits)
Mark Word(32 bits) Klass Word(32 bits)

数组对象

Object Header (96 bits)
Mark Word(32 bits) Klass Word(32 bits) array length(32 bits)

Mark Word结构

Mark Word (32 bits) State
hashcode:25 age:4 biased_lock:0 01 Normal
thread:23 epoch:2 age:4 biased_lock:1 01 Biased
ptr_to_lock_record:30 00 Lightweight Locked
ptr_to_heavyweight_monitor:30 10 Heavyweight Locked
11 Marked for GC
synchronized 原理

加锁原理:当一个线程被synchronized的方法或者块时,这个线程的MarkWord会指向一个Monitor,这个Monitor只能同时被一个线程访问,假如现在有其他线程访问的话,会被加入到Monitor中的等待队列中,当正在访问的线程执行完后, 对MarkWord进行重置,唤醒EntryList,才由等待中的线程来竞争访问。

image-20230805125317400

轻量级锁

如果一个对象有多个线程访问,但多线程访问的时间是错开的(也就是没有竞争),那么可以用轻量级锁来优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LightweightLock {
final static Object obj = new Object();

public void test() {
synchronized (obj) {
test2();
}
}

public void test2() {
synchronized (obj) {

}
}
}

每个线程栈帧都会包含一个锁记录,这个锁记录可以存储锁定对象的Mark Word,其中的Object reference 会指向锁对象,线程会尝试使用cas替换锁对象的Mark Word,将Mark Word存储到自己的锁记录中。将自己的锁记录中的锁记录地址和状态给锁对象存储。

如果cas失败了:

  1. 被其他线程占有了,进入锁膨胀
  2. 自己占有,继续在栈帧中添加锁记录,但是锁记录中的锁记录的地址和状态为null

最后当退出synchronized解锁时,如果锁记录为null,表示为重入锁,并且锁记录减1

其中解锁可能失败也可能成功:

  1. 成功,表示解锁成功
  2. 失败,说明轻量级锁进入了锁膨胀,已经升级为重量级锁,进入重量级锁解锁流程

image-20230806005904485

image-20230806010811940

image-20230806010837425

image-20230806010859270

锁膨胀

当有另外一个线程进来需要获得锁时,发现锁被占有,于是锁对象将申请Monitor锁,让锁对象指向Monitor,然后将进来的线程放入到EntryList中开始等待,当正占有锁的线程执行完后,释放锁后,将唤醒正在EntryList中的线程。

image-20230806014819177

image-20230806014841542

自旋优化

重量级锁竞争的时候,还可以使用自旋(通过循环几次获取锁,Java6以后自适应,无法自己控制)来进行优化,如果当前线程自旋成功(即这时候线程已经退出了同步块,释放了锁)这时候线程就可以避免阻塞。

自旋成功

image-20230806101635454

自旋失败

image-20230806101850382

偏向锁

轻量级锁在没有竞争的时候,每次重入仍需要执行cas操作

Java6中引入了偏向锁来做进一步优化:只有第一次使用cas将线程ID设置成到对象的Mark Word头,之后发现这个线程ID是自己的就表示没有竞争,不用重新cas,以后只要不发生竞争,那么这个锁对象就属于该线程(默认开启)

  1. 其中如果调用了对象的hashCode(),会导致偏向锁被撤销。此时升级为轻量级锁
  2. 有其他线程来访问(没有竞争),升级为轻量级
  3. 调用wait()/notify(),升级为重量级,因为此方法是重量级才有的
  4. 当 2 中的其他线程频繁来访问撤销偏向锁超过20次(没有竞争),此时会重新替换偏向锁 (批量重偏向)
  5. 当撤销次数超过40次后,会让这个类的对象和新建的对象编程不可偏向的(批量撤销)
锁消除

Java运行时,JIT 即时编译器会对热点代码进行优化,可能会对锁进行消除

wait/notify

处理等待的线程会被放到Monitor中的waitSet中,此时需要处于Owner的线程notify或者notifyAll来唤醒,唤醒后将会被放入到EntrySet中进行下次线程的竞争

image-20230805125317400

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class WaitThread {
public static void main(String[] args) throws InterruptedException {
Integer i = new Integer(1);

Thread thread = new Thread(() -> {
synchronized (i) {
try {
i.wait();
System.out.println("wake up");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

Thread thread2 = new Thread(() -> {
synchronized (i) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
i.notify();
}
});
thread.start();
thread2.start();
}
}
wait 和 sleep 的区别
  1. sleep 是Thread的方法,wait是Object的

  2. sleep不需要和synchronized配合使用,wait需要

  3. sleep使时不会释放锁,wait 会

    下方代码,如果用的Thread.sleep的话就会导致其他线程都得等待她sleep完,才能获得锁,换成wait就能解决。当然interrupt也能解决

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    @Slf4j
    public class PracticeWaitNotify {
    static final Object room = new Object();
    static boolean hasCigarette = false;

    public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(() -> {
    log.debug("有烟没{}", hasCigarette);
    synchronized (room) {
    if (!hasCigarette) {
    log.debug("没烟休息..");
    try {
    TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }
    }
    }, "小娜");
    thread.start();

    TimeUnit.SECONDS.sleep(1);
    for (int i = 0; i < 5; i++) {
    new Thread(() -> {
    synchronized (room) {
    log.debug("开始干活");
    }
    }, "其他人").start();
    }

    new Thread(() -> {
    synchronized (room) {
    log.debug("烟到了!");
    hasCigarette = true;
    }
    }, "小红").start();
    }
    }
虚假唤醒

如果多个线程在等待,此时使用notify来唤醒wait的线程,就可能会唤醒不是自己想要的线程。

可以用notifyAll+while 来处理,while可以防止被不必要的唤醒

1
2
3
4
5
6
7
8
9
10
11
// 待唤醒线程
synchronized(locak) {
while(条件不成立) {
locak.wait();
}
}

// 唤醒线程
synchronized(lock) {
lock.notifyAll();
}
设计模式-保护性暂停

Guarded Supension, 用在一个线程等待另一个线程执行的结果

image-20230806220116514

通过while + wait + notifyAll 来制作一个线程等待时会释放锁,并且可超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class GuardendObject {
public static void main(String[] args) throws InterruptedException {
Guardend guardend = new Guardend();
new Thread(() -> {
try {
log.debug("get start");
Object ret = guardend.get(2000);
log.debug("{}", ret);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "get-thread").start();

Thread.sleep(5000);
new Thread(()-> {
log.debug("set start");
guardend.set("666");
}, "set_thread").start();
}
}

class Guardend {
private final Object lock = new Object();

private Object response;

public Object get(long timeout) throws InterruptedException {
synchronized (lock) {
long base = System.currentTimeMillis();
long now = 0;
// 直接超时效果
while (response == null) {
// delay 剩余时间
long delay = timeout - now;
if (delay <= 0)
break;
lock.wait(delay);
now = System.currentTimeMillis() - base;
}
return response;
}
}

public void set(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}

扩展,多个类之间使用Guardend对象,作为参数传递不方便,可使用下图来设计

image-20230807021057445

设计模式-生产者/消费者

image-20230807112649932

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ProductorAndConsumer {
public static void main(String[] args) throws InterruptedException {
MessageQueue messageQueue = new MessageQueue();
// 生产者
for (int i = 0; i < 3; i++) {
int val = i;
new Thread(()->{
try {
messageQueue.put("添加值为: " + val);
} catch (InterruptedException e) {
}
}, "productor-" + i).start();
}
// 消费者
TimeUnit.SECONDS.sleep(1);
new Thread(()-> {
try {
messageQueue.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "consumer").start();
}
}

@Slf4j
class MessageQueue {
final LinkedList<String> queue = new LinkedList<>();

public void get() throws InterruptedException {
synchronized (queue) {
while (true) {
if (queue.isEmpty()) {
log.debug("队列为空");
queue.wait();
}
String ret = queue.removeFirst();
log.debug("获得 {}", ret);
queue.notifyAll();
}
}
}

public void put(String val) throws InterruptedException {
synchronized (queue) {
while (queue.size() == 2) {
log.debug("队列已满");
queue.wait();
}
queue.add(Thread.currentThread().getName());
queue.notifyAll();
}
}
}
线程状态转换

image-20230807144953142

  1. NEW -> RUNNABLE
  2. RUNNABLE <–> WAITING
  3. RUNNABLE <–> TIMED_WAITING
  4. RUNNABLE <–> BLOCKED
  5. RUNNABLE <–> TERMINATED
活跃性
死锁

一个线程同时获取多把所

t1想获取a对象的锁,然后又想获取b对象的锁 (在获取a对象锁和b对象锁之间,加入一点延时,100%死锁)

t2想获取b对象的锁,然后又想获取a对象的锁

定位死锁

jconsole可以或者jps定位线程id

哲学家就餐问题

image-20230807205046083

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// 会死锁
@Slf4j
public class DeadLock {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1号筷子");
Chopstick c2 = new Chopstick("2号筷子");
Chopstick c3 = new Chopstick("3号筷子");
Chopstick c4 = new Chopstick("4号筷子");
Chopstick c5 = new Chopstick("5号筷子");
List<Philosopher> philosopherList = new ArrayList<>();
philosopherList.add(new Philosopher("苏格拉底", c1, c2));
philosopherList.add(new Philosopher("柏拉图", c2, c3));
philosopherList.add(new Philosopher("孔子", c3, c4));
philosopherList.add(new Philosopher("亚里士多德", c4, c5));
philosopherList.add(new Philosopher("勒奈·笛卡尔", c5, c1));

philosopherList.stream().forEach((item) -> {
new Thread(() -> {
eat(item);
}, item.getName()).start();
});
}


public static void eat(Philosopher philosopher) {
while (true) {
synchronized (philosopher.getLeft()) {
synchronized (philosopher.getRight()) {
log.debug("{} 正在吃", philosopher.getName());
}
}
}
}
}


class Philosopher {
String name;

Chopstick left;
Chopstick right;

public String getName() {
return name;
}

public Chopstick getLeft() {
return left;
}

public Chopstick getRight() {
return right;
}

public Philosopher(String name, Chopstick left, Chopstick right) {
this.name = name;
this.left = left;
this.right = right;
}
}

class Chopstick {
String name;

public Chopstick(String name) {
this.name = name;
}
}

// 可解决死锁问题
@Slf4j
public class DeadLock {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1号筷子");
Chopstick c2 = new Chopstick("2号筷子");
Chopstick c3 = new Chopstick("3号筷子");
Chopstick c4 = new Chopstick("4号筷子");
Chopstick c5 = new Chopstick("5号筷子");
List<Philosopher> philosopherList = new ArrayList<>();
philosopherList.add(new Philosopher("苏格拉底", c1, c2));
philosopherList.add(new Philosopher("柏拉图", c2, c3));
philosopherList.add(new Philosopher("孔子", c3, c4));
philosopherList.add(new Philosopher("亚里士多德", c4, c5));
philosopherList.add(new Philosopher("勒奈·笛卡尔", c5, c1));

philosopherList.stream().forEach((item) -> {
new Thread(() -> {
eat(item);
}, item.getName()).start();
});
}


public static void eat(Philosopher philosopher) {
while (true) {
if (philosopher.getLeft().tryLock()) {
if (philosopher.getRight().tryLock()) {
log.debug("{} 正在吃", philosopher.getName());
philosopher.getRight().unlock();
}
philosopher.getLeft().unlock();
}

}
}
}

class Philosopher {
String name;

Chopstick left;
Chopstick right;

public String getName() {
return name;
}

public Chopstick getLeft() {
return left;
}

public Chopstick getRight() {
return right;
}

public Philosopher(String name, Chopstick left, Chopstick right) {
this.name = name;
this.left = left;
this.right = right;
}
}

class Chopstick extends ReentrantLock {
String name;

public Chopstick(String name) {
this.name = name;
}
}

image-20230807235025476

活锁

两个线程互相改变对象结果的条件,最后谁也无法结束

通过改变执行时间,可以解决,例如Thread.sleep的时间改为随机的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class AliveLock {
static volatile int i = 10;

public static void main(String[] args) {
new Thread(() -> {
while (i > 0) {
ThreadUtil.sleep(100);
i--;
log.debug("i = {}", i);
}
}, "t1").start();
new Thread(() -> {
while (i < 20) {
ThreadUtil.sleep(100);
i++;
log.debug("i = {}", i);
}
}, "t2").start();
}
}
饥饿

image-20230807211437305

image-20230807211446589

Lock
Park&Unpark

LockSupport类中的方法,实则调用的Unsafe类的park和Unpark方法

wait&notify 区别

  • wait、notifyAll和notify需要配合Monitor一起使用, 而park、unpark不需要

  • park & unpark是以线程为单位来指定 阻塞和唤醒的,notify是随机一个,notifyAll则是所有等待线程,所以park & unpark 是很精确的

  • park & unpark 可以先 unpark,而wait & notify 则不可以

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public static void main(String[] args) {
    Thread t1 = new Thread(() -> {
    log.debug("park");
    LockSupport.park();
    });
    t1.start();
    ThreadUtil.sleep(1, TimeUnit.SECONDS);
    log.debug("unpark");
    LockSupport.unpark(t1);
    }
原理

每个线程都有自己的Parker对象,这个对象由_mutex、_counter、_cond 组成

调用park
  1. 当调用Unsafe.park()时
  2. 检查_counter, 这时获得_mutex互斥锁
  3. 线程进入_cond条件阻塞
  4. 设置_counter = 0
调用unpark
  1. 当调用Unsafe.unpark时
  2. 设置_counter = 1
  3. 线程再次调用unpark,时发现_counter = 1,让线程继续往下执行,不在阻塞
  4. 设置_counter = 0

image-20230807125443077

image-20230807144234204

多把锁

多把不相干的锁

  • 好处,可以增强并发度
  • 坏处,可能会死锁,如果同时获取两把锁

ReentrantLock

可重入锁

  1. 可中断
  2. 可设置超时时间
  3. 可设置为公平锁(会降低并发度)
  4. 支持多个条件变量
实现原理

image-20230814114509244

加锁-NonfairSync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 默认为 NonfairSync
static final class NonfairSync extends Sync
abstract static class Sync extends AbstractQueuedSynchronizer


final void lock() {
if (compareAndSetState(0, 1)) // 如果 cas stateOffset属性
setExclusiveOwnerThread(Thread.currentThread()); // ,如果成功,设置当前线程为owner
else
acquire(1); //
}


protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update); // 此处的 stateOffset 是 state对象内存偏移位置
}


public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试再次CAS
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 重试失败后,新增一个Node加入到
selfInterrupt();
}


protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}


final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 如果当前node 的前一个节点是 head节点的话,那么再次去获取锁;并且后面唤醒当前线程需要前一个线程来唤醒
setHead(node); // 设置node 为头结点
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 检查并更新节点状态,如果p的waitStatus == -1,则直接返回true,如果>0,则跳过,<0,则设置waitStatus = -1
parkAndCheckInterrupt()) // 阻塞当前线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放-NonfairSync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒线程
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 重置 state
return free;
}


private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // head的next
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 这里释放了head的下一个线程,acquireQueued 方法中的线程 head的next线程会去竞争,如果有新线程加入的话。没有就cas成功
}
可重入-NonfairSync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 锁重入
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 锁重入,对state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// 锁释放
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 对state--,直到state == 0,释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
不可打断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 不可打断,如果线程在AQS队列中,则不可以打断,直到线程获得锁后才能再次打断
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); // 打断自己
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // 获取锁后,返回打断标记
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 返回打断标记,并且重置标记为false,这样就导致上面下一次还能park住
}
可打断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 可打断,如果当前线程被打断了,会直接抛出异常,导致for循环结束
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 直接抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
公平锁

公平锁是队列中的线程与刚进来的线程进行竞争,所以说是公平的,就算你的前驱节点唤醒了你,此时有新的线程进入开始CAS,如果被新的线程抢先了,还是得继续park

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1)) // 直接进来就开始竞争
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
不公平锁

不公平则是会去判定队列中是否有前置节点,如果没有则可以CAS, 它会优先让队列中的线程先执行,然后才去获得锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final void lock() {
acquire(1);
}

public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // 查看是否有前驱节点,没有然后再去CAS
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
....
}

public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && // head 是否等于 tail,如果等于表示队列中无线程 直接false
((s = h.next) == null || // 并且如果不等于,而且第二个节点等于null
s.thread != Thread.currentThread()); // 或者当前线程不等于s线程
}
条件变量-await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 添加node到 ConditionObject
int savedState = fullyRelease(node); // 清空status,并且唤醒head的下一个节点,并且释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // acquireQueued() 尝试去获得锁,获得不了park, 对应下面signal unpark
}


private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 去掉 waitStatus != Node.CONDITION 的节点
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 新建Node,当前线程,放到 ConditionObject 的 firstWaiter 上 或者 lastWaiter 下一个节点
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
条件变量-signal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public final void signal() {
if (!isHeldExclusively()) // 当前线程是否是owner(锁持有者),有锁才能执行
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 对wait的节点进行转移,将转移到aqs队列尾部,转移过程中可能会unpark当前node
(first = firstWaiter) != null);// 或者等到 first == null
}
// doSignalAll wait队列遍历完

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); // 将把node加入到aqs队列的tail,并且返回tail
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果p的waitStatus > 0 或者 将p的waitStatus修改成 Node.SIGNAL 任意一个成功,这unpark这个node
LockSupport.unpark(node.thread); // 这里unpark是为了处理 p节点的waitStatus > 0 或者无法CAS成功成-1,unpark交给上面for循环去处理状态信息
return true;
}
————————–
可重入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
reenter();
}

public static void reenter() {
lock.lock();
log.debug("lock 1");
reenterSec();
lock.unlock();
}
public static void reenterSec() {
lock.lock();
log.debug("lock 2");
}
可中断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
log.debug("被中断");
} finally {
lock.unlock();
}
});

lock.lock();
thread.start();
ThreadUtil.sleep(1000);
thread.interrupt();
}
可设置超时时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
new Thread(() -> {
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("超时");
}
} catch (InterruptedException e) {
log.debug(e.getMessage());
}
}).start();

lock.lock();
ThreadUtil.sleep(2000);
}
可设置公平锁
1
static final ReentrantLock lock = new ReentrantLock(true);
支持多个条件变量

可指定唤醒 wait的线程

使用流程

  1. await 前需要锁
  2. await 执行后,会释放锁,进入confitionObject等待
  3. await 的线程被唤醒(或打断、超时)重新竞争lock锁
  4. 竞争lock锁成功后,从await后继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final ReentrantLock lock = new ReentrantLock(true);
static final Condition condition = lock.newCondition();

public static void main(String[] args) throws InterruptedException {

new Thread(() -> {
lock.lock();
try {
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
lock.unlock();
}
}, "condition").start();
ThreadUtil.sleep(1000);
log.debug("唤醒 condition");
lock.lock();
condition.signalAll();
lock.unlock();
}

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,从而提高性能

读-读 不互斥;读-写 互斥; 写-写 互斥; 故有写即互斥

类似于数据库中的select…from…lock in share mode

测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 同时获得到读锁的
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

new Thread(() -> {
try {
readLock.lock();
log.debug("thread-1");
ThreadUtil.sleep(1, TimeUnit.SECONDS);
} finally {
readLock.unlock();
}
}).start();

new Thread(() -> {
try {
readLock.lock();
log.debug("thread-2");
} finally {
readLock.unlock();
}
}).start();

20:22:15.807 lock.ReentrantReadWriteLockTest [Thread-0] - thread-1
20:22:15.807 lock.ReentrantReadWriteLockTest [Thread-1] - thread-2
注意事项
  1. 读锁不支持条件变量
  2. 重入时升级不支持:即持有读锁的情况下去获得写锁,会导致写锁永久等待(不能用读锁去嵌套写锁,但是可以先获取读锁,获取数据再去释放读锁,然后再去获取写锁,修改数据,再去降级成读锁获取读锁,再释放写锁,释放读锁;即获取和释放2次读锁,获取和释放1次写锁)
  3. 重入时降级支持:即持有写锁的情况下去获取读锁
事项2-官方解决例子

image-20230816204204754

读写锁应用
StampedLock

此类JDK8加入,StampedLock支持 tryOptimisticRead() 方法(乐观读) , 读取完毕后需要一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,否则需要重新获取读锁,保证数据安全

可用作缓存,多读少写

实现原理

流程与 ReentrantLock 类似,不同的是写锁状态占了state的低16位,读锁状态占了高16为

加锁-写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // acquireQueued、addWaiter与ReentrantLock一致
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) { // acquires = 1
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 获取写锁状态,是还没被占用还是已经占用了 0 或者 > 0
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 写锁为 0,但是 status 不为0,说明有读锁但是想升级成写锁(读锁不能升级写锁); 第二种表示锁重入,非owner线程不让锁重入,这两种情况都直接返回false,后面直接让进入aqs队列。
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 锁重入次数超过一定上线,直接抛错误(不太可能,重入这么深的锁)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires); // 写锁 + 1
return true;
}
// c == 0 的情况
if (writerShouldBlock() || // 写是否该堵塞
!compareAndSetState(c, c + acquires)) // 修改写锁状态, 可能有其他线程先到故有cas不成功的情况
return false;
setExclusiveOwnerThread(current); // 设置 owner
return true;
}
加锁-读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public final void acquireShared(int arg) { // arg = 1
if (tryAcquireShared(arg) < 0) // 成功返回1,失败返回-1
doAcquireShared(arg);
}
// 尝试获取共享读锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && // 独占锁状态(写锁)不为0;这里在判定写锁降级为读锁
getExclusiveOwnerThread() != current) //
return -1;
int r = sharedCount(c); // 读锁状态大小
if (!readerShouldBlock() && // 读是否该堵塞
r < MAX_COUNT && // 读锁大小 小于 65535
compareAndSetState(c, c + SHARED_UNIT)) { // 增量读锁状态(读锁状态 + 1)
if (r == 0) { // 读锁状态等于 0
firstReader = current; // 第一个读节点
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {// 涉及到ThreadLocal
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);// 未cas成功, 里面还会再次去cas,以及对线程计数器的处理,其中每个线程有自己本地的线程计数器
}


private void doAcquireShared(int arg) {// 逻辑与 ReentrantLock 类似
final Node node = addWaiter(Node.SHARED); // SHARED类型节点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);// 设置头结点,并且释放一连串Shared节点(这就是为什么能读并发的原因)
p.next = null; // help GC
if (interrupted)
selfInterrupt();// 会将自己中断掉
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}


private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); // 设置node 为 head
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();// 释放Shared节点
}
}
释放-写
1
2
3
4
5
6
7
8
9
public final boolean release(int arg) { // 类似 RL
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放-读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}


protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
....
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT; // 读状态 - 1
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0; // 读状态为0为true,否则false
}
}

StampedLock

需要配合戳使用

1
2
3
4
5
6
7
long stamp = lock.readLock();
lock.unlockRead(stamp);

long stamp = lock.writeLock();
lock.unlockWrite(stamp);


  • StampedLock支持 tryOptimisticRead() 方法(乐观读) , 读取完毕后需要一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,否则需要重新获取读锁,保证数据安全
  • 但是不支持可重入, 和条件变量
  • 支持读写锁相互转换
1
2
3
4
5
long stamp = lock.tryOptimisticRead();
// 校验戳
if (!lock.validate(stamp)){
// 锁升级
}
使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Slf4j
public class StampedLockTest {
public static void main(String[] args) {
Test test = new Test();

new Thread(() -> {
log.debug(test.getName(0));
}).start();
new Thread(() -> {
log.debug(test.getName(10));
}).start();
new Thread(() -> {
log.debug("写数据");
test.setName("ac");
}).start();
new Thread(() -> {
log.debug(test.getName(0));
}).start();
}
}

@Slf4j
class Test {
private final StampedLock lock = new StampedLock();
private String name = "default";

public void setName(String name) {
long stamp = lock.writeLock();
try {
this.name = name;
} finally {
lock.unlockWrite(stamp);
}
}

public String getName(long sleepTime) {
String ans = name;
long stamp = lock.tryOptimisticRead();
ThreadUtil.sleep(sleepTime);
if (!lock.validate(stamp)) {
log.debug("数据被修改: {}", stamp);
stamp = lock.readLock();
try {
ans = name;
} finally {
lock.unlockRead(stamp);
}
}
return ans;
}
}
原理

state:64位,前7位表示读锁状态、第8位表示写锁状态、9-64位表示stamp状态writeLock

writeLock()
1
2
3
4
5
6
7
8
9
10
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
// state 默认为 256
// s && 255 表示state的第8位是否为 0
// 如果为0表示无锁, 进行cas
// 否则调用 acquireWrite
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// 其中会两次死循环两次
// 获取锁,如果失败加入队列
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
// 尝试获取锁
if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
// 获取锁失败,对spins 赋值,然后进行自旋
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
// 初始化对p进行赋值wtail,和初始化whead
else if ((p = wtail) == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
// 初始化上方的node
else if (node == null)
node = new WNode(WMODE, p);
// 如果当前节点的前节点被改变(可能队列被其他线程又添加了node),重新赋值
else if (node.prev != p)
node.prev = p;
// 尝试将当前节点设置为队尾,成功就将上一次队尾的下一个改成当前节点,然后break退出
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}

//
for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 头节点等于尾节点(没有其他节点),可以尝试再次自旋
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
whead = node;
node.prev = null;
return ns;
}
}
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
}
// 帮助释放cowait中的读节点
else if (h != null) { // help release stale waiters
WNode c; Thread w;
// h.cowait != null, 释放读节点
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 头节点不变的情况下
if (whead == h) {
// node的prve改变了,不等于p
if ((np = node.prev) != p) {
if (np != null)
// 重置p并且将p.next再次指向node
(p = np).next = node; // stale
}
// 修改前驱节点状态
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
// 删除前驱节点(取消的)
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
// 超时取消节点
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}

Semaphore

信号量,用来限制同时访问共享资源的线程上限(只适合单机线程数)

使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
log.debug("running。。");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
log.debug("release");
}
}).start();
}
}
}
重写连接池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@Slf4j
public class SemaphoreTest {
public static void main(String[] args) {
ConnectionPool connectionPool = new ConnectionPool(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection connection = connectionPool.get();
log.debug("线程 {}, 获取到了 {}", Thread.currentThread().getName(), connection.getName());
connectionPool.free(connection);
}, "therad-" + i).start();
}
}
}

@Slf4j
class ConnectionPool {
private AtomicIntegerArray locks;
private List<Connection> connections;
private Semaphore semaphore;
private int PoolSize;

public ConnectionPool(int poolSize) {
this.PoolSize = poolSize;
this.semaphore = new Semaphore(poolSize);
this.locks = new AtomicIntegerArray(PoolSize);
this.connections = new ArrayList<>(PoolSize);
for (int i = 0; i < poolSize; i++) {
connections.add(new Connection(UUID.randomUUID().toString().split("-")[0]));
}
}

public Connection get() {
while (true) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (int i = 0; i < PoolSize; i++) {
if (locks.get(i) == 0 &&
locks.compareAndSet(i, 0, 1)) {
return connections.get(i);
}
}
// 循环一次都被占用了
// synchronized (this) {
// try {
// log.debug("等待中..");
// wait();
// } catch (InterruptedException e) {
// }
// }
}
}

public void free(Connection coon) {
for (int i = 0; i < PoolSize; i++) {
if (connections.get(i) == coon) {
locks.set(i, 0);
semaphore.release();
log.debug("释放 {}", coon.getName());
// synchronized (this) {
// // 释放锁
// this.notifyAll();
// log.debug("释放 {}", coon.getName());
// }
return;
}
}
log.debug("释放失败, 未找到对应Connection");
}


}

class Connection {
private String name;

public Connection() {
}

public Connection(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
原理
加锁-公平
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 小于0,才开始park其他线程
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || // 状态最多等于0就返回
compareAndSetState(available, remaining))
return remaining;
}
}

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 释放一系列shared Node
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放-公平
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();// 释放老二节点...
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // 修改状态到初始化阶段
return true;
}
}

CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时

其中构造参数用来初始化等待计数值,await()用来等待计数归零,countDown用来计数减一

简单使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);

for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(()->{
log.debug("{}", finalI);
ThreadUtil.sleep(100);
countDownLatch.countDown();
}).start();
}
countDownLatch.await(); // 会等待 status 减到 0, 类似join,但是使用更方便(可配合线程池使用),join更加底层,但是使用更加繁琐
log.debug("finish");
}


// 配合线程池
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(5);

for (int i = 0; i < 5; i++) {
int finalI = i;
executorService.submit(() -> {
log.debug("{}", finalI);
ThreadUtil.sleep(500);
countDownLatch.countDown();
});
}
executorService.shutdown();
countDownLatch.await();
log.debug("finish");
}
原理
countDown()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 释放老二节点
doReleaseShared();
return true;
}
return false;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 获取state
int c = getState();
// 如果 state == 0 直接返回false
if (c == 0)
return false;
// state - 1
int nextc = c-1;
// cas state如果nextc == 0 则返回true
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}


await()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 尝试获取一次锁
doAcquireSharedInterruptibly(arg);// 进入里面会让当前线程park 住,所以需要当state == 0 的时候,释放当前线程,这样就能形成一种计数的功能
}

protected int tryAcquireShared(int acquires) {
// status == 0 返回 1 否则 返回 0
return (getState() == 0) ? 1 : -1;
}

CyclicBarrier

可重复使用的 CountdownLatch

简单使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2); // 线程数需要和下面 CyclicBarrier构造器第一个参数一致,不然会任务一致不完成,或任务完成对不上
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, ()-> {
log.debug("finish");
});


for (int i = 0; i < 10; i++) {
int k = i;
executorService.submit(() -> {
log.debug("线程: {}", k);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
}
executorService.shutdown();
}
原理

同步顺序控制

wait notify版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final Object lock = new Object();

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
try {
lock.wait();
log.debug("t1");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock) {
log.debug("t2");
lock.notify();
}
});
t1.start();
t2.start();
}
park unpark 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
LockSupport.park();
log.debug("t1");
});
Thread t2 = new Thread(() -> {
log.debug("t2");
LockSupport.unpark(t1);
});

t1.start();
t2.start();
}
await signal 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static final ReentrantLock lock = new ReentrantLock();
static final Condition condition = lock.newCondition();

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
lock.lock();
try {
try {
condition.await();
log.debug("t1");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
lock.unlock();
}
});
Thread t2 = new Thread(() -> {
lock.lock();
try {
condition.signal();
log.debug("t2");
} finally {
lock.unlock();
}
});

t1.start();
t2.start();
}

交替输出

wait notify版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static boolean flag = true;
static final Object lock = new Object();

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while (true) {
synchronized (lock) {
if (flag) {
log.debug("t1");
flag = !flag;
}
}
}
});
Thread t2 = new Thread(() -> {
while (true) {
synchronized (lock) {
if (!flag) {
log.debug("t2");
flag = !flag;
}
}
}
});
t1.start();
t2.start();
}
park unpark 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int flag = 1;

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while (true) {
LockSupport.park();
log.debug("t1");
}
});
Thread t2 = new Thread(() -> {
while (true) {
LockSupport.park();
log.debug("t2");
}
});
t1.start();
t2.start();
while (true) {
if (flag % 2 == 0) {
LockSupport.unpark(t1);
} else {
LockSupport.unpark(t2);
}
flag++;
}
}
await signal 版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static final ReentrantLock lock = new ReentrantLock();
static Condition c1 = lock.newCondition();
static Condition c2 = lock.newCondition();

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while (true) {
lock.lock();
try {
try {
c2.signal();
c1.await();
log.debug("t1");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
lock.unlock();
}
}
});
Thread t2 = new Thread(() -> {
while (true) {
lock.lock();
try {
try {
c1.signal();
c2.await();
log.debug("t2");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
lock.unlock();
}
}
});

t1.start();
t2.start();
}

共享内存模型

Java内存模型

JMM 即 Java Memory Model,它定义了主存、工作内抽象概念,底层对应着CPU寄存器、缓存、硬件内存、CPU指令优化等

JMM体现在以下几个方面

  • 原子性 - 保证执行不会受到线程线程上下文切换的影响
  • 可见性 - 保证指令不会受cpu缓存的影响
  • 有序性 - 保证指令不会受到cpu执行并行优化的影响

synchronized 和 volatile 都支持可见性、有有序性,但是synchronized支持原子性,volatile不可以

synchronized 是能解决原子性、可见性、有序性。但是有序性必须代码都在synchronized的临界区中(每次只能一个线程执行,并不影响最终结果),但是如果有不在这个临界区外的代码是有可能产生有序性的问题,例如,单例模式双检锁种如果不对需要单例的对象使用volatile关键字的话,就会造成一些线程拿到的是一个未初始化完成的对象。

两阶段终止模式 volatile版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static volatile boolean isRun = true;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
if (!isRun) {
log.debug("退出");
break;
}

try {
Thread.sleep(1000);
log.debug("监控");
} catch (InterruptedException e) {
}
}
});
thread.start();
Thread.sleep(2000);
isRun = false;
}
同步模式 Balking

Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

static volatile boolean isOpen = false;
private static final Object lock = new Object();

public static void main(String[] args) {
start();
start();
ThreadUtil.sleep(3000);
stop();

}

public static void start() {

synchronized (lock) {
if (isOpen) {
log.debug("已开启监控!");
return;
}
isOpen = true;
}

new Thread(() -> {
while (isOpen) {
ThreadUtil.sleep(1000);
log.debug("监控中。。。");
}
}).start();
}

public static void stop() {
isOpen = false;
log.debug("已关闭");
}
指令重排序优化

现代处理器会被设计为一个时钟周期完成一条执行时间最长的cpu指令,每条指令都可以分为:取指令 - 指令编码 - 执行指令 - 内存访问 - 数据写回;本质上,流水线技术并不能缩短单条指令的执行时间,但是它能变相地提高了指令地吞吐率,其中串行的方式执行指令并不能把缓存、ALU都能使用起来,反而是每次就运行其中一个,如果使用流水线的方式,则尽可能得将这些都运用起来

image-20230809121912300

image-20230809122102877

volatile 原理

volatile 底层实现原理是内存屏障

写屏障

写屏障保障在该屏障之前的,对共享变量的改动,都同步到主存当中

1
2
3
4
public void test() {
num = 2;
read = true; // read 是 volatile 写屏障,并且可防止之前的指令重排
}
读屏障

读屏障保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据

1
2
3
4
5
6
7
public void test() {
// 读屏障
// read 是 volatile 读取值带读屏障
if (read) {
...
}
}
double check locking 问题

典型例子:单例模式 双检索

下方 Singleton 如果不加 volatile 的话,可能会导致获得的是一个未初始化完成的对象: singleton 原本应该在 Singleton 对象初始化后才能获得其地址,但是由于指令重排会将 耗时的 初始化对象放到 singleton 获得地址之后,也就是 singleton 先获得地址,然后才去初始化 Singleton,这样就会导致新来一个线程singleton == null 判定发现 singleton 不等于null,此时直接返回 singleton,但是此时返回的是一个未初始化完成的 Singleton对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 反序列化会破坏单例
private static volatile Singleton singleton;
private static final Object lock = new Object();
private static Singleton getInstance() {
if (singleton == null) {
synchronized (lock) {
if (singleton != null) {
return singleton;
}
singleton = new Singleton();
}
}
return singleton;
}

public static void main(String[] args) {
Singleton instance1 = Singleton.getInstance();
Singleton instance2 = Singleton.getInstance();
System.out.println(instance2 == instance1);
}

// 防止反序列化

共享模型之无锁

CAS与volatile

CAS Compare And Swap 比较并替换,是原子的

并且无锁效率是很高的,在多核CPU并且线程数不要超过CPU核心数的情况下可发挥其效率,因为synchronized 加锁释放锁会导致上下文切换,发生阻塞。无锁则能一直运行重试失败,没有阻塞,直到CAS成功后退出线程。

CAS比较适合多读少写

其中 balance.compareAndSet(expect, update)方法是原子性的,于是1000个线程都通过while循环来进行CAS,如果当前的expect值与当前balance值一样,则会修改,否则一直循环直到与自己的一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final static AtomicInteger balance = new AtomicInteger(10000);
public static void main(String[] args) {
List<Thread> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
Thread thread = new Thread(() -> {
withdraw(10);
});
list.add(thread);
}

list.forEach(Thread::start);
ThreadUtil.sleep(500);
log.debug("{}", balance);
}

public static void withdraw(int money) {
while (true) {
int expect = balance.get();
int update = expect - money;
boolean b = balance.compareAndSet(expect, update);
if (b)
break;
}
}
原子整数

AtomicInteger 原子整数,通过CAS保证原子性,类似的还有AtomicBoolea、AtomicLong…

  1. get 获取最新值
  2. getAndSet 获取并设置
  3. compareAndSet 比较并替换,原子的
  4. getAndUpdate 给予表达式操作
原子引用
  1. AtomicReference
  2. AtomicMarkableReference
  3. AtomicStampedReference
ABA问题
  1. 比如有三个线程,main、t1、t2,如果t1 将 变量 temp = A 更改为了 B,线程t2又temp更改为了 B -> A,此时main线程中get到的数据还是A,于是就将A->C,其中main并不能感知到temp是否被其他线程修改过
  2. 可通过添加版本号解决,如果版本号不一致,则已经被其他线程修改过
1
2
3
4
5
6
7
8
9
10
11
12
13
// AtomicStampedReference 可判定更改过的版本号
AtomicStampedReference<String> tem = new AtomicStampedReference<String>("A", 0); // 添加一个版本号 Stamp
new Thread(()->{
tem.compareAndSet("A", "B", 0, 1);
}).start();
ThreadUtil.sleep(100);
new Thread(()->{
tem.compareAndSet("B", "A", 1, 2);
}).start();
ThreadUtil.sleep(100);
tem.compareAndSet("A", "C", 0, 1);
log.debug(tem.getReference()); // A
log.debug("version: {}", tem.getStamp()); // 2
1
2
3
4
5
6
7
8
// 仅判定是否更改过
AtomicMarkableReference<String> tmp = new AtomicMarkableReference<>("garbage", true);
new Thread(()->{
tmp.compareAndSet("garbage", "no garbage", true, false);
}).start();
ThreadUtil.sleep(100);
tmp.compareAndSet("garbage", "no garbage2", true, false);
log.debug(tmp.getReference()); // no garbage
原子数组

Unsafe中如果操作的呢?

1
2
3
4
5
6
7
8
9
10
public final boolean compareAndSet(int i, int expect, int update) { // 通过值在对象中的偏移量,来进行CAS
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

private long checkedByteOffset(int i) { // 获取值的偏移量
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);

return byteOffset(i);
}
  1. AtomicIntegerArray
  2. AtomicLongArray
  3. AtomicReferenceArray
字段更新器
  1. AtomicIntegerFieldUpdater
  2. AtomicLongFieldUpdater
  3. AtomicReferenceFieldUpdater
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AtomicFiled {
public static void main(String[] args) {
Person tperson = new Person();
tperson.name = "jack";
AtomicReferenceFieldUpdater<Person, String> person =
AtomicReferenceFieldUpdater.newUpdater(Person.class, String.class, "name");
person.compareAndSet(tperson, "jack", "tom");
System.out.println(tperson);
}


}
class Person {
volatile String name;

@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
'}';
}
}
原子累加器

Striped64 类专门数字累加和计算的的,其中实现类有, 其中效率是很高的。

效率高主要原因是,在CAS时,设置多个累加单元,thread-0累加cell[0],而thread-1 累加 cell[1]..最后将结果汇总。因此可减少CAS重试失败

​ 累加—–

  1. LongAdder

  2. DoubleAdder

    计算——

  3. LongAccumulator

  4. DoubleAccumulator

1
2
3
4
5
6
7
LongAdder longAdder = new LongAdder();
for (int i = 0; i < 10000; i++) {
new Thread(longAdder::increment).start();
}

ThreadUtil.sleep(100);
System.out.println(longAdder.sum());
LongAdder 源码
1
2
3
4
5
6
7
8
@sun.misc.Contended static final class Cell { // Contended防止缓存行共享,防止一个缓存行共享多个cell对象
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
....
}
  • CPU缓存,其中缓存速度从上往下L1、L2、L3、..到内存,其中缓存存储数据的单位是缓存行,如果某个cpu核心修改了某个缓存行的数据,就会导致其他cpu核心的这个缓存行都失效,从而让这个cpu核心重新到内存中区读取复制数据到缓存中。一个缓存行大小一般是64Byte
  • 如果cpu 核心1去修改cell[0],cpu核心2去修改cell[1] 。这样就会导致整行都会失效,从而再次到内存中去读取最新数据
  • 如果把cell[0]和cell[1]分别占用一个缓存行,就能避免这种情况(将一个缓存行用空数据填满,故就会一个cell占用一行)

image-20230811205329597

image-20230811210320364

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// 几个重要的属性
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors(); // cpu核心数

/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells; // cell数组,大小每次x2

/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base; // 基数

/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy; // cell锁,创建cell时会使用


final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

static final int getProbe() { // 获取线程的探测值,类似线程的hash
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { // cells不等于null 或者 base进行CAS失败会进入
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || // cells等于null或者cells的长度小于0
(a = as[getProbe() & m]) == null || // 或者cells下标为 getProbe() & m 为 null
!(uncontended = a.cas(v = a.value, v + x))) // 或者上方的a进行CAS失败会进入下方
longAccumulate(x, null, uncontended);
}
}

// longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // cells存在,且长度大于0
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell cellsBusy == 0未上锁
Cell r = new Cell(x); // Optimistically create 新建一个Cell
if (cellsBusy == 0 && casCellsBusy()) { // cellsBusy == 0 且上锁
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j; // 上方创建的一个新的cell存储到cells中,如果cells当前下标为null的话
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created) // 创建成功,退出
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // 对a进行CAS,成功则退出
break;
else if (n >= NCPU || cells != as)// 超过cpu核心数
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale 扩容cells
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 初始cells
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base 再次对x进行Cas,成功则退出循环
}
}
Unsafe

这个类比较强大,可以操作内存、对内存的分配、复制、释放、对象的操作、线程调度、CAS操作、和一些系统信息

不可变

DateTimeFormatter

此类是一个不可变并且线程安全的类 (This class is immutable and thread-safe)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 不安全
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd");
for (int i = 0; i < 100; i++) {
new Thread(()->{
try {
Date parse = simpleDateFormat.parse("2023/8/12");
log.debug("{}", parse);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}).start();
}

// 安全
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd");
for (int i = 0; i < 100; i++) {
new Thread(() -> {
TemporalAccessor parse = formatter.parse("2023/08/12");
log.debug("{}", parse);
}).start();
}
不可变对象设计

String 是不可变的,并且很多方法都是通过copy来赋值或返回数据,这样就能防止对原数据操作。

1
2
3
4
5
6
7
8
9
10
11
public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen); // 如果开始为0,则返回当前字符数组对象,否则new一个String对象
}
....
final原理
1
2
3
4
5
6
7
8
9
10
final int a = 100;

LINENUMBER 8 L0
ALOAD 0
INVOKESPECIAL java/lang/Object.<init> ()V
LINENUMBER 9 L1
ALOAD 0
BIPUSH 100 // 复制一份a的值,并不是getstatic指令
PUTFIELD nochange/connectionPool.a : I
RETURN
手写-简单连接池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Slf4j
public class ConnectionPool {
private AtomicIntegerArray locks;
private List<Connection> connections;

private int PoolSize;

public ConnectionPool(int poolSize) {
this.PoolSize = poolSize;
this.locks = new AtomicIntegerArray(PoolSize);
this.connections = new ArrayList<>(PoolSize);
for (int i = 0; i < poolSize; i++)
connections.add(new Connection(UUID.randomUUID().toString().split("-")[0]));
}

public Connection get() {
while (true) {
for (int i = 0; i < PoolSize; i++) {
if (locks.get(i) == 0 &&
locks.compareAndSet(i, 0, 1)) {
return connections.get(i);
}
}
// 循环一次都被占用了
synchronized (this) {
try {
log.debug("等待中..");
wait();
} catch (InterruptedException e) {
}
}
}
}

public void free(Connection coon) {
for (int i = 0; i < PoolSize; i++) {
if (connections.get(i) == coon) {
locks.compareAndSet(i, 1, 0);
synchronized (this) {
// 释放锁
this.notifyAll();
log.debug("释放 {}", coon.getName());
}
return;
}
}
log.debug("释放失败, 未找到对应Connection");
}

public static void main(String[] args) {
connectionPool connectionPool = new connectionPool(2);
for (int i = 0; i < 10; i++) {
new Thread(()->{
Connection connection = connectionPool.get();
log.debug("线程 {}, 获取到了 {}", Thread.currentThread().getName(), connection.getName());
connectionPool.free(connection);
}, "therad-" + i).start();
}
}
}

class Connection {
private String name;

public Connection() {
}

public Connection(String name) {
this.name = name;
}

public String getName() {
return name;
}
}

线程池

线程池大概执行步骤
1
2
3
4
5
6
7
1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false. 
2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
3. If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.

1.如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个任务来启动一个新线程。对addWorker的调用原子性地检查runState和workerCount,从而通过返回false来防止错误警报,这些错误警报会在不应该添加线程的情况下添加线程。
2.如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有线程自上次检查以来已经死亡),或者池自进入该方法以来已经关闭。因此,我们重新检查状态,如果有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。
3.如果任务队列满了,那么我们尝试添加一个新线程。如果它失败了,拒绝执行任务。
自定义线程池实现

image-20230812131941708

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 2, 3, (queue, task) -> {
// queue.put((Runnable) task);
log.debug("拒绝任务 {}", task);
});

// 10个任务
for (int i = 0; i < 5; i++) {
int cur = i;
threadPool.excute(() -> {
log.debug("任务-{}", cur);
});
}

}
}

@Slf4j
class ThreadPool {
private BlockingQueue blockingQueue;
private ArrayList<Worker> workers;
private int workerNum;
private int blockingQueueSize;
private long delay;


public ThreadPool(int workerNum, int blockingQueueSize, long delay, RejectPolicy rejectPolicy) {
this.workerNum = workerNum;
this.blockingQueueSize = blockingQueueSize;
this.blockingQueue = new BlockingQueue(blockingQueueSize, rejectPolicy);
this.workers = new ArrayList<>(workerNum);
this.delay = TimeUnit.NANOSECONDS.convert(delay, TimeUnit.SECONDS);
}

public void excute(Runnable task) {
synchronized (this) {
if (workers.size() != workerNum) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
log.debug("放入");
// blockingQueue.put(task);
blockingQueue.rejectPut(task);
}
}
}

class Worker extends Thread {
private Runnable task;

@Override
public void run() {
// 直到把当前任务和任务队列,执行完
while (task != null || (task = blockingQueue.get(delay)) != null) {
try {
log.debug("正在执行任务");
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}

// 移除 worker
synchronized (this) {
workers.remove(this);
log.debug("移除 {}", this);
}
}

public Worker(Runnable task) {
this.task = task;
}
}
}

// 任务队列
class BlockingQueue {
private int capacity;
private ArrayDeque<Runnable> tasks;
private ReentrantLock lock = new ReentrantLock();
private Condition emptySet = lock.newCondition();
private Condition fullSet = lock.newCondition();
private RejectPolicy<Runnable> rejectPolicy;

public BlockingQueue(int capacity, RejectPolicy rejectPolicy) {
this.capacity = capacity;
this.tasks = new ArrayDeque<>(capacity);
this.rejectPolicy = rejectPolicy;
}

// 阻塞获取
public Runnable get(long delay) {
lock.lock();
try {
if (tasks.isEmpty()) {
try {
delay = emptySet.awaitNanos(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Runnable task = tasks.peek() == null ? null : tasks.removeFirst();
fullSet.signal();
return task;
} finally {
lock.unlock();
}
}

// 阻塞添加
public void put(Runnable task) {
lock.lock();
try {
if (capacity == tasks.size()) {
try {
fullSet.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
tasks.add(task);
emptySet.signal();
} finally {
lock.unlock();
}
}

// 带策略添加
public void rejectPut(Runnable task) {
lock.lock();
try {
if (tasks.size() == capacity) {
rejectPolicy.reject(this, task);
} else {
tasks.add(task);
emptySet.signal();
}
} finally {
lock.unlock();
}
}
}

interface RejectPolicy<T> {
void reject(BlockingQueue queue, T task);
}
ThreadPoolExecutor

不建议用,会有OOM

线程池状态,int的高3为来表示状态,低29位表示线程数量,故大小顺序排序依次为 TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

image-20230813104713577

这些信息次存储在一个原子变量中ctl,目的是将状态和线程数合二为一,以一次CAS操作就行了

1
2
3
4
5
6
7
8
// 对旧的c进行CAS,ctlOf是合并两个变量
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))

// c & 上capacity
private static int workerCountOf(int c) { return c & CAPACITY; }

// 合并两个变量
private static int ctlOf(int rs, int wc) { return rs | wc; }
构造方法
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 生存时间-针对救急线程
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler) { // 拒绝策略
JDK中拒绝策略的实现类

image-20230813113714713

  1. AortPolicy 让调用者抛出rejectException异常,这是默认策略
  2. CallerPolicy 让调用者运行任务
  3. DiscardPolicy 放弃本次任务
  4. DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
Executors
newFixedThreadPool
  1. 没有救急线程,重建出来的全是核心线程,队列也是阻塞队列,可以放任意数量的任务
  2. 并且可以动态修改线核心线程个数
1
2
3
4
5
6
7
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

newCachedThreadPool
  1. 全部都是救急线程,无核心线程
  2. 60s 回收救急
  3. 队列采用 SynchronousQueue ,它没有容量,没有现成来取是放不进去的(一手交钱,一手交货)
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor
  1. 单线程
  2. 较适合串行执行任务,如果出现异常,它会创建一个新的线程来代替当前异常的线程继续执行任务
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
提交任务
submit()

提交任务,可以是Runnable 或者是有返回值的Callable

invokeAll()

执行所有任务

invokeAny()

执行任意一个

shutdown()
  1. 不会接受新的任务,将队列中的任务执行完成,然后结束掉
  2. 异步的,调用者并不会等待他执行完成
shutdownnow()
  1. 不会接受新的任务,并且不会处理队列中的任务都不再执行,直接退出
  2. 会返回未执行的任务集合
工作线程模式

让有限的工作线程,来轮流执行任务。典型的就是线程池,体现出了享元模式设计模式

饥饿-固定线程池

线程池只有两个核心线程数,下方会导致线程数不够用,卡在获取submit结果的位置,因为没有现成去处理submit提交的任务。最终到将出现饥饿的现象, 不是死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ExecutorService service = Executors.newFixedThreadPool(2);
service.execute(()->{
log.debug("进入");
Future<String> submit = service.submit(() -> {
return "ok";
});
try {
String str = submit.get();
log.debug(str);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
service.execute(()->{
log.debug("进入");
Future<String> submit = service.submit(() -> {
return "ok";
});
try {
String str = submit.get();
log.debug(str);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
创建多少线程合适
  1. cpu密集型: cpu核数 + 1

  2. io密集型: 线程数 = 核数 * 期望cpu利用率 * 总时间(cpu计算时间+等待时间) / cpu计算时间

    例如: 4核 cpu 计算时间是 50%,其他等待时间是50%,期望cpu被100利用

    ​ 4 * 100% * 100% / 50 % = 8

任务调度线程池
Timer
  1. java.util.Timer来实现定时功能
  2. Timer简单易用,但是只能串行执行,并且如果没有捕获异常会导致后面的任务也无法正常执行
ScheduledThreadPoolExecutor
  1. 非串行
  2. 异常不会导致后面任务失效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
executorService.schedule(()->{
log.debug("sc1");
int i = 1 / 0;
}, 2, TimeUnit.SECONDS);
executorService.schedule(()->{
log.debug("sc2");
}, 1, TimeUnit.SECONDS);
}

// scheduleAtFixedRate 定时循环任务, 任务中的sleep 或者 scheduleAtFixedRate 形参的间隔时间两个中的最大值
// scheduleWithFixedDelay 则是两个时间相加
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
executorService.scheduleAtFixedRate(()->{
log.debug("while");
},0, 1, TimeUnit.SECONDS);
}
Fork/Join

JDK7新增线程池实现,它体现分治的实现,适合用户任务拆分的cpu密集型运算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
ForkJoinTask forkJoinTask = new ForkJoinTask(3);
Integer i = forkJoinPool.submit(forkJoinTask).get();
System.out.println(i);
}
}

class ForkJoinTask extends RecursiveTask<Integer> {

int n;

public ForkJoinTask(int n) {
this.n = n;
}

@Override
protected Integer compute() {
if (n == 0) {
return 0;
}

ForkJoinTask forkJoinTask = new ForkJoinTask(n - 1);
forkJoinTask.fork();

return forkJoinTask.join() + n;
}
}

AQS

  1. AbstractQuueudSynchronized,阻塞式锁
  2. 用state属性来表示资源状态(分独占模式和共享模式),子类需要的定义如何维护换这个状态,控制获取和释放锁
  3. 提供基于FIFO的等待队列,类似monitor的entrylist
  4. 条件变量来实现等待、唤醒机制,支持对个条件变量,类似monitor的waitset
  5. ReentrantLock 使用了此类
手写一个不可重入锁
1

ThreadLocal

  1. 仅本线程能获取到此对象,从而可防止多线程数据问题,可能会OOM

  2. 其中key是弱引用(当其他所有对key的引用没有后,gc会将此key给回收掉)


  3. get 的时候发现key为null,会使value设置为null (可能也会无效)

  4. set的时候发现key为null,也会使value设置为null (可能也会无效)

  5. 调用remove()推荐,也会使value为null

原理

线程安全集合类

Collections

通过集合工具类,来创建线程安全的集合对象, 底层还是通过synchronized锁来保证线程安全

  • synchronizedList
  • synchronizedMap
HashMap-JDK7-扩容死链
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 // 假如有两个线程A和B,有段链表值是1->2->null, 此时线程A先进入到transfer的next = e.next(5)此处停下,上下文切换到线程B,线程B直接一下把这段链表个给扩容了,此时 2 - 1 - > null,但是A线程又将1头插入到新节点中,所有就导致了1->2 -> 1,这样就成死链了
1 void transfer(Entry[] newTable, boolean rehash) {
//获取新Entry数组的容量
2 int newCapacity = newTable.length;
//遍历旧的Entry数组
3 for (Entry<K,V> e : table) {
// e的下一个entry不为null则循环(链表上的循环)
4 while(null != e) {
5 Entry<K,V> next = e.next;
6 if (rehash) {
7 e.hash = null == e.key ? 0 : hash(e.key);
8 }
//根据e的hash值和容量计算出index值
9 int i = indexFor(e.hash, newCapacity);
//e的下一个Entry为新数组i位置上的值
10 e.next = newTable[i];
//e添加到新数组的i位置上
11 newTable[i] = e;
//next赋值给e
12 e = next;
13 }
14 }
15 }
ConcurrentHashMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// hash表
transient volatile Node<K,V>[] table;

/**
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;

/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;

/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;

/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;

/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;

/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;

// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

// 扩容时如果某个 bin 迁移完毕,用 ForwardingNode 作为旧的 table bin 的头节点
static final class ForwardingNode<K,V> extends Node<K,V>
// 用在 compute 以及 computeIfAbsent 时,用来占位,计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V>
// 用作 treebin 的头结点,存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V>
// 作为 treebin 的节点,存储 parent,left,right
static final class TreeNode<K,V> extends Node<K,V>

构造器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
   // 有参无参都不会对table进行初始化,有参仅仅是对table的容量进行计算
// 无参
/**
* Creates a new, empty map with the default initial table size (16).
*/
public ConcurrentHashMap() {
}

// 有参
public ConcurrentHashMap(int initialCapacity) {
// 容量小于0,抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 初始化大小大于 最大容量无符号右移1位,否则采用 tableSizeFor 算出2^n的大小容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
// 初始容量 + 1/2初始容量 + 1,超过1.5倍的初始容量
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// table大小
this.sizeCtl = cap;
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 加载因子小于0、初始化大于0、并发等级小于0
// 抛出参数非法异常
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 初始容量 < 并发等级 则采用并发等级容量
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
// 算出实际容量大小
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 判定是否超过最大容量
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// table大小
this.sizeCtl = cap;
}

get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 未采用加锁的方式

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 获取非负数的hash ((h ^ (h >>> 16)) & HASH_BITS), HASH_BITS = Integer的最大值,故最高位为0,所以最后怎么&都是一个正数或者0
int h = spread(key.hashCode());
// 如果table已经初始化或者table的长度>0
if ((tab = table) != null && (n = tab.length) > 0 &&
// 获取当前hash所在的下标的头node,并且不为null
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头node的hash 等于 传入进来的key的hash
if ((eh = e.hash) == h) {
// 并且头node的key 等于 传进来的key 或者 头node的key equals 传入进来的 key
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// 直接返回头node的值
return e.val;
}
// 小于0 表示为 ForwardingNode 节点,也有可能是红黑树(因为第一个的hash默认为-1,红黑树 的hash默认为-2)
else if (eh < 0)
// 调用 ForwardingNode 或者 红黑树 的find方法查找值返回
return (p = e.find(h, key)) != null ? p.val : null;
// 链表,遍历获取值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
put()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public V put(K key, V value) {
// 参数列表: 键,值,仅对不存在的key才赋值(false表示相同key进行覆盖,true表示没有当前key才会去添加)
return putVal(key, value, false);
}


final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 或者 value 为null,直接抛出空指针异常, 与Hashtable 一致(Hashtable的key不是手动抛出的而是调用hashCode()造成的)
if (key == null || value == null) throw new NullPointerException();
// 获取hash
int hash = spread(key.hashCode());
// 桶的操作次数
int binCount = 0;
// 开始自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// table为null,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 下标头结点为null,进行新建头node
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果头node的hash 等于 -1,表示正需要迁移扩容table
else if ((fh = f.hash) == MOVED)
// 帮助其他线程迁移node
tab = helpTransfer(tab, f);
else {
// 修改或添加区域
V oldVal = null;
// 锁住当前头node
synchronized (f) {
// 如果当前table,i位置的node是上方获取的node
if (tabAt(tab, i) == f) {
// fh >= 0 表示正常hash
if (fh >= 0) {
// 桶操作数 = 1
binCount = 1;
// 开始自旋,寻找节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
// hash 一致
if (e.hash == hash &&
// 并且 key是一个
((ek = e.key) == key ||
// 或者key相同
(ek != null && key.equals(ek)))) {
// 保留查找到的node的value
oldVal = e.val;
// 是否不存在才覆盖
if (!onlyIfAbsent)
// 替换新值
e.val = value;
break;
}
// 更新e,已经到达当前链表的末尾了,还是没有找到,就新建
Node<K,V> pred = e;
// 末尾了,但是节点为null
if ((e = e.next) == null) {
// 新建节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用红黑树的putTreeVal方法,添加数据
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 对数据修改过
if (binCount != 0) {
// binCount 次数 大于 8,故链表长度大于 8
// 红黑树最多为 2
// 什么都不操作为 0
if (binCount >= TREEIFY_THRESHOLD)
// 开始树化
treeifyBin(tab, i);
if (oldVal != null)
// 返回旧值
return oldVal;
break;
}
}
}
// 操作次数 + 1,并且判定是否需要扩容table等
addCount(1L, binCount);
return null;
}
initTable()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 初始化条件 table 等于 null 或者 table的长度 为 0
while ((tab = table) == null || tab.length == 0) {
// -1 < 0:表示有其他线程正在初始化
if ((sc = sizeCtl) < 0)
// 礼让给正在初始化table的线程
Thread.yield(); // lost initialization race; just spin
// 否则进行初始化table,对sizeCtl进行cas修改成-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再判断一次,table是否为null,或者长度为0
if ((tab = table) == null || tab.length == 0) {
// 重置容量大小,大于0用本身,否则用默认 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 新建table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 赋值给table、tab
table = tab = nt;
// n - 1/4*n,故相当于乘以HashMap中的加载因子
sc = n - (n >>> 2);
}
} finally {
// 重置sizeCtl
sizeCtl = sc;
}
break;
}
}
// 返回tab
return tab;
}
casTabAt()
1
2
3
4
5
// 通过一个基本偏移量 + i << ASHIFT 偏移量 = tab中下标为i的值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
helpTransfer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// tab 不为null 并且 f 属于 ForwardingNode
if (tab != null && (f instanceof ForwardingNode) &&
// 当前node头的nextTab不为null
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// nextTab没有变化并且tablet也没有变,才帮忙做迁移
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 进行数据迁移
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
transfer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果nextTab == null, 就初始化一个nextTab
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果已经搬迁完了,就把新建的 ForwardingNode贴上去
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 如果当前node的hash 等于 -2
// 等待下一次循环
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 搬迁代码
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) { // 链表搬迁
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 红黑树搬迁
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
COpy

队列

LinkedBlockingQueue

原理
put()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void put(E e) throws InterruptedException {
// 不能传入null数据,否者空指针异常
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
// 获取锁和原子自增
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 加锁
putLock.lockInterruptibly();
try {
// 如果容量满了,就开始park住
while (count.get() == capacity) {
notFull.await();
}
// 加入队列
enqueue(node);
c = count.getAndIncrement();
// 如果还小于容量
if (c + 1 < capacity)
// 通知放数据
notFull.signal();
} finally {
// 释放
putLock.unlock();
}
if (c == 0)
// 通知拿数据
signalNotEmpty();
}

private void enqueue(Node<E> node) {
// 追加node
last = last.next = node;
}
poll()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public E poll() {
final AtomicInteger count = this.count;
// 等于表示没有数据,直接返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 大于0表示有数据
if (count.get() > 0) {
// 出队
x = dequeue();
// count - 1
c = count.getAndDecrement();
if (c > 1)
// 通知拿数据
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
// 通知放数据
signalNotFull();
return x;
}

private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
// 释放 head
h.next = h; // help GC
// 重置头结点
head = first;
E x = first.item;
// 将头结点内容置空
first.item = null;
// 返回新head的值
return x;
}

ConcurrentLinkedQueue

原理
add()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (p.casNext(null, newNode)) {
casTail(t, newNode); // Failure is OK.
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}

Exchanger

线程之间进行数据交换,exchange()会一直堵塞,直到有人和它交换数据

使用
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws InterruptedException {
Exchanger<Object> exchanger = new Exchanger<>();

new Thread(() -> {
try {
Object exchange = exchanger.exchange(1);
System.out.println(exchange);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t1").start();
exchanger.exchange(200);
}

CompletableFuture

可代替FutureTask,可做异步任务

构造
  1. CompletableFuture.runAsync() 无返回值
  2. CompletableFuture.supplyAsync() 有返回值
方法
  1. supplyAsync:开启异步任务
  2. thenCompose:连接两个异步任务
  3. thenCombine:合并两个异步任务
  4. thenApply:任务的后置处理
  5. applyToEither:获取最先完成的任务
  6. exceptionally:异常处理