ReentrantLock

一个可重入,可多条件变量,可超时的锁

new

1
2
3
4
5
6
7
8
// 无参非公平
public ReentrantLock() {
sync = new NonfairSync();
}
// 公平或者非公平
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 状态码
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 等待状态
volatile int waitStatus;
// 上一个节点
volatile Node prev;
// 下一个节点
volatile Node next;
// 线程
volatile Thread thread;
// 下一个等待的节点
Node nextWaiter;
// 是否是共享节点
final boolean isShared() {
return nextWaiter == SHARED;
}
// 前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

lock-非公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// 无参构造器是非公平锁
// 公平与非公平差别: 非公平锁新进的线程会直接就去和aqs队列中的线程抢占锁,公平则是会去判定aqs队列中是否有其他的线程,没有才会去获取锁
public void lock() {
sync.lock();
}

final void lock() {
// 尝试获取锁
// 修改state为1
if (compareAndSetState(0, 1))
// 设置锁的owner为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 再次去获得锁
acquire(1);
}

public final void acquire(int arg) {
// 尝试获取锁
if (!tryAcquire(arg) &&
// 获取失败, 新建一个独占锁到队列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断自己
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 线程状态
int c = getState();
// 0 表示未被占
if (c == 0) {
// 尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程已经获得锁了
// 锁重入
else if (current == getExclusiveOwnerThread()) {
// 状态 + 1
// 锁重入
int nextc = c + acquires;
// 状态溢出(不太可能会重入20多亿次)
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 修改状态
setState(nextc);
return true;
}
return false;
}

// 添加新节点
private Node addWaiter(Node mode) {
// 创建一个节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 尾部的Node赋值给pred
// tail 不为 null
if (pred != null) {
// 当前传入的node的prev设置成队列中最后一个,当前node就成为队列中最后一个了
node.prev = pred;
// cas 修改 tail 成 node
// 重新设置末尾
if (compareAndSetTail(pred, node)) {
// tail的下一个node指向node
pred.next = node;
return node;
}
}
// 循环直到加到队列尾部
enq(node);
return node;
}


final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 当前node的前置node
final Node p = node.predecessor();
// 当前node的前置node等于head,表示当前node为第二个node
// 可以去获取锁
// 获取成功,将当前node设置成head
// 移出掉旧的head
if (p == head && tryAcquire(arg)) {
// 设置为head
setHead(node);
// 老的next设置为null,用于gc回收
p.next = null; // help GC
failed = false;
// 返回是否被打断
return interrupted;
}
// 如果没有获取到锁
if (shouldParkAfterFailedAcquire(p, node) &&
// 将自己park, 如果打断了返回打断标记
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果最终没有成功
if (failed)
// 将node给取消掉
cancelAcquire(node);
}
}

final Node predecessor() throws NullPointerException {
// 当前node的前一个node
Node p = prev;
// 空指针异常, p 等于 null
if (p == null)
throw new NullPointerException();
else
return p;
}

private void setHead(Node node) {
// head设置成node
head = node;
// head 没有 thread
node.thread = null;
// head 没有 prev
node.prev = null;
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 已经设置成功前置node的waitStatus为-1
// 前置通知node的waitStatus的状态码都是-1
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 跳过cancel状态node
// 因为这里仅cancel的node为大于0(为1)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 修改前置node的waitStatus为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
// 当前线程park住
LockSupport.park(this);
// 返回是否被中断了
return Thread.interrupted();
}

lock-公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 非公平锁,抢锁的时候并不是上来就抢,而是判定队列中是否有其他node,并且这个owner不是自己,就不会去抢占
public void lock() {
sync.lock();
}

final void lock() {
acquire(1);
}

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 这两个方法是和非公平lock共用的AQS的
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 状态码为0表示未上锁
if (c == 0) {
// 查看队列中是否还有其他node
// 没有才会执行下一个判定CAS获取锁
if (!hasQueuedPredecessors() &&
// cas state
compareAndSetState(0, acquires)) {
// 设置owner
setExclusiveOwnerThread(current);
return true;
}
}
// c !=0 并且当前线程就是owner, 重入锁了
else if (current == getExclusiveOwnerThread()) {
// state + 1
int nextc = c + acquires;
// 溢出
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 重新设置state
setState(nextc);
return true;
}
return false;
}

public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
// 下一个node等于null, 没有其他节点
// 或者第二个线程就是现在这个线程
((s = h.next) == null || s.thread != Thread.currentThread());
}

unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 尝试释放
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 释放h的下一个node
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
// 状态码 - 1
int c = getState() - releases;
// 非owner线程释放锁直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 状态码为0,释放成功
// 如果是重入锁, 则需要所有的锁都释放了, 才是真正的释放了
if (c == 0) {
free = true;
// owner设置为null
setExclusiveOwnerThread(null);
}
// 重置状态码
setState(c);
return free;
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 修改head的ws为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
// 后继node如果为null或者状态大于0表示,已经被释放或者取消掉了
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部向前遍历找到一个最前面的非取消的node
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 释放s
LockSupport.unpark(s.thread);
}

ConditionObject

1
2
3
4
5
6
7
8
9
public class ConditionObject implements Condition, java.io.Serializable {
// 第一个等待节点
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
// 最后一个等待节点
private transient Node lastWaiter;
....
}

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// 需要获取到当前owner才能await
public final void await() throws InterruptedException {
// 被中断抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 添加到条件等待队列
// 清理被中断或者取消的node
Node node = addConditionWaiter();
// 去释放owner,并且去unpark队列中的第二个node
// 并且会判定await方法在lock后去调用的,如果不是会抛出IllegalMonitorStateException捕获然后将当前node给取消
int savedState = fullyRelease(node);
int interruptMode = 0;
// 同步队列中是否存在
while (!isOnSyncQueue(node)) {
// 不存在park, 等待被唤醒
LockSupport.park(this);
// 检查等待的时候是否被中断了
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 上方有acquireQueued解析
// 尝试去获取锁,失败则park住
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // != -1
// 需要重新中断
interruptMode = REINTERRUPT; // 1
//清除cancelled
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// interruptMode != 0 表示等待过程中被中断过
// 根据 interruptMode看是抛出异常还是中断当前线程
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// 如果是 THROW_IE 则抛出异常
if (interruptMode == THROW_IE)
throw new InterruptedException();
// 如果是 REINTERRUPT 则把自己给中断掉
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// lastWaiter 不为null 并且waitStatus不为 -2, 表示CONDITION被取消了
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建一个CONDITION节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// lastWaiter == null
if (t == null)
// 放在第一个
firstWaiter = node;
else
// 放在 lastWaiter后
t.nextWaiter = node;
// 更新下 lastWaiter
lastWaiter = node;
return node;
}

// 清理非CONDITION状态的node
private void unlinkCancelledWaiters() {
// 第一个node
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 下一个node
Node next = t.nextWaiter;
// 状态不对,被取消了
if (t.waitStatus != Node.CONDITION) {
// 设置当前node的下一个node为null, 准备放弃掉当前node
t.nextWaiter = null;
// trail 为null
if (trail == null)
// 更新firstWaiter, 跳过当前node
firstWaiter = next;
else
// 跳过当前node
trail.nextWaiter = next;
if (next == null) // 已经到尾部了
// 更新 lastWaiter
lastWaiter = trail;
}
else
// 上一个有效node
trail = t;
// 更新 t
t = next;
}
}

final int fullyRelease(Node node) {
// 是否释放失败
boolean failed = true;
try {
// 获取state
int savedState = getState();
// 因为这里await必须要获取了owner, 不然会抛出异常后将当前node给取消掉
// 释放owner,并且去队列中unpark其他node
if (release(savedState)) {
failed = false;
// 返回释放后的state大小
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 失败
if (failed)
// 将当前node取消掉
node.waitStatus = Node.CANCELLED;
}
}

final boolean isOnSyncQueue(Node node) {
// 不在aqs同步队列中
// waitStatus 是 CONDITION 表示未在aqs同步队列中, prev为 null则表示还没有加入aqs同步队列(因为在加入队列的时候才会去初始化node的前置节点)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 有后置节点,不是CONDITION
// 在同步队列中
if (node.next != null) // If has successor, it must be on queue
return true;
// 上方都不满足, 再去同步队列从尾部向前面查找(因为添加都是从尾部开始添加的,故尾部查找更快)
return findNodeFromTail(node);
}

// 是否在等待中被打断
private int checkInterruptWhileWaiting(Node node) {
// 当前线程是否被中断
return Thread.interrupted() ?
// 如果等待中被打断
// 将node转移到队列的尾部
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : // THROW_IE = -1, REINTERRUPT = 1
// 未被打断, 返回0
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 修改node waitStatus为0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 加入到队列尾部
enq(node);
return true;
}
// 如果不在队列中
while (!isOnSyncQueue(node))
// 让掉
Thread.yield();
// 转移失败
return false;
}

signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public final void signal() {
// 是否owner线程
// 不是抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 取出 firstWaiter
Node first = firstWaiter;
// first 不为null
if (first != null)
doSignal(first);
}

protected final boolean isHeldExclusively() {
// 当前owner线程是否等于当前线程
return getExclusiveOwnerThread() == Thread.currentThread();
}

private void doSignal(Node first) {
do {
// 如果 first 的下一个为null, 则表示只有一个等待节点
if ((firstWaiter = first.nextWaiter) == null)
// 将 lastWaite 设置为null
lastWaiter = null;
// 当前的node下一个设置为null
first.nextWaiter = null;
// 转移到同步队列并且唤醒, 成功就直接退出
// 或者first的下一个为null也会退出
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
// 如果失败,说明这个node别取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 追加到同步队列尾部
// 返回node的前置节点
Node p = enq(node);
// 前置节点的waitStatus
int ws = p.waitStatus;
// wx > 0 表示已经取消
// 修改前置节点的ws为-1, 如果修改失败,表示有其他线程也在操作这个node, 这样就直接取unpark node, 如果修改成功就仅放在队列里不管,后面等其他前置节点来唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒node
LockSupport.unpark(node.thread);
return true;
}

ReentrantReadWriteLock

  • 读锁不能升级写锁(读锁里面不能重入写锁, 会卡死),写锁可以降级读锁

  • 多条件、可重入、读读并发、读写互斥

  • 写锁读锁各占一半state, 高16位读锁,低16位写锁

new

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
....
}

public ReentrantReadWriteLock() {
// 父类 ReadWriteLock 非公平
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
// 公平或者非公平
sync = fair ? new FairSync() : new NonfairSync();
// 读锁获取
readerLock = new ReadLock(this);
// 写锁获取
writerLock = new WriteLock(this);
}

Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
abstract static class Sync extends AbstractQueuedSynchronizer {

static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
// 返回读锁状态 高16位, 这里是将c直接向右无符号移动16位, 低16位被高16位覆盖
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
// 返回写锁状态 低16位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

公平-非公平区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static final class NonfairSync extends Sync {

// 总是返回false
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}

// 老二是否是独占的. 非Shared
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// h s 都不为null,并且s不为Shared, s的thread也不为null
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

/**
* Fair version of Sync
*/
static final class FairSync extends Sync {

final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
public final boolean hasQueuedPredecessors() {
// 尾
Node t = tail; // Read fields in reverse initialization order
// 头
Node h = head;
Node s;
return h != t && // 老二节点不是当前节点, 其他节点在占用
((s = h.next) == null || s.thread != Thread.currentThread());
}

lock-写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void lock() {
sync.acquire(1);
}

public final void acquire(int arg) {
// 尝试获取
if (!tryAcquire(arg) &&
// 与上方ReentrantLock lock一致
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 与上方一致
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
// 当前线程
Thread current = Thread.currentThread();
// state
int c = getState();
// 获取写锁state
int w = exclusiveCount(c);
// c 不等于0, 有锁但是不知道是写锁还是读锁
if (c != 0) { // 锁重入判定
// (Note: if c != 0 and w == 0 then shared count != 0)
// 写锁为0, 进来的是读锁, 但是当前线程又不等于owner线程 (读锁想要重入写锁)
// 直接不让获取返回 false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 写锁重入太多次,不太可能
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 写锁可直接添加 acquires
setState(c + acquires);
return true;
}
// 非公平一直都是 false
if (writerShouldBlock() ||
// 修改状态
!compareAndSetState(c, c + acquires))
return false;
// 设置owner
setExclusiveOwnerThread(current);
return true;
}

lock-读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
public void lock() {
sync.acquireShared(1);
}

public final void acquireShared(int arg) {
// 尝试获取读锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

protected final int tryAcquireShared(int unused) {
// 当前线程
Thread current = Thread.currentThread();
// state
int c = getState();
// 写锁不为0 ,owner不为当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 返回-1
return -1;
// 获取读锁状态
int r = sharedCount(c);
// 当前节点不是老二节点
if (!readerShouldBlock() &&
// 读锁是否超过固定值
r < MAX_COUNT &&
// 读锁增加
compareAndSetState(c, c + SHARED_UNIT)) {
// 第一个读锁
if (r == 0) {
// 将当前线程赋值给 firstReader
firstReader = current;
// 第一个读锁获取数 = 1
// 第一个特殊标记用于通知其他获取读锁的线程尽快获取锁,减少堵塞
firstReaderHoldCount = 1;
// 如果 firstReader 等于当前线程
} else if (firstReader == current) {
// 第一个读锁获取数++
firstReaderHoldCount++;
} else {
// HoldCounter 用于释放读锁的时候,减少循环次数,并且方便获取对应线程读锁数量
// 以及线程ID用于避免同一个线程多次创建 HoldCounter
HoldCounter rh = cachedHoldCounter;
// rh如果为null 或者 rh的线程ID 不等于 当前线程的ID
if (rh == null || rh.tid != getThreadId(current))
// readHolds 中新建一个计数器
cachedHoldCounter = rh = readHolds.get(); // readHolds继承ThreadLocal调用ThreadLocal的get和set
else if (rh.count == 0)// 表示一个新的rh
// 将新的rh添加到 readHolds中
readHolds.set(rh);
// 计数器++
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
// 读锁计时器
HoldCounter rh = null;
for (;;) {
// state
int c = getState();
// 写锁不为0,
if (exclusiveCount(c) != 0) {
// owner线程不是当前线程
if (getExclusiveOwnerThread() != current)
// 返回 -1
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
// 读锁是否堵塞 公平和非公平不一样
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
// cachedHoldCounter获取
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
// readHolds中获取
rh = readHolds.get();
// 从未被使用过, 移除掉
if (rh.count == 0)
readHolds.remove();
}
}
// 被使用完了
if (rh.count == 0)
return -1;
}
}
// 溢出
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 与上方代码一致
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

private void doAcquireShared(int arg) {
// 新建SHARED节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// node的前置节点
final Node p = node.predecessor();
// 前置节点是head
if (p == head) {
// 又去尝试获取一次
int r = tryAcquireShared(arg);
if (r >= 0) {
// 会释放共享节点,直到遇见独占节点
setHeadAndPropagate(node, r);
// GC掉p
p.next = null; // help GC
// 是否中断
if (interrupted)
// 中断自己
selfInterrupt();
failed = false;
return;
}
}
// ....
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 跳过cancel节点,并且会释放节点
cancelAcquire(node);
}
}

unlock-写锁

1
2
3
4
// ReentrantLock一致
public void unlock() {
sync.release(1);
}

unlock-读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void unlock() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
// 尝试释放共享节点
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int unused) {
// 当前线程
Thread current = Thread.currentThread();
// 如果当前线程等于第一个获取读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 只剩最后一个读锁了
if (firstReaderHoldCount == 1)
// 第一个读锁设置null
firstReader = null;
else
// 否则 计数器--
firstReaderHoldCount--;
} else {
// cacheHoldCounter中获取
HoldCounter rh = cachedHoldCounter;
// 没有或者线程ID不一致与当前线程
if (rh == null || rh.tid != getThreadId(current))
// readHolds中获取
rh = readHolds.get();
int count = rh.count;
// 快使用完了
if (count <= 1) {
// 移出当前计数器
readHolds.remove();
// 超过抛出异常
if (count <= 0)
throw unmatchedUnlockException();
}
// 计数器--
--rh.count;
}

for (;;) {
// 当前state
int c = getState();
// 读锁-1
int nextc = c - SHARED_UNIT;
// cas成功, 判定是否已经释放完了返回
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

private void doReleaseShared() {
for (;;) {
Node h = head;
// 头节点不为null, 且不止一个节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// h的waitStatus等于-1
if (ws == Node.SIGNAL) {
// CAS 修改成 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 释放老二,或者从尾部向前释放
unparkSuccessor(h);
}
// 释放老二成功后, 就会可能导致ws = 0,并且如果有后记节点进入, 那么head的ws又会变成SIGNAL
else if (ws == 0 &&
// CAS失败则跳过, 成功就退出
// 尽最大可能给去释放后继节点, 不停止
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

await和signal和ReentrantLock一致

Semaphore

  • 可限制资源访问数
  • 仅适合单机

new

1
2
3
4
5
6
7
8
9
10
11
12
public Semaphore(int permits) {
// 默认非公平
sync = new NonfairSync(permits);
}

NonfairSync(int permits) {
super(permits);
}
// 将permits赋值给state
Sync(int permits) {
setState(permits);
}

acquire-非公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 通过state值来限制访问量, 超过就放入队列park
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 当钱线程被中断抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 小于0放入队列
if (tryAcquireShared(arg) < 0)
// 分析过, 会添加新的节点,并且会去判定当前节点的前置节点是否是head如果是尝试获取锁,如果失败就park住
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取state
int available = getState();
// state - 1
int remaining = available - acquires;
// 小于0或者CAS成功返回剩余信号量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

acquire-公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁不一致
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 是否有老二节点,并且这个老二节点不是自己
if (hasQueuedPredecessors())
// 不参与争夺
return -1;
int available = getState();
int remaining = available - acquires;
// 访问数不足
if (remaining < 0 ||
// 或者访问数足够CAS成功
compareAndSetState(available, remaining))
return remaining;
}
}

release-公平-非公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void release() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
// 尝试释放节点
if (tryReleaseShared(arg)) {
// 释放后驱节点
// aqs中的方法, 已解析
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取状态码
int current = getState();
// 状态码 + 1
int next = current + releases;
// 溢出
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS状态码
if (compareAndSetState(current, next))
return true;
}
}

CountDownLatch

  • 用来进行线程同步协作,等待所有线程完成倒计时

  • 其中构造参数用来初始化等待计数值,await()用来等待计数归零,countDown用来计数减一

  • 只能使用一次,如果需要重复使用需要用 CyclicBarrier

new

1
2
3
4
5
6
7
8
9
10
11
public CountDownLatch(int count) {
// count 不能小于0
if (count < 0) throw new IllegalArgumentException("count < 0");
// 设置state
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
// Sync继承关系
private static final class Sync extends AbstractQueuedSynchronizer {...}

countDown

释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
// 返回0表示可以释放队列中的节点了
// 倒计时已经使用完了
if (tryReleaseShared(arg)) {
// 调用AQS的,释放后继节点,仅最大可能释放后继节点
doReleaseShared();
return true;
}
return false;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
// state已经被使用完
if (c == 0)
return false;
int nextc = c-1;
// CAS减少倒计时
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

await

获取锁,失败进入队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// < 0未获取到锁
if (tryAcquireShared(arg) < 0)
// aqs中的方法,会将节点加入队列中,然后park住
// 如果当前节点是老二节点,会尝试再去获取锁并且释放锁(SHARED)
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
// 当前状态是等于0,是返回1否则返回 -1
// 如果被释放完了才能获取锁, 倒计时使用完了才能
return (getState() == 0) ? 1 : -1;
}

CyclicBarrier

  • 可重复使用的 CountDownLatch计数
  • 当满足多个线程调用await后才能继续往下执行

new

1
2
3
4
5
6
7
8
9
10
11
12
13
public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
// parties不能小于等于0
if (parties <= 0) throw new IllegalArgumentException();
// 线程数
this.parties = parties;
this.count = parties;
// 任务
this.barrierCommand = barrierAction;
}

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 参数是否超时,超时多久
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

private static class Generation {
boolean broken = false;
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 使用的 ReentrantLock 锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

// 是否中断
if (g.broken)
throw new BrokenBarrierException();
// 是否打断
if (Thread.interrupted()) {
// 中断唤醒所有
breakBarrier();
throw new InterruptedException();
}

// 获取计数
int index = --count;
// 计数消耗完
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 构造器中传入进来的任务
final Runnable command = barrierCommand;
if (command != null)
// 执行任务
command.run();
// 已执行任务
ranAction = true;
// 开始另一次计数,新的一轮任务
nextGeneration();
// 已经执行完成一次
return 0;
} finally {
// 异常中断
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 不是开启超时
if (!timed)
// 直接加入等待队列,park
trip.await();
else if (nanos > 0L)
// 计时等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) { // 被中断异常
// 未被中断
if (g == generation && ! g.broken) {
// 中断掉
breakBarrier();
throw ie;
} else {
// 直接打断
Thread.currentThread().interrupt();
}
}
// 被中断了
if (g.broken)
throw new BrokenBarrierException();

// 不是同一个 generation, 是否已经被更新了
// 已经执行完一轮了, 故两个generation不一致
if (g != generation)
return index;

// 超时处理
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}

// 中断并且唤醒所有
private void breakBarrier() {
generation.broken = true;
// 重置count
count = parties;
trip.signalAll();
}

// 新的一轮计数
private void nextGeneration() {
// signal completion of last generation
// 唤醒所有
trip.signalAll();
// set up next generation
// 重置计数
count = parties;
// 新建一个 generation
generation = new Generation();
}

LongAdder

高效率并发累加器,通过cells将操作分解到多个单元,减少竞争,从而增加效率

new

1
public LongAdder() {}

add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// 防止行共享
@sun.misc.Contended static final class Cell {...}

public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// cells不为null, 或者CAS基础值失败(成功就结束掉)
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// as为null或者长度小于0, 无法操作问题
if (as == null || (m = as.length - 1) < 0 ||
// 当前 getProbe哈希 & m 下标的单元不存在, 会进入下方的 longAccumulate
(a = as[getProbe() & m]) == null ||
// 当前单元cas 失败, 也会进入下方 longAccumulate
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// hash等于0
if ((h = getProbe()) == 0) {
// 强制初始化s
ThreadLocalRandom.current(); // force initialization
// 重新获取hash
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells存在,且长度大于0, 已经初始化
// cells中的某些单元可能没有初始化
if ((as = cells) != null && (n = as.length) > 0) {
// 当前下标的单元未初始化
if ((a = as[(n - 1) & h]) == null) {
// cellsBusy == 0未上锁
if (cellsBusy == 0) { // Try to attach new Cell
// 因为没有,所以给当前下标的单元新建一个
// 并且带值的单元
Cell r = new Cell(x); // Optimistically
// cellsBusy == 0 未上锁, 就尝试上锁
if (cellsBusy == 0 && casCellsBusy()) {
// 是否已经完成创建单元
boolean created = false;
try { // Recheck under lock
// 上方创建的一个新的cell存储到cells中
// 如果cells当前下标为null的话
Cell[] rs; int m, j;
// 又进行一次判定
// cells 不为null
// 这里两次判定, 为了防止多个线程进入这里然后把当前下标的单元给覆盖了
if ((rs = cells) != null &&
// 并且长度要大于0
(m = rs.length) > 0 &&
// 并且当前下标为null
rs[j = (m - 1) & h] == null) {
// 放入新建的cell
rs[j] = r;
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 创建成功,退出
if (created)
// 因为这是当前下标的第一个单元, 所以可以直接退出
// 退出循环
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 重置CAS失败的hash
// 让当前线程寻找一个其他下标的
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 对当前下标的单元进行CAS增值
// 成功则退出
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 超过cpu逻辑处理器个数
// 或者cells被其他线程扩容了, 导致两个不一致
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 会一直在这里循环
// 不在走下面的扩容了
else if (!collide)
collide = true;
// 开始扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 是同一个cells
if (cells == as) { // Expand table unless stale
// 长度扩大两倍
Cell[] rs = new Cell[n << 1];
// 遍历旧单元到新cells数组上
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 重新赋值cells
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 重新获取hash
h = advanceProbe(h);
}
// 未创建cells, 准备创建cells
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
// 初始化cells
// 并且第一次长度是2的一个cell数组
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 初始化成功, 并且退出循环
if (init)
break;
}
// 当cells为空并且有其他线程在扩容, 新进来的线程也不能闲置着, 通过cas叠加base来增加效率
// base 会在后面sum中当成基础值来用
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

increment

1
2
3
4
public void increment() {
// 上方的add
add(1L);
}

sum

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public long sum() {
Cell[] as = cells; Cell a;
// 基础值
long sum = base;
// cells不是空的
if (as != null) {
// 遍历cells中的值
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
// 叠加
sum += a.value;
}
}
// 最终返回
return sum;
}

decrement

1
2
3
4
public void decrement() {
// 减1
add(-1L);
}

reset

1
2
3
4
5
6
7
8
9
10
11
// 将cells中的所有值和base都设置为0L
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}

Exchanger

可两个线程之间数据交换, 一个线程会等待另一个线程的数据到达后进行数据交换(exchange方法)

new

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Exchanger() {
participant = new Participant();
}

// 继承 ThreadLocal
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}

// 防止缓存行
@sun.misc.Contended static final class Node {
// 下标值
int index; // Arena index
// 允许交换边界值, 允许交换的最大位置
int bound; // Last recorded value of Exchanger.bound
// 边界值的CAS操作失败的次数
int collides; // Number of CAS failures at current bound
// 控制线程的自旋
int hash; // Pseudo-random for spins
// 当前线程要交换的项
Object item; // This thread's current item
// 与当前线程交换的项
volatile Object match; // Item provided by releasing thread
// 当前线程是否被挂起
volatile Thread parked; // Set to this thread when parked, else null
}

exchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public V exchange(V x) throws InterruptedException {
Object v;
// 对item初始化, 如果传入的是null,则是一个Object对象
// 否则就是传入的值
Object item = (x == null) ? NULL_ITEM : x; // translate null args
// 多槽位数组不为null
if ((arena != null ||
// 进行单槽位交换值, 如果这里有返回值就不会再执行 && 后面的了
(v = slotExchange(item, false, 0L)) == null) &&
// 并且当前线程是否被中断了
((Thread.interrupted() || // disambiguates null return
// 进行多槽位交换值
(v = arenaExchange(item, false, 0L)) == null)))
// 最终失败抛出异常
throw new InterruptedException();
// 成功返回交换值
return (v == NULL_ITEM) ? null : (V)v;
}

// 线程1: 先进来的线程, 线程2: 后进的线程
private final Object slotExchange(Object item, boolean timed, long ns) {
// 获取当前线程 Node
Node p = participant.get();
// 当前线程
Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;

for (Node q;;) {
// 单槽不为null, 并且将slot赋值给q
// 第一次进来是null
if ((q = slot) != null) {
// 第2个线程进来
// 将slot CAS为null
if (U.compareAndSwapObject(this, SLOT, q, null)) {
// 保存第一个线程的值
Object v = q.item;
// 将自己传入值赋值
q.match = item;
// 第一个线程Thread
Thread w = q.parked;
if (w != null)
// 不为null, 释放掉
U.unpark(w);
// 线程1传入的值返回给线程二
return v;
}
// create arena on contention, but continue until slot null
// 多槽初始化....
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
// 退出准备使用多槽
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
// 线程1将传入的值赋值
p.item = item;
// 初始化slot, 线程2检测到slot不为null准备拿值并且将自己的放入
if (U.compareAndSwapObject(this, SLOT, null, p))
// CAS 成功退出当前循环
break;
// CAS 失败, 将刚才赋的值抹掉
p.item = null;
}
}

// await release
// 获取hash
int h = p.hash;
// 超时时间
long end = timed ? System.nanoTime() + ns : 0L;
// 自旋次数
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
// 后来线程的值为null
while ((v = p.match) == null) {
// 自旋优化
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
// slot不一致, 重试
else if (slot != p)
spins = SPINS;
// 线程未中断, 并且多槽为null, 并且超时关闭或者剩余超时时间大于0L
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
// 记录当前线程
U.putObject(t, BLOCKER, this);
// 将当前线程赋值
p.parked = t;
// slot未更改
if (slot == p)
// park当前线程
U.park(false, ns);
// 释放后清空p.parked
p.parked = null;
// 移除当前线程
U.putObject(t, BLOCKER, null);
}
// 超时处理
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
// 如果被中断则返回null, 否则返回TIMED_OUT Object对象
// 上面方法会去判定返回值是否是TIMED_OUT, 如果是就抛出超时异常
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
// 退出
break;
}
}
// 清空 MATCH
U.putOrderedObject(p, MATCH, null);
// 清空 item
p.item = null;
// 重置hash
p.hash = h;
// 返回
return v;
}

FutureTask

可用于异步任务, 可用 CompletableFuture 代替

new

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 状态值
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

// 任务
private Callable<V> callable;
// 返回值
private Object outcome;
// 执行call方法时候需要的锁
private volatile Thread runner;
// 等待队列
private volatile WaitNode waiters;

public FutureTask(Callable<V> callable) {
// 不能传入null
if (callable == null)
throw new NullPointerException();
// 赋值callable
this.callable = callable;
// 设置状态
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
// RunnableAdapter适配下
this.callable = Executors.callable(runnable, result);
// 设置线程状态
this.state = NEW; // ensure visibility of callable
}

static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
// runnable 执行完了, 直接返回result。
// 这个返回值是new的时候就设置好了
// 并不是线程中返回的
task.run();
return result;
}
}

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果未完成, 进入等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 抛出异常或者返回值
return report(s);
}

static final class WaitNode {
// 当前线程
volatile Thread thread;
// 下一个线程
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 线程被中断
if (Thread.interrupted()) {
// 移出当前q
removeWaiter(q);
throw new InterruptedException();
}
// 当前状态
int s = state;
// 线程已经完成
if (s > COMPLETING) {
// 当前线程不为空
if (q != null)
// 线程置空
q.thread = null;
// 返回线程状态
return s;
}
// 线程正在完成中
else if (s == COMPLETING) // cannot time out yet
// 让掉
Thread.yield();
// q为空
else if (q == null)
// 新建一个
q = new WaitNode();
// 未入队列
else if (!queued)
// 将旧的WaitNode最佳到最新的q后面
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // 是否开启超时
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {// 已经超时
removeWaiter(q);// 移除掉q
return state; // 返回状态
}
// 通过限时park来控制
LockSupport.parkNanos(this, nanos);
}
else
// 无限时park
LockSupport.park(this);
}
}

run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public void run() {
// 非new状态
if (state != NEW ||
// 修改runner, 加锁
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// c不为null且状态为新建
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 设置异常
setException(ex);
}
if (ran)
// 设置返回值
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
// 如果s为中断, 那就等待中断完成
handlePossibleCancellationInterrupt(s);
}
}

protected void set(V v) {
// 修改状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 返回结果
outcome = v;
// 再次修改state
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 释放 waiters
finishCompletion();
}
}

private void finishCompletion() {
// assert state > COMPLETING;
// 遍历 waiters
for (WaitNode q; (q = waiters) != null;) {
// waiters 设置为 null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 遍历所有 WaitNode 并且释放掉
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 一个一个唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

// 没内容
done();
// callable 设置为 null
callable = null; // to reduce footprint
}

cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean cancel(boolean mayInterruptIfRunning) {
// 可以CAS成中断或者取消(根据 mayInterruptIfRunning)
// 必须在NEW的时候cancel掉
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
// CAS状态为打断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 结束处理
finishCompletion();
}
return true;
}

ConcurrentHashMap

高效 + 并发安全的 HashMap

new

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 代表移动状态, 由ForwardingNode占位
static final int MOVED = -1; // hash for forwarding nodes
// 红黑树
static final int TREEBIN = -2; // hash for roots of trees
// 用于一些特殊方法添加节点的时候占位用的, 因为这个节点最后可能不会添加(computeIfAbsent)
static final int RESERVED = -3; // hash for transient reservations
// 用于求正数hash
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// CPU逻辑处理器个数
static final int NCPU = Runtime.getRuntime().availableProcessors();

// 有参无参都不会对table进行初始化,有参仅仅是对table的容量进行计算
// 无参
/**
* Creates a new, empty map with the default initial table size (16).
*/
public ConcurrentHashMap() {}

// 有参
public ConcurrentHashMap(int initialCapacity) {
// 容量小于0,抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 初始化大小大于 最大容量无符号右移1位,否则采用 tableSizeFor 算出2^n的大小容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
// 初始容量 + 1/2初始容量 + 1,超过1.5倍的初始容量
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// table大小
this.sizeCtl = cap;
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 加载因子小于0、初始化大于0、并发等级小于0
// 抛出参数非法异常
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 初始容量 < 并发等级 则采用并发等级容量
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
// 算出实际容量大小
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 判定是否超过最大容量
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// table大小
this.sizeCtl = cap;
}

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 未采用加锁的方式
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 获取非负数的hash ((h ^ (h >>> 16)) & HASH_BITS), HASH_BITS = Integer的最大值,故最高位为0,所以最后怎么&都是一个正数或者0
int h = spread(key.hashCode());

// 如果table已经初始化或者table的长度>0
if ((tab = table) != null && (n = tab.length) > 0 &&
// 获取当前hash所在的下标的头node,并且不为null
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头node的hash 等于 传入进来的key的hash
if ((eh = e.hash) == h) {
// 并且头node的key 等于 传进来的key 或者 头node的key equals 传入进来的 key
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// 直接返回头node的值
return e.val;
}
// 小于0 表示为 ForwardingNode 节点,也有可能是红黑树(因为第一个的hash默认为-1,红黑树 的hash默认为-2)
else if (eh < 0)
// 调用 ForwardingNode 或者 红黑树 的find方法查找值返回
return (p = e.find(h, key)) != null ? p.val : null;
// 链表,遍历获取值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}


// 获取hash方式
static final int spread(int h) {
// 无符号右移16位 & 0x7fffffff
// 获取一个正数的hash
return (h ^ (h >>> 16)) & HASH_BITS;
}

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public V put(K key, V value) {
// 参数列表: 键,值,仅对不存在的key才赋值(false表示相同key进行覆盖,true表示没有当前key才会去添加)
return putVal(key, value, false);
}


final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 或者 value 为null,直接抛出空指针异常, 与Hashtable 一致(Hashtable的key不是手动抛出的而是调用hashCode()造成的)
if (key == null || value == null) throw new NullPointerException();
// 获取hash
int hash = spread(key.hashCode());
// 桶的操作次数
int binCount = 0;
// 开始自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// table为null,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 下标头结点为null,进行新建头node
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果头node的hash 等于 -1,表示正需要迁移扩容table
else if ((fh = f.hash) == MOVED)
// 帮助其他线程迁移node
tab = helpTransfer(tab, f);
else {
// 修改或添加区域
V oldVal = null;
// 锁住当前头node
synchronized (f) {
// 如果当前table,i位置的node是上方获取的node
if (tabAt(tab, i) == f) {
// fh >= 0 表示正常hash
if (fh >= 0) {
// 桶操作数 = 1
binCount = 1;
// 开始自旋,寻找节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
// hash 一致
if (e.hash == hash &&
// 并且 key是一个
((ek = e.key) == key ||
// 或者key相同
(ek != null && key.equals(ek)))) {
// 保留查找到的node的value
oldVal = e.val;
// 是否不存在才覆盖
if (!onlyIfAbsent)
// 替换新值
e.val = value;
break;
}
// 更新e,已经到达当前链表的末尾了,还是没有找到,就新建
Node<K,V> pred = e;
// 末尾了,但是节点为null
if ((e = e.next) == null) {
// 新建节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用红黑树的putTreeVal方法,添加数据
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 对数据修改过
if (binCount != 0) {
// binCount 次数 大于 8,故链表长度大于 8
// 红黑树最多为 2
// 什么都不操作为 0
if (binCount >= TREEIFY_THRESHOLD)
// 开始树化
treeifyBin(tab, i);
if (oldVal != null)
// 返回旧值
return oldVal;
break;
}
}
}
// 操作次数 + 1,并且判定是否需要扩容table等
addCount(1L, binCount);
return null;
}

initTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 初始化条件 table 等于 null 或者 table的长度 为 0
while ((tab = table) == null || tab.length == 0) {
// -1 < 0:表示有其他线程正在初始化
if ((sc = sizeCtl) < 0)
// 礼让给正在初始化table的线程
Thread.yield(); // lost initialization race; just spin
// 否则进行初始化table,对sizeCtl进行cas修改成-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再判断一次,table是否为null,或者长度为0
if ((tab = table) == null || tab.length == 0) {
// 重置容量大小,大于0用本身,否则用默认 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 新建table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 赋值给table、tab
table = tab = nt;
// n - 1/4*n,故相当于乘以HashMap中的加载因子
sc = n - (n >>> 2);
}
} finally {
// 重置sizeCtl
sizeCtl = sc;
}
break;
}
}
// 返回tab
return tab;
}

casTabAt

1
2
3
4
5
// 通过一个基本偏移量 + i << ASHIFT 偏移量 = tab中下标为i的值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

helpTransfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// tab 不为null 并且 f 属于 ForwardingNode
if (tab != null && (f instanceof ForwardingNode) &&
// 当前node头的nextTab不为null
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// nextTab没有变化并且tablet也没有变,才帮忙做迁移
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 进行数据迁移
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果nextTab == null, 就初始化一个nextTab
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果已经搬迁完了,就把新建的 ForwardingNode贴上去
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 已经转移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 搬迁代码
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) { // 链表搬迁
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 红黑树搬迁
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

CopyOnWriteArrayList

一种线程安全的集合,它是 List 接口的实现之一。它的主要特点是读操作不需要锁,因此适用于读多写少的场景。每次写操作(添加、删除、更新元素)都会创建一个新的底层数组的副本,这样可以确保读操作不受写操作的干扰,从而实现线程安全。

new

1
2
3
4
5
6
7
8
9
10
final transient ReentrantLock lock = new ReentrantLock();

/** The array, accessed only via getArray/setArray. */
// 存储数据对象数组
private transient volatile Object[] array;

public CopyOnWriteArrayList() {
// 新建一个长度为0的对象数组
setArray(new Object[0]);
}

get

1
2
3
4
5
6
7
8
public E get(int index) {
return get(getArray(), index);
}

private E get(Object[] a, int index) {
// 直接通过下标的方式获取
return (E) a[index];
}

set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 通过copy旧值, 然后修改copy的值。从而读写不影响
public E set(int index, E element) {
// 是用的是可重入锁
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);
// 即将设置的数据和存储的数据不一致
if (oldValue != element) {
// 对象原数组长度
int len = elements.length;
// copy一个新的数组
Object[] newElements = Arrays.copyOf(elements, len);
// 覆盖掉旧值; 有越界异常
newElements[index] = element;
// 设置新的对象数组
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
// 确保 volatile 写入
setArray(elements);
}
// 返回旧值
return oldValue;
} finally {
// 释放锁
lock.unlock();
}
}

add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 拷贝一个副本
// 每次都会拷贝一个副本, 并且长度就 + 1
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 末尾添加数据
newElements[len] = e;
// 设置新对象数组
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 旧数组
Object[] elements = getArray();
// 旧数组长度
int len = elements.length;
// 旧值
E oldValue = get(elements, index);
// 需要移动个数
int numMoved = len - index - 1;
// 刚好删除的是最后一个元素
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
// 复制两次
Object[] newElements = new Object[len - 1];
// 删除位置处
System.arraycopy(elements, 0, newElements, 0, index);
// 删除位置处 + 1, 跳过删除的值
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
// 设置新的数组
setArray(newElements);
}
// 返回旧值
return oldValue;
} finally {
lock.unlock();
}
}

subList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public List<E> subList(int fromIndex, int toIndex) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 开始下标和结束下标限制
if (fromIndex < 0 || toIndex > len || fromIndex > toIndex)
throw new IndexOutOfBoundsException();
// 返回一个记录下标的对象, 和ArrayList的SubList有点类似, 通过记录子数组的开始和结束位置
// 添加和删除的时候还是对原数组操作, 只不过会 rangeCheck 范围判定和checkForComodification对原数组是否改变的判定
return new COWSubList<E>(this, fromIndex, toIndex);
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

一个链表阻塞队列

new

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 容量
private final int capacity;

// 记录个数
private final AtomicInteger count = new AtomicInteger();

// 链表头
transient Node<E> head;

// 链表尾
private transient Node<E> last;

/** Lock held by take, poll, etc */
// 获取锁
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
// 非空等待Condition
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
// 放入锁
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
// 满了等待Condition
private final Condition notFull = putLock.newCondition();


public LinkedBlockingQueue() {
// 无参初始容量是Integer的最大值
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
// 不能小于1
if (capacity <= 0) throw new IllegalArgumentException();
// 容量大小
this.capacity = capacity;
// 初始化一个空Node, 并且head和last指向它
last = head = new Node<E>(null);
}

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 长度不为0
if (count.get() > 0) {
// 保存出队值, 先进先出
x = dequeue();
// 长度 - 1
c = count.getAndDecrement();
// 长度 > 1
if (c > 1)
// 唤醒其他消费
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 满了
if (c == capacity)
// 唤醒放入的
signalNotFull();
// 返回x
return x;
}

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void put(E e) throws InterruptedException {
// 不可以传入null值
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
// 放入锁, 放入和拿取是两把锁..
final ReentrantLock putLock = this.putLock;
// 长度
final AtomicInteger count = this.count;
// 可被中断锁
putLock.lockInterruptibly();
try {
// 容量满了
while (count.get() == capacity) {
// 进行等待
notFull.await();
}
// 入队
enqueue(node);
// 长度 + 1
c = count.getAndIncrement();
// 未满状态, 唤醒上面等待的
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 尽早唤醒消费线程
if (c == 0)
signalNotEmpty();
}

take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 长度为0, 等待
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 通知其他 take
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 让生产的线程尽早的唤醒
if (c == capacity)
signalNotFull();
return x;
}

offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 满的就直接返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 长度小于容量才继续入队
if (count.get() < capacity) {
enqueue(node);
// 增加长度
c = count.getAndIncrement();
// 队列还没有满, 唤醒生产线程继续添加
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 尽早通知消费线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}