Sari la conținutul principal

Bonus - producator-consumator

Ca si problema clasica ce se poate discuta legat de paralelism este problema producator-consumator. Problema se pune in felul urmator, avem doua tipuri de thread-uri, producatori si consumatori care comunica printr-un buffer cu o limita de elemente, producatorii pun in buffer valori iar consumatorii le scot. Cand se face accesul pe buffer vrem ca bufferul sa fie accesat in mod exclusiv ca stuctura acestuia sa fie integra, producatorii sa atepte cand buffer-ul e umplut si nu mai pot produce iar consumatorii sa astepte cand nu e nimic in buffer de consumat.

Pentru asta putem folosi clasa Monitor din C# care are mai multe metode, Enter(object) prin care putem face lock pe un obiect, Exit(object) pentru a face unlock, Wait(object) ca thread-ul fie semnalat ca s-a schimbat starea zonei de memorie si sa fie suspendat la executia pe procesor si Pulse(object) ca sa se semnaleze la alte fire de executie ca s-a schimbat starea zonei de memorie si sa fie puse inapoi pe procesor.

O solutie la aceasta problema poate arata astfel.

// Folosim aceasta clasa ca buffer pentru problema producer-consummer.
public class LimitedBuffer
{
// Avem un queue ca as tinem datele.
private readonly Queue<int> _items;
// Tinem numarul maxim de elemente din buffer.
public int MaxSize { get; }
public int Length => _items.Count;

public LimitedBuffer(int maxSize)
{
_items = new();
MaxSize = maxSize;
}

// Adaugam in coada un element cat timp este loc.
public void Add(int item)
{
if (Length == MaxSize)
{
throw new InvalidOperationException("Buffer limit reached!");
}

_items.Enqueue(item);
}

// Scoatem din coada daca avem macar un elemnet.
public int Get() => Length != 0 ? _items.Dequeue() :
throw new InvalidOperationException("No elements found!");
}

// Aici avem producatorul, o clasa care pune in buffer valori produse.
public class Producer
{
// Dam viteza de producere in secunde ca sa putem urmarii executia.
private readonly int _speed;
private readonly LimittedBuffer _buffer;

public Producer(LimittedBuffer buffer, int speed)
{
_buffer = buffer;
_speed = speed;
}

public async Task Produce(CancellationToken cancellationToken = default)
{
// Cat timp nu este oprita executia rulam. Bucla e necesara sa reintram dupa wait sa inspectam bufferul.
while (!cancellationToken.IsCancellationRequested)
{
// Ne folosim de monitor ca lock sa avem acces exclusiv pe buffer.
Monitor.Enter(_buffer);

if (_buffer.MaxSize == _buffer.Length)
{
/* Daca nu mai putem produce semnalam prin Pulse sa fie notificat
* un thread, posibil un consumator ca sa elibereze din buffer.
*/
Monitor.Pulse(_buffer);
// Asteptam ca sa se intample ceva cu bufferul.
Monitor.Wait(_buffer);
// Cedam accesul pe buffer.
Monitor.Exit(_buffer);
continue;
}

var item = new Random().Next();
Console.WriteLine("Producing value {0}", item);
_buffer.Add(item);
/* Dupa ce am pus in buffer notificam un thread, posibil consumator ca
* exista in buffer ceva de consumat.
*/
Monitor.Pulse(_buffer);
// Cedam accesul pe buffer.
Monitor.Exit(_buffer);
await Task.Delay(new TimeSpan(0, 0, _speed), cancellationToken);

break;
}
}
}

// Aici avem consumatorul, o clasa care ia si caonsuma din buffer valori.
public class Consumer
{
// Dam viteza de consumare in secunde ca sa putem urmarii executia.
private readonly int _speed;
private readonly LimittedBuffer _buffer;

public Consumer(LimittedBuffer buffer, int speed)
{
_buffer = buffer;
_speed = speed;
}

public async Task Consume(CancellationToken cancellationToken = default)
{
// Cat timp nu este oprita executia rulam. Bucla e necesara sa reintram dupa wait sa inspectam bufferul.
while (!cancellationToken.IsCancellationRequested)
{
// Ne folosim de monitor ca lock sa avem acces exclusiv pe buffer.
Monitor.Enter(_buffer);

if (_buffer.Length == 0)
{
/* Daca nu mai putem consuma semnalam prin Pulse sa fie notificat
* un thread, posibil un producator ca sa adauge in buffer.
*/
Monitor.Pulse(_buffer);
// Asteptam ca sa se intample ceva cu bufferul.
Monitor.Wait(_buffer);
// Cedam accesul pe buffer
Monitor.Exit(_buffer);
continue;
}

Console.WriteLine("Consuming value {0}", _buffer.Get());
/* Dupa ce am scos din buffer notificam un thread, posibil producator ca
* exista loc in buffer pentru a produce ceva.
*/
Monitor.Pulse(_buffer);
// Cedam accesul pe buffer
Monitor.Exit(_buffer);
await Task.Delay(new TimeSpan(0, 0, _speed), cancellationToken);

break;
}
}
}

private static async Task RunProducer(Producer producer, CancellationToken cancellationToken = default)
{
// Rulam intr-o bucla producatorul ca task.
while (!cancellationToken.IsCancellationRequested)
{
await producer.Produce(cancellationToken);
}
}

private static async Task RunConsumer(Consumer consumer, CancellationToken cancellationToken = default)
{
// Rulam intr-o bucla consumatorul ca task.
while (!cancellationToken.IsCancellationRequested)
{
await consumer.Consume(cancellationToken);
}
}

private static void RunProducerConsumer()
{
var buffer = new LimittedBuffer(5);
var cancellationTokenSource = new CancellationTokenSource();

Console.WriteLine("Starting producers and consumers!");

var tasks = new[] {
Task.Run(() => RunProducer(new(buffer, 2), cancellationTokenSource.Token), cancellationTokenSource.Token),
Task.Run(() => RunProducer(new(buffer, 2), cancellationTokenSource.Token), cancellationTokenSource.Token),
Task.Run(() => RunConsumer(new(buffer, 5), cancellationTokenSource.Token), cancellationTokenSource.Token),
Task.Run(() => RunConsumer(new(buffer, 5), cancellationTokenSource.Token), cancellationTokenSource.Token),
Task.Run(() => RunConsumer(new(buffer, 5), cancellationTokenSource.Token), cancellationTokenSource.Token)
};

cancellationTokenSource.CancelAfter(30 * 1000);

foreach (var task in tasks)
{
try
{
task.Wait(cancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
Console.Error.WriteLine("The task was canceled!");
}
}

Console.WriteLine("Producers and consumers finished!");
Console.WriteLine("---------------");
}