Skip to main content

Bonus - Producer-Consumer

As a classic problem that can be discussed in relation to parallelism, we have the producer-consumer problem. The problem can be described as follows: we have two types of threads, producers and consumers, communicating through a buffer with a limited number of elements. Producers add values to the buffer, while consumers remove them. When accessing the buffer, we want it to be accessed exclusively to maintain its integrity. Producers should wait when the buffer is full and they can't produce anymore, while consumers should wait when there's nothing in the buffer to consume.

For this, we can use the Monitor class in C#, which provides several methods. The Enter(object) method allows us to lock an object, Exit(object) is used to unlock, Wait(object) signals a thread that the state of the memory area has changed and suspends its execution on the processor, and Pulse(object) is used to signal other threads that the state of the memory area has changed and they should be put back onto the processor.

One possible solution to this problem could look like this.

public class LimitedBuffer
{
// We use this class as a buffer for the producer-consumer problem.
// We have a queue to hold the data.
private readonly Queue<int> _items;
// Maintain the maximum number of elements in the buffer.
public int MaxSize { get; }
public int Length => _items.Count;

public LimitedBuffer(int maxSize)
{
_items = new();
MaxSize = maxSize;
}

// Add an element to the queue as long as there is space.
public void Add(int item)
{
if (Length == MaxSize)
{
throw new InvalidOperationException("Buffer limit reached!");
}

_items.Enqueue(item);
}

// Remove from the queue if at least one element is present.
public int Get() => Length != 0 ? _items.Dequeue() :
throw new InvalidOperationException("No elements found!");
}

public class Producer
{
// Here we have the producer, a class that adds produced values to the buffer.
// Set the production speed in seconds to track execution.
private readonly int _speed;
private readonly LimitedBuffer _buffer;

public Producer(LimitedBuffer buffer, int speed)
{
_buffer = buffer;
_speed = speed;
}

public async Task Produce(CancellationToken cancellationToken = default)
{
// Run as long as execution is not stopped. The loop is necessary to re-enter after waiting to inspect the buffer.
while (!cancellationToken.IsCancellationRequested)
{
// Use a monitor for locking to have exclusive access to the buffer.
Monitor.Enter(_buffer);

if (_buffer.MaxSize == _buffer.Length)
{
/* If we can't produce anymore, signal through Pulse to notify
* a thread, possibly a consumer, to free up space in the buffer.
*/
Monitor.Pulse(_buffer);
// Wait for something to happen with the buffer.
Monitor.Wait(_buffer);
// Release access to the buffer.
Monitor.Exit(_buffer);
continue;
}

var item = new Random().Next();
Console.WriteLine("Producing value {0}", item);
_buffer.Add(item);
/* After adding to the buffer, notify a thread, possibly a consumer,
* that there's something to consume.
*/
Monitor.Pulse(_buffer);
// Release access to the buffer.
Monitor.Exit(_buffer);
await Task.Delay(new TimeSpan(0, 0, _speed), cancellationToken);

break;
}
}
}

public class Consumer
{
// Here we have the consumer, a class that takes and consumes values from the buffer.
// Set the consumption speed in seconds to track execution.
private readonly int _speed;
private readonly LimitedBuffer _buffer;

public Consumer(LimitedBuffer buffer, int speed)
{
_buffer = buffer;
_speed = speed;
}

public async Task Consume(CancellationToken cancellationToken = default)
{
// Run as long as execution is not stopped. The loop is necessary to re-enter after waiting to inspect the buffer.
while (!cancellationToken.IsCancellationRequested)
{
// Use a monitor for locking to have exclusive access to the buffer.
Monitor.Enter(_buffer);

if (_buffer.Length == 0)
{
/* If we can't consume anymore, signal through Pulse to notify
* a thread, possibly a producer, to add to the buffer.
*/
Monitor.Pulse(_buffer);
// Wait for something to happen with the buffer.
Monitor.Wait(_buffer);
// Release access to the buffer.
Monitor.Exit(_buffer);
continue;
}

Console.WriteLine("Consuming value {0}", _buffer.Get());
/* After removing from the buffer, notify a thread, possibly a producer,
* that there's space in the buffer to produce something.
*/
Monitor.Pulse(_buffer);
// Release access to the buffer.
Monitor.Exit(_buffer);
await Task.Delay(new TimeSpan(0, 0, _speed), cancellationToken);

break;
}
}
}

private static async Task RunProducer(Producer producer, CancellationToken cancellationToken = default)
{
// Run the producer in a loop as a task.
while (!cancellationToken.IsCancellationRequested)
{
await producer.Produce(cancellationToken);
}
}

private static async Task RunConsumer(Consumer consumer, CancellationToken cancellationToken = default)
{
// Run the consumer in a loop as a task.
while (!cancellationToken.IsCancellationRequested)
{
await consumer.Consume(cancellationToken);
}
}

private static void RunProducerConsumer()
{
var buffer = new LimitedBuffer(5);
var cancellationTokenSource = new CancellationTokenSource();

Console.WriteLine("Starting producers and consumers!");

var tasks = new[] {
Task.Run(() => RunProducer(new(buffer, 2), cancellationTokenSource.Token), cancellationTokenSource.Token),
Task.Run(() => RunProducer(new(buffer, 2), cancellationTokenSource.Token), cancellationTokenSource.Token),
Task.Run(() => RunConsumer(new(buffer, 5), cancellationTokenSource.Token), cancellationTokenSource.Token),
Task.Run(() => RunConsumer(new(buffer, 5), cancellationTokenSource.Token), cancellationTokenSource.Token),
Task.Run(() => RunConsumer(new(buffer, 5), cancellationTokenSource.Token), cancellationTokenSource.Token)
};

cancellationTokenSource.CancelAfter(30 * 1000);

foreach (var task in tasks)
{
try
{
task.Wait(cancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
Console.Error.WriteLine("The task was canceled!");
}
}

Console.WriteLine("Producers and consumers finished!");
Console.WriteLine("---------------");
}