Table of Contents
BlockingQueue interface in Java
BlockingQueue is an interface in Java that is part of the Java concurrency utility java.util.concurrent
package. It is a queue implementation that blocks the thread during insertion and deletion operations. This means it blocks the thread when the queue has reached its maximum capacity during insert operation. Similarly, it blocks the thread when the queue is empty during deletion operation. The BlockingQueue has 2 threads where 1 thread is a producer that inserts the elements into the queue while the other thread is a consumer that removes the elements from the queue
Implementations of a BlockingQueue
The below classes implement the BlockingQueue interface in Java:
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
Types of BlockingQueue
The BlockingQueue interface in Java may be of two types:
- Unbounded queue: In an UnboundedQueue, the capacity of the queue will be maximum which means as we insert elements, the size of the queue expands. Hence there will never be a condition where it will be blocked. The size of the queue will be Integer.MAX_VALUE.
BlockingQueue bq = new LinkedBlockingDeque();
- Bounded queue: In a bounded queue, we can declare the size of the queue by passing the capacity argument to the constructor. In this case, we decide the capacity of the queue.
BlockingQueue bq = new LinkedBlockingDeque(int capacity);
Working of a BlockingQueue
The Java BlockingQueue interface always has a producer and a consumer thread where the producer keeps inserting elements to the queue and the consumer thread keeps retrieving elements from the queue. The producer thread blocks when the queue has no more capacity to insert elements and hence wait until the consumer thread removes the elements. Similarly, the consumer thread blocks when there are no more elements to retrieve from the queue and waits until the producer thread inserts an element.
Methods of Java BlockingQueue
Below are the methods of the Java BlockingQueue interface.
Methods | Description |
---|---|
boolean add(Integer e) | Inserts an element to the queue |
boolean contains(Object o) | Returns true if the queue contains the specified element |
int drainTo(Collection c) | Removes all the elements from the queue and moves it to the specified collection. It returns the number of elements transferred |
int drainTo(Collection c, int maxElements) | Removes the maximum number of elements from the queue and adds them to the specified collection |
boolean offer(Integer e) | Inserts the specified element |
boolean offer(Integer e, long timeout, TimeUnit timeunit) | Inserts the specified element to the queue after waiting for the specified timeout if space is unavailable |
Integer poll(long timeout, TimeUnit timeunit) | Retrieves and removes the head of the element after waiting for specified time |
void put(Integer e) | Inserts the specified element into the queue |
int remainingCapacity() | Returns the number of elements it can accept without blocking the queue |
boolean remove(Object e) | Removes the specified element from the queue |
Integer take() | Retrieves and removes the head of the queue |
Java BlockingQueue Example
This is a typical example of a Java BlockingQueue interface where one thread inserts elements into the queue while the other thread removes the elements from the queue. We include a sleep time after each put()
method so that the thread blocks before inserting the next element so that there is enough queue capacity. Using the take()
method we can remove and retrieve the head element from the queue.
import java.util.concurrent.*; public class BlockingQueueDemo { public static void main(String[] args) { BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10); insertElements add = new insertElements(bq); removeElements remove = new removeElements(bq); new Thread(add).start(); new Thread(remove).start(); } } class insertElements implements Runnable { protected BlockingQueue bq = null; insertElements(BlockingQueue bq) { this.bq = bq; } public void run() { try { bq.put(10); Thread.sleep(1000); bq.put(20); Thread.sleep(1000); bq.put(30); } catch(InterruptedException e){ e.printStackTrace(); } } } class removeElements implements Runnable { protected BlockingQueue bq = null; removeElements(BlockingQueue bq) { this.bq = bq; } public void run() { try { System.out.println(bq.take()); System.out.println(bq.take()); System.out.println(bq.take()); } catch(InterruptedException e) { e.printStackTrace(); } } }
10 20 30
Example: Insert elements
Below is an example of inserting elements into a queue using the add()
, offer()
and put()
method of Java BlockingQueue interface. Suppose the capacity of the blocking queue was 5 and we then call the offer()
method, it blocks the thread until there is a capacity for the queue to add elements.
import java.util.concurrent.*; public class BlockingQueueEx { public static void main(String[] args) { BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10); bq.add("Java"); bq.add("C"); bq.add("C++"); bq.add("Pyhton"); bq.add("Perl"); System.out.println("Elements in the blockingqueue: " + bq); bq.offer("Ruby"); bq.put("VBScript"); System.out.println("Elements in the blockingqueue: " + bq); } }
Elements in the blockingqueue: [Java, C, C++, Pyhton, Perl] Elements in the blockingqueue: [Java, C, C++, Python, Perl, Ruby, VBScript]
Example: Remove elements
Below is an example of deleting elements from the queue using the remove()
, take()
and poll()
method present in Java BlockingQueue interface.
import java.util.concurrent.*; public class BlockingQueueEx { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> bq = new ArrayBlockingQueue<String>(5); bq.add("Java"); bq.add("C"); bq.add("C++"); bq.add("Python"); bq.add("Perl"); System.out.println("Elements in the blockingqueue: " + bq); bq.remove("C++"); bq.take(); bq.offer("Ruby"); bq.put("VBScript"); System.out.println("Elements in the blockingqueue: " + bq); bq.poll(2, TimeUnit.SECONDS); System.out.println("Elements in the blockingqueue: " + bq); } }
Elements in the blockingqueue: [Java, C, C++, Python, Perl] Elements in the blockingqueue: [C, Python, Perl, Ruby, VBScript] Elements in the blockingqueue: [Python, Perl, Ruby, VBScript]
Example: Access elements
This example shows the various methods to retrieve the head element from the queue like element()
and peek()
which is part of Java BlockingQueue interface. These methods just retrieve the element and do not remove it from the queue. We can also check if an element exists in the queue using the contains()
method.
import java.util.concurrent.*; public class BlockingQueueEx { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> bq = new ArrayBlockingQueue<String>(5); bq.add("Java"); bq.add("C"); bq.add("C++"); bq.add("Python"); bq.add("Perl"); System.out.println("Elements in the blockingqueue: " + bq); bq.remove("C++"); bq.take(); System.out.println("Elements in the queue afer remove operation: " + bq); System.out.println("Head element: " + bq.peek()); bq.offer("Ruby"); bq.put("VBScript"); System.out.println("Elements in the blockingqueue: " + bq); bq.poll(2, TimeUnit.SECONDS); System.out.println("Elements in the blockingqueue: " + bq); System.out.println("Head element: " + bq.element()); System.out.println(bq.contains("Ruby")); } }
Elements in the blockingqueue: [Java, C, C++, Python, Perl] Elements in the queue afer remove operation: [C, Python, Perl] Head element: C Elements in the blockingqueue: [C, Python, Perl, Ruby, VBScript] Elements in the blockingqueue: [Python, Perl, Ruby, VBScript] Head element: Python true
Example: Iterating elements in BlockingQueue
We can iterate through the elements using the iterator()
method that is present in the Java BlockingQueue interface. It accesses each element in the iterator using the next()
method. We can get the size of the queue using the size()
method and retrieve the available capacity after adding elements using the remainingCapacity()
method.
import java.util.Iterator; import java.util.concurrent.*; public class BlockingQueueEx { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> bq = new ArrayBlockingQueue<String>(5); bq.add("Java"); bq.add("C"); bq.add("C++"); bq.add("Python"); bq.add("Perl"); System.out.println("Size of the queue: " + bq.size()); Iterator<String> it = bq.iterator(); while(it.hasNext()) System.out.println(it.next()); System.out.println("Remaining capacity: " + bq.remainingCapacity()); } }
Size of the queue: 5 Java C C++ Python Perl Remaining capacity: 0