Elemente de bază ale aplicațiilor distribuite. Prima abordare

Elemente de bază ale aplicațiilor distribuite. Prima abordare

În trecut articol Am examinat bazele teoretice ale arhitecturii reactive. Este timpul să vorbim despre fluxuri de date, modalități de implementare a sistemelor reactive Erlang/Elixir și modele de mesagerie în ele:

  • Cere raspuns
  • Cerere-Răspuns fragmentat
  • Răspuns cu Solicitare
  • Publicare-abonare
  • Publicare-abonare inversată
  • Distribuirea sarcinilor

SOA, MSA și mesagerie

SOA, MSA sunt arhitecturi de sistem care definesc regulile pentru construirea sistemelor, în timp ce mesageria oferă primitive pentru implementarea lor.

Nu vreau să promovez cutare sau cutare arhitectură de sistem. Sunt pentru utilizarea celor mai eficiente și utile practici pentru un anumit proiect și afacere. Indiferent de paradigma pe care o alegem, este mai bine să creăm blocuri de sistem cu un ochi pe calea Unix: componente cu conectivitate minimă, responsabile pentru entitățile individuale. Metodele API efectuează cele mai simple acțiuni posibile cu entități.

Mesageria este, după cum sugerează și numele, un broker de mesaje. Scopul său principal este de a primi și trimite mesaje. Este responsabil pentru interfețele pentru transmiterea informațiilor, formarea canalelor logice pentru transmiterea informațiilor în cadrul sistemului, rutare și echilibrare, precum și tratarea defecțiunilor la nivel de sistem.
Mesajele pe care le dezvoltăm nu încearcă să concureze sau să înlocuiască rabbitmq. Principalele sale caracteristici:

  • Distributie.
    Punctele de schimb pot fi create pe toate nodurile clusterului, cât mai aproape de codul care le folosește.
  • Simplitate.
    Concentrați-vă pe minimizarea codului standard și ușurința în utilizare.
  • Performanță mai bună.
    Nu încercăm să repetăm ​​funcționalitatea rabbitmq, ci evidențiem doar stratul arhitectural și de transport, pe care îl încadram în OTP cât mai simplu posibil, minimizând costurile.
  • Flexibilitate.
    Fiecare serviciu poate combina mai multe șabloane de schimb.
  • Reziliență prin proiectare.
  • Scalabilitate.
    Mesageria crește odată cu aplicația. Pe măsură ce sarcina crește, puteți muta punctele de schimb pe mașini individuale.

Notă. În ceea ce privește organizarea codului, meta-proiectele sunt potrivite pentru sisteme complexe Erlang/Elixir. Tot codul de proiect este situat într-un singur depozit - un proiect umbrelă. În același timp, microserviciile sunt izolate maxim și efectuează operațiuni simple care sunt responsabile pentru o entitate separată. Cu această abordare, este ușor să mențineți API-ul întregului sistem, este ușor să faceți modificări, este convenabil să scrieți teste de unitate și de integrare.

Componentele sistemului interacționează direct sau prin intermediul unui broker. Din perspectiva mesageriei, fiecare serviciu are mai multe faze de viață:

  • Inițializarea serviciului.
    În această etapă, procesul și dependențele care execută serviciul sunt configurate și lansate.
  • Crearea unui punct de schimb.
    Serviciul poate folosi un punct de schimb static specificat în configurația nodului sau poate crea puncte de schimb în mod dinamic.
  • Înregistrarea serviciului.
    Pentru ca serviciul să poată servi cererile, acesta trebuie înregistrat la punctul de schimb.
  • Funcționare normală.
    Serviciul produce muncă utilă.
  • Finalizarea lucrării.
    Există 2 tipuri de oprire posibile: normală și de urgență. În timpul funcționării normale, serviciul este deconectat de la punctul de schimb și se oprește. În situații de urgență, mesageria execută unul dintre scripturile de failover.

Pare destul de complicat, dar codul nu este chiar atât de înfricoșător. Exemple de coduri cu comentarii vor fi date în analiza șabloanelor puțin mai târziu.

Platforme de tranzacţionare

Punctul de schimb este un proces de mesagerie care implementează logica interacțiunii cu componentele din șablonul de mesagerie. În toate exemplele prezentate mai jos, componentele interacționează prin puncte de schimb, a căror combinație formează mesaje.

Modele de schimb de mesaje (MEP)

La nivel global, modelele de schimb pot fi împărțite în două sensuri și unidirecționale. Primele implică un răspuns la un mesaj primit, cele din urmă nu. Un exemplu clasic de model bidirecțional în arhitectura client-server este modelul Cerere-răspuns. Să ne uităm la șablon și la modificările acestuia.

Cerere-răspuns sau RPC

RPC este folosit atunci când trebuie să primim un răspuns de la un alt proces. Acest proces poate rula pe același nod sau situat pe un continent diferit. Mai jos este o diagramă a interacțiunii dintre client și server prin mesagerie.

Elemente de bază ale aplicațiilor distribuite. Prima abordare

Deoarece mesageria este complet asincronă, pentru client schimbul este împărțit în 2 faze:

  1. Se trimite cererea

    messaging:request(Exchange, ResponseMatchingTag, RequestDefinition, HandlerProcess).

    schimb ‒ denumirea unică a punctului de schimb
    ResponseMatchingTag ‒ etichetă locală pentru procesarea răspunsului. De exemplu, în cazul trimiterii mai multor solicitări identice aparținând unor utilizatori diferiți.
    CerereDefiniție - organismul cererii
    HandlerProcess ‒ PID-ul handlerului. Acest proces va primi un răspuns de la server.

  2. Procesarea răspunsului

    handle_info(#'$msg'{exchange = EXCHANGE, tag = ResponseMatchingTag,message = ResponsePayload}, State)

    Sarcină utilă de răspuns - răspunsul serverului.

Pentru server, procesul constă și din 2 faze:

  1. Inițializarea punctului de schimb
  2. Procesarea cererilor primite

Să ilustrăm acest șablon cu cod. Să presupunem că trebuie să implementăm un serviciu simplu care să ofere o singură metodă exactă de timp.

Cod server

Să definim API-ul serviciului în api.hrl:

%% =====================================================
%%  entities
%% =====================================================
-record(time, {
  unixtime :: non_neg_integer(),
  datetime :: binary()
}).

-record(time_error, {
  code :: non_neg_integer(),
  error :: term()
}).

%% =====================================================
%%  methods
%% =====================================================
-record(time_req, {
  opts :: term()
}).
-record(time_resp, {
  result :: #time{} | #time_error{}
}).

Să definim controlerul de serviciu în time_controller.erl

%% В примере показан только значимый код. Вставив его в шаблон gen_server можно получить рабочий сервис.

%% инициализация gen_server
init(Args) ->
  %% подключение к точке обмена
  messaging:monitor_exchange(req_resp, ?EXCHANGE, default, self())
  {ok, #{}}.

%% обработка события потери связи с точкой обмена. Это же событие приходит, если точка обмена еще не запустилась.
handle_info(#exchange_die{exchange = ?EXCHANGE}, State) ->
  erlang:send(self(), monitor_exchange),
  {noreply, State};

%% обработка API
handle_info(#time_req{opts = _Opts}, State) ->
  messaging:response_once(Client, #time_resp{
result = #time{ unixtime = time_utils:unixtime(now()), datetime = time_utils:iso8601_fmt(now())}
  });
  {noreply, State};

%% завершение работы gen_server
terminate(_Reason, _State) ->
  messaging:demonitor_exchange(req_resp, ?EXCHANGE, default, self()),
  ok.

Cod client

Pentru a trimite o solicitare către serviciu, puteți apela API-ul de solicitare de mesagerie oriunde în client:

case messaging:request(?EXCHANGE, tag, #time_req{opts = #{}}, self()) of
    ok -> ok;
    _ -> %% repeat or fail logic
end

Într-un sistem distribuit, configurația componentelor poate fi foarte diferită și, în momentul solicitării, este posibil ca mesajele să nu pornească încă sau controlorul de serviciu nu va fi pregătit să dea serviciul cererii. Prin urmare, trebuie să verificăm răspunsul la mesagerie și să gestionăm cazul de eșec.
După trimiterea cu succes, clientul va primi un răspuns sau o eroare de la serviciu.
Să ne ocupăm de ambele cazuri în handle_info:

handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time{unixtime = Utime}}}, State) ->
  ?debugVal(Utime),
  {noreply, State};

handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time_error{code = ErrorCode}}}, State) ->
  ?debugVal({error, ErrorCode}),
  {noreply, State};

Cerere-Răspuns fragmentat

Cel mai bine este să evitați trimiterea de mesaje uriașe. De aceasta depinde capacitatea de răspuns și funcționarea stabilă a întregului sistem. Dacă răspunsul la o interogare ocupă multă memorie, atunci împărțirea lui în părți este obligatorie.

Elemente de bază ale aplicațiilor distribuite. Prima abordare

Permiteți-mi să vă dau câteva exemple de astfel de cazuri:

  • Componentele fac schimb de date binare, cum ar fi fișiere. Împărțirea răspunsului în părți mici vă ajută să lucrați eficient cu fișiere de orice dimensiune și să evitați depășirea memoriei.
  • Listări. De exemplu, trebuie să selectăm toate înregistrările dintr-un tabel imens din baza de date și să le transferăm într-o altă componentă.

Eu numesc aceste răspunsuri locomotivă. În orice caz, 1024 de mesaje de 1 MB sunt mai bune decât un singur mesaj de 1 GB.

În clusterul Erlang, obținem un beneficiu suplimentar - reducerea sarcinii asupra punctului de schimb și a rețelei, deoarece răspunsurile sunt trimise imediat destinatarului, ocolind punctul de schimb.

Răspuns cu Solicitare

Aceasta este o modificare destul de rară a modelului RPC pentru construirea sistemelor de dialog.

Elemente de bază ale aplicațiilor distribuite. Prima abordare

Publicare-abonare (arborele de distribuție a datelor)

Sistemele bazate pe evenimente le livrează consumatorilor de îndată ce datele sunt gata. Astfel, sistemele sunt mai predispuse la un model push decât la un model pull sau poll. Această caracteristică vă permite să evitați irosirea resurselor prin solicitarea și așteptarea constantă a datelor.
Figura prezintă procesul de distribuire a unui mesaj către consumatorii abonați la un anumit subiect.

Elemente de bază ale aplicațiilor distribuite. Prima abordare

Exemple clasice de utilizare a acestui tipar sunt distribuția statului: lumea jocului în jocurile pe calculator, datele pieței despre schimburi, informații utile în fluxurile de date.

Să ne uităm la codul de abonat:

init(_Args) ->
  %% подписываемся на обменник, ключ = key
  messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
  {ok, #{}}.

handle_info(#exchange_die{exchange = ?SUBSCRIPTION}, State) ->
  %% если точка обмена недоступна, то пытаемся переподключиться
  messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
  {noreply, State};

%% обрабатываем пришедшие сообщения
handle_info(#'$msg'{exchange = ?SUBSCRIPTION, message = Msg}, State) ->
  ?debugVal(Msg),
  {noreply, State};

%% при остановке потребителя - отключаемся от точки обмена
terminate(_Reason, _State) ->
  messaging:unsubscribe(?SUBSCRIPTION, key, tag, self()),
  ok.

Sursa poate apela funcția pentru a publica un mesaj în orice loc convenabil:

messaging:publish_message(Exchange, Key, Message).

schimb - numele punctului de schimb,
Cheie - cheie de rutare
Mesaj - sarcina utila

Publicare-abonare inversată

Elemente de bază ale aplicațiilor distribuite. Prima abordare

Prin extinderea pub-sub, puteți obține un model convenabil pentru înregistrare. Setul de surse și consumatori poate fi complet diferit. Figura prezintă un caz cu un consumator și mai multe surse.

Model de distribuție a sarcinilor

Aproape fiecare proiect implică sarcini de procesare amânată, cum ar fi generarea de rapoarte, livrarea de notificări și preluarea datelor de la sisteme terțe. Debitul sistemului care realizează aceste sarcini poate fi ușor de scalat prin adăugarea de handler. Tot ce ne rămâne este să formăm un grup de procesoare și să distribuim uniform sarcinile între ele.

Să ne uităm la situațiile care apar folosind exemplul celor 3 handleri. Chiar și în etapa de distribuție a sarcinilor, se pune problema echității distribuției și a depășirii operatorilor. Distribuția „round-robin” va fi responsabilă de corectitudine, iar pentru a evita o situație de debordare a operatorilor, vom introduce o restricție prefetch_limit. În condiții tranzitorii prefetch_limit va împiedica un singur handler să primească toate sarcinile.

Mesageria gestionează cozile și prioritatea de procesare. Procesoarele primesc sarcini pe măsură ce ajung. Sarcina se poate finaliza cu succes sau eșuează:

  • messaging:ack(Tack) - apelat dacă mesajul este procesat cu succes
  • messaging:nack(Tack) - sunat in toate situatiile de urgenta. Odată ce sarcina este returnată, mesageria o va transmite altui handler.

Elemente de bază ale aplicațiilor distribuite. Prima abordare

Să presupunem că a avut loc o defecțiune complexă în timpul procesării a trei sarcini: procesorul 1, după primirea sarcinii, s-a prăbușit fără a avea timp să raporteze nimic la punctul de schimb. În acest caz, punctul de schimb va transfera sarcina unui alt handler după expirarea timpului de confirmare. Din anumite motive, handlerul 3 a abandonat sarcina și a trimis nack; ca urmare, sarcina a fost transferată și unui alt handler care a finalizat-o cu succes.

Rezumat preliminar

Am acoperit blocurile de bază ale sistemelor distribuite și am obținut o înțelegere de bază a utilizării lor în Erlang/Elixir.

Combinând modele de bază, puteți construi paradigme complexe pentru a rezolva problemele emergente.

În partea finală a seriei, vom analiza problemele generale de organizare a serviciilor, rutare și echilibrare și, de asemenea, vom vorbi despre partea practică a scalabilității și toleranței la erori a sistemelor.

Sfârșitul celei de-a doua părți.

fotografie Marius Christensen
Ilustrații pregătite folosind websequencediagrams.com

Sursa: www.habr.com

Adauga un comentariu