Multiple threads modifying shared buffer [Producer/Consumer Relationship]
http://www.csharpfriends.com
World's Greatest C# Community    
Home Articles C# Forums Books C# Syntax C# Spec C# Jobs free Source Code Advertise About
 

Control Panel

[ Sign In / register ]
Points   
Notes 
My Forums
My Tutorials
My Profile

Resources

Learn
 Articles
 QuickStarts
 C# Spec
 Whitepapers
 Tools
 Class Browser
 C# Code Generator
 Links
 Misc Rss Feeds
 Code Highlight
 411 Directory
 FREE magazines
 freevb.net

Reviews
  ASP.NET Hosting

Source Code
 Get Version 1.0



C# Consulting
sql server forums
Chapter:   UnCategorized
Current Lesson:
Multiple threads modifying shared buffer [Producer/Consumer Relationship]
[Latest Content]
A | B | C | D | E | F | G | H | I | J | K | L | M | N | O | P | Q | R | S | T | U | V | W | X | Y | Z | ALL
[prev. Lesson]  Creating a Custom ADO.NET DataAdapter [next Lesson]  Method Overloading
Multiple threads modifying shared buffer [Producer/Consumer Relationship]
  by: mosessaur

Multiple threads modifying shared buffer [Producer/Consumer Relationship]

by: mosessaur

Introduction to Producer/Consumer Relationship Problem:

In a Producer/Consumer Relationship, the Producer generates data and the Consumer uses that data. In Multithreading Producer/Consumer Relationship, a Producer thread calls a Produce Method to generate data & place it into shared region of memory, called shared buffer. A Consumer thread calls a Consume Method to read that data. If the Producer waiting to write next data to the buffer determines that the Consumer has not yet read the prervious data from the buffer, the Producer thread should call Wait; Otherwise, the Consumer never sees the previous data & that data is lost cause the Producer overwrite it.

When the Consumer thread reads the data from the buffer , it should call Pulse to allow a waiting Producer to produce another data. If a Consumer finds the buffer empty or finds that the previous data has already been read, The Consumer should call Wait; Otherwise, the Consumer might read nothing from the buffer or process a previous data item more than once.

When the Producer places next data into the buffer, the Producer should call Pulse to allow the Consumer thread to proceed.

This was how the Producer/Consumer Should Work, Now let's exmine the algorithm of how to implement that using Monitor Class, We can call this a binary solution as we'll see why later.

Note: this is just an algorithm not the real C# Code, but I am using some of C# Classes names with Their methods:

var Buffer
int occupiedCount
Proc Produce(var x)
{
    Monitor.Enter(Buffer)
        if(occupiedCount == 1)
        {
            //no more space
            // Wait for Consumer to Consume Data and Decrement occupiedCount  
            Monitor.Wait(Buffer)
        }
        Buffer = x
        //Incrementing occupiedCount means that the data has beed Produced and 
        //ready to read
        occupiedCount = occupiedCount + 1

        //Tells the waiting Consumer thread  -if there is one- that there are a
        // change in Buffer
        Monitor.Pulse(Buffer)  
    Monitor.Exit(Buffer) //Now The Consumer Thread can Access Buffer
}

Proc Consumer(var x)
{
    int BufferCopy
    Monitor.Enter(Buffer)
        if(occupiedCount == 0)
        {
            //no Data to read
            //Wait for Producer to Produce Data and Increment occupiedCount  
            Monitor.Wait(Buffer)
        }
        BufferCopy = Buffer //Save Original Buffer Data
        //Decrementing occupiedCount means that the data has beed consumed
        occupiedCount = occupiedCount - 1

        //Tells the waiting Producer thread  -if there is one- that there are a
        // change in Buffer
        Monitor.Pulse(Buffer)
    Monitor.Exit(Buffer)
    return BufferCopy
}
I called this binary solution because occupiedCount only takes one of tow values 0 or 1, so you can make as boolean, as the Shared buffer only can store one item or single data, we can fix that by re-developing this problem using Bounded-buffer -Buffer array-, I attached the bounded buffer solution with this tutorial, but I'll leave it to you to exmine the code; it is well commented.

Producer/Consumer Relationship Program:

Classes I built in this Tutorial are:

Producer Class which implements the Producer Controler Method Produce Method

Example 1:
public class Producer
{
    //code..
    public void Produce()
    {
        //Controller method which will passed to the Thread Constructor
    }
}
Consumer Class which implements the Consumer Controler Method Consume Method

Example 2:
public class Consumer
{
    //code..
    public void Consume()
    {
        //Controller method which will passed to the Thread Constructor
    }
}
Note: Controler Methods must be void and take no arguments, we'll see why later.

HoldResourceSynchronized Class which holds Producer/Consumer shared object and perform the Mutual Exclusion & synchronization on that object.

Classes Used from FCL:

Thread Class: Create a thread for Consumer & Producer, the Constructor of this Class recieves ThreadStart delegates to be execute (Controler Method). ThreadStart is void and has no arguments, that is why we build the controler void and without argumets.

Example 1:

Thread m_ProducerThread = new Thread(new ThreadStart(producer.Produce));
m_ProducerThread.Name = "Producer";

Thread m_ConsumerThread = new Thread(new ThreadStart(consumer.Consume));
m_ConsumerThread.Name = "Consumer";  
Note: you should add this at the file header: using System.Threading;

Monitor Class: This class has methods that perform Mutual Exclusion & synchronization such as:

Enter, Exit, wait, Pulse, PulseAll, etc...

We'll see how those methods works in HoldResourceSynchronized Class to handle the Buffer object.

About HoldResourceSynchronized Class:

private int intBuffer = -1; //Shared Object
public HoldResourceSynchronized()
{
}
public int Buffer
{
    get
    {  
        //just like the Proc Consumer in the Algorithm

        //lock this Object while getting value from buffer, Consumer
         Monitor.Enter(this);

         //if there is no data to read, Then Wait [WaitSleepJoin State]
         if(intOccupiedBuffCount == 0)
         {
            //do some output registering here
            Monitor.Wait(this);
         }

        //Just consumed a value, so decrement number of occupied buffers
        --intOccupiedBuffCount;

        //return waiting thread (if there is one) to Started State        
        Monitor.Pulse(this);

           /*


        * Get Copy of intBuffer releasing lock.
        
        * It is possible that the producer could be assigned the processor
                  immediately after the Monitor
        
        * is released & before the the return statment executes, In this
                   case the producer would assign a  

        * new value to the intBuffer before return statement returns the
                  value to the consumer.
        
        * Making a copy of the Original Buffer solve this issue.
                   
        */

            
         int intBufferCopy = intBuffer;
            
        //Release Object  
         Monitor.Exit(this);
                  
         return intBufferCopy;
     }
     set
     {

        //just like the Proc Producer in the Algorithm

        //Lock this object while setting value in buffer //Producer
         Monitor.Enter(this);

         //if there are no empty location, Then Wait [WaitSleepJoin State]
         if(intOccupiedBuffCount == 1)
         {
            //do some output registering here
            Monitor.Wait(this);
         }
         intBuffer = value;

         //Just Produced a value, so increment number of occupied buffers
         ++intOccupiedBuffCount;

        //return waiting thread (if there is one) to Started State
        Monitor.Pulse(this);

        Monitor.Exit(this);
     }
}
When you call method Enter, you lock the object, mean no other threads can access this object, that is why you should call method Exit to release the object. But what about Wait & Pulse Method?!! Wait Method Releases the lock on an object in order to permit other threads to lock and access the object. The calling thread waits while another thread accesses the object. Pulse signals are used to notify waiting threads about changes to an object's state. Pulse Method Sends a signal to one or more waiting threads. The signal notifies a waiting thread that the state of the locked object has changed, and the owner of the lock is ready to release the lock. The waiting thread is placed in the object's ready queue so that it may eventually receive the lock for the object. Once the thread has the lock, it can check the new state of the object to see if the required state has been reached.

Note: lock keyword do the job of the Enter and Exit Methods, mean you can replace Enter & Exite with lock keywords

Example:
Monitor.Enter(This);
//Critical section code...
Monitor.Exit(This);

are Equal to this:
lock(this)
{
    //Critical section code...
}
the main difference between them is, when an exception occurs in method before Exite can be called & that exception is not caught, the method could terminate without calling Exit. If so, the lock is not released, but if that happened in lock keyword the lock will be released anyway. There are a solution if you want to use Enter, Exit Methods, you can just put your try code and handle Exit method in the finally block code.

Produce Method in Producer Class:

public void Produce()
{
    //Produce integers from 1 to 4
    for(int i=1; i<=4; i++)
    {
        //m_randSleepTime is Random object to provide delay randomly
        //We don't know who is faster Producer or consumer
        //the same is in the Consumer Consume Method
            
        Thread.Sleep(m_randSleepTime.Next(1,3000));
            
        //m_sharedResource is HoldResourceSynchronized reference passed
        //to Producer Constructor
        //the same object passed to Consumer Constructor
        m_SharedResource.Buffer = i;
        i++;
    }
    //output messages here to display the end of the thread.
}

public void Consume()
{
    int sum = 0;
    //Consume the 4 integers objects produced by Producer  
    for(int i=1; i<=4; i++)
    {
        Thread.Sleep(m_randSleepTime.Next(1,3000));
        sum += m_SharedResource.Buffer;
    }
    //output messages here to display the end of the thread.
}
Note: Open the source code on those classes to see their constructors.

Starting Threads:

Here is the Code for the form Constructor
public frmProducerConsumer()
{
    InitializeComponent();

    Random rand = new Random();

    //OutputMessage is a public delegate, I use it to display output
    //in text boxs
    // I use it in case I want to change the output control, all I 
    // need to do then is update the method OutMessageHandle.
        
    m_SharedRes = new HoldResourceSynchronized( 
                              new OutputMessage(OutMessageHandle));
    Producer producer = new Producer(m_SharedRes,rand, 
                               new OutputMessage(OutMessageHandle));
    Consumer consumer = new Consumer(m_SharedRes,rand, 
                                   new OutputMessage(OutMessageHandle));

    m_ProducerThread = new Thread(new ThreadStart(producer.Produce));
    m_ProducerThread.Name = "Producer";

    m_ConsumerThread = new Thread(new ThreadStart(consumer.Consume));
    m_ConsumerThread.Name = "Consumer";
}
This is the handle for the start button to start threads
private void btnStart_Click(object sender, System.EventArgs e)
{
    m_ProducerThread.Start();
    m_ConsumerThread.Start();
    btnStart.Enabled = false;
}
This was all about the Consumer/producer.

More a Thread Class

Thread class provide Method to Suspend Threads, or if the thread is already suspended, has no effect. This Method throws ThreadStateException when the thread has not been started or is dead. Also there is Resume Method , which Resumes a thread that has been suspended, also throw ThreadStateException when The thread has not been started, is dead, or is not in the suspended state. CurrentThread Property Gets the currently running thread, & ThreadState Property Gets a value containing the states of the current thread such if it is: Unstarted, Running, Suspended etc..

Finishing:

You can just check the source code and test the program and exmine the results, You can also try to make the bounded-buffer or circular-buffer Consumer/Producer, also it is attached to this Tutorial.

Resource:

System.Threading Namespace on MSDN: ms-help://MS.NETFrameworkSDK/cpref/html/frlrfsystemthreading.htm

Download the code

MosessaurProducerConsumer.zip


1 


Build Your Own ASP.NET Website Using C# & VB.NET

Chapter:  UnCategorized
Current Lesson:
Multiple threads modifying shared buffer [Producer/Consumer Relationship]
[Latest Content]
A | B | C | D | E | F | G | H | I | J | K | L | M | N | O | P | Q | R | S | T | U | V | W | X | Y | Z | ALL
[prev. Lesson]  Creating a Custom ADO.NET DataAdapter [next Lesson]  Method Overloading



Today's Top Movers
vulpes 3766
MadHatter 2056
jal 675
muster 623
George2 545

Yesterday Top Movers
shakti sin.. 9
MadHatter 3
Al_Pennywo.. 2
C#fanatic 2
simboy 1



Monthly Leaders
vulpes 3766
MadHatter 2096
jal 675
muster 623
George2 545

Top Members
mosessaur 18457
Rincewind 7074
stanleytan 6995
Gsuttie 6046
juliet 4679

Great Offers
.net hosting
Go To My Pc
Remote Pc Control
zonealarm
spam blocker
web hosting directory
ad server   C#
snadtech GoToMyPc

Top of Page

Advertise | About | Link To Us | Privacy Notice Copyright © 2003 - 2005 CSharpFriends.com  All Rights Reserved  Visual C# Developer Center