hashmap为什么是线程不安全的?

hashmap线程不安全

源码分析

put方法(下面是JDK7中的put)

public V put(K key, V value) {
    if (key == null)
        return putForNullKey(value);      
    int hash = hash(key.hashCode());
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    } 
 
    //modCount++ 是一个复合操作
    modCount++;
 
    addEntry(hash, key, value, i);
    return null;
}

关注点放到modCount++上,很明显这不是一个原子操作,因此会引发线程问题!

扩容期间取出的值不准确

public class HashMapNotSafe {
 
    public static void main(String[] args) {
        final Map<Integer, String> map = new HashMap<>();
 
        final Integer targetKey = 0b1111_1111_1111_1111; // 65 535
        final String targetValue = "v";
        map.put(targetKey, targetValue);
 
        new Thread(() -> {
            IntStream.range(0, targetKey).forEach(key -> map.put(key, "someValue"));
        }).start();
 
        while (true) {
            if (null == map.get(targetKey)) {
                throw new RuntimeException("HashMap is not thread safe.");
            }
        }
    }
}

key取65535是为了扩容往回填充数据的时候,不要太快,以便去捕捉错误!

用一个新的线程,使用InStream从0,1,一直往里填充数据。线程启动后主线程用while循环取数据!

put碰撞导致数据丢失

多个线程同时put元素且key相同,发生碰撞最后丢失一个数据!

可见性无法保证

如果一个数据结构声称自己是线程安全的,那么它需要保证线程可见性!

那么线程1put新值,线程2在获取对应key时,无法保证可见性!

死循环CPU100%

主要发生在扩容中!
参考文章:

ConcurrentHashMap在Java7和Java8不同

Java7中的ConcurrentHashMap


内部进行Segment分段,Segment继承ReentrantLock,各个Segment之间独立。相比于对整个对象加锁,分段加锁效率更高!
每个Segment和HashMap类似(数据加链表),默认16个,所以最多支持16线程并发。这个16在初始化时指定,之后无法扩容!

Java8中的ConcurrentHashMap

完全重写了代码(代码量由1000行---->6000行)!

图中节点有3中类型:

  1. 链表拉伸
  2. 红黑树

红黑树

是一个平衡二叉查找树,效率高!其查找时间复杂度为O(logn)级别!
特点:

  • 节点要么是红要么是黑,但是根节点永远是黑色的
  • 红色节点不能连续,也就是说红色节点的父子都不能是红色
  • 从任一节点到其叶子节点的路径都包含相同数量的黑色节点

分析Java8中的ConcurrentHashMap重要源码

Node节点

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    // ...
}

每个Node里面是key-value形式,并且value用volatile修饰。同时node还有一个指向下一个节点的next指针,以形成链表!

put

核心是putVal方法。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) {
        throw new NullPointerException();
    }
    //计算 hash 值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K, V>[] tab = table; ; ) {
        Node<K, V> f;
        int n, i, fh;
        //如果数组是空的,就进行初始化
        if (tab == null || (n = tab.length) == 0) {
            tab = initTable();
        }
        // 找该 hash 值对应的数组下标
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果该位置是空的,就用 CAS 的方式放入新值
            if (casTabAt(tab, i, null,
                    new Node<K, V>(hash, key, value, null))) {
                break;
            }
        }
        //hash值等于 MOVED 代表在扩容
        else if ((fh = f.hash) == MOVED) {
            tab = helpTransfer(tab, f);
        }
        //槽点上是有值的情况
        else {
            V oldVal = null;
            //用 synchronized 锁住当前槽点,保证并发安全
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //如果是链表的形式
                    if (fh >= 0) {
                        binCount = 1;
                        //遍历链表
                        for (Node<K, V> e = f; ; ++binCount) {
                            K ek;
                            //如果发现该 key 已存在,就判断是否需要进行覆盖,然后返回
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent) {
                                    e.val = value;
                                }
                                break;
                            }
                            Node<K, V> pred = e;
                            //到了链表的尾部也没有发现该 key,说明之前不存在,就把新值添加到链表的最后
                            if ((e = e.next) == null) {
                                pred.next = new Node<K, V>(hash, key,
                                        value, null);
                                break;
                            }
                        }
                    }
                    //如果是红黑树的形式
                    else if (f instanceof TreeBin) {
                        Node<K, V> p;
                        binCount = 2;
                        //调用 putTreeVal 方法往红黑树里增加数据
                        if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
                                value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent) {
                                p.val = value;
                            }
                        }
                    }
                }
            }
            if (binCount != 0) {
                //检查是否满足条件并把链表转换为红黑树的形式,默认的 TREEIFY_THRESHOLD 阈值是 8
                if (binCount >= TREEIFY_THRESHOLD) {
                    treeifyBin(tab, i);
                }
                //putVal 的返回是添加前的旧值,所以返回 oldVal
                if (oldVal != null) {
                    return oldVal;
                }
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

putVal逐步根据当前的槽点是未初始化、空、扩容、链表、红黑树等情况做不同的处理。

get

此方法比较简单!

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //计算 hash 值
    int h = spread(key.hashCode());
    //如果整个数组是空的,或者当前槽点的数据是空的,说明 key 对应的 value 不存在,直接返回 null
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
        //判断头结点是否就是我们需要的节点,如果是则直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //如果头结点 hash 值小于 0,说明是红黑树或者正在扩容,就用对应的 find 方法来查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //遍历链表来查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get的过程:

  1. 计算hash值,并由此值找到槽点
  2. 数组为空或槽点为空,返回null
  3. 槽点位置即所求,返回
  4. 红黑树或者正在扩容,用find方法查找
  5. 链表,遍历

Java7和Java8的异同和优点

数据结构

J7使用Segment分段锁实现,J8中的ConcurrentMap和HashMap一样使用数组+链表+红黑树

并发度

J7每个Segment独立加锁,最大并发是Segment个数,默认16
J8,理想情况下table数组就是并发最大个数

并发原理

J7采用分段锁,
J8采用Node+CAS+synchronized

Hash碰撞

和HashMap类似

时间复杂度

J7是O(n)
J8如果遍历红黑树,则为O(logn)

为什么Map桶中元素超过8个会转为红黑树?

链表长度大于8时会转化为红黑树,当然还得满足容量大于MIN_TREEIFY_CAPACITY(64),红黑树恢复到链表条件是减小到6个!

链表查找复杂度是O(n)
红黑树是O(logn)
当n很小时,两者差不多。但是长了就区别很明显!
但是为什么一开始不就是红黑树呢?


看注释:

Because TreeNodes are about twice the size of regular nodes,
use them only when bins contain enough nodes to warrant use
(see TREEIFY_THRESHOLD). And when they become too small (due 
removal or resizing) they are converted back to plain bins.

翻译:单个TreeNode需要占用的空间大约是普通Node的2倍,所以只有当包含足够多的nodes时才会>转为TreeNode,这个值就是TREEIFY_THRESHOLD。


值时8的原因:


 In usages with well-distributed user hashCodes, tree bins 
 are rarely used.  Ideally, under random hashCodes, the 
 frequency of nodes in bins follows a Poisson distribution 
 (http://en.wikipedia.org/wiki/Poisson_distribution) with a 
 parameter of about 0.5 on average for the default resizing 
 threshold of 0.75, although with a large variance because 
 of resizing granularity. Ignoring variance, the expected 
 occurrences of list size k are (exp(-0.5) * pow(0.5, k) / 
 factorial(k)). The first values are:
 
 0:    0.60653066
 1:    0.30326533
 2:    0.07581633
 3:    0.01263606
 4:    0.00157952
 5:    0.00015795
 6:    0.00001316
 7:    0.00000094
 8:    0.00000006
 more: less than 1 in ten million

翻译:如果hashCode分布良好情况下,也就四hash计算离散好,那么红黑树就很少被用到,也很 少出现链表很长。在理想情况下,链表长度符合泊松分布。当长度为8时,长度命中率仅为 0.000000006。Map通常不会存储这么多数据,即一般不会发生链表转红黑树!


当然,我们可以故意将hash算法变得不太友好:

@Override
public int hashCode() {
    return 1;
}

此时hash冲突变高,链表会被拉伸。

public class HashMapDemo {
 
    public static void main(String[] args) {
        HashMap map = new HashMap<HashMapDemo,Integer>(1);
        for (int i = 0; i < 1000; i++) {
            HashMapDemo hashMapDemo1 = new HashMapDemo();
            map.put(hashMapDemo1, null);
        }
        System.out.println("运行结束");
    }
 
    @Override
    public int hashCode() {
        return 1;
    }
}

把断点打在sout的位置,就可以看到数据结构为TreeNode

Map内部出现红黑树的结构,说明hash算法出了问题

ConcurrentHashMap和Hashtable的区别

同样是线程安全,两者有何区别?

出现的版本不同

Hashtable早在JDK1.0就出现了,ConcurrentHashMap在JDK1.5之后才出现!

实现线程安全的方式不同

hashtable使用了synchronized关键字!
以clear()为例:

public synchronized void clear() {
    Entry<?,?> tab[] = table;
    modCount++;
    for (int index = tab.length; --index >= 0; )
        tab[index] = null;
    count = 0;
}

所有方法都是被synchronized修饰的!

性能

当线程数量增加时,hashtable性能急剧下降!

迭代时修改的不同

hashtable和hashmap在迭代时不允许修改,否则抛出ConcurrentModificationException,其原理是检测modCount变量,迭代器的next()方法如下:

public T next() {
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
    return nextElement();
}

其中expectedModCount迭代器生成的时候生成的,不会改变!而每一次调用add、remove、rehash等方法都会修改ModCount,这样一来导致next检查不通过,抛出异常!

CopyOnWriteArrayList

在CopyOnWriteArrayList出现之前就有了ArrayList和LinkedLish,还有线程安全的Vector和Collections.synchronizedList()。所以首先看一下vector的size和get方法!

public synchronized int size() {
    return elementCount;
}
public synchronized E get(int index) {
    if (index >= elementCount)
        throw new ArrayIndexOutOfBoundsException(index);

    return elementData(index);
}

这是使用synchronized关键字来保证线程安全!并发量高时效率极低!

并且:前面这些Lish在迭代期间不允许编辑!
在JDK1.5之后并发包提供了使用CopyOnWrite机制实现的并发容器:

  • CopyOnWriteArrayList,作为主要的并发list
  • CopyOnWriteArraySet,底层还是CopyOnWriteArrayList

使用场景

  • 读操作可以尽可能快,而写慢一些没有关系
    比如:系统级别的信息,往往只需要加载,但是会被系统所有模块频繁访问。
  • 读多写少
    比如:黑名单----搜索引擎,搜索框中,输入关键字某些不允许被搜索!这些关键字放在一个黑名单中,此名单无需实时更新!

读写规则

  • 读写锁的规则
  • 对读写锁规则的升级:读取完全不用加锁,写入也不会阻塞读取操作。只有写入和写入之间才需要进行同步,也就是不允许多个写入同时发生!

特点

  • CopyOnWrite的含义
    当容器需要被修改时,先将容器Copy,完成修改之后,将原容器的引用指向新的容器
  • 迭代期间允许修改集合内容
    迭代器使用的还是原数组(可能是过期数据)
/**
* 描述: 演示CopyOnWriteArrayList迭代期间可以修改集合的内容
*/
public class CopyOnWriteArrayListDemo {
 
    public static void main(String[] args) {
 
        CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(new Integer[]{1, 2, 3});
 
        System.out.println(list); //[1, 2, 3]
 
        //Get iterator 1
        Iterator<Integer> itr1 = list.iterator();
 
        //Add one element and verify list is updated
        list.add(4);
 
        System.out.println(list); //[1, 2, 3, 4]
 
        //Get iterator 2
        Iterator<Integer> itr2 = list.iterator();
 
        System.out.println("====Verify Iterator 1 content====");
 
        itr1.forEachRemaining(System.out::println); //1,2,3
 
        System.out.println("====Verify Iterator 2 content====");
 
        itr2.forEachRemaining(System.out::println); //1,2,3,4
 
    }
 
}

迭代器创建后,修改对象中的元素,迭代器不会显示变更!

缺点

  • 内存
    由于复制机制,内存中会同时驻扎两个对象的内存。
  • 元素较多或者复杂情况下,复制开销大
  • 数据一致性

源码分析

数据结构

/** 可重入锁对象 */
final transient ReentrantLock lock = new ReentrantLock();
 
/** CopyOnWriteArrayList底层由数组实现,volatile修饰,保证数组的可见性 */
private transient volatile Object[] array;
 
/**
 * 得到数组
*/
final Object[] getArray() {
    return array;
}
 
/**
 * 设置数组
*/
final void setArray(Object[] a) {
    array = a;
}
 
/**
 * 初始化CopyOnWriteArrayList相当于初始化数组
*/
public CopyOnWriteArrayList() {
    setArray(new Object[0]);
}
  • 有一个ReentrantLock,保证修改线程安全
  • array数组用volatile修饰,保证可见性

add

public boolean add(E e) {
 
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
 
        // 得到原数组的长度和元素
        Object[] elements = getArray();
        int len = elements.length;
 
        // 复制出一个新数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
 
        // 添加时,将新元素添加到新数组中
        newElements[len] = e;
 
        // 将volatile Object[] array 的指向替换成新数组
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

添加操作时需要获取到锁,同时复制新的数组。最后指向新数组!

get

有两个重载

public E get(int index) {
    return get(getArray(), index);
}
final Object[] getArray() {
    return array;
}
private E get(Object[] a, int index) {
    return (E) a[index];
}

get没有加锁!

迭代器COWIterator类

2个重要属性,Object[] snapshot(数组的快照) 和int cursor(迭代器的游标)!

private COWIterator(Object[] elements, int initialCursor) {
    cursor = initialCursor;
    snapshot = elements;
}

之后的操作都是基于snapshot的,比如

public E next() {
    if (! hasNext())
        throw new NoSuchElementException();
    return (E) snapshot[cursor++];
}
Last modification:April 16th, 2020 at 01:27 pm