曹工說JDK源碼(2)–ConcurrentHashMap的多線程擴容,說白了,就是分段取任務

前言

先預先說明,我這邊jdk的代碼版本為1.8.0_11,同時,因為我直接在本地jdk源碼上進行了部分修改、調試,所以,導致大家看到的我這邊貼的代碼,和大家的不太一樣。

不過,我對源碼進行修改、重構時,會保證和原始代碼的功能、邏輯嚴格一致,更多時候,可能只是修改變量名,方便理解。

大家也知道,jdk代碼寫得實在是比較深奧,變量名經常都是單字符,i,j,k啥的,實在是很難理解,所以,我一般會根據自己的理解,去重命名,為了減輕我們的頭腦負擔。

至於怎麼去修改代碼並調試,可以參考我之前的文章:

曹工力薦:調試 jdk 中 rt.jar 包部分的源碼(可自由增加註釋,修改代碼並debug)

文章中,我改過的代碼放在:

https://gitee.com/ckl111/jdk-debug

sizeCtl field的初始化

大家知道,concurrentHashMap底層是數組+鏈表+紅黑樹,數組的長度假設為n,在hashmap初始化的時候,這個n除了作為數組長度,還會作為另一個關鍵field的值。

    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;

該字段非常關鍵,根據取值不同,有不同的功能。

使用默認構造函數時

    public ConcurrentHashMap() {
    }

此時,sizeCtl被初始化為0.

使用帶初始容量的構造函數時

此時,sizeCtl也是32,和容量一致。

使用另一個map來初始化時

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

此時,sizeCtl,直接使用了默認值,16.

使用初始容量、負載因子來初始化時

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

這裏重載了:

這裏,我們傳入的負載因子為0.75,這也是默認的負載因子,傳入的初始容量為14.

這裏面會根據: 1 + 14/0.75 = 19,拿到真正的size,然後根據size,獲取到第一個大於19的2的n次方,即32,來作為數組容量,然後sizeCtl也被設置為32.

initTable時,對sizeCtl field的修改

實際上,new一個hashmap的時候,我們並沒有創建支撐數組,那,什麼時候創建數組呢?是在真正往裡面放數據的時候,比如put的時候。

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());

        int binCount = 0;
        ConcurrentHashMapPutResultVO vo = new ConcurrentHashMapPutResultVO();
        vo.setBinCount(0);
        for (Node<K,V>[] tab = table;;) {
            int tableLength;
            // 1
            if (tab == null) {
                tab = initTable();
                continue;
            }
            ...
        }

1處,即會去初始化table。

/**
     * Initializes table, using the size recorded in sizeCtl.
     * 初始化hashmap,使用sizeCtl作為容量
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            sc = sizeCtl;
            if (sc < 0){
                Thread.yield(); // lost initialization race; just spin
                continue;
            }

            /**
             * 走到這裏,說明sizeCtl大於0,大於0,代表什麼,可以去看下其構造函數,此時,sizeCtl表示
             * capacity的大小。
             * {@link #ConcurrentHashMap(int)}
             *
             * cas修改為-1,如果成功修改為-1,則表示搶到了鎖,可以進行初始化
             *
             */
            // 1
            boolean bGotChanceToInit = U.compareAndSwapInt(this, SIZECTL, sc, -1);
            if (bGotChanceToInit) {
                try {
                    tab = table;
                    /**
                     * 如果當前表為空,尚未初始化,則進行初始化,分配空間
                     */
                    if (tab == null || tab.length == 0) {
                        /**
                         * sc大於0,則以sc為準,否則使用默認的容量
                         */
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

                        Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                        table = tab = nt;
                        /**
                         * n >>> 2,無符號右移2位,則是n的四分之一。
                         * n- n/4,結果為3/4 * n
                         * 則,這裏修改sc為 3/4 * n
                         * 比如,默認容量為16,則修改sc為12
                         */
                        // 2
                        sc = n - (n >>> 2);
                    }
                } finally {
                    /**
                     * 修改sizeCtl到field
                     */
                    // 3
                    sizeCtl = sc;
                }
                break;
            }
        }

        return tab;
    }
  • 1處,cas修改sizeCtl為-1,成功了的,獲得初始化table的權利
  • 2處,修改局部變量sc為: n – (n >>> 2),也就是修改為 0.75n,假設此時的數組容量為16,那麼sc就是16 * 0.75 = 12.
  • 3處,將sc賦值到field: sizeCtl

經過上面的分析,initTable時,這個字段可能有兩種取值:

  • -1,有線程正在對該table進行初始化
  • 0.75*數組長度,此時,已經初始化完成

上面說的是,在put的時候去initTable,實際上,這個initTable,也會在以下函數中被調用,其共同點就是,都是往裡面放數據的操作:

擴容時機

上面說了很多,目前,我們知道的是,在initTable后,sizeCtl的值,是舊的數組的長度 * 0.75。

接下來,我們看看擴容時機,在put時,會調用putVal,這個函數的大體步驟:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 1
    int hash = spread(key.hashCode());

    int binCount = 0;
    System.out.println("binCount:" + binCount);
    // 2
    ConcurrentHashMapPutResultVO vo = new ConcurrentHashMapPutResultVO();
    vo.setBinCount(0);
    for (Node<K,V>[] tab = table;;) {
        int tableLength;
        // 3
        if (tab == null) {
            tab = initTable();
            continue;
        }
        
        tableLength = tab.length;
        if (tableLength == 0) {
            tab = initTable();
            continue;
        }

        int entryNodeHashCode;
		
        // 4
        int entryNodeIndex = (tableLength - 1) & hash;
        Node<K,V> entryNode = tabAt(tab,entryNodeIndex);

        /**
         * 5 如果我們要放的桶,還是個空的,則直接cas放進去
         */
        if (entryNode == null) {
            Node<K, V> node = new Node<>(hash, key, value, null);

            // no lock when adding to empty bin
            boolean bSuccess = casTabAt(tab, entryNodeIndex, null, node);
            if (bSuccess) {
                break;
            } else {
                /**
                 * 如果沒成功,則繼續下一輪循環
                 */
                continue;
            }
        }
		
        entryNodeHashCode = entryNode.hash;
        /**
         * 6 如果要放的這個桶,正在遷移,則幫助遷移
         */
        if (entryNodeHashCode == MOVED){
            tab = helpTransfer(tab, entryNode);
            continue;
        }


        /**
         * 7 對entryNode加鎖
         */
        V oldVal = null;
        System.out.println("sync");
        synchronized (entryNode) {
            /**
             * 這一行是判斷,在我們執行前面的一堆方法的時候,看看entryNodeIndex處的node是否變化
             */
            if (tabAt(tab, entryNodeIndex) != entryNode) {
                continue;
            }

            /**
             * 8 hashCode大於0,說明不是處於遷移狀態
             */
            if (entryNodeHashCode >= 0) {
                /**
                 * 9 鏈表中找到合適的位置並放入
                 */
                findPositionAndPut(key, value, onlyIfAbsent, hash, vo, entryNode);
                binCount = vo.getBinCount();
                oldVal = (V) vo.getOldValue();
            }
            else if (entryNode instanceof TreeBin) {
                ...
            }
        }
		
        System.out.println("binCount:" + binCount);
        // 10
        if (binCount != 0) {
            if (binCount >= TREEIFY_THRESHOLD)
                treeifyBin(tab, entryNodeIndex);
            if (oldVal != null)
                return oldVal;
            break;
        }
    }
    // 11
    addCount(1L, binCount);
    return null;
}
  • 1處,計算key的hashcode

  • 2處,我這邊new了一個對象,裏面兩個字段:

    public class ConcurrentHashMapPutResultVO<V> {
        int binCount;
    
        V oldValue;
    }
    

    其中,oldValue用來存放,如果put進去的key/value,其中key已經存在的話,一般會直接覆蓋之前的舊值,這裏主要存放之前的舊值,因為我們需要返回舊值。

    binCount,則存放:在找到對應的hash桶之後,在鏈表中,遍歷了多少個元素,該值後面會使用,作為一個標誌,當該標誌大於0的時候,才去進一步檢查,看看是否擴容。

  • 3處,如果table為null,說明table里沒有任何一個鍵值對,數組也還沒創建,則初始化table

  • 4處,根據hashcode,和(數組長度 – 1)相與,計算出應該存放的哈希桶在數組中的索引

  • 5處,如果要放的哈希桶,還是空的,則直接cas設置進去,成功則跳出循環,否則重試

  • 6處,如果要放的這個桶,該節點的hashcode為MOVED(一個常量,值為-1),說明有其他線程正在擴容該hashmap,則幫助擴容

  • 7處,對要存放的hash桶的頭節點加鎖

  • 8處,如果頭節點的hashcode大於0,說明是拉了一條鏈表,則調用子方法(我這邊自己抽的),去找到合適的位置並插入到鏈表

  • 9處,findPositionAndPut,在鏈表中,找到合適的位置,並插入

  • 10處,在findPositionAndPut函數中,會返回:為了找到合適的位置,遍歷了多少個元素,這個值,就是binCount。

    如果這個binCount大於8,則說明遍歷了8個元素,則需要轉紅黑樹了。

  • 11處,因為我們新增了一個元素,總數自然要加1,這裏面會去增加總數,和檢查是否需要擴容。

其中,第9步,因為是自己抽的函數,所以這裏貼出來給大家看下:

/**
     * 遍歷鏈表,找到應該放的位置;如果遍歷完了還沒找到,則放到最後
     * @param key
     * @param value
     * @param onlyIfAbsent
     * @param hash
     * @param vo
     * @param entryNode
     */
    private void findPositionAndPut(K key, V value, boolean onlyIfAbsent, int hash, ConcurrentHashMapPutResultVO vo, Node<K, V> entryNode) {
        vo.setBinCount(1);

        for (Node<K,V> currentIterateNode = entryNode;
                ;
             vo.setBinCount(vo.getBinCount() + 1)) {


            /**
             * 如果當前遍歷指向的節點的hash值,與參數中的key的hash值相等,則,
             * 繼續判斷
             */
            K currentIterateNodeKey = currentIterateNode.key;
            boolean bKeyEqualOrNot = Objects.equals(currentIterateNodeKey, key);
            /**
             * key的hash值相等,且equals比較也相等,則就是我們要找的
             */
            if (currentIterateNode.hash == hash && bKeyEqualOrNot) {
                /**
                 * 獲取舊的值
                 */
                vo.setOldValue(currentIterateNode.val);

                /**
                 * 覆蓋舊的node的val
                 */
                if (!onlyIfAbsent)
                    currentIterateNode.val = value;
                // 這裏直接break跳出循環
                break;
            }

            /**
             * 把當前節點保存起來
             */
            Node<K,V> pred = currentIterateNode;
            /**
             * 獲取下一個節點
             */
            currentIterateNode = currentIterateNode.next;
            /**
             * 如果下一個節點為null,說明當前已經是鏈表的最後一個node了
             */
            if ( currentIterateNode  == null) {
                /**
                 * 則在當前節點後面,掛上新的節點
                 */
                pred.next = new Node<K,V>(hash, key,
                        value, null);
                break;
            }
        }

    }

第11步,也是我們要看的重點:

private final void addCount(long delta, int check) {
        CounterCell[] counterCellsArray = counterCells;
		// 1
        long b = baseCount;
    	// 2
        long newBaseCount = b + delta;

        /**
         * 3 直接cas在baseCount上增加
         */
        boolean bSuccess = U.compareAndSwapLong(this, BASECOUNT, b, newBaseCount);
        if ( counterCellsArray != null ||  !bSuccess) {
			...
            newBaseCount = sumCount();
        }
		
    	// 4
        if (check >= 0) {
            while (true) {

                Node<K,V>[] tab = table;
                Node<K,V>[] nt;
                int n = 0;
                // 5
                int sc =  sizeCtl;
                // 6
                boolean bSumExteedSizeControl = newBaseCount >= (long) sc;
				// 7
                boolean bContinue = bSumExteedSizeControl && tab != null && (n = tab.length) < MAXIMUM_CAPACITY;
                if (bContinue) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                                sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                                transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    } else if (U.compareAndSwapInt(this, SIZECTL, sc,
                            (rs << RESIZE_STAMP_SHIFT) + 2))
                        // 8
                        transfer(tab, null);
                    newBaseCount = sumCount();
                } else {
                    break;
                }
            }

        }
    }
  • 1處,baseCount是一個field,存儲當前hashmap中,有多少個鍵值對,你put一次,就一個;remove一次,就減一個。

  • 2處,b + delta,其中,b就是baseCount,是舊的數量;dalta,我們傳入的是1,就是要增加的元素數量

    所以,b + delta,得到的,就是經過這次put后,預期的數量

  • 3處,直接cas,修改baseCount這個field為 新值,也就是第二步拿到的值。

  • 4處,這裏檢查check是否大於0,check,是第二個形參;這個參數,我們外邊怎麼傳的?

    addCount(1L, binCount);

    不就是bincount嗎,也就是說,這裏檢查:我們在put過程中,在鏈表中遍歷了幾個元素,如果遍歷了至少1個元素,這裏要進入下面的邏輯:檢查是否要擴容,因為,你binCount大於0,說明可能已經開始出現哈希衝突了。

  • 5處,取field:sizeCtl的值,給局部變量sc

  • 6處,判斷當前的新的鍵值對總數,是否大於sc了;比如容量是16,那麼sizeCtl是12,如果此時,hashmap中存放的鍵值對已經大於等於12了,則要檢查是否擴容了

  • 7處,幾個組合條件,查看是否要擴容,其中,主要的條件就是第6步的那個。

  • 8處,調用transfer,進行擴容

總結一下,經過前面的第6處,我們知道,如果存放的鍵值對總數,已經大於等於0.75*哈希桶(也就是底層數組的長度)的數量了,那麼,就基本要擴容了。

擴容的大體過程

擴容也是一個相對複雜的過程,這裏只說大概,詳細的放下講。

假設,現在底層數組長度,128,也就是128個哈希桶,當存放的鍵值對數量,大於等於 128 * 0.75的時候,就會開始擴容,擴容的過程,大概是:

  • 申請一個256(容量翻倍)的數組
  • 現在有128個桶,相當於,需要對128個桶進行遍歷,遍歷每個桶拉出去的鏈表或紅黑樹,查看每個鍵值對,是需要放到新數組的什麼位置

這個過程,昨天的博文,畫了個圖,這裏再貼一下。

擴容后:

可是,如果我們要一個個去遍歷所有哈希桶,然後遍歷對應的鏈表/紅黑樹,會不會太慢了?完全是單線程工作啊。

換個思路,我們能不能加快點呢?比如,線程1可以去處理數組的 0 -15這16個桶,16- 31這16個桶,完全可以讓線程2去做啊,這樣的話,不就多線程了嗎,不是就快了嗎?

沒錯,jdk就是這麼乾的。

jdk維護了一個field,這個field,專門用來存當前可以獲取的任務的索引,舉個例子:

大家看上圖就懂了,一開始,這裏假設我們有128個桶,每次每個線程,去拿16個桶來處理。

剛開始的時候,field:transferIndex就等於127,也就是最後一個桶的位置,然後我們要從后往前取,那麼,127 到112,剛好就是16個桶,所以,申請任務的時候,就會用cas去更新field為112,則表示,自己取到了112 到127這一個區間的hash桶遷移任務。

如果自始至終,只有一個線程呢,它處理完了112 – 127這一批hash桶后,會繼續取下一波任務,96 – 112;以此類推。

如果多線程的話呢,也是類似的,反正都是去嘗試cas更新transferIndex的值為任務區間的開始下標的值,成功了,就算任務認領成功了。

多線程,怎麼知道需要去幫助擴容呢? 發起擴容的線程,在處理完bucket[k]時,會把老的table中的對應的bucket[k]的頭節點,修改為下面這種類型的節點:

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
    }

其他線程,在put或者其他操作時,發現頭結點變成了這個,就會去協助擴容了。

多線程擴容,和分段取任務的差別?

我個人感覺,差別不大,多線程擴容,就是多線程去獲取自己的那一段任務,然後來完成。我這邊寫了簡單的demo,不過感覺還是很有用的,可以幫助我們理解。

import sun.misc.Unsafe;

import java.lang.reflect.Field;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

public class ConcurrentTaskFetch {

    /**
     * 空閑任務索引,獲取任務時,從該下標開始,往前獲取。
     * 比如當前下標為10,表示tasks數組中,0-10這個區間的任務,沒人領取
     */
    // 0
    private  volatile int freeTaskIndexForFetch;
	
    // 1
    private static final int TASK_COUNT_PER_FETCH = 16;
	
    // 2
    private String[] tasks = new String[128];

    public static void main(String[] args) {
        ConcurrentTaskFetch fetch = new ConcurrentTaskFetch();
        // 3
        fetch.init();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        executor.prestartAllCoreThreads();

        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
		
        // 4
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
					
                    // 5
                    FetchedTaskInfo fetchedTaskInfo = fetch.fetchTask();
                    if (fetchedTaskInfo != null) {
                        System.out.println("thread:" + Thread.currentThread().getName() + ",get task success:" + fetchedTaskInfo);
                        try {
                            TimeUnit.SECONDS.sleep(3);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        System.out.println("thread:" + Thread.currentThread().getName()  +  ", process task finished");
                    }
                }
            });
        }


        LockSupport.park();
    }

    public void init() {
        for (int i = 0; i < 128; i++) {
            tasks[i] = "task" + i;
        }
        freeTaskIndexForFetch = tasks.length;
    }

	// 6
    public FetchedTaskInfo fetchTask() {
        System.out.println("Thread start fetch task:"+Thread.currentThread().getName()+",time: "+System.currentTimeMillis());

        while (true){
			// 6.1
            if (freeTaskIndexForFetch == 0) {
                System.out.println("thread:" + Thread.currentThread().getName() + ",get task failed,there is no task");
                return null;
            }

            /**
             * 6.2 獲取當前任務的集合的上界
             */
            int subTaskListEndIndex = this.freeTaskIndexForFetch;

            /**
             * 6.3 獲取當前任務的集合的下界
             */
            int subTaskListStartIndex = subTaskListEndIndex > TASK_COUNT_PER_FETCH ?
                    subTaskListEndIndex - TASK_COUNT_PER_FETCH : 0;

            /**
             * 6.4
             * 現在,我們拿到了集合的上下界,即[subTaskListStartIndex,subTaskListEndIndex)
             * 該區間為前開后閉,所以,實際的區間為:
             * [subTaskListStartIndex,subTaskListEndIndex - 1]
             */

            /**
             * 6.5 使用cas,嘗試更新{@link freeTaskIndexForFetch} 為 subTaskListStartIndex
             */
            if (U.compareAndSwapInt(this, FREE_TASK_INDEX_FOR_FETCH, subTaskListEndIndex, subTaskListStartIndex)) {
                // 6.6 
                FetchedTaskInfo info = new FetchedTaskInfo();
                info.setStartIndex(subTaskListStartIndex);
                info.setEndIndex(subTaskListEndIndex - 1);


                return info;
            }
        }

    }



    // Unsafe mechanics
    private static final sun.misc.Unsafe U;

    private static final long FREE_TASK_INDEX_FOR_FETCH;

    static {
        try {
//            U = sun.misc.Unsafe.getUnsafe();
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            U = (Unsafe) f.get(null);
            Class<?> k = ConcurrentTaskFetch.class;
            FREE_TASK_INDEX_FOR_FETCH = U.objectFieldOffset
                    (k.getDeclaredField("freeTaskIndexForFetch"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }


    static class FetchedTaskInfo{
        int startIndex;
        int endIndex;

        public int getStartIndex() {
            return startIndex;
        }

        public void setStartIndex(int startIndex) {
            this.startIndex = startIndex;
        }

        public int getEndIndex() {
            return endIndex;
        }

        public void setEndIndex(int endIndex) {
            this.endIndex = endIndex;
        }

        @Override
        public String toString() {
            return "FetchedTaskInfo{" +
                    "startIndex=" + startIndex +
                    ", endIndex=" + endIndex +
                    '}';
        }
    }
}

  • 0處,定義了一個field,類似於前面的transferIndex

        /**
         * 空閑任務索引,獲取任務時,從該下標開始,往前獲取。
         * 比如當前下標為10,表示tasks數組中,0-10這個區間的任務,沒人領取
         */
        // 0
        private  volatile int freeTaskIndexForFetch;
    
  • 1,定義了每次取多少個任務,這裏也是16個

    private static final int TASK_COUNT_PER_FETCH = 16;
    
  • 2,定義任務列表,共128個任務

  • 3,main函數中,進行任務初始化

    public void init() {
        for (int i = 0; i < 128; i++) {
            tasks[i] = "task" + i;
        }
        freeTaskIndexForFetch = tasks.length;
    }
    

    主要初始化任務列表,其次,將freeTaskIndexForFetch 賦值為128,後續取任務,從這個下標開始

  • 4處,啟動10個線程,每個線程去執行取任務,按理說,我們128個任務,每個線程取16個,只能有8個線程取到任務,2個線程取不到

  • 5處,線程邏輯里,去獲取任務

  • 6處,獲取任務的方法定義

  • 6.1 ,如果可獲取的任務索引為0了,說明沒任務了,直接返回

  • 6.2,獲取當前任務的集合的上界

  • 6.3,獲取當前任務的集合的下界,減去16就行了

  • 6.4,拿到了集合的上下界,即[subTaskListStartIndex,subTaskListEndIndex)

  • 6.5, 使用cas,更新field為:6.4中的任務下界。

執行效果演示:

可以看到,8個線程取到任務,2個線程沒取到。

該思想在內存分配時的應用

其實jvm內存分配時,也是類似的思路,比如,設置堆內存為200m,那這200m是啟動時立馬從操作系統分配了的。

接下來,就是每次new對象的時候,去這個大內存里,找個小空間,這個過程,也是需要cas去競爭的,比如肯定也有個全局的字段,來表示當前可用內存的索引,比如該索引為100,表示,第100個字節后的空間是可以用的,那我要new個對象,這個對象有3個字段,需要大概30個字節,那我是不是需要把這個索引更新為130。

這中間是多線程的,所以也是要cas操作。

道理都是類似的。

總結

時間倉促,有問題在所難免,歡迎及時指出或加群討論。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

網頁設計最專業,超強功能平台可客製化

※別再煩惱如何寫文案,掌握八大原則!

帶你學夠浪:Go語言基礎系列 – 8分鐘學控制流語句

文章每周持續更新,原創不易,「三連」讓更多人看到是對我最大的肯定。可以微信搜索公眾號「 後端技術學堂 」第一時間閱讀(一般比博客早更新一到兩篇)

對於一般的語言使用者來說 ,20% 的語言特性就能夠滿足 80% 的使用需求,剩下在使用中掌握。基於這一理論,Go 基礎系列的文章不會刻意追求面面俱到,但該有知識點都會覆蓋,目的是帶你快跑趕上 Golang 這趟新車。

Hurry up , Let’s go !

控制語句是程序的靈魂,有了它們程序才能完成各種邏輯,今天我們就來學習 Go 中的各種控制語句。

通過本文的學習你將掌握以下知識:

  • if 條件語句
  • for 循環語句
  • switch 語句
  • defer 延遲調用

if 條件語句

與大多數編程語言一樣,if 用於條件判斷,當條件表達式 exprtrue 執行 {} 包裹的消息體語句,否則不執行。

語法是這樣的:

if expr {
    // some code
}

**注意:**語法上和 c 語言不同的是不用在條件表達式 expr 外帶括號,和 python 的語法類似。

當然,如果想在條件不滿足的時候做點啥,就可以 if 后帶 else 語句。語法:

if expr {
    // some code
} else {
    // another code
}

不僅僅是 if

除了可以在 if 中做條件判斷之外,在 Golang 中你甚至可以在 if 的條件表達式前執行一個簡單的語句。

舉個例子:

if x2 := 1; x2 > 10 { 
    fmt.Println("x2 great than 10")
} else {
    fmt.Println("x2 less than 10", x2)
}

上面的例子在 if 語句中先聲明並賦值了 x2,之後對 x2 做條件判斷。

注意:此處在 if 內聲明的變量 x2 作用域僅限於 if 和else 語句。

for循環語句

當需要重複執行的時候需要用到循環語句,Go 中只有 for 這一種循環語句。

標準的for循環語法:

for 初始化語句; 條件表達式; 後置語句 {
    // some code
}

這種語法形式和 C 語言中 for 循環寫法還是很像的,不同的是不用把這三個部分用 () 括起來。循環執行邏輯:

  • 初始化語句:初始循環時執行一次,做一些初始化工作,一般是循環變量的聲明和賦值。
  • 條件表達式:在每次循環前對條件表達式求值操作,若求值結果是
    true 則執行循環體內語句,否則不執行。
  • 後置語句:在每次循環的結尾執行,一般是做循環變量的自增操作。

舉個例子:

sum := 0
for i := 0; i < 10; i++ {
    sum += i // i作用域只在for語句內
    fmt.Println(i, sum)
}

注意:循環變量i 的作用域只在 for 語句內,超出這個範圍就不能使用了。

while循環怎麼寫?

前面說了,Golang 中只有 for 這一種循環語法,那有沒有類似 C 語言中 while 循環的寫法呢?答案是有的:把 for 語句的前後兩部分省略,只留中間的「條件表達式」的 for 語句等價於 while 循環。

像下面這樣:

sum1 := 0
for ;sum1 < 10; { // 可以省略初始化語句和後置語句
    sum1++
    fmt.Println(sum1)
}

上面的示例沒有初始化語句和後置語句,會循環執行 10 次後退出。

當然你要是覺得前後的分號也不想寫了,也可以省略不寫,上面的代碼和下面是等效的:

sum1 := 0
for sum1 < 10 { // 可以省略初始化語句和後置語句,分號也能省略
    sum1++
    fmt.Println(sum1)
}

在 Golang 中死循環可以這樣寫,相當於 C 語言中的 while(true)

 for { // 死循環
  // your code
 }

switch 語句

switch 語句可以簡化多個 if-else 條件判斷寫法,避免代碼看起來雜亂。

可以先定義變量,然後在 switch 中使用這個變量。

 a := 1
 switch a {
 case 1: 
  fmt.Println("case 1") // 不用寫break 執行到這自動跳出
 case 2:
  fmt.Println("case 2")
 default:
  fmt.Printf("unexpect case")
 }
輸出:case 1

從 C 語言過來的朋友一定有這樣的經歷:經常會在 case 語句中漏掉 break 導致程序繼續往下執行,從而產生奇奇怪怪的 bug ,這種問題在 Golang 中不復存在了。

Golang 在每個 case 後面隱式提供 break 語句。 除非以 fallthrough 語句結束,否則分支會自動終止。

 switch a := 1; a { //這裡有分號
 case 1: // case 無需為常量,且取值不必為整數。
  fmt.Println("case 1") // 不用寫break 執行到自動跳出 除非以 fallthrough 語句結束
  fallthrough
 case 2:
  fmt.Println("case 2")
 default:
  fmt.Printf("unexpect case")
 }
輸出:
case 1
case 2

還可以直接在 switch 中定義變量后使用,但是要注意變量定義之後又分號,比如下面這樣:

 switch b :=1; b { //注意這裡有分號
 case 1: 
  fmt.Println("case 1") 
 case 2:
  fmt.Println("case 2")
 default:
  fmt.Printf("unexpect case")
 }

沒有條件的switch

沒有條件的 switch 同 switch true 一樣,只有當 case 中的表達式值為「真」時才執行,這種形式能簡化複雜的 if-else-if else 語法。

下面是用 if 來寫多重條件判斷,這裏寫的比較簡單若是再多幾個 else if 代碼結構看起來會更糟糕。

    a := 1
    if a > 0 {
        fmt.Println("case 1") 
    } else if a < 0 {
        fmt.Println("case 2")   
    } else {
        fmt.Printf("unexpect case")   
    }

如果用上不帶條件的 switch 語句,寫出來就會簡潔很多,像下面這樣。

 a := 1
 switch {    // 相當於switch true
 case a > 0: // 若表達式為「真」則執行 
  fmt.Println("case 1") 
 case a < 0:
  fmt.Println("case 2")
 default:
  fmt.Printf("unexpect case")
 }

defer 語句

defer 語句有延遲調用的效果。具體來說defer後面的函數調用會被壓入堆棧,當外層函數返回才會對壓棧的函數按後進先出順序調用。說起來有點抽象,舉個例子:

package main

import "fmt"

func main() {
 fmt.Println("entry main")
 for i := 0; i < 6; i++ {
  defer fmt.Println(i)
 }
 fmt.Println("exit main")
}

fmt.Println(i) 不會每次立即執行,而是在 main 函數返回之後才依次調用,編譯運行上述程序的輸出:

entry main
exit main  //外層函數返回
5
4
3
2
1
0

上面是簡單的使用示例,實際使用中defer 通常用來釋放函數內部變量,因為它可以在外層函數 return 之後繼續執行一些清理動作。

這在文件類操作異常處理中非常實用,比如用於釋放文件描述符,我們以後會講解這塊應用,總之先記住 defer 延遲調用的特點。

總結

通過本文的學習,我們掌握了 Golang 中基本的控制流語句,利用這些控制語句加上一節介紹的變量等基礎知識,可以構成豐富的程序邏輯,就能用 Golang 來做一些有意思的事情了。

感謝各位的閱讀,文章的目的是分享對知識的理解,技術類文章我都會反覆求證以求最大程度保證準確性,若文中出現明顯紕漏也歡迎指出,我們一起在探討中學習.

今天的技術分享就到這裏,我們下期再見。

創作不易,白票不是好習慣,如果在我這有收穫,動動手指「點贊」「關注」是對我持續創作的最大支持。

微信搜索公眾號「 後端技術學堂 」回復「資料」「1024」有我給你準備的各種編程學習資料。文章每周持續更新,我們下期見!

本文使用 mdnice 排版

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

※回頭車貨運收費標準

聊一聊高併發高可用那些事 – Kafka篇

目錄

為什麼需要消息隊列

1.異步 :一個下單流程,你需要扣積分,扣優惠卷,發短信等,有些耗時又不需要立即處理的事,可以丟到隊列里異步處理。

2.削峰 :按平常的流量,服務器剛好可以正常負載。偶爾推出一個優惠活動時,請求量極速上升。由於服務器 Redis,MySQL 承受能力不一樣,如果請求全部接收,服務器負載不了會導致宕機。加機器嘛,需要去調整配置,活動結束後用不到了,即麻煩又浪費。這時可以將請求放到隊列里,按照服務器的能力去消費。

3.解耦 :一個訂單流程,需要扣積分,優惠券,發短信等調用多個接口,出現問題時不好排查。像發短信有很多地方需要用到, 如果哪天修改了短信接口參數,用到的地方都得修改。這時可以將要發送的內容放到隊列里,起一個服務去消費, 統一發送短信。

高吞吐、高可用 MQ 對比分析

看了幾個招聘網站,提到較多的消息隊列有:RabbitMQ、RocketMQ、Kafka 以及 Redis 的消息隊列和發布訂閱模式。

Redis 隊列是用 List 數據結構模擬的,指定一端 Push,另一端 Pop,一條消息只能被一個程序所消費。如果要一對多消費的,可以用 Redis 的發布訂閱模式。Redis 發布訂閱是實時消費的,服務端不會保存生產的消息,也不會記錄客戶端消費到哪一條。在消費的時候如果客戶端宕機了,消息就會丟失。這時就需要用到高級的消息隊列,如 RocketMQ、Kafka 等。

ZeroMQ 只有點對點模式和 Redis 發布訂閱模式差不多,如果不是對性能要求極高,我會用其它隊列代替,畢竟關解決開發環境所需的依賴庫就夠折騰的。

RabbitMQ 多語言支持比較完善,特性的支持也比較齊全,但是吞吐量相對小些,而且基於 Erlang 語言開發,不利於二次開發和維護。

RocketMQ 和 Kafka 性能差不多,基於 Topic 的訂閱模式。RocketMQ 支持分佈式事務,但在集群下主從不能自動切換,導致了一些小問題。RocketMQ 使用的集群是 Master-Slave ,在 Master 沒有宕機時,Slave 作為災備,空閑着機器。而 Kafka 採用的是 Leader-Slave 無狀態集群,每台服務器既是 Master 也是 Slave。

Kafka 相關概念

在高可用環境中,Kafka 需要部署多台,避免 Kafka 宕機后,服務無法訪問。Kafka集群中每一台 Kafka 機器就是一個 Broker。Kafka 主題名稱和 Leader 的選舉等操作需要依賴 ZooKeeper。

同樣地,為了避免 ZooKeeper 宕機導致服務無法訪問,ZooKeeper 也需要部署多台。生產者的數據是寫入到 Kafka 的 Leader 節點,Follower 節點的 Kafka 從 Leader 中拉取數據同步。在寫數據時,需要指定一個 Topic,也就是消息的類型。

一個主題下可以有多個分區,數據存儲在分區下。一個主題下也可以有多個副本,每一個副本都是這個主題的完整數據備份。Producer 生產消息,Consumer 消費消息。在沒給 Consumer 指定 Consumer Group 時會創建一個臨時消費組。Producer 生產的消息只能被同一個 Consumer Group 中的一個 Consumer 消費。

  • Broker:Kafka 集群中的每一個 Kafka 實例
  • Zookeeper:選舉 Leader 節點和存儲相關數據
  • Leader:生產者與消費者只跟 Leader Kafka 交互
  • Follower:Follower 從 Leader 中同步數據
  • Topic:主題,相當於發布的消息所屬類別
  • Producer:消息的生產者
  • Consumer:消息的消費者
  • Partition:分區
  • Replica:副本
  • Consumer Group:消費組

分區、副本、消費組

  • 分區

主題的數據會按分區數分散存到分區下,把這些分區數據加起來才是一個主題的完整的數據。分區數最好是副本數的整數倍,這樣每個副本分配到的分區數比較均勻。同一個分區寫入是有順序的,如果要保證全局有序,可以只設置一個分區。

如果分區數小於消費者數,前面的消費者會配到一個分區,後面超過分區數的消費者將無分區可消費,除非前面的消費者宕機了。如果分區數大於消費者數,每個消費者至少分配到一個分區的數據,一些分配到兩個分區。這時如果有新的消費者加入,會把有兩個分區的調一個分配到新的消費者。

分區數可以設置成 6、12 等數值。比如 6,當消費者只有一個時,這 6 個分區都歸這個消費者,後面再加入一個消費者時,每個消費者都負責 3 個分區,後面又加入一個消費者時,每個消費者就負責 2 個分區。每個消費者分配到的分區數是一樣的,可以均勻地消費。

  • 副本

主題的副本數即數據備份的個數,如果副本數為 1 , 即使 Kafka 機器有多個,當該副本所在的機器宕機后,對應的數據將訪問失敗。

集群模式下創建主題時,如果分區數和副本數都大於 1,主題會將分區 Leader 較均勻的分配在有副本的 Kafka 上。這樣客戶端在消費這個主題時,可以從多台機器上的 Kafka 消息數據,實現分佈式消費。

副本數不是越多越好,從節點需要從主節點拉取數據同步,一般設置成和 Kafka 機器數一樣即可。如果只需要用到高可用的話,可以採用 N+1 策略,副本數設置為 2,專門弄一台 Kafka 來備份數據。然後主題分佈存儲在 “N” 台 Kafka 上,”+1″ 台 Kafka 保存着完整的主題數據,作為備用服務。

Replicas 表示在哪些 Kafka 機器上有主題的副本,Isr 表示當前有副本的 Kafka 機器上還存活着的 Kafka 機器。主題分區中所涉及的 Leader Kafka 宕機時,會將宕機 Kafka 涉及的分區分配到其它可用的 Kafka 節點上。如下:

  • 消費組

每一個消費組記錄者各個主題分區的消費偏移量,在消費的時候,如果沒有指定消費組,會默認創建一個臨時消費組。生產者生產的消息只能被同一消費組下某個消費者消費。如果想要一條消息可以被多個消費者消費,可以加入不同的消費組。

偏移量最大值,消息存儲策略

  • 偏移量的最大值

long 類型最大值是(2^63)-1 (為什麼要減一呢?第一位是符號位,正的有262,負的有262,其中+0 和 -0 是相等的 , 只不過有的語言把0算到負裏面,有的語言把0算到正裏面)。 偏移量是一個 long 類型,除去負數,包含0,其最大值為 2^62。

  • 消息存儲策略

Kafka 配置項提供兩種策略, 一種是基於時間:log.retention.hours=168,另一種是基於大小:log.retention.bytes=1073741824 。符合條件的數據會被標記為待刪除,Kafka會在恰當的時候才真正刪除。

Zookeeper 上存的 Kafka 相關數據

如何確保消息只被消費一次

前面已經講到,同一主題里的分區數據,只能被相同消費組裡其中一個消費者消費。當有多個消費者同時消費同一主題時,將這些消費者都加入相同的消費組,這時生產者的消息只能被其中一個消費者消費。

重複消費和數據丟失問題

  • 生產者

生產者發送消息成功后,不等 Kafka 同步完成的確認,繼續發送下一條消息。在發的過程中如果 Leader Kafka 宕機了,但生產者並不知情,發出去的信息 Kafka 就收不到,導致數據丟失。解決方案是將 Request.Required.Acks 設置為 -1,表示生產者等所有副本都確認收到后才發送下一條消息。

Request.Required.Acks=0 表示發送消息即完成發送,不等待確認(可靠性低,延遲小,最容易丟失消息)

Request.Required.Acks=1 表示當 Leader 提交同步完成后才發送下一條消息

  • 消費者

消費者有兩種情況,一種是消費的時候自動提交偏移量導致數據丟失:拿到消息的同時偏移量加一,如果業務處理失敗,下一次消費的時候偏移量已經加一了,上一個偏移量的數據丟失了。

另一種是手動提交偏移量導致重複消費:等業務處理成功后再手動提交偏移量,有可能出現業務處理成功,偏移量提交失敗,那下一次消費又是同一條消息。

怎麼解決呢?這是一個 or 的問題,偏移量要麼自動提交要麼手動提交,對應的問題是要麼數據丟失要麼重複消費。如果消息要求實時性高,丟個一兩條沒關係的話可以選擇自動提交偏移量。如果消息一條都不能丟的話可以選擇手動提交偏移量,然後將業務設計成冪等,不管這條消息消費多少次最終和消費一次的結果一樣。

Linux Kafka 操作命令

  • 查看 Kafka 中 Topic
  • 查看 Kafka 詳情
  • 消費 Topic
  • 查看所有消費組
  • 查看消費組的消費情況

Windows 可視化工具 Kafka Tool

  • 配置 Hosts 文件
123.207.79.96 ZooKeeper-Kafka-01
  • 配置 Kafka Tool 連接信息

  • 查看 Kafka 主題數據

生產者和消費者使用代碼

  • 具體操作參考 github.com/wong-winnie/library

訂閱號:偉洪winnie

  • 訂閱號回復關鍵字【聊聊高併發高可用那些事】獲取專欄文章

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

※回頭車貨運收費標準

java作品集:企業信息門戶webtap

作品背景

隨着企業應用的軟件越來越多,並且信息軟件基本以B/S為主了,很多時候各種軟件的地址,讓大家記的頭昏腦脹,並且一堆密碼要記,而且大部分系統之間無法互通,雖然市面上有各種集成方案,但無法做到簡單有效,都是大型軟件廠商的PPT解決方案加一堆開發工作和大量的成本支出,最重要的是大部分都是體驗極差、毫無美感的東西。

解決方案

基於上述背景,個人利用業餘時間在持續完善做一款小作品,或多或少的解決一點問題,雖然目前還沒成熟,但是考慮再三,先開源出來,希望有志同道合的人一起完善。

作品說明

1.首頁

首頁主要功能有

  • app显示
  • 文件夾分類
  • 應用搜索
  • 登錄
  • 登錄后快捷新增應用
  • 背景自動每天同步bing搜索引擎的的壁紙
  • 應用和新聞站點鏈接(未完成)

通過點擊應用上的鎖 icon即可查看應用的賬號和密碼,在沒有單點登錄的功能情況下這個功能非常有用

2.系統登錄

點擊首頁右上角的 sigin 到登錄頁面

3.應用列表

後台管理 主功能只有新建應用、應用列表、系統設置,極其簡約,好不好看只是個人風格,默認登錄進來及显示應用列表。

4、新增應用

添加應用除了常規功能還增加了敏感信息輸入,敏感信息只能登錄后才能查看;
查看密碼功能考慮到很多時候連接地址需要密碼才能訪問;
查看權限目前只實現了登錄可見以及自己可見(權限功能還需要繼續完善);

5、系統設置

系統設置里可以進行基本信息維護,個人登錄信息維護,用戶管理,app分類管理,及多組織管理,主要介紹以下2重點功能。

常規設置

基本設置里為當前組織的組織名稱,訪問短鏈接地址(多組織情況下),以及組織的logo

多組織管理

技術架構

技術棧

  • springboot
  • mysql5.7
  • gradle
  • thymeleaf
  • vue2.0

代碼結構

數據庫結構

源碼地址

https://github.com/robotbird/webtap

https://gitee.com/robotbird/webtap

使用方法

  • 1、mysql 新建webtap數據庫
  • 2、導入工程目錄下doc/db/webtap.sql
  • 3、設置好application-dev.properties 里的數據庫密碼,默認root/root
  • 4、打個war包放到tomcat下即可運行,這地方沒有用jar包的原因是考慮上傳目錄採用jar包不知道什麼樣的方式合適。
  • 5、登錄管理員默認賬號robotbird@qq.com,密碼123456(暫時只支持郵箱登錄)

在線體驗

體驗地址:http://webtap.cn/
由於服務器在國外,訪問時候還請耐心等候。

總結

作品當前還未實現的功能,企業內部信息搜索集成、單點登錄集成、權限管理,以及後續考慮的小程序功能,但是依然放出來,激勵自己繼續完善下去。
此作品完全個人原創,開源遵從GNU General Public License v3.0,版權所屬個人所有,如果有同學對這個作品比較感興趣可以微信聯繫robotbird798

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

OAuth + Security – 5 – Token存儲升級(數據庫、Redis)

PS:此文章為系列文章,建議從第一篇開始閱讀。

在我們之前的文章中,我們當時獲取到Token令牌時,此時的令牌時存儲在內存中的,這樣顯然不利於我們程序的擴展,所以為了解決這個問題,官方給我們還提供了其它的方式來存儲令牌,存儲到數據庫或者Redis中,下面我們就來看一看怎麼實現。

不使用Jwt令牌的實現

  • 存儲到數據庫中(JdbcTokenStore)

使用數據庫存儲方式之前,我們需要先準備好對應的表。Spring Security OAuth倉庫可以找到相應的腳本:https://github.com/spring-projects/spring-security-oauth/blob/master/spring-security-oauth2/src/test/resources/schema.sql。該腳本為HSQL,所以需要根據具體使用的數據庫進行相應的修改,以MySQL為例,並且當前項目中,只需要使用到oauth_access_token和oauth_refresh_token數據表,所以將創建這兩個庫表的語句改為MySQL語句:

CREATE TABLE oauth_access_token (
	token_id VARCHAR ( 256 ),
	token BLOB,
	authentication_id VARCHAR ( 256 ),
	user_name VARCHAR ( 256 ),
	client_id VARCHAR ( 256 ),
	authentication BLOB,
	refresh_token VARCHAR ( 256 ) 
);

CREATE TABLE oauth_refresh_token ( 
token_id VARCHAR ( 256 ), 
token BLOB, authentication BLOB 
);

然後我們需要去配置對應的認證服務器,主要就是修改之前文章中TokenConfigure類中的tokenStore()方法:

// 同時需要注入數據源
@Autowired
private DataSource dataSource;
    
@Bean
    public TokenStore tokenStore() {
        return new JdbcTokenStore(dataSource);
    }

同時需要添加jdbc的依賴:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

認證服務器中的配置保持本系列第一章的配置不變,此處不再贅述。

其中若有不理解的地方,請參考該系列的第一篇文章

關於數據源的補充:

在我們項目中配置的數據源,可能不一定是使用的官方提供的格式,比如我們自定義的格式,或者使用第三方的數據源,那麼我們如何去配置呢?這裏以mybatis-plus的多數據源為例:

@Configuration
@EnableAuthorizationServer
public class FebsAuthorizationServerConfigure extends AuthorizationServerConfigurerAdapter {

    ......
    
    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Bean
    public TokenStore tokenStore() {
        DataSource dimplesCloudBase = dynamicRoutingDataSource.getDataSource("dimples_cloud_base");
        return new JdbcTokenStore(febsCloudBase);
    }
    ......
}

然後啟動項目,重新獲取Token進行測試,能正確的獲取到Token:

我們查看數據中,看是否已經存入相關信息到數據庫中:

  • 存儲到redis(RedisTokenStore)

令牌存儲到redis中相比於存儲到數據庫中來說,存儲redis中,首先在性能上有一定的提升,其次,令牌都有有效時間,超過這個時間,令牌將不可再用,而redis的可以給對應的key設置過期時間,完美切合需求,所有令牌存儲到redis中也是一種值得使用的方法。

首先,我們需要在項目中添加redis依賴,同時配置redis

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

新建配置類RedisConfigure

@Configuration
public class RedisConfigure{

    @Bean
    @ConditionalOnClass(RedisOperations.class)
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(mapper);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key採用 String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的 key也採用 String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式採用 jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的 value序列化方式採用 jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;
    }
    
}

配置redis的連接

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    lettuce:
      pool:
        min-idle: 8
        max-idle: 500
        max-active: 2000
        max-wait: 10000
    timeout: 5000

這時我們啟動項目測試,會發現項目將會報錯:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericObjectPoolConfig

這是由於我們配置RedisConfigure時,使用到了一個ObjectMapper類,這個類需要我們引入Apache的一個工具包

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

最後,我們需要在認證服務器中配置token的存儲方式,還是同jdbc的配置,在tokenStore()方法中配置,打開我們的TokenConfigure類,然後做如下配置:

@Autowired
private RedisConnectionFactory redisConnectionFactory;

@Bean
public TokenStore tokenStore() {
    return new RedisTokenStore(redisConnectionFactory);
}

認證服務器中的配置還是跟之前的一樣,保持不變,啟動項目,獲取Token測試:

我們查看Redis中,看是否已經存入相關信息到數據庫中:

  • 其它

我們打開TokenStore接口的實現類,會發現,還有一種JwkTokenStore,這種實際上就是將我們UUID格式令牌變成可以帶特殊含義的jwt格式的令牌,我們已經在第三篇文章中介紹過了,可以參考前面的文章。

但是我們需要明白一點的是,這種令牌還是存儲在內存中的,後期我們如何將其存儲到redis中是我們研究的方向。

在上面的token存儲到數據庫和存儲到redis中,我們會發現一個問題,那就是我們多次獲取令牌,但是其值是固定不變的,為了解決這個問題,我們可以使用如下方式解決:

@Bean
public TokenStore tokenStore() {
    RedisTokenStore redisTokenStore = new RedisTokenStore(redisConnectionFactory);
    // 解決每次生成的 token都一樣的問題
    redisTokenStore.setAuthenticationKeyGenerator(oAuth2Authentication -> UUID.randomUUID().toString());
    return redisTokenStore;
}

使用JWT令牌的實現

在之前的所有使用方法中,我們要麼是將Token存儲在數據庫或者redis中的時候,令牌的格式是UUID的無意義字符串,或者是使用JWT的有意義字符串,但是確是將令牌存儲在內存中,那麼,我們現在想使用JWT令牌的同時,也想將令牌存儲到數據庫或者Redis中。我們該怎麼配置呢?下面我們以Redis存儲為例,進行探索:

首先,我們還是要使用到TokenConfigure中的JWT和Redis配置:

@Autowired
private RedisConnectionFactory redisConnectionFactory;

@Bean
public JwtAccessTokenConverter accessTokenConverter() {
    JwtAccessTokenConverter converter = new JwtAccessTokenConverter();
    //對稱秘鑰,資源服務器使用該秘鑰來驗證
    converter.setSigningKey(SIGNING_KEY);
    return converter;
}

@Bean
public TokenStore tokenStore() {
    return new RedisTokenStore(redisConnectionFactory);
}

接下來,是配置認證服務器,在認證服務器中,跟之前的配置有一點區別,如下:

@Autowired
private JwtAccessTokenConverter jwtAccessTokenConverter;

@Autowired
private TokenStore tokenStore;

@Override
public void configure(AuthorizationServerEndpointsConfigurer endpoints) {
    endpoints
            // 配置密碼模式管理器
            .authenticationManager(authenticationManager)
            // 配置授權碼模式管理器
            .authorizationCodeServices(authorizationCodeServices())
            // 令牌管理
//                .tokenServices(tokenServices());
            .accessTokenConverter(jwtAccessTokenConverter)
            .tokenStore(tokenStore);

}

啟動項目,進行測試,獲取token

然後使用該令牌請求資源

至此,我們差不多完成了Token令牌的存儲和獲取

源碼傳送門: https://gitee.com/dimples820/dimples-explore

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※回頭車貨運收費標準

台中搬家公司費用怎麼算?

IEA:再生能源對武漢肺炎適應力最佳 將是滿足2020需求增長的唯一能源

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※回頭車貨運收費標準

「封城新時代」垃圾量爆增! 防疫優先環保擺一邊

摘錄自2020年05月08日TVBS新聞網全球報導

全球進入「封城新時代」,人類減少外出,大大改善了地球環境,不過塑膠污染卻愈來愈嚴重。美國各州,為了防止重複使用的購物袋傳播病毒,開放使用塑膠袋,許多環保措施因此停擺。而宅經濟正夯,網購的貨品包裝,也製造出大量垃圾。

加州塑膠袋全面啟用,當地原先規定購買塑膠袋必須支付10美分,大約台幣3元錢,如今禁令暫停60天,因為衛生安全問題,暫時大於環保問題。就連舊金山,身為全美最先禁用塑膠袋的城市之一,也已經宣布,禁止消費者攜帶自己的環保杯、環保袋等用品到店內。

無論是外出購物,還是網購,似乎怎麼做都會製造塑膠廢棄物,知名零垃圾專家就教大家運用「5R方針」,包括Refuse拒絕垃圾、Reduse減量、Reuse重複使用、Recycle回收,以及Rot把廚餘做成堆肥。零廢棄專家BeaJohnson:「這5R在世界各地都適用,不管你身處什麼情況,包括全球大流行疫情期間也是。」

公害污染
污染治理
國際新聞
美國
塑膠袋
塑膠袋禁用政策
一次性包裝
一次性塑膠袋
廢棄物

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

※產品缺大量曝光嗎?你需要的是一流包裝設計!

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

LG印度工廠化學氣體外洩已11死 村民抬遺體抗議要求關廠

摘錄自2020年5月10日自由時報報導

印度東南部維沙卡帕特南(Visakhapatnam)7日時,南韓LG集團子公司的聚合物工廠發生苯乙烯(styrene)氣體大規模外洩,據外媒,目前已知最少11死,逾千人住院,現仍有數百人在院內接受治療。當地時間週六,憤怒的村民將受害者遺體陳列於工廠大門前,要求工廠即刻關閉,同時逮捕工廠最高管理階層。

這間工廠屬於「南韓樂金聚合物(LG Polymers)」為南韓最大化工企業「LG化學(LG Chem Ltd)」的子公司。同日該公司發表聲明向受害者道歉,聲明也透露,初步調查結果證實,這場悲劇是由苯乙烯儲存槽蒸氣外洩而引起的。

苯乙烯被廣泛用於塑膠及橡膠製品製造,據我國勞動部職業安全衛生署2015「職業性苯乙烯、二苯乙烯中毒認定參考指引」,吸入或食入高濃度苯乙烯可能刺激呼吸道,引起昏睡、頭痛、精神混亂、協調感喪失及意識不清等症狀,若食入或在嘔吐下吸入肺部,可能導致嚴重肺組織損傷,引起肺組織壞死,甚至致死。

公害污染
空氣污染
毒物
污染治理
國際新聞
印度
化學工廠
化學氣體

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※推薦台中搬家公司優質服務,可到府估價

不只陸上生物會感染 科學家擔憂武漢肺炎恐可傳「鯨魚」

摘錄自2020年5月9日自由時報報導

武漢肺炎(新型冠狀病毒疾病,COVID-19)衝擊全球,不僅導致近400萬人染疫,與人類親近的犬、貓,甚至貂和大型貓科動物都有確診案例;專家則警告,雖目前還未有正式的科學期刊證實,但有研究顯示武漢肺炎可感染的潛在生物範圍恐擴及「鯨魚」。

根據加國媒體《Nunatsiaq news 》報導,目前科學界認為其他動物也會感染武漢肺炎的原因在於,牠們呼吸道細胞表面的蛋白質類似武漢肺炎的受體ACE2,當武漢肺炎病毒進入體內時,會經由ACE2進入細胞並開始複製,進而侵入呼吸道,造成感染。日前加利福尼亞大學戴維斯分校的研究人員創建了一份ACE2受體的動物清單,雖目前仍須更多研究佐證,但專家恩威亞(Martin Nweeia)日前在威爾遜國際學者中心的討論會上強調,北極海域的海洋哺乳動物有感染武漢肺炎的可能性。

雖然人類多透過飛沫感染到武漢肺炎,但根據國際文獻指出,人的糞便、尿液也會殘留病毒,當這些含有病毒的廢水排入水中時,有可能導致某些動物感染,雖然病毒可能被海水中的某些物質破壞,但是北極海有多個鹹淡水入口,以及因全球暖化冰川徑流和夏季冰融化增加了淡水系統,可能影響海洋生物以及病毒的存活。

生活環境
生態保育
物種保育
生物多樣性
國際新聞
武漢肺炎
鯨魚
動物與大環境變遷
公共衛生

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

網頁設計最專業,超強功能平台可客製化

疫情衝擊酪農 日本北海道知事推牛奶挑戰

摘錄自2020年5月10日中央通訊社報導

日本境內2019冠狀病毒疾病(COVID-19)疫情延燒,連帶衝擊到北海道酪農生乳銷量,讓北海道知事鈴木直道推出「牛奶挑戰」,呼籲民眾多喝牛奶,增加牛奶或優格購買量。日本放送協會(NHK)報導,鈴木直道在一段影片中單手拿著一杯牛奶,然後一口氣喝完,這個稱為「牛奶挑戰」的活動,希望能提升牛奶消費量。

日本境內受到疫情影響,學校臨時停課,所以沒有提供營養午餐,讓牛奶消費量下滑;民眾避免外出及出遊,也造成牛奶製品的冰淇淋、蛋糕等甜點銷量銳減。牛奶消費量減少直接衝擊酪農,而北海道的農業生產額中,酪農占了4成,在北海道農產占有相當重要地位。

一般來說,需求減少可以減產因應,但事實上不然,北海道江別市酪農川口谷仁說,每天要幫牛擠3次奶,如果不這麼做,牛就會生病,所以無法停止擠奶。加上北海道每年5月到6月的氣候,對牛來說是最舒服的時候,也是一年中最容易增加生乳產量的時期。而再這樣下去的話,最糟的情況就是造成廠商無法再收購,每天固定生產的生乳將不得不廢棄。

生活環境
國際新聞
日本
北海道
酪農
牛奶
武漢肺炎
疫情下的食衣住行
公共衛生
經濟動物
動物福利

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計最專業,超強功能平台可客製化

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家公司費用怎麼算?