首頁(yè)技術(shù)文章正文

Java培訓(xùn):高階源碼分析-ConcurrentHashMap

更新時(shí)間:2022-08-26 來源:黑馬程序員 瀏覽量:

  一、文章導(dǎo)讀:

  這部分內(nèi)容讓大家讀懂ConcurrentHashMap源碼的底層實(shí)現(xiàn)從而在工作中合理去使用他并且在面試中能做到游刃有余,主要內(nèi)容如下:

  * 核心構(gòu)造方法

  * 核心成員變量

  * put方法過程數(shù)據(jù)的完整過程

  * get方法的無鎖實(shí)現(xiàn)

  二、文章講解內(nèi)容

  JDK8中ConcurrentHashMap的結(jié)構(gòu)是:數(shù)組+鏈表+紅黑樹。

   因?yàn)樵趆ash沖突嚴(yán)重的情況下,鏈表的查詢效率是O(n),所以jdk8中改成了單個(gè)鏈表的個(gè)數(shù)大于8時(shí),數(shù)組長(zhǎng)度小于64就擴(kuò)容,數(shù)組長(zhǎng)度大于等于64,則鏈表會(huì)轉(zhuǎn)換為紅黑樹,這樣以空間換時(shí)間,查詢效率會(huì)變O(nlogn)。

   紅黑樹在Node數(shù)組內(nèi)部存儲(chǔ)的不是一個(gè)TreeNode對(duì)象,而是一個(gè)TreeBin對(duì)象,TreeBin內(nèi)部維持著一個(gè)紅黑樹。

   在JDK8中ConcurrentHashMap最經(jīng)點(diǎn)的實(shí)現(xiàn)是使用CAS+synchronized+volatile 來保證并發(fā)安全。

  一、JDK7中ConcurrentHashMap的實(shí)現(xiàn)

   在JDK1.7中ConcurrentHashMap是通過定義多個(gè)Segment來實(shí)現(xiàn)的分段加鎖,一個(gè)Segment對(duì)應(yīng)一個(gè)數(shù)組,如果多線程對(duì)同一個(gè)數(shù)組中的元素進(jìn)行添加那么多個(gè)線程會(huì)去競(jìng)爭(zhēng)同一把鎖,他鎖的是一個(gè)數(shù)組,有多少個(gè)segment那么就有多少把鎖,這個(gè)力度還是比較粗的。

1661477207247_1.jpg

   JDK8的實(shí)現(xiàn)是下文要重點(diǎn)探討的內(nèi)容,同時(shí)下文全部都是JDK8的實(shí)現(xiàn)。

  二、核心構(gòu)造方法

/**
     * Creates a new, empty map with the default initial table size (16).
        無參構(gòu)造函數(shù),new一個(gè)默認(rèn)table容量為16的ConcurrentHashMap
     */
    public ConcurrentHashMap() {
    }
/**
     * Creates a new, empty map with an initial table size
     * accommodating the specified number of elements without the need
     * to dynamically resize.
     *
     * @param initialCapacity The implementation performs internal
     * sizing to accommodate this many elements.
     * @throws IllegalArgumentException if the initial capacity of
     * elements is negative
     做了3件事:
     1.如果入?yún)?lt;0,拋出異常。
     2.如果入?yún)⒋笥谧畲笕萘?,則使用最大容量(是2的30次方)
     3.tableSizeFor方法得到一個(gè)大于負(fù)載因子入?yún)⑶易罱咏?的N次方的數(shù)作為容量
     4.設(shè)置sizeCtl的值等于初始化容量。未對(duì)table進(jìn)行初始化,table的初始化要在第一次put的時(shí)候進(jìn)行。
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

 
    特別注意:未對(duì)table進(jìn)行初始化,table的初始化要在第一次put的時(shí)候進(jìn)行。

  三、核心成員變量

// 該字段控制table(也被稱作hash桶數(shù)組)的初始化和擴(kuò)容
private transient volatile int sizeCtl;
// table最大容量是2的30次方
private static final int MAXIMUM_CAPACITY = 1 << 30;
// table的默認(rèn)初始化容量-擴(kuò)容總是2的n次方
private static final int DEFAULT_CAPACITY = 16;
// table的負(fù)載因子。當(dāng)前已使用容量 >= 負(fù)載因子*總?cè)萘康臅r(shí)候,進(jìn)行resize擴(kuò)容
private static final float LOAD_FACTOR = 0.75f;
// 存儲(chǔ)數(shù)據(jù)的數(shù)組-注意添加了volatile
transient volatile Node<K,V>[] table;
// 當(dāng)桶內(nèi)鏈表長(zhǎng)度>=8時(shí),會(huì)將鏈表轉(zhuǎn)成紅黑樹-注意需要和MIN_TREEIFY_CAPACITY結(jié)合起來用 
static final int TREEIFY_THRESHOLD = 8;
// table的總?cè)萘?,要大?4,桶內(nèi)鏈表才轉(zhuǎn)換為樹形結(jié)構(gòu),否則當(dāng)桶內(nèi)鏈表長(zhǎng)度>=8時(shí)會(huì)擴(kuò)容
static final int MIN_TREEIFY_CAPACITY = 64;
// 當(dāng)桶內(nèi)node小于6時(shí),紅黑樹會(huì)轉(zhuǎn)成鏈表。
static final int UNTREEIFY_THRESHOLD = 6;

  四、put存儲(chǔ)數(shù)據(jù)

   先從JDK的源碼中復(fù)制一段上頭的代碼,如下代碼就完成了數(shù)據(jù)的添加,在添加的時(shí)候還完成了數(shù)組的初始化、擴(kuò)容、CAS修改、分段鎖的實(shí)現(xiàn),大家大體的對(duì)該代碼有一個(gè)認(rèn)識(shí)即可我們后面會(huì)詳細(xì)的畫圖分析該過程。

public V put(K key, V value) {
        return putVal(key, value, false);
    }
 
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 如果key或者value為空,拋出異常
        if (key == null || value == null) throw new NullPointerException();
        // 得到hash值
        int hash = spread(key.hashCode());
        // 用來記錄所在table數(shù)組中的桶的中鏈表的個(gè)數(shù),后面會(huì)用于判斷是否鏈表過長(zhǎng)需要轉(zhuǎn)紅黑樹
        int binCount = 0;
        // for循環(huán),直到put成功插入數(shù)據(jù)才會(huì)跳出
        for (Node<K,V>[] tab = table;;) {
            // f=桶頭節(jié)點(diǎn)  n=table的長(zhǎng)度  i=在數(shù)組中的哪個(gè)下標(biāo)  fh=頭節(jié)點(diǎn)的hash值
            Node<K,V> f; int n, i, fh;
            // 如果table沒有初始化
            if (tab == null || (n = tab.length) == 0)              
                // 初始化table
                tab = initTable();
            // 根據(jù)數(shù)組長(zhǎng)度減1再對(duì)hash值取余得到在node數(shù)組中位于哪個(gè)下標(biāo)
            // 用tabAt獲取數(shù)組中該下標(biāo)的元素
            // 如果該元素為空
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 直接將put的值包裝成Node用tabAt方法放入數(shù)組內(nèi)這個(gè)下標(biāo)的位置中
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果頭結(jié)點(diǎn)hash值為-1,則為ForwardingNode結(jié)點(diǎn),說明正再擴(kuò)容,
            else if ((fh = f.hash) == MOVED)  
                // 調(diào)用hlepTransfer幫助擴(kuò)容
                tab = helpTransfer(tab, f);
            // 否則鎖住槽的頭節(jié)點(diǎn)
            else {
                V oldVal = null;
                // 鎖桶的頭節(jié)點(diǎn)
                synchronized (f) {  
                    // 雙重鎖檢測(cè),看在加鎖之前,該桶的頭節(jié)點(diǎn)是不是被改過了
                    if (tabAt(tab, i) == f) {
                        // 如果桶的頭節(jié)點(diǎn)的hash值大于0
                        if (fh >= 0) {
                            binCount = 1;
                            // 遍歷鏈表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 如果遇到節(jié)點(diǎn)hash值相同,key相同,看是否需要更新value
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //如果到鏈表尾部都沒有遇到相同的
                                if ((e = e.next) == null) {
                                    // 就生成Node掛在鏈表尾部,該Node成為一個(gè)新的鏈尾。
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果桶的頭節(jié)點(diǎn)是個(gè)TreeBin
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            // 用紅黑樹的形式添加節(jié)點(diǎn)或者更新相同hash、key的值。
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }  
                if (binCount != 0) {
                    // 如果鏈表長(zhǎng)度>需要樹化的閾值
                    if (binCount >= TREEIFY_THRESHOLD)
                        //調(diào)用treeifyBin方法將鏈表轉(zhuǎn)換為紅黑樹,而這個(gè)方法中會(huì)判斷數(shù)組值是否大于64,如果沒有大于64則只擴(kuò)容
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        // 如果是修改,不是新增,則返回被修改的原值
                        return oldVal;
                    break;
                }
            }
        }
        // 計(jì)數(shù)器加1,完成新增后,table擴(kuò)容,就是這里面觸發(fā)
        addCount(1L, binCount);
        // 新增后,返回Null
        return null;
    }

  4.1 高效的hash算法

int hash = spread(key.hashCode());
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

  該算法其實(shí)在HashMap中我們就已經(jīng)重點(diǎn)說過了,他既保存了key值得hash的高16位也保存了低16位,從而讓key值在不同的數(shù)組中盡可能的散列,從而避免hash沖突。

  4.2 數(shù)組的初始化

for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();

  注意是在循環(huán)中判斷的table是否為空如果為空則會(huì)調(diào)用initTable完成數(shù)組的默認(rèn)初始化。

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;

  以上代碼做了如下事情:

  1.如果table為空,進(jìn)入while準(zhǔn)備開始初始化。

  2.將sizeCtl賦值給sc。如果sizeCtl<0,線程等待,小于零時(shí)表示有其他線程在執(zhí)行初始化。

  3.如果能將sizeCtl設(shè)置為-1,則開始進(jìn)行初始化操作

  4.用戶有指定初始化容量,就用用戶指定的,否則用默認(rèn)的16.

  5.生成一個(gè)長(zhǎng)度為16的Node數(shù)組,把引用給table。

  6.重新設(shè)置sizeCtl=數(shù)組長(zhǎng)度 - (數(shù)組長(zhǎng)度 >>>2) 這是忽略符號(hào)的移位運(yùn)算符,可以理解成 n - (n / 2)。

1661477618162_2.jpg

  
       4.3 不沖突的數(shù)據(jù)添加過程

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
     new Node<K,V>(hash, key, value, null)))
        break;       // no lock when adding to empty bin
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

  高效的尋址算法: (n - 1) & hash這個(gè)在我們的HashMap中也是講過的了就不在贅述。

   通過高效的尋址算法得到一個(gè)索引并獲取該索引的數(shù)據(jù)如果不為空則調(diào)用casTabAt方法進(jìn)行CAS的原子修改,CAS在Java層面是無鎖的所以速度會(huì)很快,但是他在硬件層面是有鎖的,他會(huì)在硬件的拉鏈散列表中加鎖。

   如果有多個(gè)線程同時(shí)hash到了該索引我們的CAS也能保證只有一個(gè)線程能添加成功其他的線程其實(shí)是走分段加鎖的過程。

1661477697191_3.jpg

  4.4 分段加鎖策略

  需要大家特別注意的是synchronized (f)鎖了一個(gè)f對(duì)象,那么這個(gè)f對(duì)象是什么呢?其實(shí)就是我們高效尋址算法的到的一個(gè)下標(biāo)中存儲(chǔ)的對(duì)象,注意他鎖的是我們尋址到的這個(gè)對(duì)象并沒有鎖這個(gè)數(shù)組,所以我們當(dāng)前的鎖一共有多少把呢?就看你的size有多少了默認(rèn)是16那么就會(huì)有16把鎖可以來并發(fā)的修改,這個(gè)其實(shí)就是JDK1.8的分段鎖拉,他比1.7鎖的粒度更細(xì)那么并發(fā)的效果就會(huì)更好。

1661477714864_4.jpg

  五、無鎖獲取

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

  以上代碼中的內(nèi)容幾乎我們都講過了,所以就不在贅述,但是值得一說的是獲取的過程中并沒有對(duì)數(shù)組或者某一個(gè)Node元素添加鎖,獲取是無鎖的所有性能高。

   還有一個(gè)問題需要注意的是獲取是無鎖的那么他如果出現(xiàn)多線程修改或者寫入的時(shí)候他就有可能會(huì)出現(xiàn)可見性的問題,因?yàn)槊恳粋€(gè)線程都有自己的工作內(nèi)存,那么ConcurrentHashMap是如何解決可見性的問題呢?

transient volatile Node<K,V>[] table;

  從變量的申明中我們可以看到存儲(chǔ)數(shù)據(jù)的table其實(shí)是添加了volatile關(guān)鍵字的,所以其他線程修改了以后我們要求其他的線程把數(shù)據(jù)刷新主內(nèi)存從而保證數(shù)據(jù)的可見性。

  五、總結(jié):

  1. JDK1.7 使用分段鎖實(shí)現(xiàn)

  2. JDK1.8 使用CAS+synchronized+volatile 的具體實(shí)現(xiàn)

  3. put方法的復(fù)雜實(shí)現(xiàn)過程

  4. get方法的無鎖實(shí)現(xiàn)尤其是volatile關(guān)鍵字的使用

分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!