Saturday, October 24, 2015

[Multithreading] Classic Problems - Reader and Writer, Producer and Consumer, Dining Philosopher

Concurrent and Distributed Computing in Java

Cracking code philosopher


       public void pickUp(){
   
                  void lock.trylock();
      }
      public void putDown(){
                 void lock.unlock();
     }
            if ( !left.pickUp() )
                     return fasle;
               if ( !right.pickUp() ){
                     left.putDown();
                     return fasle;
               }
// release the left chopstick if we can't pick up  the right one
               return true;


Chopstick call lock.lock() when it is picked up
and lock.unlockQ when it is put down
public class Chopstick {
        private Lock lock;
       publci Chopstick {
            lock = new ReentrantLock();
       }
       public void pickUp(){
                  # Aprroach 1 : deadlock
                  void lock.lock();
                  # Aprroach 2 : GOOD
                  void lock.trylock();
      }
      public void putDown(){
                 void lock.unlock();
     }
}

To prevent deadlocks, we can implement a strategy where a philosopher will put down
his left Chopstick if he is unable to obtain the right one
public class Philosopher extends Thread{
      private int bites = 10;
      private Chopstick left1;
      private Chopstick right2;
      public Philosopher (Chopstick left1, Chopstick right2)
      {
               this.left1= left1; this.right2= right2;
      }

      public void run ()
      {
                 for (int i =0; i  < bites ; i++)
                         eat();
      }

      public void eat()
      {           
                 # Approach 1 :deadlock
                 pickUp();
                  chew();
                  putDown();
                  # Approach 2 :GOOD
                  if ( pickUp() )
                  {
                          chew();
                          putDown();
                    }
      }
      public boolean pickUp()
      {
                 # Approach 1 :deadlock
              left1.pickUp();
               right2.pickUp();
                  # Approach 2 :GOOD
               if ( !left.pickUp() )
                     return fasle;
               if ( !right.pickUp() ){
                     left.putDown();
                     return fasle;
               }
// release the left chopstick if we can't pick up  the right one
               return true;
      }
      public void chew(){}
       publci void putDown(){
              left.putDown();
              right.putDown();
      }
}

Reference: cracking the coding interview




                synchonized (  forks[fork1]  ) {
                                synchonized ( forks[fork2] ){}
                             if ( i%2 == 0)
                                  pholosophers[i] = new Philosopher(i, fork2, fork1);
                             else
                                  pholosophers[i] = new Philosopher(i, fork1, fork2);
public class DiningPhilosophers
{
       // create shared object
       private Object[] forks;
       private Philosopher[] philosophers;

      public DiningPhilosophers ( int num )
      {
                 forks = new Obejct[num];
                 philosophers = new Philosophers[num];
                 for ( int i = 0 ; i < num ; i ++) {
                        forks[i ] = new Object();;
                              # Approach 1: deadlock
                              philosophers[i ] = new Philosopher( i,  i, (i+1)% num, );
                              # Approach 2: livelock, not fair
                              int fork1= i;       
                              int frok2 = (i+1)%num;
                             if ( i == 0)
                                 pholosophers[i] = new Philosopher(i, fork2, fork1);
                             else
                                 pholosophers[i] = nwe Philosopher(i, fork1, fork2);
                             #Approach 3 : fair
                             int fork1 = i;
                             int fork2 =( i+1)% num;
                             if ( i%2 == 0)
                                  pholosophers[i] = new Philosopher(i, fork2, fork1);
                             else
                                  pholosophers[i] = new Philosopher(i, fork1, fork2);
                 }
      }

      public void startEating () throws InterrupedException
      {
                  for (int i = 0; i < philosophers.length;i++)
                  pholosophers[0].start();
                  philosophers[0].join(); // startEating execute after thread[0] finished the task
                  // Suspend the main thread until the first philosopher stops eating, which will never happen  -- this keep simulation running indefinitely
      }

      public static void main (String[] args)
      {
               try {
                          DiningPholosophers d = new DiningPholosophers(5);
                          d.startEating();
               } catch (  ItnerruptedExcepiton e) {
               }
     }
}
INSIDE philosophers
public class Philosopher extends Thread {
      
     private int id;
     private int fork1;
     private int fork2;
private Object[] forks;
     public pholosopher( int id, int fork1, int fork2, Object[] forks )
     {
          this.id = id; this.fork1 = fork1; this.fork2 = fork2; this. forks = forks;
     }
     public void run ()
     {
            System.out.println(“ Ready to eat using forks” + fork1 + “and”+fork2);
            while ( true )
            {
                        synchonized (  forks[fork1]  ) {
                                synchonized ( forks[fork2] ){}

                        }
            }
     }
    
    
}
Reference : programming interview exposed

producer and consumer

With introduction of BlockingQueue Data Structure in Java 5 Its now much simpler because BlockingQueue provides this control implicitly by introducing blocking methods put() and take()

BlockingQueue put() method will block if Queue is full in case of Bounded Queue and take() will block if Queue is empty

BlockingQuue is an interface and Java 5 provides different implantation like ArrayBlockingQueue and LinkedBlockingQueue , both implement FIFO order or elements, while ArrayLinkedQueue is bounded in nature LinkedBlockingQueue is optionally bounded.

import java.util.concurrent.BlockingQueue;
import.java.util.concurrent.LinkedBlockingQueue;
import.java.util.Level;
impot.java.util.Logger;

public class ProducerConsumerPattern
{
         public static void main (String[] args)
         {
                    // createing shared object
                    BlockingQueue sharedQueue = new LinkedBlockingQueue();

                   // creating producer and consumer Thread
                    Thread prodThread = new Thread ( new Producer(sharedQueue));
                    Thread consThread = new Thread(  new Consumer(sharedQueue));

                   // starting producer and consumer thread
                   prodThread().start();
                   consThread().start();
         }
}
class Producer implements Runnable
{
         private final BlockingQueue sharedQueue;
         public Producer(BlockingQueue sharedQueue)
              this.sharedQueue = sharedQueue;
    
        @Override
        public void run()
        {
                   for (int i = 0 ; i < 10 ; i++ )
                {
                         try{
                                  sharedQueue.put(i);
                         } catch ( ItnerrruptedException e ) {
                                   Logger.getLogger(  Producer.class.getName() ).log( Level.SERVERE, null, e );
                         }                
                }
        
        }
}
class Consumer implements Runnable
{
         private final BlockingQueue sharedQueue;
         public Consuemr(BlockingQueue sharedQueue)
                this.sharedQueue = sharedQueue;
     
         @Override
         public void run()
         {
                    while ( true )
                    {
                           try{
                                   sharedQueue.take();
                           } catch (InterruptedException e) {
                                     Logger.getLogger(  Consuemr.class.getname() ).log(  Level.SERVERE, null ,e  );
                           }
                    }      
         }
          
}

classical way is using wait and notify method to communicate between Producer and Consumer thread and blocking each of them on individual condition like full queue and empty queue

public class ProducerConsumerWaitAdnNotify{

         public static void main (String[] args)
         {
                    // creating shared object
                   IntBuffer b  = new IntBuffer();
                   
                   // creating Prodcuer and Consumer Thread
                  Producer p  = new Producer(b);
                  Consumer c = new Consumer(b);

                  // starting producer and consumer thread
                 p.start();
                 c.start();

         }
}
public class Producer extends Thread
{
          private IntBuffer buffer;
          public Producer ( IntBuffer buffer)
              this.buffer = buffer;

         public void run()
         {
                 Random r = new Random();
                 while( true )
                {
                        int num  = r.nextInt();
                        buffer.add( num );
                   
                 }
         }
}
publci class Consumer extends Thread
{
          private IntBuffer buffer;
          public Consumer (IntBuffer buffer)
             this.buffer = buffer;
          
         public void run()
         {
                    while( true )
                    {
                            int num = buffer.remove();
                    }
         }
}
pbulic class IntBuffer
{
        private int index;
        private int[] buffer = new int[8];
    
        // Called from Producer
        public synchronized void add()
        {
                 while( index == buffer.length -1 )
                 {
                             try
                             {
                                       wait();
                              }
                               catch (InterrupedException e)
                              {
                              }
                  }
                  buffer[index+1] = num;
                  notifyAll();
        }

        // Called from Consumer
        public synchronized int remove()
        {

                    while( index == 0 )
                    {
                              try
                              {
                                      wait();
                              }
                             catch( InterrrupedException e)
                              {
           
                              }
                    }
                    int ret = buffer[--index];
                    notifyAll();
                    return ret;
        }
}
Reference: programming interview exposed

readers and writers Problem

semaphore as counter which can be incremented or decremented
binary semaphore protect the access to a SINGLE shared resource, so the internal counter of the semaphore can only take the values 1 or 0
     semaphore = new Semaphore(1);

  1. First, you acquire the semaphore, with the acquire() method.
  2. Then, you do the necessary operations with the shared resource.
  3. Finally, release the semaphore with the release() method.
It decreases the value to 0 with the P(S) operation when the resource is in use, and then releases it with the V(S) operation by increasing it back to 1 when the resource is freed
# Approach 2 : using semaphore
class ReaderWrtier {
     // shared object/data
     int numReaders =0;
     BinarySemaphore mutex = new BinarySemaohre( true);
     BinarySemaphore wlock = new BinsraySemaphore(true);

    public void startRead (){
           mutex.P(); // 1→ 0
           numReaders++;
           if ( numReaders == 1 ) wlock.P(); // CANNOT WRITE, since someone id reading
           nutex.V(); // 0→ 1
    }
    public void endRead(){
          mutex.P();
          numReaders--;
          if ( numReaders == 0 ) wlock.V(); // CAN WRITE
          mutex.V();
    }
    public void startWrite(){
          wlock.P();
    }
    public void endWrite(){
          wlock.V();
    }
}

public class BinarySemaphore {
   boolean value;
   BinarySemaphore(boolean initValue) {
       value = initValue;
   }
   public synchronized void P() {
       while (value == false)
           Util.myWait(this);// in queue of blocked processes
       value = false;
   }
   public synchronized void V() {
       value = true;
       notify();
   }
}

One Wrtier but multiple Readers

# Approach 1 : using monitor

class ReaderWrtier{

       // shared object/ data
        private int numReaders = 0;
        private numWriters = 0;
       
        private synchonized void prepareToRead() // many readers can call this but  only one can read
        {
                  while ( numWrtiers > 0)  {
                        wait();
                  }
                  numberReaders++;
        }
        public synchronized void doneReading() //many readers can call this but  only one can end reading
        {
                  numberReaders--;
                  if ( numReaders == 0) {
                          notify();
              }
         }
         public void someReadMethod (){
                  // reads not synchonized, mutliple readers
                  prepareToRead();
                  // do the reading…
                 doneReading();
        }

       private void prepareToWrtie()
       {
              numberQriters++;
              while ( numReaders!= 0) {
                   wait();
              }
       }
       public void doneReading(){
             numWriters--;
             notify();
       }
       public synchronized void someWriteMethod()
       {
               // synchronized => only one writer
               prepateToWrite();
               // do the wirting
                doneWriting();
       }
}


No comments:

Post a Comment