Thursday, February 28, 2019

Java Concurrency - BlockingQueue Interface

A java.util.concurrent.BlockingQueue interface is a subinterface of Queue interface, and additionally supports operations such as waiting for the queue to become non-empty before retrieving an element, and wait for space to become available in the queue before storing an element.

BlockingQueue Methods

Sr.No.Method & Description
1
boolean add(E e)
Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and throwing an IllegalStateException if no space is currently available.
2
boolean contains(Object o)
Returns true if this queue contains the specified element.
3
int drainTo(Collection<? super E> c)
Removes all available elements from this queue and adds them to the given collection.
4
int drainTo(Collection<? super E> c, int maxElements)
Removes at most the given number of available elements from this queue and adds them to the given collection.
5
boolean offer(E e)
Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and false if no space is currently available.
6
boolean offer(E e, long timeout, TimeUnit unit)
Inserts the specified element into this queue, waiting up to the specified wait time if necessary for space to become available.
7
E poll(long timeout, TimeUnit unit)
Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.
8
void put(E e)
Inserts the specified element into this queue, waiting if necessary for space to become available.
9
int remainingCapacity()
Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.
10
boolean remove(Object o)
Removes a single instance of the specified element from this queue, if it is present.
11
E take()
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

Example

The following TestThread program shows usage of BlockingQueue interface in thread based environment.
 Live Demo
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }  


   static class Producer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }    
   }

   static class Consumer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue) {
         this.queue = queue;
      }
      
      @Override
      public void run() {
         
         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}
This will produce the following result.

Output

Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27

No comments:

Post a Comment

Concurrent Navigable Map Interface

A java.util.concurrent.ConcurrentNavigableMap interface is a subinterface of ConcurrentMap interface, and supports NavigableMap operations...