Thursday, February 28, 2019

Java Concurrency - Condition Interface

A java.util.concurrent.locks.Condition interface provides a thread ability to suspend its execution, until the given condition is true. A Condition object is necessarily bound to a Lock and to be obtained using the newCondition() method.

Condition Methods

Following is the list of important methods available in the Condition class.
Sr.No.Method & Description
1
public void await()
Causes the current thread to wait until it is signalled or interrupted.
2
public boolean await(long time, TimeUnit unit)
Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
3
public long awaitNanos(long nanosTimeout)
Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
4
public long awaitUninterruptibly()
Causes the current thread to wait until it is signalled.
5
public long awaitUntil()
Causes the current thread to wait until it is signalled or interrupted, or the specified deadline elapses.
6
public void signal()
Wakes up one waiting thread.
7
public void signalAll()
Wakes up all waiting threads.

Example

The following TestThread program demonstrates these methods of the Condition interface. Here we've used signal() to notify and await() to suspend the thread.
 Live Demo
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestThread {

   public static void main(String[] args) throws InterruptedException {
      ItemQueue itemQueue = new ItemQueue(10);

      //Create a producer and a consumer.
      Thread producer = new Producer(itemQueue);
      Thread consumer = new Consumer(itemQueue);

      //Start both threads.
      producer.start();
      consumer.start();

      //Wait for both threads to terminate.
      producer.join();
      consumer.join();
   }

   static class ItemQueue {
      private Object[] items = null;
      private int current = 0;
      private int placeIndex = 0;
      private int removeIndex = 0;

      private final Lock lock;
      private final Condition isEmpty;
      private final Condition isFull;

      public ItemQueue(int capacity) {
         this.items = new Object[capacity];
         lock = new ReentrantLock();
         isEmpty = lock.newCondition();
         isFull = lock.newCondition();
      }

      public void add(Object item) throws InterruptedException {
         lock.lock();

         while(current >= items.length)
            isFull.await();

         items[placeIndex] = item;
         placeIndex = (placeIndex + 1) % items.length;
         ++current;

         //Notify the consumer that there is data available.
         isEmpty.signal();
         lock.unlock();
      }

      public Object remove() throws InterruptedException {
         Object item = null;

         lock.lock();

         while(current <= 0) {
            isEmpty.await();
         }
         item = items[removeIndex];
         removeIndex = (removeIndex + 1) % items.length;
         --current;

         //Notify the producer that there is space available.
         isFull.signal();
         lock.unlock();

         return item;
      }

      public boolean isEmpty() {
         return (items.length == 0);
      }
   }

   static class Producer extends Thread {
      private final ItemQueue queue;
      
      public Producer(ItemQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         String[] numbers =
            {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"};

         try {
            
            for(String number: numbers) {
               System.out.println("[Producer]: " + number);
            }
            queue.add(null);
         } catch (InterruptedException ex) {
            ex.printStackTrace();
         } 
      }
   }

   static class Consumer extends Thread {
      private final ItemQueue queue;
      
      public Consumer(ItemQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         
         try {
            
            do {
               Object number = queue.remove();
               System.out.println("[Consumer]: " + number);

               if(number == null) {
                  return;
               }
            } while(!queue.isEmpty());
         } catch (InterruptedException ex) {
            ex.printStackTrace();
         }
      }
   }
}
This will produce the following result.

Output

[Producer]: 1
[Producer]: 2
[Producer]: 3
[Producer]: 4
[Producer]: 5
[Producer]: 6
[Producer]: 7
[Producer]: 8
[Producer]: 9
[Producer]: 10
[Producer]: 11
[Producer]: 12
[Consumer]: null

No comments:

Post a Comment

Concurrent Navigable Map Interface

A java.util.concurrent.ConcurrentNavigableMap interface is a subinterface of ConcurrentMap interface, and supports NavigableMap operations...