进程与线程
进程:当一个程序被开启,即开启了一个进程
线程:一个进程之内可以分为多个线程、一个线程就是一个指令流、Java中线程是最小的调度单位,进程是作为资源分配的最小单位
对比:
进程基本上是互相独立的,二线程存在于进程内,是进程的一个子集
进程拥有共享的资源,如内存空间等,使其内部的线程共享
进程间通信更复杂
线程通信相对简单(因为它们共享一个进程的内存),并且更加轻量
线程的上下文切换比进程的上下文切换成本低
并发&并行
并发:同一时间应对多件事情的能力
并行:同一时间做多件事情的能力
异步同步
需要等级结果返回,才能继续运行就是同步
不需要等待结果返回,就能继续运行就是异步
同步 1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) { FileReader fileReader = new FileReader ("E:\\todo_files\\login.bat" ); String s = fileReader.readString(); System.out.println(s); log.debug("hello" ); } f: cd F:\pycharm\code\demo python Login.py pause 18 :23 :38.423 asyncsync.Async [main] - hello
异步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) { new Thread (()-> { FileReader fileReader = new FileReader ("E:\\todo_files\\login.bat" ); String s = fileReader.readString(); System.out.println(s); }).start(); log.debug("hello" ); } 18 :25 :29.591 asyncsync.Sync [main] - hellof: cd F:\pycharm\code\demo python Login.py pause
创建运行线程 Thread
继承或者直接new
重写run方法
1 2 3 4 5 6 7 8 9 10 public class c_thread { public static void main (String[] args) { Thread thread = new Thread (() -> { System.out.println(222 ); }); thread.setName("child" ); thread.start(); System.out.println("main" ); } }
Runnable
实现Runnable接口
重写run方法
1 2 3 4 5 6 7 8 9 10 11 public class c_runnable implements Runnable { @Override public void run () { System.out.println("child" ); } public static void main (String[] args) { new Thread (new c_runnable ()).start(); System.out.println("main" ); } }
Thread和Runnable区别
推荐使用 Runnable:
因为Thread是线程和任务在一起的,并没有分开。
Runnable是将线程和任务分开的,从而更容易与线程池等高级API配合
根据设计模式应该尽量用组合或者聚合代替继承
并且Java是类单继承多实现的语言
FutureTask
继承了Runnable
run方法会抛出异常,并且有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j public class c_futuretask { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> fk = new FutureTask <>(() -> { log.debug("child" ); Thread.sleep(1000 ); return 1 ; }); Thread thread = new Thread (fk, "fk" ); thread.start(); log.debug("返回值: {}" , fk.get()); } }
查看线程方法 windows
任务管理器
tasklist查看进程
taskkill杀死进程
linux
ps -fe 查看所有进程
ps -fT -p 查看某个
top -H -p 查看某个进程
kill 杀死进程
java
jps
jstack 查看某个Java进程的所有线程状态
jconsole可以查看(并且也可以远程)
线程运行原理 栈和栈帧
JVM是由堆、栈、方法去所组成,其中栈内存是给线程所使用,每一个线程启动后,虚拟机都会为其分配一块栈内存
每个栈由多个栈帧组成,对应着每次方法调用时所占用的内存
每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
图解
cup调度main线程执行main方法,加入main方法的栈帧,并且main方法中的局部变量args指向堆中中的一个字符串数组,然后调用method1方法,加入栈帧,初始化一些局部变量,并且返回地址指向调用者,最后method2加入栈帧同样局部变量n指向堆中的new Object(),返回地址指向method1(调用者),最后method2执行完成弹出,继续执行method1后再弹出,然后再执行main方法,执行完成后弹出,程序执行完毕。
线程上下文
应为一些原因导致cpu不在执行当前线程,转而执行另一个线程
线程cpu时间片用完
垃圾回收
有更高的优先级的线程需要执行
线程自己调用了sleep、yield、wait、join、park、synchronized、lock等方法
线程常见方法 start()
启动线程
getState()
获取线程状态信息
sleep()
睡眠,将线程正在running状态改为timed waiting状态
可用TimeUnit代替 (TimeUnit.SECONDS.sleep(1))
interrupt()
中断线程,谁调谁被打断,会抛出异常 (InterruptedException)
并不能被打断,需要配合打断表记
yield()
让出cpu的使用权
不一定能成功,因为即便让出了,任务调度器也有可能再次执行当前线程
join()
插队,等待线程执行结束,谁调用谁就会先执行结束
多个join的等待时间是最大那个
底层是wait
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }); Thread t2 = new Thread (() -> { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }); long start = System.currentTimeMillis(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(System.currentTimeMillis() - start); }
wait() stop()
直接停止线程,会释放资源和锁
如果直接stop,会释放资源,但是不安全,可能会导致程序并不会执行完毕,最终导致数据不一致问题
与此类似的还有 suspend()、resume()方法
isInterrupted()
查看是否打断正在运行的线程,不会清除打断标记
如果是阻塞线程会是 false (因为异常会重置打断标记)
如果是正常的 true
interrupted()
查看是否打断正在运行的线程,但是会清除标记
park()
LockSupport.park(),阻塞线程
底层是UnSafe类的park方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) throws InterruptedException { Thread thread = new Thread (() -> { log.debug("park" ); LockSupport.park(); log.debug("unpark" ); Thread.interrupted(); LockSupport.park(); log.debug("park again" ); }); thread.start(); Thread.sleep(1000 ); thread.interrupt(); }
notify()
在正在waitSet中等待的线程挑选一个唤醒
notifyAll()
将waitSet中的等待的所有的线程唤醒
防止cpu占用100%
使用sleep
wait或条件变量
后两种需要加锁,并且需要相应的唤醒操作,一般适用于同步场景
sleep适用于无需加锁的同步场景
两阶段停止模式 xxxxxxxxxx RPM(ReadHat Package Manager) rpm -qa 查询已安装软件rpm -qi 软件名称 是否已经安装此软件rpm -e 软件包 卸载软件rpm -e –nodeps 软件包 卸载软件,不考虑依赖关系rpm -ivh rpm包名 安装软件-i install-v –verbose,显示详细信息-h –hash,进度条–nodeps 安装前不检查依赖推荐yum install 软件名yum remove 软件名yum update 升级软件参数—install 安装 update 更新check-update 检查是否有可用更新remove 移除list 显示软件包信息clean 清理yum过期的缓存deplist 显示yum软件包的所有依赖关系更改软件更新下载镜像源🎈🎈🎈todomarkdown
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) throws InterruptedException { Thread thread = new Thread (() -> { while (true ) { if (Thread.currentThread().isInterrupted()) { log.debug("退出" ); break ; } try { Thread.sleep(1000 ); log.debug("监控" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); thread.start(); Thread.sleep(4000 ); thread.interrupt(); }
主线程与守护线程
默认情况下,Java进程需要所有线程结束后才会终止程序,有一种特殊线程叫做守护线程,只要其他非守护线程运行结束了,即使守护线程的代码没有执行完毕,也会强制结束。
线程生命周期
新建、准备、运行、阻塞、终结
新建、运行、带时等待、等待、锁定、终结 (Java中的)
代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static void main (String[] args) { Thread thread1 = new Thread (() -> { while (true ) ; }); Thread thread2 = new Thread (() -> { synchronized (LifeState.class) { while (true ) ; } }); Thread thread3 = new Thread (() -> { try { Thread.sleep(1000000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }); Thread thread4 = new Thread (() -> { try { thread3.join(); } catch (InterruptedException e) { throw new RuntimeException (e); } }); Thread thread5 = new Thread (() -> { synchronized (LifeState.class) { System.out.println(Thread.currentThread().getName()); } }); Thread thread6 = new Thread (() -> { }); thread2.start(); thread3.start(); thread4.start(); thread5.start(); thread6.start(); System.out.println(thread1.getState()); System.out.println(thread2.getState()); System.out.println(thread3.getState()); System.out.println(thread4.getState()); System.out.println(thread5.getState()); System.out.println(thread6.getState()); }
共享模型管理 共享问题
i++并非原子操作,它会被分为3条指令:
获取i值
进行自增或自减
对i进行赋值
synchronized
对代进行加锁
public void static synchronized a(){}; 此时锁住的是类对象
public void synchronized a(){}; 此时锁住的是this对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public static int counter = 0 ;public static void main (String[] args) throws InterruptedException { Thread thread1 = new Thread (() -> { for (int i = 0 ; i < 100000 ; i++) { counter++; } }); Thread thread2 = new Thread (() -> { for (int i = 0 ; i < 100000 ; i++) { counter--; } }); thread1.start(); thread2.start(); Thread.sleep(2000 ); System.out.println(counter); } public static int counter = 0 ;public static void main (String[] args) throws InterruptedException { Thread thread1 = new Thread (() -> { for (int i = 0 ; i < 100000 ; i++) { synchronized (ThreadSecurity.class) { counter++; } } }); Thread thread2 = new Thread (() -> { for (int i = 0 ; i < 100000 ; i++) { synchronized (ThreadSecurity.class) { counter--; } } }); thread1.start(); thread2.start(); Thread.sleep(2000 ); System.out.println(counter); -- counter 每次值都为 0 }
线程安全分析 成员变量和静态变量是否安全
如果共享了并且有写操作就不安全
局部变量是否线程安全
局部变量是安全的,但是局部引用变量不一定是安全的
常见安全类
String(其中存储字符的String的是个char数组, 并且其中的所有修改方法都是会再new 一个String)
还有一些StringBuffer、Hashtable、以及一些包装类(Integer、Double…)
练习
购票
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public static void main (String[] args) throws InterruptedException { Ticket ticket = new Ticket (); ThreadLocalRandom random = ThreadLocalRandom.current(); List<Thread> list = new ArrayList <>(); List<Integer> ret = new Vector <>(); for (int i = 0 ; i < 5000 ; i++) { Thread thread = new Thread (() -> { int res = ticket.buyTicket(random.nextInt(1 , 10 )); ret.add(res); }); list.add(thread); thread.start(); } Thread.sleep(1000 ); int sum = ret.stream().mapToInt(i -> i).sum(); System.out.println(ticket.count); System.out.println(sum); } } class Ticket { int count = 500 ; public synchronized int buyTicket (int num) { if (this .count >= num) { this .count -= num; return num; } return 0 ; }
转账
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class PracticeTransfer { public static void main (String[] args) throws InterruptedException { Account account1 = new Account (); Account account2 = new Account (); Thread thread1 = new Thread (() -> { for (int i = 0 ; i < 1000 ; i++) { account1.transfer(account2, 500 ); } }); Thread thread2 = new Thread (() -> { for (int i = 0 ; i < 1000 ; i++) { account2.transfer(account1, 500 ); } }); thread2.start(); thread1.start(); Thread.sleep(1000 ); System.out.println(account1); System.out.println(account2); } } class Account { int balance = 1000 ; public void transfer (Account target, int money) { synchronized (Account.class) { if (this .balance > money) { target.balance += money; this .balance -= money; } } } @Override public String toString () { return "Account{" + "balance=" + balance + '}' ; } }
Monitor 原理
Monitor(监视器), 每个Java对象都可以关联一个Monitor对象,如果使用synchronized给对象上锁(重量级) 之后,该对象的Mark Word就被设置指向Monitor对象的指针
Java对象头
Java对象由两部分组成: 对象头 + 对象中的成员变量
普通对象
Object Header (64 bits)
Mark Word(32 bits)
Klass Word(32 bits)
数组对象
Object Header (96 bits)
Mark Word(32 bits)
Klass Word(32 bits)
array length(32 bits)
Mark Word结构
Mark Word (32 bits)
State
hashcode:25 age:4 biased_lock:0 01
Normal
thread:23 epoch:2 age:4 biased_lock:1 01
Biased
ptr_to_lock_record:30 00
Lightweight Locked
ptr_to_heavyweight_monitor:30 10
Heavyweight Locked
11
Marked for GC
synchronized 原理
加锁原理:当一个线程被synchronized的方法或者块时,这个线程的MarkWord会指向一个Monitor,这个Monitor只能同时被一个线程访问,假如现在有其他线程访问的话,会被加入到Monitor中的等待队列中,当正在访问的线程执行完后, 对MarkWord进行重置,唤醒EntryList,才由等待中的线程来竞争访问。
轻量级锁
如果一个对象有多个线程访问,但多线程访问的时间是错开的(也就是没有竞争),那么可以用轻量级锁来优化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class LightweightLock { final static Object obj = new Object (); public void test () { synchronized (obj) { test2(); } } public void test2 () { synchronized (obj) { } } }
每个线程栈帧都会包含一个锁记录,这个锁记录可以存储锁定对象的Mark Word,其中的Object reference 会指向锁对象,线程会尝试使用cas替换锁对象的Mark Word,将Mark Word存储到自己的锁记录中。将自己的锁记录中的锁记录地址和状态给锁对象存储。
如果cas失败了:
被其他线程占有了,进入锁膨胀
自己占有,继续在栈帧中添加锁记录,但是锁记录中的锁记录的地址和状态为null
最后当退出synchronized解锁时,如果锁记录为null,表示为重入锁,并且锁记录减1
其中解锁可能失败也可能成功:
成功,表示解锁成功
失败,说明轻量级锁进入了锁膨胀,已经升级为重量级锁,进入重量级锁解锁流程
锁膨胀
当有另外一个线程进来需要获得锁时,发现锁被占有,于是锁对象将申请Monitor锁,让锁对象指向Monitor,然后将进来的线程放入到EntryList中开始等待,当正占有锁的线程执行完后,释放锁后,将唤醒正在EntryList中的线程。
自旋优化
重量级锁竞争的时候,还可以使用自旋(通过循环几次获取锁,Java6以后自适应,无法自己控制)来进行优化,如果当前线程自旋成功(即这时候线程已经退出了同步块,释放了锁)这时候线程就可以避免阻塞。
自旋成功
自旋失败
偏向锁
轻量级锁在没有竞争的时候,每次重入仍需要执行cas操作
Java6中引入了偏向锁来做进一步优化:只有第一次使用cas将线程ID设置成到对象的Mark Word头,之后发现这个线程ID是自己的就表示没有竞争,不用重新cas,以后只要不发生竞争,那么这个锁对象就属于该线程(默认开启)
其中如果调用了对象的hashCode(),会导致偏向锁被撤销。此时升级为轻量级锁
有其他线程来访问(没有竞争),升级为轻量级
调用wait()/notify(),升级为重量级,因为此方法是重量级才有的
当 2 中的其他线程频繁来访问撤销偏向锁超过20次(没有竞争),此时会重新替换偏向锁 (批量重偏向)
当撤销次数超过40次后,会让这个类的对象和新建的对象编程不可偏向的(批量撤销)
锁消除
Java运行时,JIT 即时编译器会对热点代码进行优化,可能会对锁进行消除
wait/notify
处理等待的线程会被放到Monitor中的waitSet中,此时需要处于Owner的线程notify或者notifyAll来唤醒,唤醒后将会被放入到EntrySet中进行下次线程的竞争
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class WaitThread { public static void main (String[] args) throws InterruptedException { Integer i = new Integer (1 ); Thread thread = new Thread (() -> { synchronized (i) { try { i.wait(); System.out.println("wake up" ); } catch (InterruptedException e) { throw new RuntimeException (e); } } }); Thread thread2 = new Thread (() -> { synchronized (i) { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { throw new RuntimeException (e); } i.notify(); } }); thread.start(); thread2.start(); } }
wait 和 sleep 的区别
sleep 是Thread的方法,wait是Object的
sleep不需要和synchronized配合使用,wait需要
sleep使时不会释放锁,wait 会
下方代码,如果用的Thread.sleep的话就会导致其他线程都得等待她sleep完,才能获得锁,换成wait就能解决。当然interrupt也能解决
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Slf4j public class PracticeWaitNotify { static final Object room = new Object (); static boolean hasCigarette = false ; public static void main (String[] args) throws InterruptedException { Thread thread = new Thread (() -> { log.debug("有烟没{}" , hasCigarette); synchronized (room) { if (!hasCigarette) { log.debug("没烟休息.." ); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } } }, "小娜" ); thread.start(); TimeUnit.SECONDS.sleep(1 ); for (int i = 0 ; i < 5 ; i++) { new Thread (() -> { synchronized (room) { log.debug("开始干活" ); } }, "其他人" ).start(); } new Thread (() -> { synchronized (room) { log.debug("烟到了!" ); hasCigarette = true ; } }, "小红" ).start(); } }
虚假唤醒
如果多个线程在等待,此时使用notify来唤醒wait的线程,就可能会唤醒不是自己想要的线程。
可以用notifyAll+while 来处理,while可以防止被不必要的唤醒
1 2 3 4 5 6 7 8 9 10 11 synchronized (locak) { while (条件不成立) { locak.wait(); } } synchronized (lock) { lock.notifyAll(); }
设计模式-保护性暂停
Guarded Supension, 用在一个线程等待另一个线程执行的结果
通过while + wait + notifyAll 来制作一个线程等待时会释放锁,并且可超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class GuardendObject { public static void main (String[] args) throws InterruptedException { Guardend guardend = new Guardend (); new Thread (() -> { try { log.debug("get start" ); Object ret = guardend.get(2000 ); log.debug("{}" , ret); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "get-thread" ).start(); Thread.sleep(5000 ); new Thread (()-> { log.debug("set start" ); guardend.set("666" ); }, "set_thread" ).start(); } } class Guardend { private final Object lock = new Object (); private Object response; public Object get (long timeout) throws InterruptedException { synchronized (lock) { long base = System.currentTimeMillis(); long now = 0 ; while (response == null ) { long delay = timeout - now; if (delay <= 0 ) break ; lock.wait(delay); now = System.currentTimeMillis() - base; } return response; } } public void set (Object response) { synchronized (lock) { this .response = response; lock.notifyAll(); } } }
扩展,多个类之间使用Guardend对象,作为参数传递不方便,可使用下图来设计
设计模式-生产者/消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class ProductorAndConsumer { public static void main (String[] args) throws InterruptedException { MessageQueue messageQueue = new MessageQueue (); for (int i = 0 ; i < 3 ; i++) { int val = i; new Thread (()->{ try { messageQueue.put("添加值为: " + val); } catch (InterruptedException e) { } }, "productor-" + i).start(); } TimeUnit.SECONDS.sleep(1 ); new Thread (()-> { try { messageQueue.get(); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "consumer" ).start(); } } @Slf4j class MessageQueue { final LinkedList<String> queue = new LinkedList <>(); public void get () throws InterruptedException { synchronized (queue) { while (true ) { if (queue.isEmpty()) { log.debug("队列为空" ); queue.wait(); } String ret = queue.removeFirst(); log.debug("获得 {}" , ret); queue.notifyAll(); } } } public void put (String val) throws InterruptedException { synchronized (queue) { while (queue.size() == 2 ) { log.debug("队列已满" ); queue.wait(); } queue.add(Thread.currentThread().getName()); queue.notifyAll(); } } }
线程状态转换
NEW -> RUNNABLE
RUNNABLE <–> WAITING
RUNNABLE <–> TIMED_WAITING
RUNNABLE <–> BLOCKED
RUNNABLE <–> TERMINATED
活跃性 死锁
一个线程同时获取多把所
t1想获取a对象的锁,然后又想获取b对象的锁 (在获取a对象锁和b对象锁之间,加入一点延时,100%死锁)
t2想获取b对象的锁,然后又想获取a对象的锁
定位死锁
jconsole可以或者jps定位线程id
哲学家就餐问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 @Slf4j public class DeadLock { public static void main (String[] args) { Chopstick c1 = new Chopstick ("1号筷子" ); Chopstick c2 = new Chopstick ("2号筷子" ); Chopstick c3 = new Chopstick ("3号筷子" ); Chopstick c4 = new Chopstick ("4号筷子" ); Chopstick c5 = new Chopstick ("5号筷子" ); List<Philosopher> philosopherList = new ArrayList <>(); philosopherList.add(new Philosopher ("苏格拉底" , c1, c2)); philosopherList.add(new Philosopher ("柏拉图" , c2, c3)); philosopherList.add(new Philosopher ("孔子" , c3, c4)); philosopherList.add(new Philosopher ("亚里士多德" , c4, c5)); philosopherList.add(new Philosopher ("勒奈·笛卡尔" , c5, c1)); philosopherList.stream().forEach((item) -> { new Thread (() -> { eat(item); }, item.getName()).start(); }); } public static void eat (Philosopher philosopher) { while (true ) { synchronized (philosopher.getLeft()) { synchronized (philosopher.getRight()) { log.debug("{} 正在吃" , philosopher.getName()); } } } } } class Philosopher { String name; Chopstick left; Chopstick right; public String getName () { return name; } public Chopstick getLeft () { return left; } public Chopstick getRight () { return right; } public Philosopher (String name, Chopstick left, Chopstick right) { this .name = name; this .left = left; this .right = right; } } class Chopstick { String name; public Chopstick (String name) { this .name = name; } } @Slf4j public class DeadLock { public static void main (String[] args) { Chopstick c1 = new Chopstick ("1号筷子" ); Chopstick c2 = new Chopstick ("2号筷子" ); Chopstick c3 = new Chopstick ("3号筷子" ); Chopstick c4 = new Chopstick ("4号筷子" ); Chopstick c5 = new Chopstick ("5号筷子" ); List<Philosopher> philosopherList = new ArrayList <>(); philosopherList.add(new Philosopher ("苏格拉底" , c1, c2)); philosopherList.add(new Philosopher ("柏拉图" , c2, c3)); philosopherList.add(new Philosopher ("孔子" , c3, c4)); philosopherList.add(new Philosopher ("亚里士多德" , c4, c5)); philosopherList.add(new Philosopher ("勒奈·笛卡尔" , c5, c1)); philosopherList.stream().forEach((item) -> { new Thread (() -> { eat(item); }, item.getName()).start(); }); } public static void eat (Philosopher philosopher) { while (true ) { if (philosopher.getLeft().tryLock()) { if (philosopher.getRight().tryLock()) { log.debug("{} 正在吃" , philosopher.getName()); philosopher.getRight().unlock(); } philosopher.getLeft().unlock(); } } } } class Philosopher { String name; Chopstick left; Chopstick right; public String getName () { return name; } public Chopstick getLeft () { return left; } public Chopstick getRight () { return right; } public Philosopher (String name, Chopstick left, Chopstick right) { this .name = name; this .left = left; this .right = right; } } class Chopstick extends ReentrantLock { String name; public Chopstick (String name) { this .name = name; } }
活锁
两个线程互相改变对象结果的条件,最后谁也无法结束
通过改变执行时间,可以解决,例如Thread.sleep的时间改为随机的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j public class AliveLock { static volatile int i = 10 ; public static void main (String[] args) { new Thread (() -> { while (i > 0 ) { ThreadUtil.sleep(100 ); i--; log.debug("i = {}" , i); } }, "t1" ).start(); new Thread (() -> { while (i < 20 ) { ThreadUtil.sleep(100 ); i++; log.debug("i = {}" , i); } }, "t2" ).start(); } }
饥饿
Lock Park&Unpark
LockSupport类中的方法,实则调用的Unsafe类的park和Unpark方法
wait¬ify 区别
wait、notifyAll和notify需要配合Monitor一起使用, 而park、unpark不需要
park & unpark是以线程为单位来指定 阻塞和唤醒的,notify是随机一个,notifyAll则是所有等待线程,所以park & unpark 是很精确的
park & unpark 可以先 unpark,而wait & notify 则不可以
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { Thread t1 = new Thread (() -> { log.debug("park" ); LockSupport.park(); }); t1.start(); ThreadUtil.sleep(1 , TimeUnit.SECONDS); log.debug("unpark" ); LockSupport.unpark(t1); }
原理 每个线程都有自己的Parker对象,这个对象由_mutex、_counter、_cond 组成
调用park
当调用Unsafe.park()时
检查_counter, 这时获得_mutex互斥锁
线程进入_cond条件阻塞
设置_counter = 0
调用unpark
当调用Unsafe.unpark时
设置_counter = 1
线程再次调用unpark,时发现_counter = 1,让线程继续往下执行,不在阻塞
设置_counter = 0
多把锁
多把不相干的锁
好处,可以增强并发度
坏处,可能会死锁,如果同时获取两把锁
ReentrantLock
可重入锁
可中断
可设置超时时间
可设置为公平锁(会降低并发度)
支持多个条件变量
实现原理
加锁-NonfairSync 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 static final class NonfairSync extends Sync abstract static class Sync extends AbstractQueuedSynchronizer final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
释放-NonfairSync 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
可重入-NonfairSync 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
不可打断 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
可打断 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
公平锁 公平锁是队列中的线程与刚进来的线程进行竞争,所以说是公平的,就算你的前驱节点唤醒了你,此时有新的线程进入开始CAS,如果被新的线程抢先了,还是得继续park
1 2 3 4 5 6 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); }
不公平锁 不公平则是会去判定队列中是否有前置节点,如果没有则可以CAS, 它会优先让队列中的线程先执行,然后才去获得锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 final void lock () { acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } .... } public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
条件变量-await 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public final boolean await (long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
条件变量-signal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
————————– 可重入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { reenter(); } public static void reenter () { lock.lock(); log.debug("lock 1" ); reenterSec(); lock.unlock(); } public static void reenterSec () { lock.lock(); log.debug("lock 2" ); }
可中断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void main (String[] args) { Thread thread = new Thread (() -> { try { lock.lockInterruptibly(); } catch (InterruptedException e) { log.debug("被中断" ); } finally { lock.unlock(); } }); lock.lock(); thread.start(); ThreadUtil.sleep(1000 ); thread.interrupt(); }
可设置超时时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { new Thread (() -> { try { if (!lock.tryLock(1 , TimeUnit.SECONDS)) { log.debug("超时" ); } } catch (InterruptedException e) { log.debug(e.getMessage()); } }).start(); lock.lock(); ThreadUtil.sleep(2000 ); }
可设置公平锁
1 static final ReentrantLock lock = new ReentrantLock (true );
支持多个条件变量
可指定唤醒 wait的线程
使用流程
await 前需要锁
await 执行后,会释放锁,进入confitionObject等待
await 的线程被唤醒(或打断、超时)重新竞争lock锁
竞争lock锁成功后,从await后继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static final ReentrantLock lock = new ReentrantLock (true );static final Condition condition = lock.newCondition();public static void main (String[] args) throws InterruptedException { new Thread (() -> { lock.lock(); try { try { condition.await(); } catch (InterruptedException e) { throw new RuntimeException (e); } } finally { lock.unlock(); } }, "condition" ).start(); ThreadUtil.sleep(1000 ); log.debug("唤醒 condition" ); lock.lock(); condition.signalAll(); lock.unlock(); }
ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,从而提高性能
读-读 不互斥;读-写 互斥; 写-写 互斥; 故有写即互斥
类似于数据库中的select…from…lock in share mode
测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock ();ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); new Thread (() -> { try { readLock.lock(); log.debug("thread-1" ); ThreadUtil.sleep(1 , TimeUnit.SECONDS); } finally { readLock.unlock(); } }).start(); new Thread (() -> { try { readLock.lock(); log.debug("thread-2" ); } finally { readLock.unlock(); } }).start(); 20 :22 :15.807 lock.ReentrantReadWriteLockTest [Thread-0 ] - thread-1 20 :22 :15.807 lock.ReentrantReadWriteLockTest [Thread-1 ] - thread-2
注意事项
读锁不支持条件变量
重入时升级不支持:即持有读锁的情况下去获得写锁,会导致写锁永久等待(不能用读锁去嵌套写锁,但是可以先获取读锁,获取数据再去释放读锁,然后再去获取写锁,修改数据,再去降级成读锁获取读锁,再释放写锁,释放读锁;即获取和释放2次读锁,获取和释放1次写锁)
重入时降级支持:即持有写锁的情况下去获取读锁
事项2-官方解决例子
读写锁应用 StampedLock
此类JDK8加入,StampedLock支持 tryOptimisticRead() 方法(乐观读) , 读取完毕后需要一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,否则需要重新获取读锁,保证数据安全
可用作缓存,多读少写
实现原理
流程与 ReentrantLock 类似,不同的是写锁状态占了state的低16位,读锁状态占了高16为
加锁-写 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
加锁-读 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
释放-写 1 2 3 4 5 6 7 8 9 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
释放-读 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); .... for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
StampedLock
需要配合戳使用
1 2 3 4 5 6 7 long stamp = lock.readLock();lock.unlockRead(stamp); long stamp = lock.writeLock();lock.unlockWrite(stamp);
StampedLock支持 tryOptimisticRead() 方法(乐观读) , 读取完毕后需要一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,否则需要重新获取读锁,保证数据安全
但是不支持可重入, 和条件变量
支持读写锁相互转换
1 2 3 4 5 long stamp = lock.tryOptimisticRead();if (!lock.validate(stamp)){ }
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Slf4j public class StampedLockTest { public static void main (String[] args) { Test test = new Test (); new Thread (() -> { log.debug(test.getName(0 )); }).start(); new Thread (() -> { log.debug(test.getName(10 )); }).start(); new Thread (() -> { log.debug("写数据" ); test.setName("ac" ); }).start(); new Thread (() -> { log.debug(test.getName(0 )); }).start(); } } @Slf4j class Test { private final StampedLock lock = new StampedLock (); private String name = "default" ; public void setName (String name) { long stamp = lock.writeLock(); try { this .name = name; } finally { lock.unlockWrite(stamp); } } public String getName (long sleepTime) { String ans = name; long stamp = lock.tryOptimisticRead(); ThreadUtil.sleep(sleepTime); if (!lock.validate(stamp)) { log.debug("数据被修改: {}" , stamp); stamp = lock.readLock(); try { ans = name; } finally { lock.unlockRead(stamp); } } return ans; } }
原理
state:64位,前7位表示读锁状态、第8位表示写锁状态、9-64位表示stamp状态writeLock
writeLock() 1 2 3 4 5 6 7 8 9 10 public long writeLock () { long s, next; return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this , STATE, s, next = s + WBIT)) ? next : acquireWrite(false , 0L )); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 private long acquireWrite (boolean interruptible, long deadline) { WNode node = null , p; for (int spins = -1 ;;) { long m, s, ns; if ((m = (s = state) & ABITS) == 0L ) { if (U.compareAndSwapLong(this , STATE, s, ns = s + WBIT)) return ns; } else if (spins < 0 ) spins = (m == WBIT && wtail == whead) ? SPINS : 0 ; else if (spins > 0 ) { if (LockSupport.nextSecondarySeed() >= 0 ) --spins; } else if ((p = wtail) == null ) { WNode hd = new WNode (WMODE, null ); if (U.compareAndSwapObject(this , WHEAD, null , hd)) wtail = hd; } else if (node == null ) node = new WNode (WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this , WTAIL, p, node)) { p.next = node; break ; } } for (int spins = -1 ;;) { WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0 ) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1 ; for (int k = spins;;) { long s, ns; if (((s = state) & ABITS) == 0L ) { if (U.compareAndSwapLong(this , STATE, s, ns = s + WBIT)) { whead = node; node.prev = null ; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0 ) break ; } } else if (h != null ) { WNode c; Thread w; while ((c = h.cowait) != null ) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null ) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null ) (p = np).next = node; } else if ((ps = p.status) == 0 ) U.compareAndSwapInt(p, WSTATUS, 0 , WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null ) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L ) time = 0L ; else if ((time = deadline - System.nanoTime()) <= 0L ) return cancelWaiter(node, node, false ); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this ); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L ) && whead == h && node.prev == p) U.park(false , time); node.thread = null ; U.putObject(wt, PARKBLOCKER, null ); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true ); } } } }
Semaphore
信号量,用来限制同时访问共享资源的线程上限(只适合单机线程数)
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public class SemaphoreTest { public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { semaphore.acquire(); log.debug("running。。" ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { semaphore.release(); log.debug("release" ); } }).start(); } } }
重写连接池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 @Slf4j public class SemaphoreTest { public static void main (String[] args) { ConnectionPool connectionPool = new ConnectionPool (1 ); for (int i = 0 ; i < 5 ; i++) { new Thread (() -> { Connection connection = connectionPool.get(); log.debug("线程 {}, 获取到了 {}" , Thread.currentThread().getName(), connection.getName()); connectionPool.free(connection); }, "therad-" + i).start(); } } } @Slf4j class ConnectionPool { private AtomicIntegerArray locks; private List<Connection> connections; private Semaphore semaphore; private int PoolSize; public ConnectionPool (int poolSize) { this .PoolSize = poolSize; this .semaphore = new Semaphore (poolSize); this .locks = new AtomicIntegerArray (PoolSize); this .connections = new ArrayList <>(PoolSize); for (int i = 0 ; i < poolSize; i++) { connections.add(new Connection (UUID.randomUUID().toString().split("-" )[0 ])); } } public Connection get () { while (true ) { try { semaphore.acquire(); } catch (InterruptedException e) { throw new RuntimeException (e); } for (int i = 0 ; i < PoolSize; i++) { if (locks.get(i) == 0 && locks.compareAndSet(i, 0 , 1 )) { return connections.get(i); } } } } public void free (Connection coon) { for (int i = 0 ; i < PoolSize; i++) { if (connections.get(i) == coon) { locks.set(i, 0 ); semaphore.release(); log.debug("释放 {}" , coon.getName()); return ; } } log.debug("释放失败, 未找到对应Connection" ); } } class Connection { private String name; public Connection () { } public Connection (String name) { this .name = name; } public String getName () { return name; } }
原理 加锁-公平 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
释放-公平 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } }
CountdownLatch
用来进行线程同步协作,等待所有线程完成倒计时
其中构造参数用来初始化等待计数值,await()用来等待计数归零,countDown用来计数减一
简单使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (5 ); for (int i = 0 ; i < 5 ; i++) { int finalI = i; new Thread (()->{ log.debug("{}" , finalI); ThreadUtil.sleep(100 ); countDownLatch.countDown(); }).start(); } countDownLatch.await(); log.debug("finish" ); } public static void main (String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3 ); CountDownLatch countDownLatch = new CountDownLatch (5 ); for (int i = 0 ; i < 5 ; i++) { int finalI = i; executorService.submit(() -> { log.debug("{}" , finalI); ThreadUtil.sleep(500 ); countDownLatch.countDown(); }); } executorService.shutdown(); countDownLatch.await(); log.debug("finish" ); }
原理 countDown() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
await() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; }
CyclicBarrier
可重复使用的 CountdownLatch
简单使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2 ); CyclicBarrier cyclicBarrier = new CyclicBarrier (2 , ()-> { log.debug("finish" ); }); for (int i = 0 ; i < 10 ; i++) { int k = i; executorService.submit(() -> { log.debug("线程: {}" , k); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException (e); } }); } executorService.shutdown(); }
原理 同步顺序控制 wait notify版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 static final Object lock = new Object ();public static void main (String[] args) { Thread t1 = new Thread (() -> { synchronized (lock) { try { lock.wait(); log.debug("t1" ); } catch (InterruptedException e) { throw new RuntimeException (e); } } }); Thread t2 = new Thread (() -> { synchronized (lock) { log.debug("t2" ); lock.notify(); } }); t1.start(); t2.start(); }
park unpark 版本 1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { LockSupport.park(); log.debug("t1" ); }); Thread t2 = new Thread (() -> { log.debug("t2" ); LockSupport.unpark(t1); }); t1.start(); t2.start(); }
await signal 版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static final ReentrantLock lock = new ReentrantLock ();static final Condition condition = lock.newCondition();public static void main (String[] args) { Thread t1 = new Thread (() -> { lock.lock(); try { try { condition.await(); log.debug("t1" ); } catch (InterruptedException e) { throw new RuntimeException (e); } } finally { lock.unlock(); } }); Thread t2 = new Thread (() -> { lock.lock(); try { condition.signal(); log.debug("t2" ); } finally { lock.unlock(); } }); t1.start(); t2.start(); }
交替输出 wait notify版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 static boolean flag = true ;static final Object lock = new Object ();public static void main (String[] args) { Thread t1 = new Thread (() -> { while (true ) { synchronized (lock) { if (flag) { log.debug("t1" ); flag = !flag; } } } }); Thread t2 = new Thread (() -> { while (true ) { synchronized (lock) { if (!flag) { log.debug("t2" ); flag = !flag; } } } }); t1.start(); t2.start(); }
park unpark 版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static int flag = 1 ;public static void main (String[] args) { Thread t1 = new Thread (() -> { while (true ) { LockSupport.park(); log.debug("t1" ); } }); Thread t2 = new Thread (() -> { while (true ) { LockSupport.park(); log.debug("t2" ); } }); t1.start(); t2.start(); while (true ) { if (flag % 2 == 0 ) { LockSupport.unpark(t1); } else { LockSupport.unpark(t2); } flag++; } }
await signal 版本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 static final ReentrantLock lock = new ReentrantLock ();static Condition c1 = lock.newCondition();static Condition c2 = lock.newCondition();public static void main (String[] args) { Thread t1 = new Thread (() -> { while (true ) { lock.lock(); try { try { c2.signal(); c1.await(); log.debug("t1" ); } catch (InterruptedException e) { throw new RuntimeException (e); } } finally { lock.unlock(); } } }); Thread t2 = new Thread (() -> { while (true ) { lock.lock(); try { try { c1.signal(); c2.await(); log.debug("t2" ); } catch (InterruptedException e) { throw new RuntimeException (e); } } finally { lock.unlock(); } } }); t1.start(); t2.start(); }
共享内存模型 Java内存模型
JMM 即 Java Memory Model,它定义了主存、工作内抽象概念,底层对应着CPU寄存器、缓存、硬件内存、CPU指令优化等
JMM体现在以下几个方面
原子性 - 保证执行不会受到线程线程上下文切换的影响
可见性 - 保证指令不会受cpu缓存的影响
有序性 - 保证指令不会受到cpu执行并行优化的影响
synchronized 和 volatile 都支持可见性、有有序性 ,但是synchronized支持原子性,volatile不可以
synchronized 是能解决原子性、可见性、有序性。但是有序性必须代码都在synchronized的临界区中(每次只能一个线程执行,并不影响最终结果),但是如果有不在这个临界区外的代码是有可能产生有序性的问题,例如,单例模式双检锁种如果不对需要单例的对象使用volatile关键字的话,就会造成一些线程拿到的是一个未初始化完成的对象。
两阶段终止模式 volatile版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static volatile boolean isRun = true ;public static void main (String[] args) throws InterruptedException { Thread thread = new Thread (() -> { while (true ) { if (!isRun) { log.debug("退出" ); break ; } try { Thread.sleep(1000 ); log.debug("监控" ); } catch (InterruptedException e) { } } }); thread.start(); Thread.sleep(2000 ); isRun = false ; }
同步模式 Balking
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 static volatile boolean isOpen = false ;private static final Object lock = new Object ();public static void main (String[] args) { start(); start(); ThreadUtil.sleep(3000 ); stop(); } public static void start () { synchronized (lock) { if (isOpen) { log.debug("已开启监控!" ); return ; } isOpen = true ; } new Thread (() -> { while (isOpen) { ThreadUtil.sleep(1000 ); log.debug("监控中。。。" ); } }).start(); } public static void stop () { isOpen = false ; log.debug("已关闭" ); }
指令重排序优化 现代处理器会被设计为一个时钟周期完成一条执行时间最长的cpu指令,每条指令都可以分为:取指令 - 指令编码 - 执行指令 - 内存访问 - 数据写回;本质上,流水线技术并不能缩短单条指令的执行时间,但是它能变相地提高了指令地吞吐率,其中串行的方式执行指令并不能把缓存、ALU都能使用起来,反而是每次就运行其中一个,如果使用流水线的方式,则尽可能得将这些都运用起来
volatile 原理
volatile 底层实现原理是内存屏障
写屏障
写屏障保障在该屏障之前的,对共享变量的改动,都同步到主存当中
1 2 3 4 public void test () { num = 2 ; read = true ; }
读屏障
读屏障保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
1 2 3 4 5 6 7 public void test () { if (read) { ... } }
double check locking 问题
典型例子:单例模式 双检索
下方 Singleton 如果不加 volatile 的话,可能会导致获得的是一个未初始化完成的对象: singleton 原本应该在 Singleton 对象初始化后才能获得其地址,但是由于指令重排会将 耗时的 初始化对象放到 singleton 获得地址之后,也就是 singleton 先获得地址,然后才去初始化 Singleton,这样就会导致新来一个线程singleton == null 判定发现 singleton 不等于null,此时直接返回 singleton,但是此时返回的是一个未初始化完成的 Singleton对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static volatile Singleton singleton;private static final Object lock = new Object ();private static Singleton getInstance () { if (singleton == null ) { synchronized (lock) { if (singleton != null ) { return singleton; } singleton = new Singleton (); } } return singleton; } public static void main (String[] args) { Singleton instance1 = Singleton.getInstance(); Singleton instance2 = Singleton.getInstance(); System.out.println(instance2 == instance1); }
共享模型之无锁 CAS与volatile
CAS Compare And Swap 比较并替换,是原子的
并且无锁效率是很高的,在多核CPU并且线程数不要超过CPU核心数的情况下可发挥其效率,因为synchronized 加锁释放锁会导致上下文切换,发生阻塞。无锁则能一直运行重试失败,没有阻塞,直到CAS成功后退出线程。
CAS比较适合多读少写
其中 balance.compareAndSet(expect, update)方法是原子性的,于是1000个线程都通过while循环来进行CAS,如果当前的expect值与当前balance值一样,则会修改,否则一直循环直到与自己的一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 final static AtomicInteger balance = new AtomicInteger (10000 );public static void main (String[] args) { List<Thread> list = new ArrayList <>(); for (int i = 0 ; i < 1000 ; i++) { Thread thread = new Thread (() -> { withdraw(10 ); }); list.add(thread); } list.forEach(Thread::start); ThreadUtil.sleep(500 ); log.debug("{}" , balance); } public static void withdraw (int money) { while (true ) { int expect = balance.get(); int update = expect - money; boolean b = balance.compareAndSet(expect, update); if (b) break ; } }
原子整数 AtomicInteger 原子整数,通过CAS保证原子性,类似的还有AtomicBoolea、AtomicLong…
get 获取最新值
getAndSet 获取并设置
compareAndSet 比较并替换,原子的
getAndUpdate 给予表达式操作
…
原子引用
AtomicReference
AtomicMarkableReference
AtomicStampedReference
ABA问题
比如有三个线程,main、t1、t2,如果t1 将 变量 temp = A 更改为了 B,线程t2又temp更改为了 B -> A,此时main线程中get到的数据还是A,于是就将A->C,其中main并不能感知到temp是否被其他线程修改过
可通过添加版本号解决,如果版本号不一致,则已经被其他线程修改过
1 2 3 4 5 6 7 8 9 10 11 12 13 AtomicStampedReference<String> tem = new AtomicStampedReference <String>("A" , 0 ); new Thread (()->{ tem.compareAndSet("A" , "B" , 0 , 1 ); }).start(); ThreadUtil.sleep(100 ); new Thread (()->{ tem.compareAndSet("B" , "A" , 1 , 2 ); }).start(); ThreadUtil.sleep(100 ); tem.compareAndSet("A" , "C" , 0 , 1 ); log.debug(tem.getReference()); log.debug("version: {}" , tem.getStamp());
1 2 3 4 5 6 7 8 AtomicMarkableReference<String> tmp = new AtomicMarkableReference <>("garbage" , true ); new Thread (()->{ tmp.compareAndSet("garbage" , "no garbage" , true , false ); }).start(); ThreadUtil.sleep(100 ); tmp.compareAndSet("garbage" , "no garbage2" , true , false ); log.debug(tmp.getReference());
原子数组 Unsafe中如果操作的呢?
1 2 3 4 5 6 7 8 9 10 public final boolean compareAndSet (int i, int expect, int update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); } private long checkedByteOffset (int i) { if (i < 0 || i >= array.length) throw new IndexOutOfBoundsException ("index " + i); return byteOffset(i); }
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
字段更新器
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class AtomicFiled { public static void main (String[] args) { Person tperson = new Person (); tperson.name = "jack" ; AtomicReferenceFieldUpdater<Person, String> person = AtomicReferenceFieldUpdater.newUpdater(Person.class, String.class, "name" ); person.compareAndSet(tperson, "jack" , "tom" ); System.out.println(tperson); } } class Person { volatile String name; @Override public String toString () { return "Person{" + "name='" + name + '\'' + '}' ; } }
原子累加器 Striped64 类专门数字累加和计算的的,其中实现类有, 其中效率是很高的。
效率高主要原因是,在CAS时,设置多个累加单元,thread-0累加cell[0],而thread-1 累加 cell[1]..最后将结果汇总。因此可减少CAS重试失败
累加—–
LongAdder
DoubleAdder
计算——
LongAccumulator
DoubleAccumulator
1 2 3 4 5 6 7 LongAdder longAdder = new LongAdder ();for (int i = 0 ; i < 10000 ; i++) { new Thread (longAdder::increment).start(); } ThreadUtil.sleep(100 ); System.out.println(longAdder.sum());
LongAdder 源码 1 2 3 4 5 6 7 8 @sun .misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , valueOffset, cmp, val); } .... }
CPU缓存,其中缓存速度从上往下L1、L2、L3、..到内存,其中缓存存储数据的单位是缓存行,如果某个cpu核心修改了某个缓存行的数据,就会导致其他cpu核心的这个缓存行都失效,从而让这个cpu核心重新到内存中区读取复制数据到缓存中。一个缓存行大小一般是64Byte
如果cpu 核心1去修改cell[0],cpu核心2去修改cell[1] 。这样就会导致整行都会失效,从而再次到内存中去读取最新数据
如果把cell[0]和cell[1]分别占用一个缓存行,就能避免这种情况(将一个缓存行用空数据填满,故就会一个cell占用一行)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; transient volatile long base; transient volatile int cellsBusy; final boolean casBase (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , BASE, cmp, val); } static final int getProbe () { return UNSAFE.getInt(Thread.currentThread(), PROBE); } public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } } final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell (x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell [2 ]; rs[h & 1 ] = new Cell (x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } }
Unsafe 这个类比较强大,可以操作内存、对内存的分配、复制、释放、对象的操作、线程调度、CAS操作、和一些系统信息
不可变 此类是一个不可变并且线程安全的类 (This class is immutable and thread-safe)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy/MM/dd" );for (int i = 0 ; i < 100 ; i++) { new Thread (()->{ try { Date parse = simpleDateFormat.parse("2023/8/12" ); log.debug("{}" , parse); } catch (ParseException e) { throw new RuntimeException (e); } }).start(); } DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd" );for (int i = 0 ; i < 100 ; i++) { new Thread (() -> { TemporalAccessor parse = formatter.parse("2023/08/12" ); log.debug("{}" , parse); }).start(); }
不可变对象设计 String 是不可变的,并且很多方法都是通过copy来赋值或返回数据,这样就能防止对原数据操作。
1 2 3 4 5 6 7 8 9 10 11 public String substring (int beginIndex) { if (beginIndex < 0 ) { throw new StringIndexOutOfBoundsException (beginIndex); } int subLen = value.length - beginIndex; if (subLen < 0 ) { throw new StringIndexOutOfBoundsException (subLen); } return (beginIndex == 0 ) ? this : new String (value, beginIndex, subLen); } ....
final原理 1 2 3 4 5 6 7 8 9 10 final int a = 100 ;LINENUMBER 8 L0 ALOAD 0 INVOKESPECIAL java/lang/Object.<init> ()V LINENUMBER 9 L1 ALOAD 0 BIPUSH 100 PUTFIELD nochange/connectionPool.a : I RETURN
手写-简单连接池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 @Slf4j public class ConnectionPool { private AtomicIntegerArray locks; private List<Connection> connections; private int PoolSize; public ConnectionPool (int poolSize) { this .PoolSize = poolSize; this .locks = new AtomicIntegerArray (PoolSize); this .connections = new ArrayList <>(PoolSize); for (int i = 0 ; i < poolSize; i++) connections.add(new Connection (UUID.randomUUID().toString().split("-" )[0 ])); } public Connection get () { while (true ) { for (int i = 0 ; i < PoolSize; i++) { if (locks.get(i) == 0 && locks.compareAndSet(i, 0 , 1 )) { return connections.get(i); } } synchronized (this ) { try { log.debug("等待中.." ); wait(); } catch (InterruptedException e) { } } } } public void free (Connection coon) { for (int i = 0 ; i < PoolSize; i++) { if (connections.get(i) == coon) { locks.compareAndSet(i, 1 , 0 ); synchronized (this ) { this .notifyAll(); log.debug("释放 {}" , coon.getName()); } return ; } } log.debug("释放失败, 未找到对应Connection" ); } public static void main (String[] args) { connectionPool connectionPool = new connectionPool (2 ); for (int i = 0 ; i < 10 ; i++) { new Thread (()->{ Connection connection = connectionPool.get(); log.debug("线程 {}, 获取到了 {}" , Thread.currentThread().getName(), connection.getName()); connectionPool.free(connection); }, "therad-" + i).start(); } } } class Connection { private String name; public Connection () { } public Connection (String name) { this .name = name; } public String getName () { return name; } }
线程池 线程池大概执行步骤 1 2 3 4 5 6 7 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false. 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none. 3. If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task. 1.如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个任务来启动一个新线程。对addWorker的调用原子性地检查runState和workerCount,从而通过返回false来防止错误警报,这些错误警报会在不应该添加线程的情况下添加线程。 2.如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有线程自上次检查以来已经死亡),或者池自进入该方法以来已经关闭。因此,我们重新检查状态,如果有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。 3.如果任务队列满了,那么我们尝试添加一个新线程。如果它失败了,拒绝执行任务。
自定义线程池实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 public class ThreadPoolTest { public static void main (String[] args) { ThreadPool threadPool = new ThreadPool (2 , 2 , 3 , (queue, task) -> { log.debug("拒绝任务 {}" , task); }); for (int i = 0 ; i < 5 ; i++) { int cur = i; threadPool.excute(() -> { log.debug("任务-{}" , cur); }); } } } @Slf4j class ThreadPool { private BlockingQueue blockingQueue; private ArrayList<Worker> workers; private int workerNum; private int blockingQueueSize; private long delay; public ThreadPool (int workerNum, int blockingQueueSize, long delay, RejectPolicy rejectPolicy) { this .workerNum = workerNum; this .blockingQueueSize = blockingQueueSize; this .blockingQueue = new BlockingQueue (blockingQueueSize, rejectPolicy); this .workers = new ArrayList <>(workerNum); this .delay = TimeUnit.NANOSECONDS.convert(delay, TimeUnit.SECONDS); } public void excute (Runnable task) { synchronized (this ) { if (workers.size() != workerNum) { Worker worker = new Worker (task); workers.add(worker); worker.start(); } else { log.debug("放入" ); blockingQueue.rejectPut(task); } } } class Worker extends Thread { private Runnable task; @Override public void run () { while (task != null || (task = blockingQueue.get(delay)) != null ) { try { log.debug("正在执行任务" ); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (this ) { workers.remove(this ); log.debug("移除 {}" , this ); } } public Worker (Runnable task) { this .task = task; } } } class BlockingQueue { private int capacity; private ArrayDeque<Runnable> tasks; private ReentrantLock lock = new ReentrantLock (); private Condition emptySet = lock.newCondition(); private Condition fullSet = lock.newCondition(); private RejectPolicy<Runnable> rejectPolicy; public BlockingQueue (int capacity, RejectPolicy rejectPolicy) { this .capacity = capacity; this .tasks = new ArrayDeque <>(capacity); this .rejectPolicy = rejectPolicy; } public Runnable get (long delay) { lock.lock(); try { if (tasks.isEmpty()) { try { delay = emptySet.awaitNanos(delay); } catch (InterruptedException e) { e.printStackTrace(); } } Runnable task = tasks.peek() == null ? null : tasks.removeFirst(); fullSet.signal(); return task; } finally { lock.unlock(); } } public void put (Runnable task) { lock.lock(); try { if (capacity == tasks.size()) { try { fullSet.await(); } catch (InterruptedException e) { throw new RuntimeException (e); } } tasks.add(task); emptySet.signal(); } finally { lock.unlock(); } } public void rejectPut (Runnable task) { lock.lock(); try { if (tasks.size() == capacity) { rejectPolicy.reject(this , task); } else { tasks.add(task); emptySet.signal(); } } finally { lock.unlock(); } } } interface RejectPolicy <T> { void reject (BlockingQueue queue, T task) ; }
ThreadPoolExecutor 不建议用,会有OOM
线程池状态,int的高3为来表示状态,低29位表示线程数量,故大小顺序排序依次为 TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息次存储在一个原子变量中ctl,目的是将状态和线程数合二为一,以一次CAS操作就行了
1 2 3 4 5 6 7 8 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; }
构造方法 1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 生存时间-针对救急线程 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 阻塞队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler) {
JDK中拒绝策略的实现类
AortPolicy 让调用者抛出rejectException异常,这是默认策略
CallerPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
Executors newFixedThreadPool
没有救急线程,重建出来的全是核心线程,队列也是阻塞队列,可以放任意数量的任务
并且可以动态修改线核心线程个数
1 2 3 4 5 6 7 public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory); }
newCachedThreadPool
全部都是救急线程,无核心线程
60s 回收救急
队列采用 SynchronousQueue ,它没有容量,没有现成来取是放不进去的(一手交钱,一手交货)
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
newSingleThreadExecutor
单线程
较适合串行执行任务,如果出现异常,它会创建一个新的线程来代替当前异常的线程继续执行任务
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
提交任务 submit()
提交任务,可以是Runnable 或者是有返回值的Callable
invokeAll()
执行所有任务
invokeAny()
执行任意一个
shutdown()
不会接受新的任务,将队列中的任务执行完成,然后结束掉
异步的,调用者并不会等待他执行完成
shutdownnow()
不会接受新的任务,并且不会处理队列中的任务都不再执行,直接退出
会返回未执行的任务集合
工作线程模式
让有限的工作线程,来轮流执行任务。典型的就是线程池,体现出了享元模式设计模式
饥饿-固定线程池 线程池只有两个核心线程数,下方会导致线程数不够用,卡在获取submit结果的位置,因为没有现成去处理submit提交的任务。最终到将出现饥饿的现象, 不是死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 ExecutorService service = Executors.newFixedThreadPool(2 );service.execute(()->{ log.debug("进入" ); Future<String> submit = service.submit(() -> { return "ok" ; }); try { String str = submit.get(); log.debug(str); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException (e); } }); service.execute(()->{ log.debug("进入" ); Future<String> submit = service.submit(() -> { return "ok" ; }); try { String str = submit.get(); log.debug(str); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException (e); } });
创建多少线程合适
cpu密集型: cpu核数 + 1
io密集型: 线程数 = 核数 * 期望cpu利用率 * 总时间(cpu计算时间+等待时间) / cpu计算时间
例如: 4核 cpu 计算时间是 50%,其他等待时间是50%,期望cpu被100利用
4 * 100% * 100% / 50 % = 8
任务调度线程池 Timer
java.util.Timer来实现定时功能
Timer简单易用,但是只能串行执行,并且如果没有捕获异常会导致后面的任务也无法正常执行
ScheduledThreadPoolExecutor
非串行
异常不会导致后面任务失效
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10 ); executorService.schedule(()->{ log.debug("sc1" ); int i = 1 / 0 ; }, 2 , TimeUnit.SECONDS); executorService.schedule(()->{ log.debug("sc2" ); }, 1 , TimeUnit.SECONDS); } public static void main (String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10 ); executorService.scheduleAtFixedRate(()->{ log.debug("while" ); },0 , 1 , TimeUnit.SECONDS); }
Fork/Join
JDK7新增线程池实现,它体现分治的实现,适合用户任务拆分的cpu密集型运算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class ForkJoinTest { public static void main (String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool (10 ); ForkJoinTask forkJoinTask = new ForkJoinTask (3 ); Integer i = forkJoinPool.submit(forkJoinTask).get(); System.out.println(i); } } class ForkJoinTask extends RecursiveTask <Integer> { int n; public ForkJoinTask (int n) { this .n = n; } @Override protected Integer compute () { if (n == 0 ) { return 0 ; } ForkJoinTask forkJoinTask = new ForkJoinTask (n - 1 ); forkJoinTask.fork(); return forkJoinTask.join() + n; } }
AQS
AbstractQuueudSynchronized,阻塞式锁
用state属性来表示资源状态(分独占模式和共享模式),子类需要的定义如何维护换这个状态,控制获取和释放锁
提供基于FIFO的等待队列,类似monitor的entrylist
条件变量来实现等待、唤醒机制,支持对个条件变量,类似monitor的waitset
ReentrantLock 使用了此类
手写一个不可重入锁
ThreadLocal
仅本线程能获取到此对象,从而可防止多线程数据问题,可能会OOM
其中key是弱引用(当其他所有对key的引用没有后,gc会将此key给回收掉)
get 的时候发现key为null,会使value设置为null (可能也会无效)
set的时候发现key为null,也会使value设置为null (可能也会无效)
调用remove()推荐,也会使value为null
原理 线程安全集合类 Collections 通过集合工具类,来创建线程安全的集合对象, 底层还是通过synchronized锁来保证线程安全
synchronizedList
synchronizedMap
HashMap-JDK7-扩容死链 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 1 void transfer (Entry[] newTable, boolean rehash) { 2 int newCapacity = newTable.length; 3 for (Entry<K,V> e : table) { 4 while (null != e) { 5 Entry<K,V> next = e.next; 6 if (rehash) { 7 e.hash = null == e.key ? 0 : hash(e.key); 8 } 9 int i = indexFor(e.hash, newCapacity); 10 e.next = newTable[i]; 11 newTable[i] = e; 12 e = next;13 }14 }15 }
ConcurrentHashMap 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 transient volatile Node<K,V>[] table; private transient volatile Node<K,V>[] nextTable; private transient volatile long baseCount; private transient volatile int sizeCtl; private transient volatile int transferIndex; private transient volatile int cellsBusy; private transient volatile CounterCell[] counterCells; private transient KeySetView<K,V> keySet; private transient ValuesView<K,V> values; private transient EntrySetView<K,V> entrySet; static final class ForwardingNode <K,V> extends Node <K,V> static final class ReservationNode <K,V> extends Node <K,V> static final class TreeBin <K,V> extends Node <K,V> static final class TreeNode <K,V> extends Node <K,V>
构造器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public ConcurrentHashMap () { } public ConcurrentHashMap (int initialCapacity) { if (initialCapacity < 0 ) throw new IllegalArgumentException (); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1 ) + 1 )); this .sizeCtl = cap; } public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException (); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long )(1.0 + (long )initialCapacity / loadFactor); int cap = (size >= (long )MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int )size); this .sizeCtl = cap; }
get() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
put() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 public V put (K key, V value) { return putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException (); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node <K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node <K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
initTable() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield (); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node <?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }
casTabAt() 1 2 3 4 5 static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long )i << ASHIFT) + ABASE, c, v); }
helpTransfer() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { transfer(tab, nextTab); break ; } } return nextTab; } return table; }
transfer() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 private final void transfer (Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null ) { try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node <?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode <K,V>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false ; else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt (this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); else if ((fh = f.hash) == MOVED) advance = true ; else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node <K,V>(ph, pk, pv, ln); else hn = new Node <K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null , loTail = null ; TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode <K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new TreeBin <K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin <K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } }
COpy 队列 LinkedBlockingQueue 原理 put() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); } private void enqueue (Node<E> node) { last = last.next = node; }
poll() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public E poll () { final AtomicInteger count = this .count; if (count.get() == 0 ) return null ; E x = null ; int c = -1 ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { if (count.get() > 0 ) { x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }
ConcurrentLinkedQueue 原理 add() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public boolean offer (E e) { checkNotNull(e); final Node<E> newNode = new Node <E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null ) { if (p.casNext(null , newNode)) { casTail(t, newNode); return true ; } } else if (p == q) p = (t != (t = tail)) ? t : head; else p = (p != t && t != (t = tail)) ? t : q; } }
Exchanger
线程之间进行数据交换,exchange()会一直堵塞,直到有人和它交换数据
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) throws InterruptedException { Exchanger<Object> exchanger = new Exchanger <>(); new Thread (() -> { try { Object exchange = exchanger.exchange(1 ); System.out.println(exchange); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "t1" ).start(); exchanger.exchange(200 ); }
CompletableFuture
可代替FutureTask,可做异步任务
构造
CompletableFuture.runAsync() 无返回值
CompletableFuture.supplyAsync() 有返回值
方法
supplyAsync:开启异步任务
thenCompose:连接两个异步任务
thenCombine:合并两个异步任务
thenApply:任务的后置处理
applyToEither:获取最先完成的任务
exceptionally:异常处理
…