多執行緒系列:Blocking Queue 概念與實作
Blocking Queue 也可稱為 Bounded Buffer,只有兩個對外方法:enqueue 和 dequeue。enqueue 是往 queue 內添加一個 item;dequeue 是從 queue 中刪除 1 個 item 並且回傳。當 queue 內的容量滿了的時候會阻擋(block)其他要呼叫 enqueue 的 thread;當 queue 中沒有 item 時阻擋其他呼叫 dequeue 的 thread。
BlockingQueue:
public class BlockingQueue<T> { T[] array; //此 array 拿來存放 queue 中的資料 int size = 0; //當前已填容量 int capacity; //總容量 int head = 0; //頭座標 int tail = 0; //尾座標 public BlockingQueue(int capacity) { array = (T[]) new Object[capacity]; this.capacity = capacity; } public void enqueue(T item) { //TO DO } public T dequeue() { //TO DO } }
讓我們從 enqueue 方法開始。如果 queue 的當前大小 == 容量 (代表已經滿了),那麼我們知道要阻止當前此方法的調用者。我們可以通過在 while 循環中適當地調用 wait() 方法來做到這一點。 while 循環的條件是 queue 的大小等於最大容量。一旦另一個線程執行 dequeue,循環的條件就會變為 false。
請注意,每當我們測試 size 變量的值時,我們還需要確保沒有其他線程在操縱 size 的值。這可以通過 synchronized 關鍵字來確保,因為它只允許單個線程調用 queue 上的 enqueue/dequeue 方法。
最後,隨著 queue 的增長,它將到達我們的 array 的末尾,因此我們需要將隊列的尾部重置為零。請注意,由於我們僅在隊列大小 < 容量時才允許繼續將 item enqueue,因此我們保證 tail 不會覆蓋現有 item。
enqueue:
public synchronized void enqueue(T item) throws InterruptedException { // 如果 size 等於 capacity 代表此 queue 已滿,阻塞 while (size == capacity) { wait(); } // 如果 tail 已經在 array 尾,則重設 tail 到 array 頭 if (tail == capacity) { tail = 0; } // 將 item 放入 array,tail 加 1 且 size 加 1 完整表示 array // 中的 item 增加 1 個 array[tail] = item; size++; tail++; //這邊要記得喚醒其他要準備進行 enqueue 或是 dequeue 的 thread notifyAll(); }
請注意,最後我們調用的是 notifyAll() 方法。由於我們剛剛向 queue 中添加了一個 item,因此有可能在queue class 的 dequeue 中阻塞了一個消費者線程,等待一個項目變為可用,因此我們有必要發送一個信號來喚醒任何等待的線程。
如果沒有線程在等待,那麼該信號將不會被注意到並被忽略,這不會影響我們 class 的正確工作。
現在讓我們來設計 dequeue 方法。與 enqueue 類似,如果沒有什麼可以dequeue,即 size == 0,我們需要阻塞 dequeue 的調用者
我們需要將 queue 的頭部重置為零,以防它指向數組的末尾。我們也需要將 size 減 1 ,因為 queue 現在將少一個 item。
最後,我們記得調用 notifyAll() ,因為如果 queue 已滿,那麼 enqueue 方法中可能會阻塞。程式如下所示:
dequeue:
public synchronized T dequeue() throws InterruptedException { T item = null; // wait for atleast one item to be enqueued while (size == 0) { wait(); } // reset head to start of array if its past the array if (head == capacity) { head = 0; } // store the reference to the object being dequeued // and overwrite with null item = array[head]; array[head] = null; head++; size--; // don't forget to call notify, there might be another thread // blocked in the enqueue method. notifyAll(); return item; }
我們看到 dequeue 方法類似於 enqueue 方法。
喜欢我的作品吗?别忘了给予支持与赞赏,让我知道在创作的路上有你陪伴,一起延续这份热忱!
- 来自作者
- 相关推荐