Java 并发之容器
同步容器
同步容器简介
在 Java 中,同步容器主要包括 2 类:
Vector
、Stack
、Hashtable
Vector
- Vector
实现了 List
接口。Vector
实际上就是一个数组,和 ArrayList
类似。但是 Vector
中的方法都是 synchronized
方法,即进行了同步措施。
Stack
- Stack
也是一个同步容器,它的方法也用 synchronized
进行了同步,它实际上是继承于 Vector
类。
Hashtable
- Hashtable
实现了 Map
接口,它和 HashMap
很相似,但是 Hashtable
进行了同步处理,而 HashMap
没有。
Collections
类中提供的静态工厂方法创建的类(由 Collections.synchronizedXXX
等方法)
同步容器的问题
同步容器的同步原理就是在其 get
、set
、size
等主要方法上用 synchronized
修饰。 synchronized
可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块。
想详细了解 synchronized
用法和原理可以参考:Java 并发核心机制
性能问题
synchronized
的互斥同步会产生阻塞和唤醒线程的开销。显然,这种方式比没有使用 synchronized
的容器性能要差很多。
注:尤其是在 Java 1.6 没有对 synchronized
进行优化前,阻塞开销很高。
安全问题
同步容器真的绝对安全吗?
其实也未必。在做复合操作(非原子操作)时,仍然需要加锁来保护。常见复合操作如下:
- 迭代:反复访问元素,直到遍历完全部元素;
- 跳转:根据指定顺序寻找当前元素的下一个(下 n 个)元素;
- 条件运算:例如若没有则添加等;
❌ 不安全的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class VectorDemo {
static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) { while (true) { vector.clear();
for (int i = 0; i < 10; i++) { vector.add(i); }
Thread thread1 = new Thread() { @Override public void run() { for (int i = 0; i < vector.size(); i++) { vector.remove(i); } } };
Thread thread2 = new Thread() { @Override public void run() { for (int i = 0; i < vector.size(); i++) { vector.get(i); } } };
thread1.start(); thread2.start();
while (Thread.activeCount() > 10) { System.out.println("同时存在 10 个以上线程,退出"); return; } } }
}
|
以上程序执行时可能会出现数组越界错误。
Vector
是线程安全的,那为什么还会报这个错?
这是因为,对于 Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:
当某个线程在某个时刻执行这句时:
1 2
| for(int i=0;i<vector.size();i++) vector.get(i);
|
假若此时 vector 的 size 方法返回的是 10,i 的值为 9
然后另外一个线程执行了这句:
1 2
| for(int i=0;i<vector.size();i++) vector.remove(i);
|
将下标为 9 的元素删除了。
那么通过 get 方法访问下标为 9 的元素肯定就会出问题了。
✔️️️ 安全示例
因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class VectorDemo2 {
static Vector<Integer> vector = new Vector<Integer>();
public static void main(String[] args) { while (true) { for (int i = 0; i < 10; i++) { vector.add(i); }
Thread thread1 = new Thread() { @Override public void run() { synchronized (VectorDemo2.class) { for (int i = 0; i < vector.size(); i++) { vector.remove(i); } } } };
Thread thread2 = new Thread() { @Override public void run() { synchronized (VectorDemo2.class) { for (int i = 0; i < vector.size(); i++) { vector.get(i); } } } };
thread1.start(); thread2.start();
while (Thread.activeCount() > 10) { System.out.println("同时存在 10 个以上线程,退出"); return; } } }
}
|
ConcurrentModificationException
异常
在对 Vector
等容器并发地进行迭代修改时,会报 ConcurrentModificationException
异常,关于这个异常将会在后续文章中讲述。
但是在并发容器中不会出现这个问题。
并发容器简介
同步容器将所有对容器状态的访问都串行化,以保证线程安全性,这种策略会严重降低并发性。
Java 1.5 后提供了多种并发容器,使用并发容器来替代同步容器,可以极大地提高伸缩性并降低风险。
J.U.C 包中提供了几个非常有用的并发容器作为线程安全的容器:
并发容器 |
对应的普通容器 |
描述 |
ConcurrentHashMap |
HashMap |
Java 1.8 之前采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性;Java 1.8 之后基于 CAS 实现。 |
ConcurrentSkipListMap |
SortedMap |
基于跳表实现的 |
CopyOnWriteArrayList |
ArrayList |
|
CopyOnWriteArraySet |
Set |
基于 CopyOnWriteArrayList 实现。 |
ConcurrentSkipListSet |
SortedSet |
基于 ConcurrentSkipListMap 实现。 |
ConcurrentLinkedQueue |
Queue |
线程安全的无界队列。底层采用单链表。支持 FIFO。 |
ConcurrentLinkedDeque |
Deque |
线程安全的无界双端队列。底层采用双向链表。支持 FIFO 和 FILO。 |
ArrayBlockingQueue |
Queue |
数组实现的阻塞队列。 |
LinkedBlockingQueue |
Queue |
链表实现的阻塞队列。 |
LinkedBlockingDeque |
Deque |
双向链表实现的双端阻塞队列。 |
J.U.C 包中提供的并发容器命名一般分为三类:
Concurrent
- 这类型的锁竞争相对于
CopyOnWrite
要高一些,但写操作代价要小一些。
- 此外,
Concurrent
往往提供了较低的遍历一致性,即:当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历。代价就是,在获取容器大小 size()
,容器是否为空等方法,不一定完全精确,但这是为了获取并发吞吐量的设计取舍,可以理解。与之相比,如果是使用同步容器,就会出现 fail-fast
问题,即:检测到容器在遍历过程中发生了修改,则抛出 ConcurrentModificationException
,不再继续遍历。
CopyOnWrite
- 一个线程写,多个线程读。读操作时不加锁,写操作时通过在副本上加锁保证并发安全,空间开销较大。
Blocking
- 内部实现一般是基于锁,提供阻塞队列的能力。
:x: 错误示例,产生 ConcurrentModificationException
异常:
1 2 3
| public void removeKeys(Map<String, Object> map, final String... keys) { map.keySet().removeIf(key -> ArrayUtil.contains(keys, key)); }
|
:x: 错误示例,产生 ConcurrentModificationException
异常:
1 2 3 4 5 6
| public static <K, V> Map<K, V> removeKeys(Map<String, Object> map, final String... keys) { for (K key : keys) { map.remove(key); } return map; }
|
并发场景下的 Map
如果对数据有强一致要求,则需使用 Hashtable
;在大部分场景通常都是弱一致性的情况下,使用 ConcurrentHashMap
即可;如果数据量在千万级别,且存在大量增删改操作,则可以考虑使用 ConcurrentSkipListMap
。
并发场景下的 List
读多写少用 CopyOnWriteArrayList
。
写多读少用 ConcurrentLinkedQueue
,但由于是无界的,要有容量限制,避免无限膨胀,导致内存溢出。
Map
Map 接口的两个实现是 ConcurrentHashMap 和 ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。所以如果你需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。
使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key 和 value 都不能为空,否则会抛出NullPointerException
这个运行时异常。
ConcurrentHashMap
ConcurrentHashMap
是线程安全的 HashMap
,用于替代 Hashtable
。
ConcurrentHashMap
的特性
ConcurrentHashMap
实现了
ConcurrentMap
接口,而 ConcurrentMap
接口扩展了 Map
接口。
1 2 3 4
| public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { }
|
ConcurrentHashMap
的实现包含了 HashMap
所有的基本特性,如:数据结构、读写策略等。
ConcurrentHashMap
没有实现对 Map
加锁以提供独占访问。因此无法通过在客户端加锁的方式来创建新的原子操作。但是,一些常见的复合操作,如:“若没有则添加”、“若相等则移除”、“若相等则替换”,都已经实现为原子操作,并且是围绕 ConcurrentMap
的扩展接口而实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public interface ConcurrentMap<K, V> extends Map<K, V> {
V putIfAbsent(K key, V value);
boolean remove(Object key, Object value);
boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value); }
|
不同于 Hashtable
,ConcurrentHashMap
提供的迭代器不会抛出 ConcurrentModificationException
,因此不需要在迭代过程中对容器加锁。
:bell: 注意:一些需要对整个 Map
进行计算的方法,如 size
和 isEmpty
,由于返回的结果在计算时可能已经过期,所以并非实时的精确值。这是一种策略上的权衡,在并发环境下,这类方法由于总在不断变化,所以获取其实时精确值的意义不大。ConcurrentHashMap
弱化这类方法,以换取更重要操作(如:get
、put
、containesKey
、remove
等)的性能。
ConcurrentHashMap 的用法
示例:不会出现 ConcurrentModificationException
ConcurrentHashMap
的基本操作与 HashMap
的用法基本一样。不同于 HashMap
、Hashtable
,ConcurrentHashMap
提供的迭代器不会抛出 ConcurrentModificationException
,因此不需要在迭代过程中对容器加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
Map<Integer, Character> map = new ConcurrentHashMap<>();
Thread wthread = new Thread(() -> { System.out.println("写操作线程开始执行"); for (int i = 0; i < 26; i++) { map.put(i, (char) ('a' + i)); } }); Thread rthread = new Thread(() -> { System.out.println("读操作线程开始执行"); for (Integer key : map.keySet()) { System.out.println(key + " - " + map.get(key)); } }); wthread.start(); rthread.start(); Thread.sleep(1000); } }
|
ConcurrentHashMap 的原理
ConcurrentHashMap
一直在演进,尤其在 Java 1.7 和 Java 1.8,其数据结构和并发机制有很大的差异。
- Java 1.7
- 数据结构:数组+单链表
- 并发机制:采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性。
- Java 1.8
- 数据结构:数组+单链表+红黑树
- 并发机制:取消分段锁,之后基于 CAS + synchronized 实现。
Java 1.7 的实现
分段锁,是将内部进行分段(Segment),里面是 HashEntry
数组,和 HashMap
类似,哈希相同的条目也是以链表形式存放。
HashEntry
内部使用 volatile
的 value
字段来保证可见性,也利用了不可变对象的机制,以改进利用 Unsafe
提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe
中的很多操作都是 JVM intrinsic 优化过的。

在进行并发写操作时,ConcurrentHashMap
会获取可重入锁(ReentrantLock
),以保证数据一致性。所以,在并发修改期间,相应 Segment
是被锁定的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
final Segment<K,V>[] segments;
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; }
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; } }
|
Java 1.8 的实现
- 数据结构改进:与 HashMap 一样,将原先 数组+单链表 的数据结构,变更为 数组+单链表+红黑树 的结构。当出现哈希冲突时,数据会存入数组指定桶的单链表,当链表长度达到 8,则将其转换为红黑树结构,这样其查询的时间复杂度可以降低到 $$O(logN)$$,以改进性能。
- 并发机制改进:
- 取消 segments 字段,直接采用
transient volatile HashEntry<K,V>[] table
保存数据,采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
- 使用 CAS +
sychronized
操作,在特定场景进行无锁并发操作。使用 Unsafe、LongAdder 之类底层手段,进行极端情况的优化。现代 JDK 中,synchronized 已经被不断优化,可以不再过分担心性能差异,另外,相比于 ReentrantLock,它可以减少内存消耗,这是个非常大的优势。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
|
ConcurrentHashMap 的实战
示例摘自:极客时间教程 - Java 业务开发常见错误 100 例
ConcurrentHashMap 错误示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| private static int THREAD_COUNT = 10;
private static int ITEM_COUNT = 1000;
public static void main(String[] args) throws InterruptedException { ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100); System.out.println("init size:" + concurrentHashMap.size()); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> { int gap = ITEM_COUNT - concurrentHashMap.size(); System.out.println("gap size:" + gap); concurrentHashMap.putAll(getData(gap)); })); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); System.out.println("finish size:" + concurrentHashMap.size()); }
private static ConcurrentHashMap<String, Long> getData(int count) { return LongStream.rangeClosed(1, count) .boxed() .collect( Collectors.toConcurrentMap( i -> UUID.randomUUID().toString(), i -> i, (o1, o2) -> o1, ConcurrentHashMap::new)); }
|
初始大小 900 符合预期,还需要填充 100 个元素。
预期结果为 1000 个元素,实际大于 1000 个元素。
【分析】
ConcurrentHashMap 对外提供的方法或能力的限制:
- 使用了 ConcurrentHashMap,不代表对它的多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保需要手动加锁。
- 诸如 size、isEmpty 和 containsValue 等聚合方法,在并发情况下可能会反映 ConcurrentHashMap 的中间状态。因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。显然,利用 size 方法计算差异值,是一个流程控制。
- 诸如 putAll 这样的聚合方法也不能确保原子性,在 putAll 的过程中去获取数据可能会获取到部分数据。
ConcurrentHashMap 错误示例修正 1.0 版
通过 synchronized 加锁,当然可以保证数据一致性,但是牺牲了 ConcurrentHashMap 的性能,没哟真正发挥出 ConcurrentHashMap 的特性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
private static int THREAD_COUNT = 10;
private static int ITEM_COUNT = 1000;
public static void main(String[] args) throws InterruptedException { ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100); System.out.println("init size:" + concurrentHashMap.size()); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> { synchronized (concurrentHashMap) { int gap = ITEM_COUNT - concurrentHashMap.size(); System.out.println("gap size:" + gap); concurrentHashMap.putAll(getData(gap)); } })); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); System.out.println("finish size:" + concurrentHashMap.size()); }
private static ConcurrentHashMap<String, Long> getData(int count) { return LongStream.rangeClosed(1, count) .boxed() .collect( Collectors.toConcurrentMap( i -> UUID.randomUUID().toString(), i -> i, (o1, o2) -> o1, ConcurrentHashMap::new)); }
|
ConcurrentHashMap 错误示例修正 2.0 版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
private static int LOOP_COUNT = 10000000;
private static int THREAD_COUNT = 10;
private static int ITEM_COUNT = 1000;
public static void main(String[] args) throws InterruptedException { StopWatch stopWatch = new StopWatch(); stopWatch.start("normaluse"); Map<String, Long> normaluse = normaluse(); stopWatch.stop(); Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error"); Assert.isTrue(normaluse.values().stream() .mapToLong(aLong -> aLong).reduce(0, Long::sum) == LOOP_COUNT , "normaluse count error"); stopWatch.start("gooduse"); Map<String, Long> gooduse = gooduse(); stopWatch.stop(); Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error"); Assert.isTrue(gooduse.values().stream() .mapToLong(l -> l) .reduce(0, Long::sum) == LOOP_COUNT , "gooduse count error"); System.out.println(stopWatch.prettyPrint()); }
private static Map<String, Long> normaluse() throws InterruptedException { ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT); synchronized (freqs) { if (freqs.containsKey(key)) { freqs.put(key, freqs.get(key) + 1); } else { freqs.put(key, 1L); } } } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); return freqs; }
private static Map<String, Long> gooduse() throws InterruptedException { ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT); freqs.computeIfAbsent(key, k -> new LongAdder()).increment(); } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); return freqs.entrySet().stream() .collect(Collectors.toMap( e -> e.getKey(), e -> e.getValue().longValue()) ); }
|
List
CopyOnWriteArrayList
CopyOnWriteArrayList
是线程安全的 ArrayList
。CopyOnWrite
字面意思为写的时候会将共享变量新复制一份出来。复制的好处在于读操作是无锁的(也就是无阻塞)。
CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会非常糟糕。
CopyOnWriteArrayList 原理
CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。

- lock - 执行写时复制操作,需要使用可重入锁加锁
- array - 对象数组,用于存放元素
1 2 3 4 5
| final transient ReentrantLock lock = new ReentrantLock();
private transient volatile Object[] array;
|

(1)读操作
在 CopyOnWriteAarrayList
中,读操作不同步,因为它们在内部数组的快照上工作,所以多个迭代器可以同时遍历而不会相互阻塞(图 1,2,4)。
CopyOnWriteArrayList 的读操作是不用加锁的,性能很高。
1 2 3 4 5 6
| public E get(int index) { return get(getArray(), index); } private E get(Object[] a, int index) { return (E) a[index]; }
|
(2)写操作
所有的写操作都是同步的。他们在备份数组(图 3)的副本上工作。写操作完成后,后备阵列将被替换为复制的阵列,并释放锁定。支持数组变得易变,所以替换数组的调用是原子(图 5)。
写操作后创建的迭代器将能够看到修改的结构(图 6,7)。
写时复制集合返回的迭代器不会抛出 ConcurrentModificationException
,因为它们在数组的快照上工作,并且无论后续的修改(2,4)如何,都会像迭代器创建时那样完全返回元素。
添加操作 - 添加的逻辑很简单,先将原容器 copy 一份,然后在新副本上执行写操作,之后再切换引用。当然此过程是要加锁的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
|
删除操作 - 删除操作同理,将除要删除元素之外的其他元素拷贝到新副本中,然后切换引用,将原容器引用指向新副本。同属写操作,需要加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public E remove(int index) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) setArray(Arrays.copyOf(elements, len - 1)); else { Object[] newElements = new Object[len - 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { lock.unlock(); } }
|
CopyOnWriteArrayList 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class CopyOnWriteArrayListDemo {
static class ReadTask implements Runnable {
List<String> list;
ReadTask(List<String> list) { this.list = list; }
public void run() { for (String str : list) { System.out.println(str); } } }
static class WriteTask implements Runnable {
List<String> list; int index;
WriteTask(List<String> list, int index) { this.list = list; this.index = index; }
public void run() { list.remove(index); list.add(index, "write_" + index); } }
public void run() { final int NUM = 10; CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); for (int i = 0; i < NUM; i++) { list.add("main_" + i); } ExecutorService executorService = Executors.newFixedThreadPool(NUM); for (int i = 0; i < NUM; i++) { executorService.execute(new ReadTask(list)); executorService.execute(new WriteTask(list, i)); } executorService.shutdown(); }
public static void main(String[] args) { new CopyOnWriteArrayListDemo().run(); } }
|
CopyOnWriteArrayList 实战
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| @Slf4j public class WrongCopyOnWriteList {
public static void main(String[] args) { testRead(); testWrite(); }
public static Map testWrite() { List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>()); StopWatch stopWatch = new StopWatch(); int loopCount = 100000; stopWatch.start("Write:copyOnWriteArrayList"); IntStream.rangeClosed(1, loopCount) .parallel() .forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount))); stopWatch.stop(); stopWatch.start("Write:synchronizedList"); IntStream.rangeClosed(1, loopCount) .parallel() .forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount))); stopWatch.stop(); log.info(stopWatch.prettyPrint()); Map result = new HashMap(); result.put("copyOnWriteArrayList", copyOnWriteArrayList.size()); result.put("synchronizedList", synchronizedList.size()); return result; }
private static void addAll(List<Integer> list) { list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList())); }
public static Map testRead() { List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>()); addAll(copyOnWriteArrayList); addAll(synchronizedList); StopWatch stopWatch = new StopWatch(); int loopCount = 1000000; int count = copyOnWriteArrayList.size(); stopWatch.start("Read:copyOnWriteArrayList"); IntStream.rangeClosed(1, loopCount) .parallel() .forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count))); stopWatch.stop(); stopWatch.start("Read:synchronizedList"); IntStream.range(0, loopCount) .parallel() .forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count))); stopWatch.stop(); log.info(stopWatch.prettyPrint()); Map result = new HashMap(); result.put("copyOnWriteArrayList", copyOnWriteArrayList.size()); result.put("synchronizedList", synchronizedList.size()); return result; }
}
|
读性能差不多是写性能的一百倍。
Set
Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的。
Queue
Java 并发包里面 Queue 这类并发容器是最复杂的,你可以从以下两个维度来分类。一个维度是阻塞与非阻塞,所谓阻塞指的是:当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识。
BlockingQueue
BlockingQueue
顾名思义,是一个阻塞队列。**BlockingQueue
基本都是基于锁实现。在 BlockingQueue
中,当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞**。
BlockingQueue
接口定义如下:
1
| public interface BlockingQueue<E> extends Queue<E> {}
|
核心 API:
1 2 3 4
| E take() throws InterruptedException;
void put(E e) throws InterruptedException;
|
BlockingQueue
对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:
- 抛出异常;
- 返回特殊值(
null
或 true
/false
,取决于具体的操作);
- 阻塞等待此操作,直到这个操作成功;
- 阻塞等待此操作,直到成功或者超时指定时间。
总结如下:
|
Throws exception |
Special value |
Blocks |
Times out |
Insert |
add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
Remove |
remove() |
poll() |
take() |
poll(time, unit) |
Examine |
element() |
peek() |
not applicable |
not applicable |
BlockingQueue
的各个实现类都遵循了这些规则。
BlockingQueue
不接受 null
值元素。
JDK 提供了以下阻塞队列:
ArrayBlockingQueue
- 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue
- 一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue
- 一个支持优先级排序的无界阻塞队列。
SynchronousQueue
- 一个不存储元素的阻塞队列。
DelayQueue
- 一个使用优先级队列实现的无界阻塞队列。
LinkedTransferQueue
- 一个由链表结构组成的无界阻塞队列。
BlockingQueue
基本都是基于锁实现。
PriorityBlockingQueue 类
PriorityBlockingQueue
类定义如下:
1 2
| public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {}
|
PriorityBlockingQueue 要点
PriorityBlockingQueue
可以视为 PriorityQueue
的线程安全版本。
PriorityBlockingQueue
实现了 BlockingQueue
,也是一个阻塞队列。
PriorityBlockingQueue
实现了 Serializable
,支持序列化。
PriorityBlockingQueue
不接受 null
值元素。
PriorityBlockingQueue
的插入操作 put
方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
PriorityBlockingQueue 原理
PriorityBlockingQueue
有两个重要成员:
1 2
| private transient Object[] queue; private final ReentrantLock lock;
|
queue
是一个 Object
数组,用于保存 PriorityBlockingQueue
的元素。
- 而可重入锁
lock
则用于在执行插入、删除操作时,保证这个方法在当前线程释放锁之前,其他线程不能访问。
PriorityBlockingQueue
的容量虽然有初始化大小,但是不限制大小,如果当前容量已满,插入新元素时会自动扩容。
ArrayBlockingQueue 类
ArrayBlockingQueue
是由数组结构组成的有界阻塞队列。
ArrayBlockingQueue 要点
ArrayBlockingQueue
类定义如下:
1 2 3 4 5 6 7
| public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public ArrayBlockingQueue(int capacity) { public ArrayBlockingQueue(int capacity, boolean fair) { public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { }
|
说明:
ArrayBlockingQueue
实现了 BlockingQueue
,也是一个阻塞队列。
ArrayBlockingQueue
实现了 Serializable
,支持序列化。
ArrayBlockingQueue
是基于数组实现的有界阻塞队列。所以初始化时必须指定容量。
ArrayBlockingQueue 原理
ArrayBlockingQueue
的重要成员如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
|
ArrayBlockingQueue
内部以 final
的数组保存数据,数组的大小就决定了队列的边界。
ArrayBlockingQueue
实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。
- 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
- 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除,然后唤醒写线程队列的第一个等待线程。
对于 ArrayBlockingQueue
,我们可以在构造的时候指定以下三个参数:
- 队列容量,其限制了队列中最多允许的元素个数;
- 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
- 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。
LinkedBlockingQueue 类
LinkedBlockingQueue
是由链表结构组成的有界阻塞队列。容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为 Integer.MAX_VALUE
,成为了无界队列。
LinkedBlockingQueue 要点
LinkedBlockingQueue
类定义如下:
1 2
| public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {}
|
LinkedBlockingQueue
实现了 BlockingQueue
,也是一个阻塞队列。
LinkedBlockingQueue
实现了 Serializable
,支持序列化。
LinkedBlockingQueue
是基于单链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
LinkedBlockingQueue
中元素按照插入顺序保存(FIFO)。
LinkedBlockingQueue 原理
LinkedBlockingQueue
中的重要数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
|
参考资料