Sari la conținutul principal

Operatii I/O peste retea

Pentru a demonstra cum se pot folosi stream-uri peste retea vom incerca sa facem o aplicatie client-server de chat. Clientul se conecteaza la server iar ambii pot schimba mesaje.

In primul rand, trebuie mentionat ca in retea circula doar date binare, nu se poate asuma ca se va transmite text la trimitere si primire. Datele care circula in retea sunt mereu incapsulate intr-un format binar intr-o unitate de transfer peste un protocol, protocolul de comunicatie este doar metoda prestabilita de catre participanti de a se intelege pe formatul datelor transmise si modul de interactionare. Unitatea de transfer a datelor, adesea numit pachet, contine un header care sa descrie cateva informatii despre datele ce trebuie transmise si mecanisme de verificare a integritatii datelor.

Pentru exemplul nostru vom transmite un mesaj text incadrat in forma binara intr-un pachet de forma ||||<lungime_mesaj>||||<mesaj> unde "|" este un caracter delimitator. Este de mentionat ca lungimea mesajului va fi scrisa in network byte order, adica in ordine de big-endian, istoric s-a folosit ordinea de big-endian pentru a transmite date in retea pentru ca puteau comunica atat calculatoare de tip big-endian cat si de tip little-endian si s-a ales aceasta. Daca se transmite asa cum este in memorie o valoare de tip int de pe o arhitectura pe alta va ajunge in alta ordine sa fie citit si poate conduce la probleme, deci trebuie convertit numarul intr-o anumita ordine. Aici se poate folosi clasa IPAddress pentru conversie.

In exemplul ce urmeaza se vor folosi conexiuni TCP pentru transmitere de date de la un client care se conecteaza la o adreasa IP pe un anumit port unde un proces la distanta sta si asteapta conexiuni. Clientul va crea direct un TcpClient care expune un NetworkStream prin care putem schimba mesaje. Serverul se va lega de interfetele de retea a masinii si va astepta dupa conexiuni, cand primeste una va putea sa genereze si el un TcpClient care reprezinta clientul nou conectat.

// Facem o abstractizare pentru canalul de comunicatie.
public abstract class AbstractChannel
{
// Descriem in constante cum trebuie sa arate header-ul mesajului trimis in bytes.
private const int HeaderStartSize = 4;
private const int HeaderEndSize = 4;
// In header vrem sa punem lungimea mesajului pe care vrem sa-l trimite.
private const int HeaderPayloadLengthSize = sizeof(int);
private const int HeaderSize = HeaderStartSize + HeaderPayloadLengthSize + HeaderEndSize;
// Folosim un caracter special ca sa identificam unde este header-ul si ca nu este malformat.
private static readonly byte StartCharacter = Convert.ToByte('|');
// Legatura TCP va fi stabilita de clasele care mostenesc canalul si il facem abstract.
protected abstract TcpClient? Client { get; set; }
public abstract void Stop();

// Daca trimitem de mai multe ori o eroare o punem intr-o proprietate.
private static Exception MakeNotConnectedException => new InvalidOperationException("The client is not connected!");
private static Exception MakeConnectionAbortedException => new("The client is not connected!");

// Header-ul mesajului va arata ca ||||<messageSize>|||| si il cream aici.
private static byte[] MakeHeader(int messageSize)
{
// Cream header-ul ca array si il populam cum ne dorim.
var header = new byte[HeaderSize];
Array.Fill(header, StartCharacter, 0, HeaderStartSize);
Array.Fill(header, StartCharacter, HeaderStartSize + HeaderPayloadLengthSize, HeaderEndSize);
// Ca sa evitam probleme daca trimitem un int de pe big-endian pe little-endian transfomam numarul in network byte order.
messageSize = IPAddress.HostToNetworkOrder(messageSize);
Array.Copy(BitConverter.GetBytes(messageSize), 0, header, HeaderStartSize, HeaderPayloadLengthSize);

return header;
}

// Verificam daca header-ul este integru.
public static bool IsValidHeader(byte[] header)
{
// Putem sa scoatem ca interval parti din header ca sa le verificam.
return header[..HeaderStartSize].All(e => e == StartCharacter) &&
header[(HeaderStartSize + HeaderPayloadLengthSize)..].All(e => e == StartCharacter);
}

// Daca primim un header trebuie sa descifram lungimea mesajului din acesta.
public static int GetSizeFromHeader(byte[] header)
{
// Pentru ca am trimis in network byte order vrem sa transformam inapoi lungimea pentru arhitecura curenta.
var messageSize = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(header, HeaderStartSize));

return messageSize;
}

// In retea sunt trimise date binare, nu text, si trebuie sa trasnformam totul in bytes si inapoi.
public static byte[] MakePayload(string message) => Encoding.ASCII.GetBytes(message);
public static string GetPayload(byte[] buffer) => Encoding.ASCII.GetString(buffer);

// Avem nevoie sa trimitem mesajul pe retea si vom folosi stream-ul de la client.
public void SendMessage(string message)
{
if (Client == null)
{
throw MakeNotConnectedException;
}

// Doar trebuie sa folosim stream-ul si sa scriem header-ul si mesajul ca bytes.
Client.GetStream().Write(MakeHeader(message.Length));
Client.GetStream().Write(MakePayload(message));
}

// La primire este mai coplicat pentru ca nu ne putem baza sa primim mereu date.
public void ReceiveBytes(byte[] buffer, int size)
{
if (Client == null)
{
throw MakeNotConnectedException;
}

var bytesReceived = 0;

// Ne asteptam sa primim un numar de bytes si facem o bucla sa ii primim.
do
{
// Putem primi o parte din date si trebuie sa avem evidenta ca sa-i primim pe toti
bytesReceived += Client.GetStream().Read(buffer, bytesReceived, size - bytesReceived);

// Daca am primit 0 bytes insemna ca conectiunea s-a inchis.
if (bytesReceived == 0)
{
Stop();
throw MakeConnectionAbortedException;
}
} while (bytesReceived < size);
}

// La fel ca la trimitre, prima data primim header-ul apoi incercam sa primim mesajul.
public string ReceiveMessage()
{
if (Client == null)
{
throw new InvalidOperationException("The client is not connected!");
}

// Stim cum arata header-ul si asteptam sa primim lungimea sa fixa.
var headerBuffer = new byte[HeaderSize];
ReceiveBytes(headerBuffer, HeaderSize);

if (!IsValidHeader(headerBuffer))
{
throw MakeNotConnectedException;
}

// Dupa ce incercam sa extragem lungimea mesajului din header putem astepta sa primim mesajul.
var messageSize = GetSizeFromHeader(headerBuffer);
var payloadBuffer = new byte[messageSize];

ReceiveBytes(payloadBuffer, messageSize);

return GetPayload(payloadBuffer);
}

/* Declaram bucla principala pentru cititul de la tastatura si trimiterea mesajeloe
* aici pentru ca e convenabil alfel o declaram in alta parte.
*/
public Task SendMessagesAsync()
{
return Task.Run(() =>
{
while (true)
{
try
{
var message = Console.ReadLine();

if (Client?.Connected != true)
{
break;
}

if (message == "exit")
{
Stop();
break;
}

if (string.IsNullOrWhiteSpace(message))
{
continue;
}

SendMessage(message);
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
break;
}
}
});
}

// Declaram si bucla pentru primire.
public Task ReceivedMessagesAsync()
{
return Task.Run(() =>
{
while (true)
{
try
{
Console.WriteLine("Received: {0}", ReceiveMessage());
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
break;
}
}
});
}
}

// Pentru emitator vom crea un client si va fi disposable.
public sealed class ChatClient : AbstractChannel, IDisposable
{
/* Trebuie sa stim pentru crearea clientului adresa IP si
* portul la care sa se conecteze pentru acea adresa.
*/
private readonly string _hostname;
private readonly int _port;
// Avem in vedere sa cream si clientul de TCP.
protected override TcpClient? Client { get; set; }

public ChatClient(string hostname, int port)
{
_hostname = hostname;
_port = port;
}

/* Cream la conectare un nou client care se conecteaza pe adresa
* si port la serverul de pe statia la distanta.
*/
public void Connect()
{
Client = new(_hostname, _port);
}

// La deconectare inchidem clientul si ii facem dispose.
public override void Stop()
{
Client?.Close();
Client?.Dispose();
Client = null;
}

// Trebuie la fel ca la receptor sa implementam Dispose pentru eliberarea resurselor.
public void Dispose()
{
Stop();
}
}

// Pentru receptor vom crea un server si va fi disposable.
public sealed class ChatServer : AbstractChannel, IDisposable
{
/* Avem nevoie de un port ca sa stim pe adresa IP curenta
* ce proces primeste o anumita conectiune, clientul se va
* conecta pe acest port.
*/
private readonly int _port;
/* Ca orice server va asculta sa vina conectiuni din partea clientilor
* si avem nevoie de acest obiect ca sa primim conectiuni.
*/
private TcpListener? _listener;
protected override TcpClient? Client { get; set; }

public ChatServer(int port)
{
_port = port;
}

// Aici pornim si asteptam pentru o coneciune din partea unui client.
public void StartAndAccept()
{
/* Ca sa asteptam facem bind la port pe interfetele statiei curente
* IPAddress.Any corespunde la 0.0.0.0 ca sa asteptam conectiuni de
* pe oricare interfata.
*/
_listener = new(IPAddress.Any, _port);
// Lasa sa putem accepta maxim o coneciune.
_listener.Start(1);
// Asteptam sa primim o conectiune TCP si salvam clientul.
Client = _listener.AcceptTcpClient();
}

// Ca sa oprim serverul facem dispose la obiectele disposabble si oprim listener-ul.
public override void Stop()
{
Client?.Close();
Client?.Dispose();
_listener?.Stop();

Client = null;
_listener = null;
}

// Pentru ca avem resurse ce trebuie eliberate implementam Dispose.
public void Dispose()
{
Stop();
}
}

// Pentru server
public class Program
{
private const int BindPort = 8989;

public static void Main()
{
/* Cream receptorul si folosim cuvantul cheie "using" ca sa
* faca automat Dispose la iesirea din scope.
*/
using var receiver = new ChatServer(BindPort);

Console.WriteLine("Starting the receiver!");

try
{
// Pornim si asteptam conectiuni de la un client.
receiver.StartAndAccept();
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
return;
}

Console.WriteLine("Receiver ready to chat!");

/* Pornim si cele doua bucle de send si receive asincron ca
* sa putem face trimitere si primire in acelasi timp.
*/
var sendTask = receiver.SendMessagesAsync();
var receiveTask = receiver.ReceivedMessagesAsync();

sendTask.Wait();
receiveTask.Wait();
}
}

// Pentru client
public class Program
{
private const string HostAddress = "localhost";
private const int HostPort = 8989;

public static void Main()
{
/* Cream emitatorul si folosim cuvantul cheie "using" ca sa
* faca automat Dispose la iesirea din scope.
*/
using var sender = new ChatClient(HostAddress, HostPort);

Console.WriteLine("Starting the sender!");

try
{
// Pornim clientul si incercam sa ne conectam la server.
sender.Connect();
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
return;
}

Console.WriteLine("Sender ready to chat!");

/* Pornim si cele doua bucle de send si receive asincron ca
* sa putem face trimitere si primire in acelasi timp.
*/
var sendTask = sender.SendMessagesAsync();
var receiveTask = sender.ReceivedMessagesAsync();

sendTask.Wait();
receiveTask.Wait();
}
}

In exemplul dat este important de mentionat ca mai ales in comunicatia peste retea nu ne putem baza nici ca datele la finalul transmiterii sunt integre, adica n-au fost alterate, nici ca conectiunea nu poate sa fie intrerupta. Aici pot fi multe cazuri de situatii de eroare pe care le-am omis pentru a simplifica exemplul dar in practica daca lucrati cu IO de retea trebuie sa le aveti in vedere. Pentru a intelege mai bine problemele ce pot aparea in comunicatia peste retea este bine de urmarit un curs care trateaza doar acest subiect iar ca recomandare de carte ar fi COMPUTER NETWORKS 5th Edition; ANDREW S. TANENBAUM, DAVID J. WETHERALL.

Exercitii

Modificati exemplul dat sa apeleze functiile de citire si scriere asincrone folosind CancellationToken pentru oprirea mai usoara a clientului si a serverului.