.NET: Instrumente pentru lucrul cu multithreading și asincronie. Partea 1

Public articolul original despre Habr, a cărui traducere este postată în corporate postare pe blog.

Necesitatea de a face ceva asincron, fără a aștepta rezultatul aici și acum, sau de a împărți o muncă mare între mai multe unități care îl efectuează, a existat înainte de apariția computerelor. Odată cu apariția lor, această nevoie a devenit foarte tangibilă. Acum, în 2019, scriu acest articol pe un laptop cu un procesor Intel Core cu 8 nuclee, pe care rulează mai mult de o sută de procese în paralel și chiar mai multe fire. În apropiere se află un telefon ușor ponosit, cumpărat acum câțiva ani, are la bord un procesor cu 8 nuclee. Resursele tematice sunt pline de articole și videoclipuri în care autorii lor admiră smartphone-urile emblematice din acest an care dispun de procesoare cu 16 nuclee. MS Azure oferă o mașină virtuală cu un procesor de 20 de nuclee și 128 TB RAM pentru mai puțin de 2 USD/oră. Din păcate, este imposibil să extragi maximul și să valorifici această putere fără a putea gestiona interacțiunea firelor.

terminologie

Proces - Obiect OS, spațiu de adrese izolat, conține fire de execuție.
Fir - un obiect OS, cea mai mică unitate de execuție, parte a unui proces, firele de execuție partajează memoria și alte resurse între ele în cadrul unui proces.
Multifunctional - Proprietatea sistemului de operare, capacitatea de a rula mai multe procese simultan
Multi-core - o proprietate a procesorului, capacitatea de a folosi mai multe nuclee pentru prelucrarea datelor
Multiprocesare - o proprietate a unui computer, capacitatea de a lucra simultan cu mai multe procesoare fizic
Multithreading — o proprietate a unui proces, capacitatea de a distribui procesarea datelor între mai multe fire.
Paralelism - efectuarea mai multor actiuni fizic simultan pe unitatea de timp
Asincronie — executarea unei operațiuni fără a aștepta finalizarea acestei prelucrări; rezultatul execuției poate fi procesat ulterior.

metaforă

Nu toate definițiile sunt bune și unele au nevoie de explicații suplimentare, așa că voi adăuga o metaforă despre gătirea micului dejun la terminologia introdusă oficial. Gătitul micul dejun în această metaforă este un proces.

În timp ce pregăteam micul dejun dimineața, am (Procesor) Vin la bucătărie (calculator). am 2 maini (Miezuri). Există o serie de dispozitive în bucătărie (IO): cuptor, ceainic, prăjitor de pâine, frigider. Dau drumul la gaz, pun o tigaie peste ea si turnam ulei in ea fara sa astept sa se incinga (asincron, Non-Blocking-IO-Wait), scot ouăle din frigider și le sparg într-o farfurie, apoi le bat cu o mână (Firul #1), și al doilea (Firul #2) ținând farfuria (Shared Resource). Acum aș vrea să pornesc ceainic, dar nu am suficiente mâini (Thread Starvation) În acest timp, tigaia se încălzește (Prelucrarea rezultatului) în care torn ce am bătut. Mă întind după ibric și îl aprind și mă uit prostesc cum fierbe apa în el (Blocare-IO-Așteptați), deși în acest timp ar fi putut spăla farfuria în care a biciuit omleta.

Am gatit o omleta folosind doar 2 maini, si nu mai am, dar in acelasi timp, in momentul baterii omletei au avut loc 3 operatii deodata: baterea omletei, tinerea farfurii, incalzirea tigaii. CPU este cea mai rapidă parte a computerului, IO este ceea ce este cel mai adesea totul încetinește, așa că adesea o soluție eficientă este să ocupi CPU cu ceva în timp ce primești date de la IO.

Continuând metafora:

  • Dacă în procesul de pregătire a unei omlete, aș încerca și eu să schimb hainele, acesta ar fi un exemplu de multitasking. O nuanță importantă: computerele sunt mult mai bune la asta decât oamenii.
  • O bucătărie cu mai mulți bucătari, de exemplu într-un restaurant - un computer multi-core.
  • Multe restaurante într-un food court într-un centru comercial - centru de date

Instrumente .NET

.NET este bun la lucrul cu fire de execuție, ca și cu multe alte lucruri. Cu fiecare versiune nouă, introduce din ce în ce mai multe instrumente noi pentru lucrul cu ele, noi straturi de abstractizare peste firele de operare. Când lucrează cu construcția abstracțiilor, dezvoltatorii de cadre folosesc o abordare care lasă oportunitatea, atunci când utilizează o abstracție de nivel înalt, de a coborî unul sau mai multe niveluri mai jos. Cel mai adesea acest lucru nu este necesar, de fapt, deschide ușa pentru a vă împușca în picior cu o pușcă, dar uneori, în cazuri rare, poate fi singura modalitate de a rezolva o problemă care nu este rezolvată la nivelul actual de abstractizare. .

Prin instrumente, mă refer atât la interfețele de programare a aplicațiilor (API) furnizate de framework și de pachetele terțe, cât și la soluții software întregi care simplifică căutarea oricăror probleme legate de codul multi-threaded.

Pornirea unui thread

Clasa Thread este cea mai de bază clasă din .NET pentru lucrul cu fire. Constructorul acceptă unul dintre cei doi delegați:

  • ThreadStart — Fără parametri
  • ParametrizedThreadStart - cu un parametru de tip obiect.

Delegatul va fi executat în firul nou creat după apelarea metodei Start.Dacă un delegat de tip ParametrizedThreadStart a fost trecut la constructor, atunci un obiect trebuie să fie trecut la metoda Start. Acest mecanism este necesar pentru a transfera orice informație locală în flux. Este de remarcat faptul că crearea unui thread este o operațiune costisitoare, iar firul în sine este un obiect greu, cel puțin pentru că alocă 1MB de memorie pe stivă și necesită interacțiunea cu API-ul OS.

new Thread(...).Start(...);

Clasa ThreadPool reprezintă conceptul de pool. În .NET, grupul de fire este o piesă de inginerie, iar dezvoltatorii de la Microsoft au depus mult efort pentru a se asigura că funcționează optim într-o mare varietate de scenarii.

Concept general:

Din momentul în care pornește aplicația, creează mai multe fire în rezervă în fundal și oferă posibilitatea de a le prelua pentru utilizare. Dacă firele de execuție sunt utilizate frecvent și în număr mare, grupul se extinde pentru a satisface nevoile apelantului. Când nu există fire libere în pool la momentul potrivit, fie va aștepta ca unul dintre fire să revină, fie va crea unul nou. Rezultă că pool-ul de fire este excelent pentru unele acțiuni pe termen scurt și prost potrivit pentru operațiuni care rulează ca servicii pe toată durata operațiunii aplicației.

Pentru a utiliza un fir din pool, există o metodă QueueUserWorkItem care acceptă un delegat de tip WaitCallback, care are aceeași semnătură ca ParametrizedThreadStart, iar parametrul transmis îndeplinește aceeași funcție.

ThreadPool.QueueUserWorkItem(...);

Metoda pool-ului de fire mai puțin cunoscută RegisterWaitForSingleObject este utilizată pentru a organiza operațiuni de IO neblocante. Delegatul trecut la această metodă va fi apelat atunci când WaitHandle transmis metodei este „Released”.

ThreadPool.RegisterWaitForSingleObject(...)

.NET are un cronometru de execuție și diferă de cronometrele WinForms/WPF prin faptul că handlerul său va fi apelat pe un fir de execuție preluat din pool.

System.Threading.Timer

Există, de asemenea, o modalitate destul de exotică de a trimite un delegat pentru execuție la un fir din pool - metoda BeginInvoke.

DelegateInstance.BeginInvoke

Aș dori să mă opresc pe scurt asupra funcției la care pot fi numite multe dintre metodele de mai sus - CreateThread din Kernel32.dll Win32 API. Există o modalitate, datorită mecanismului metodelor externe, de a apela această funcție. Am văzut un astfel de apel o singură dată într-un exemplu teribil de cod moștenit, iar motivația autorului care a făcut exact acest lucru rămâne încă un mister pentru mine.

Kernel32.dll CreateThread

Vizualizarea și depanarea thread-urilor

Threadurile create de dvs., toate componentele terțe și pool-ul .NET pot fi vizualizate în fereastra Threads din Visual Studio. Această fereastră va afișa informații despre fir numai atunci când aplicația este în curs de depanare și în modul Break. Aici puteți vizualiza în mod convenabil numele stivei și prioritățile fiecărui thread și puteți comuta depanarea la un anumit thread. Folosind proprietatea Priority a clasei Thread, puteți seta prioritatea unui fir, pe care OC și CLR o vor percepe ca o recomandare atunci când împarte timpul procesorului între fire.

.NET: Instrumente pentru lucrul cu multithreading și asincronie. Partea 1

Biblioteca paralelă de sarcini

Task Parallel Library (TPL) a fost introdusă în .NET 4.0. Acum este standardul și instrumentul principal pentru lucrul cu asincronie. Orice cod care folosește o abordare mai veche este considerat moștenit. Unitatea de bază a TPL este clasa Task din spațiul de nume System.Threading.Tasks. O sarcină este o abstractizare peste un fir. Cu noua versiune a limbajului C#, avem o modalitate elegantă de a lucra cu Tasks - operatori asincron/aștepți. Aceste concepte au făcut posibilă scrierea codului asincron ca și cum ar fi simplu și sincron, acest lucru a făcut posibil chiar și pentru persoanele cu puțină înțelegere a funcționării interne a firelor de execuție să scrie aplicații care le folosesc, aplicații care nu se blochează atunci când efectuează operațiuni lungi. Utilizarea async/wait este un subiect pentru unul sau chiar mai multe articole, dar voi încerca să aflu esența în câteva propoziții:

  • asincron este un modificator al unei metode care returnează Task sau void
  • și await este un operator de așteptare a sarcinii care nu blochează.

Încă o dată: operatorul await, în cazul general (există excepții), va elibera mai departe firul de execuție curent, iar când Sarcina își termină execuția și firul (de fapt, ar fi mai corect să spunem contextul). , dar mai multe despre asta mai târziu) va continua executarea metodei în continuare. În interiorul .NET, acest mecanism este implementat în același mod ca yield return, când metoda scrisă se transformă într-o clasă întreagă, care este o mașină de stări și poate fi executată în bucăți separate în funcție de aceste stări. Oricine este interesat poate scrie orice cod simplu folosind asynс/wait, compila și vizualiza ansamblul folosind JetBrains dotPeek cu codul generat de compilator activat.

Să ne uităm la opțiunile pentru lansarea și utilizarea Task. În exemplul de cod de mai jos, creăm o sarcină nouă care nu face nimic util (Thread.Sleep (10000)), dar în viața reală, aceasta ar trebui să fie o muncă complexă, care necesită mult CPU.

using TCO = System.Threading.Tasks.TaskCreationOptions;

public static async void VoidAsyncMethod() {
    var cancellationSource = new CancellationTokenSource();

    await Task.Factory.StartNew(
        // Code of action will be executed on other context
        () => Thread.Sleep(10000),
        cancellationSource.Token,
        TCO.LongRunning | TCO.AttachedToParent | TCO.PreferFairness,
        scheduler
    );

    //  Code after await will be executed on captured context
}

O sarcină este creată cu un număr de opțiuni:

  • LongRunning este un indiciu că sarcina nu va fi finalizată rapid, ceea ce înseamnă că ar putea merita să luați în considerare să nu luați un fir din pool, ci să creați unul separat pentru această sarcină pentru a nu dăuna altora.
  • AttachedToParent - Sarcinile pot fi aranjate într-o ierarhie. Dacă a fost folosită această opțiune, atunci Sarcina poate fi într-o stare în care ea însăși a fost finalizată și așteaptă execuția copiilor săi.
  • PreferFairness - înseamnă că ar fi mai bine să executați Sarcinile trimise pentru execuție mai devreme înainte de cele trimise mai târziu. Dar aceasta este doar o recomandare și rezultatele nu sunt garantate.

Al doilea parametru transmis metodei este CancellationToken. Pentru a gestiona corect anularea unei operațiuni după ce aceasta a început, codul executat trebuie să fie completat cu verificări pentru starea CancellationToken. Dacă nu există verificări, atunci metoda Cancel apelată pe obiectul CancellationTokenSource va putea opri execuția Sarcinii numai înainte de a începe.

Ultimul parametru este un obiect de planificare de tip TaskScheduler. Această clasă și descendenții ei sunt concepute pentru a controla strategiile de distribuire a sarcinilor pe fire de execuție; în mod implicit, sarcina va fi executată pe un fir aleatoriu din grup.

Operatorul await este aplicat sarcinii create, ceea ce înseamnă că codul scris după el, dacă există unul, va fi executat în același context (adesea asta înseamnă pe același fir) ca și codul înainte de await.

Metoda este marcată ca async void, ceea ce înseamnă că poate folosi operatorul await, dar codul de apelare nu va putea aștepta execuția. Dacă o astfel de caracteristică este necesară, atunci metoda trebuie să returneze Task. Metodele marcate ca async void sunt destul de comune: de regulă, acestea sunt handlere de evenimente sau alte metode care funcționează pe principiul foc și uitare. Dacă trebuie nu numai să oferiți posibilitatea de a aștepta până la sfârșitul execuției, ci și să returnați rezultatul, atunci trebuie să utilizați Task.

Pe Task-ul pe care metoda StartNew a returnat-o, precum și pe oricare alta, puteți apela metoda ConfigureAwait cu parametrul fals, apoi execuția după await va continua nu pe contextul capturat, ci pe unul arbitrar. Acest lucru ar trebui făcut întotdeauna atunci când contextul de execuție nu este important pentru codul după așteptare. Aceasta este, de asemenea, o recomandare de la MS atunci când scrieți codul care va fi livrat ambalat într-o bibliotecă.

Să ne oprim puțin mai mult asupra modului în care puteți aștepta finalizarea unei sarcini. Mai jos este un exemplu de cod, cu comentarii despre când așteptarea este făcută condiționat bine și când este făcută condiționat în mod prost.

public static async void AnotherMethod() {

    int result = await AsyncMethod(); // good

    result = AsyncMethod().Result; // bad

    AsyncMethod().Wait(); // bad

    IEnumerable<Task> tasks = new Task[] {
        AsyncMethod(), OtherAsyncMethod()
    };

    await Task.WhenAll(tasks); // good
    await Task.WhenAny(tasks); // good

    Task.WaitAll(tasks.ToArray()); // bad
}

În primul exemplu, așteptăm ca sarcina să se finalizeze fără a bloca firul de apelare; vom reveni la procesarea rezultatului doar atunci când acesta este deja acolo; până atunci, firul de apelare este lăsat pe propriile dispozitive.

În a doua opțiune, blocăm firul de apel până când rezultatul metodei este calculat. Acest lucru este rău nu numai pentru că am ocupat un fir, o resursă atât de valoroasă a programului, cu o simplă inactivitate, ci și pentru că dacă codul metodei pe care o apelăm îl conține așteaptă, iar contextul de sincronizare necesită revenirea la firul de apel după așteaptă, atunci vom obține un blocaj: firul apelant așteaptă ca rezultatul metodei asincrone să fie calculat, metoda asincronă încearcă în zadar să-și continue execuția în firul apelant.

Un alt dezavantaj al acestei abordări este gestionarea complicată a erorilor. Faptul este că erorile din codul asincron atunci când se utilizează async/wait sunt foarte ușor de gestionat - se comportă la fel ca și cum codul ar fi sincron. În timp ce dacă aplicăm exorcismul de așteptare sincron unei sarcini, excepția originală se transformă într-o AggregateException, de exemplu. Pentru a gestiona excepția, va trebui să examinați tipul InnerException și să scrieți un lanț if în interiorul unui bloc catch sau să utilizați catch when construct, în loc de lanțul de blocuri catch care este mai familiar în lumea C#.

Cel de-al treilea și ultimul exemple sunt, de asemenea, marcate ca rău din același motiv și conțin toate aceleași probleme.

Metodele WhenAny și WhenAll sunt extrem de convenabile pentru așteptarea unui grup de Sarcini; ele împachetează un grup de Sarcini într-una, care se va declanșa fie atunci când o Sarcină din grup este declanșată pentru prima dată, fie când toate și-au încheiat execuția.

Oprirea firelor

Din diverse motive, poate fi necesar să opriți fluxul după ce acesta a început. Există o serie de moduri de a face acest lucru. Clasa Thread are două metode denumite corespunzător: avorta и Întrerupe. Primul nu este foarte recomandat pentru utilizare, deoarece după apelarea acestuia în orice moment aleatoriu, în timpul procesării oricărei instrucțiuni, se va arunca o excepție ThreadAbortedException. Nu vă așteptați ca o astfel de excepție să fie aruncată atunci când incrementați orice variabilă întreagă, nu? Și atunci când utilizați această metodă, aceasta este o situație foarte reală. Dacă trebuie să împiedicați CLR să genereze o astfel de excepție într-o anumită secțiune de cod, o puteți include în apeluri Thread.BeginCriticalRegion, Thread.EndCriticalRegion. Orice cod scris într-un bloc finally este învelit în astfel de apeluri. Din acest motiv, în profunzimile codului cadru puteți găsi blocuri cu o încercare goală, dar nu cu un final gol. Microsoft descurajează atât de mult această metodă încât nu a inclus-o în .net core.

Metoda de întrerupere funcționează mai previzibil. Poate întrerupe firul cu o excepție ThreadInterruptedException numai în acele momente când firul este în stare de așteptare. Intră în această stare în timp ce se suspendă în timp ce așteaptă WaitHandle, blocare sau după apelarea Thread.Sleep.

Ambele opțiuni descrise mai sus sunt proaste din cauza impredictibilității lor. Soluția este folosirea unei structuri Token de anulare si clasa CancellationTokenSource. Ideea este aceasta: este creată o instanță a clasei CancellationTokenSource și numai cel care o deține poate opri operația apelând metoda Anulare. Numai CancellationToken este transmis operației în sine. Proprietarii de CancellationToken nu pot anula singuri operațiunea, ci pot verifica doar dacă operațiunea a fost anulată. Există o proprietate booleană pentru aceasta IsCancellationRequested si metoda ThrowIfCancelRequested. Acesta din urmă va face o excepție TaskCancelledException dacă metoda Cancel a fost apelată pe instanța CancellationToken care este parrodata. Și aceasta este metoda pe care o recomand să o folosesc. Aceasta este o îmbunătățire față de opțiunile anterioare prin obținerea controlului total asupra momentului în care o operație de excepție poate fi anulată.

Cea mai brutală opțiune pentru oprirea unui thread este apelarea funcției Win32 API TerminateThread. Comportamentul CLR după apelarea acestei funcții poate fi imprevizibil. Pe MSDN se scrie următoarele despre această funcție: „TerminateThread este o funcție periculoasă care ar trebui folosită numai în cazurile cele mai extreme. „

Conversia API-ului moștenit în Task Based folosind metoda FromAsync

Dacă aveți norocul să lucrați la un proiect care a început după introducerea sarcinilor și a încetat să provoace groază liniștită pentru majoritatea dezvoltatorilor, atunci nu va trebui să aveți de-a face cu multe API-uri vechi, atât cu cele terțe, cât și cu cele ale echipei dvs. a torturat în trecut. Din fericire, echipa .NET Framework a avut grijă de noi, deși poate scopul era să avem grijă de noi. Oricum ar fi, .NET are o serie de instrumente pentru convertirea fără durere a codului scris în abordări vechi de programare asincronă la cel nou. Una dintre ele este metoda FromAsync a TaskFactory. În exemplul de cod de mai jos, împachetez vechile metode asincrone ale clasei WebRequest într-o sarcină folosind această metodă.

object state = null;
WebRequest wr = WebRequest.CreateHttp("http://github.com");
await Task.Factory.FromAsync(
    wr.BeginGetResponse,
    we.EndGetResponse
);

Acesta este doar un exemplu și este puțin probabil să fiți nevoit să faceți acest lucru cu tipurile încorporate, dar orice proiect vechi este pur și simplu plin de metode BeginDoSomething care returnează metodele IAsyncResult și EndDoSomething care îl primesc.

Convertiți API-ul moștenit în Task Based folosind clasa TaskCompletionSource

Un alt instrument important de luat în considerare este clasa TaskCompletionSource. În ceea ce privește funcțiile, scopul și principiul de funcționare, poate amintește oarecum de metoda RegisterWaitForSingleObject a clasei ThreadPool, despre care am scris mai sus. Folosind această clasă, puteți include ușor și convenabil vechile API-uri asincrone în Tasks.

Veți spune că am vorbit deja despre metoda FromAsync a clasei TaskFactory destinată acestor scopuri. Aici va trebui să ne amintim întreaga istorie a dezvoltării modelelor asincrone în .net pe care Microsoft le-a oferit în ultimii 15 ani: înainte de Task-Based Asynchronous Pattern (TAP), a existat Asynchronous Programming Pattern (APP), care era despre metode ÎncepeFă ceva care se întoarce IAsyncResult si metode SfârşitFaceți ceva care o acceptă și pentru moștenirea acestor ani, metoda FromAsync este pur și simplu perfectă, dar de-a lungul timpului, a fost înlocuită cu modelul asincron bazat pe evenimente (SI AP), care presupunea că un eveniment va fi declanșat la finalizarea operației asincrone.

TaskCompletionSource este perfect pentru împachetarea sarcinilor și a API-urilor vechi construite în jurul modelului de eveniment. Esența muncii sale este următoarea: un obiect din această clasă are o proprietate publică de tip Task, a cărei stare poate fi controlată prin metodele SetResult, SetException etc. ale clasei TaskCompletionSource. În locurile în care operatorul await a fost aplicat acestei sarcini, acesta va fi executat sau eșuează cu o excepție, în funcție de metoda aplicată la TaskCompletionSource. Dacă încă nu este clar, să ne uităm la acest exemplu de cod, în care un vechi API EAP este înfășurat într-o sarcină folosind o sursă TaskCompletionSource: atunci când evenimentul se declanșează, sarcina va fi plasată în starea Completed și metoda care a aplicat operatorul await la această Sarcină își va relua execuția după ce a primit obiectul rezultat.

public static Task<Result> DoAsync(this SomeApiInstance someApiObj) {

    var completionSource = new TaskCompletionSource<Result>();
    someApiObj.Done += 
        result => completionSource.SetResult(result);
    someApiObj.Do();

    result completionSource.Task;
}

TaskCompletionSource Sfaturi și trucuri

Încheierea API-urilor vechi nu este tot ceea ce se poate face folosind TaskCompletionSource. Utilizarea acestei clase deschide o posibilitate interesantă de a proiecta diverse API-uri pe sarcini care nu ocupă fire. Și fluxul, așa cum ne amintim, este o resursă costisitoare, iar numărul lor este limitat (în principal de cantitatea de RAM). Această limitare poate fi atinsă cu ușurință prin dezvoltarea, de exemplu, a unei aplicații web încărcate cu o logică de afaceri complexă. Să luăm în considerare posibilitățile despre care vorbesc atunci când implementez un astfel de truc precum Long-Polling.

Pe scurt, esența trucului este următoarea: trebuie să primiți informații de la API despre unele evenimente care au loc pe partea sa, în timp ce API-ul, din anumite motive, nu poate raporta evenimentul, ci poate returna doar starea. Un exemplu dintre acestea sunt toate API-urile construite pe HTTP înainte de vremurile WebSocket sau când era imposibil din anumite motive să utilizați această tehnologie. Clientul poate cere serverului HTTP. Serverul HTTP nu poate iniția el însuși comunicarea cu clientul. O soluție simplă este să interoghezi serverul folosind un temporizator, dar acest lucru creează o încărcare suplimentară pe server și o întârziere suplimentară în medie TimerInterval / 2. Pentru a ocoli acest lucru, a fost inventat un truc numit Long Polling, care implică întârzierea răspunsului de la server până când expiră Timeout sau va avea loc un eveniment. Dacă evenimentul a avut loc, atunci acesta este procesat, dacă nu, atunci cererea este trimisă din nou.

while(!eventOccures && !timeoutExceeded)  {

  CheckTimout();
  CheckEvent();
  Thread.Sleep(1);
}

Dar o astfel de soluție se va dovedi teribilă de îndată ce numărul clienților care așteaptă evenimentul crește, pentru că... Fiecare astfel de client ocupă un fir întreg în așteptarea unui eveniment. Da, și primim o întârziere suplimentară de 1 ms atunci când evenimentul este declanșat, cel mai adesea acest lucru nu este semnificativ, dar de ce să facem software-ul mai rău decât poate fi? Dacă eliminăm Thread.Sleep(1), atunci degeaba vom încărca un nucleu de procesor 100% inactiv, rotindu-se într-un ciclu inutil. Folosind TaskCompletionSource, puteți reface cu ușurință acest cod și puteți rezolva toate problemele identificate mai sus:

class LongPollingApi {

    private Dictionary<int, TaskCompletionSource<Msg>> tasks;

    public async Task<Msg> AcceptMessageAsync(int userId, int duration) {

        var cs = new TaskCompletionSource<Msg>();
        tasks[userId] = cs;
        await Task.WhenAny(Task.Delay(duration), cs.Task);
        return cs.Task.IsCompleted ? cs.Task.Result : null;
    }

    public void SendMessage(int userId, Msg m) {

        if (tasks.TryGetValue(userId, out var completionSource))
            completionSource.SetResult(m);
    }
}

Acest cod nu este pregătit pentru producție, ci doar o demonstrație. Pentru a-l folosi în cazuri reale, trebuie, de asemenea, cel puțin să gestionați situația când un mesaj sosește într-un moment în care nimeni nu se așteaptă la el: în acest caz, metoda AsseptMessageAsync ar trebui să returneze o sarcină deja finalizată. Dacă acesta este cel mai frecvent caz, atunci vă puteți gândi la utilizarea ValueTask.

Când primim o solicitare pentru un mesaj, creăm și plasăm o TaskCompletionSource în dicționar, apoi așteptăm ce se întâmplă mai întâi: intervalul de timp specificat expiră sau este primit un mesaj.

ValueTask: de ce și cum

Operatorii async/wait, precum operatorul yield return, generează o mașină de stări din metodă, iar aceasta este crearea unui nou obiect, care aproape întotdeauna nu este important, dar în cazuri rare poate crea o problemă. Acest caz poate fi o metodă care se numește cu adevărat des, vorbim despre zeci și sute de mii de apeluri pe secundă. Dacă o astfel de metodă este scrisă în așa fel încât, în cele mai multe cazuri, returnează un rezultat ocolind toate metodele de așteptare, atunci .NET oferă un instrument pentru a optimiza acest lucru - structura ValueTask. Pentru a fi clar, să ne uităm la un exemplu de utilizare: există un cache la care mergem foarte des. Există niște valori în el și apoi pur și simplu le returnăm; dacă nu, atunci mergem la niște IO lente pentru a le obține. Vreau să fac aceasta din urmă asincron, ceea ce înseamnă că întreaga metodă se dovedește a fi asincronă. Astfel, modalitatea evidentă de a scrie metoda este următoarea:

public async Task<string> GetById(int id) {

    if (cache.TryGetValue(id, out string val))
        return val;
    return await RequestById(id);
}

Din cauza dorinței de a optimiza puțin și a unei ușoare frici de ceea ce va genera Roslyn la compilarea acestui cod, puteți rescrie acest exemplu după cum urmează:

public Task<string> GetById(int id) {

    if (cache.TryGetValue(id, out string val))
        return Task.FromResult(val);
    return RequestById(id);
}

Într-adevăr, soluția optimă în acest caz ar fi optimizarea hot-path-ului, și anume, obținerea unei valori din dicționar fără alocări inutile și încărcare pe GC, în timp ce în acele cazuri rare când mai trebuie să mergem la IO pentru date. , totul va rămâne un plus/minus vechiul mod:

public ValueTask<string> GetById(int id) {

    if (cache.TryGetValue(id, out string val))
        return new ValueTask<string>(val);
    return new ValueTask<string>(RequestById(id));
}

Să aruncăm o privire mai atentă la această bucată de cod: dacă există o valoare în cache, creăm o structură, altfel sarcina reală va fi înfășurată într-una cu sens. Codului de apelare nu îi pasă în ce cale a fost executat acest cod: ValueTask, din punct de vedere al sintaxei C#, se va comporta la fel ca o sarcină obișnuită în acest caz.

TaskSchedulers: gestionarea strategiilor de lansare a sarcinilor

Următorul API pe care aș dori să-l iau în considerare este clasa task Scheduler și derivatele sale. Am menționat deja mai sus că TPL are capacitatea de a gestiona strategii de distribuire a sarcinilor pe fire. Astfel de strategii sunt definite în descendenții clasei TaskScheduler. Aproape orice strategie de care ai putea avea nevoie poate fi găsită în bibliotecă. ParallelExtensionsExtras, dezvoltat de Microsoft, dar nu face parte din .NET, dar este furnizat ca pachet Nuget. Să ne uităm pe scurt la unele dintre ele:

  • CurrentThreadTaskScheduler — execută Sarcini pe firul curent
  • LimitedConcurrencyLevelTaskScheduler — limitează numărul de sarcini executate simultan de parametrul N, care este acceptat în constructor
  • OrderedTaskScheduler — este definit ca LimitedConcurrencyLevelTaskScheduler(1), astfel încât sarcinile vor fi executate secvenţial.
  • WorkStealingTaskScheduler - unelte furtul de muncă abordare a distribuirii sarcinilor. În esență, este un ThreadPool separat. Rezolvă problema că în .NET ThreadPool este o clasă statică, una pentru toate aplicațiile, ceea ce înseamnă că supraîncărcarea sau utilizarea incorectă a acesteia într-o parte a programului poate duce la efecte secundare în alta. În plus, este extrem de dificil de înțeles cauza unor astfel de defecte. Acea. Poate fi necesar să folosiți WorkStealingTaskSchedulers separat în părți ale programului în care utilizarea ThreadPool poate fi agresivă și imprevizibilă.
  • QueuedTaskScheduler — vă permite să efectuați sarcini în conformitate cu regulile de coadă de prioritate
  • ThreadPerTaskScheduler — creează un fir separat pentru fiecare Sarcină care este executată pe acesta. Poate fi util pentru sarcini care durează un timp imprevizibil de lung pentru a fi finalizate.

Există un bun detaliat articol despre TaskSchedulers pe blogul Microsoft.

Pentru o depanare convenabilă a tot ceea ce are legătură cu Sarcini, Visual Studio are o fereastră Sarcini. În această fereastră puteți vedea starea curentă a sarcinii și puteți sări la linia de cod care se execută în prezent.

.NET: Instrumente pentru lucrul cu multithreading și asincronie. Partea 1

PLinq și clasa Parallel

Pe lângă Sarcini și tot ce s-a spus despre ele, mai există două instrumente interesante în .NET: PLinq (Linq2Parallel) și clasa Parallel. Primul promite execuția paralelă a tuturor operațiunilor Linq pe mai multe fire. Numărul de fire poate fi configurat folosind metoda de extensie WithDegreeOfParallelism. Din păcate, cel mai adesea PLinq în modul său implicit nu are suficiente informații despre elementele interne ale sursei dvs. de date pentru a oferi un câștig semnificativ de viteză, pe de altă parte, costul încercării este foarte mic: trebuie doar să apelați metoda AsParallel înainte lanțul de metode Linq și executați teste de performanță. Mai mult, este posibil să transmiteți informații suplimentare către PLinq despre natura sursei dvs. de date folosind mecanismul Partiții. Puteți citi mai multe aici и aici.

Clasa statică Parallel oferă metode de iterare printr-o colecție Foreach în paralel, de executare a unei bucle For și de executare a mai multor delegați în paralel Invoke. Execuția firului curent va fi oprită până la finalizarea calculelor. Numărul de fire poate fi configurat prin trecerea ParallelOptions ca ultim argument. De asemenea, puteți specifica TaskScheduler și CancellationToken folosind opțiuni.

Constatări

Când am început să scriu acest articol pe baza materialelor raportului meu și a informațiilor pe care le-am colectat în timpul muncii mele după el, nu mă așteptam să fie atât de mult. Acum, când editorul de text în care scriu acest articol îmi va spune cu reproș că pagina 15 a trecut, voi rezuma rezultatele intermediare. Alte trucuri, API-uri, instrumente vizuale și capcane vor fi tratate în următorul articol.

Concluzii:

  • Trebuie să cunoașteți instrumentele de lucru cu fire, asincronie și paralelism pentru a utiliza resursele PC-urilor moderne.
  • .NET are multe instrumente diferite pentru aceste scopuri
  • Nu toate au apărut simultan, așa că deseori le puteți găsi pe cele vechi, cu toate acestea, există modalități de a converti vechile API-uri fără prea mult efort.
  • Lucrul cu fire de execuție în .NET este reprezentat de clasele Thread și ThreadPool
  • Metodele Thread.Abort, Thread.Interrupt și Win32 API TerminateThread sunt periculoase și nu sunt recomandate pentru utilizare. În schimb, este mai bine să utilizați mecanismul CancellationToken
  • Fluxul este o resursă valoroasă și aprovizionarea sa este limitată. Ar trebui evitate situațiile în care firele sunt ocupate în așteptarea evenimentelor. Pentru aceasta este convenabil să folosiți clasa TaskCompletionSource
  • Cele mai puternice și avansate instrumente .NET pentru lucrul cu paralelism și asincronie sunt Tasks.
  • Operatorii c# async/wait implementează conceptul de așteptare non-blocante
  • Puteți controla distribuția sarcinilor pe fire folosind clasele derivate din TaskScheduler
  • Structura ValueTask poate fi utilă în optimizarea căilor calde și a traficului de memorie
  • Ferestrele Tasks și Threads ale Visual Studio oferă o mulțime de informații utile pentru depanarea codului multi-thread sau asincron
  • PLinq este un instrument grozav, dar este posibil să nu aibă suficiente informații despre sursa dvs. de date, dar acest lucru poate fi rezolvat folosind mecanismul de partiționare
  • Pentru a fi continuat ...

Sursa: www.habr.com

Adauga un comentariu