此为历史版本和 IPFS 入口查阅区,回到作品页
jessweb3
IPFS 指纹 这是什么

作品指纹

多執行緒系列:Blocking Queue 概念與實作

jessweb3
·
·
Queue 是個先前先出(First In First Out, FIFO)的資料結構;Blocking Queue 是為了 thread-safe 而誕生的,可以讓不同的生產者和消費者 threads 添加或刪除 item,依然保持資料正確


Blocking Queue 也可稱為 Bounded Buffer,只有兩個對外方法:enqueue 和 dequeue。enqueue 是往 queue 內添加一個 item;dequeue 是從 queue 中刪除 1 個 item 並且回傳。當 queue 內的容量滿了的時候會阻擋(block)其他要呼叫 enqueue 的 thread;當 queue 中沒有 item 時阻擋其他呼叫 dequeue 的 thread。

BlockingQueue:

public class BlockingQueue<T> {

    T[] array; //此 array 拿來存放 queue 中的資料
    int size = 0; //當前已填容量
    int capacity; //總容量
    int head = 0; //頭座標
    int tail = 0; //尾座標

    public BlockingQueue(int capacity) {
        array = (T[]) new Object[capacity];
        this.capacity = capacity;
    }

    public void enqueue(T item) {
        //TO DO
    }

    public T dequeue() {
        //TO DO     
    }
}

讓我們從 enqueue 方法開始。如果 queue 的當前大小 == 容量 (代表已經滿了),那麼我們知道要阻止當前此方法的調用者。我們可以通過在 while 循環中適當地調用 wait() 方法來做到這一點。 while 循環的條件是 queue 的大小等於最大容量。一旦另一個線程執行 dequeue,循環的條件就會變為 false。

請注意,每當我們測試 size 變量的值時,我們還需要確保沒有其他線程在操縱 size 的值。這可以通過 synchronized 關鍵字來確保,因為它只允許單個線程調用 queue 上的 enqueue/dequeue 方法。

最後,隨著 queue 的增長,它將到達我們的 array 的末尾,因此我們需要將隊列的尾部重置為零。請注意,由於我們僅在隊列大小 < 容量時才允許繼續將 item enqueue,因此我們保證 tail 不會覆蓋現有 item。

enqueue:

public synchronized void enqueue(T item) throws InterruptedException {
        // 如果 size 等於 capacity 代表此 queue 已滿,阻塞
        while (size == capacity) {
            wait();
        }

        // 如果 tail 已經在 array 尾,則重設 tail 到 array 頭
        if (tail == capacity) {
            tail = 0;
        }

        // 將 item 放入 array,tail 加 1 且 size 加 1 完整表示 array
        // 中的 item 增加 1 個
        array[tail] = item;
        size++;
        tail++;

        //這邊要記得喚醒其他要準備進行 enqueue 或是 dequeue 的 thread
        notifyAll(); 
    }

請注意,最後我們調用的是 notifyAll() 方法。由於我們剛剛向 queue 中添加了一個 item,因此有可能在queue class 的 dequeue 中阻塞了一個消費者線程,等待一個項目變為可用,因此我們有必要發送一個信號來喚醒任何等待的線程。

如果沒有線程在等待,那麼該信號將不會被注意到並被忽略,這不會影響我們 class 的正確工作。


現在讓我們來設計 dequeue 方法。與 enqueue 類似,如果沒有什麼可以dequeue,即 size == 0,我們需要阻塞 dequeue 的調用者

我們需要將 queue 的頭部重置為零,以防它指向數組的末尾。我們也需要將 size 減 1 ,因為 queue 現在將少一個 item。

最後,我們記得調用 notifyAll() ,因為如果 queue 已滿,那麼 enqueue 方法中可能會阻塞。程式如下所示:

dequeue:

public synchronized T dequeue() throws InterruptedException {

        T item = null;

        // wait for atleast one item to be enqueued
        while (size == 0) {
            wait();
        }

        // reset head to start of array if its past the array
        if (head == capacity) {
            head = 0;
        }
        
        // store the reference to the object being dequeued
        // and overwrite with null 
        item = array[head];
        array[head] = null;
        head++;
        size--;

        // don't forget to call notify, there might be another thread
        // blocked in the enqueue method.
        notifyAll();

        return item;
    }

我們看到 dequeue 方法類似於 enqueue 方法。

CC BY-NC-ND 2.0 授权