W tym artykule przyjrzymy się tajnikom reaktora we/wy i jego działaniu, napiszemy implementację w mniej niż 200 liniach kodu i sprawimy, że prosty serwer HTTP będzie przetwarzał ponad 40 milionów żądań/min.
Przedmowa
Artykuł został napisany, aby pomóc zrozumieć działanie reaktora I/O, a co za tym idzie, zrozumieć ryzyko związane z jego użytkowaniem.
Do zrozumienia artykułu wymagana jest znajomość podstaw. Język C i pewne doświadczenie w tworzeniu aplikacji sieciowych.
Cały kod jest napisany w języku C ściśle według (uwaga: długi plik PDF) w standardzie C11 dla Linuksa i dostępne na GitHub.
Dlaczego to zrobić?
Wraz z rosnącą popularnością Internetu serwery WWW zaczęły obsługiwać dużą liczbę połączeń jednocześnie, dlatego wypróbowano dwa podejścia: blokowanie wejść/wyjść na dużej liczbie wątków systemu operacyjnego i nieblokowanie wejść/wyjść w połączeniu z system powiadamiania o zdarzeniach, zwany także „selektorem systemu” (epoka/kolejka/IOCP/itp).
Pierwsze podejście polegało na utworzeniu nowego wątku systemu operacyjnego dla każdego połączenia przychodzącego. Jego wadą jest słaba skalowalność: system operacyjny będzie musiał wdrożyć wiele przejścia kontekstowe и wywołania systemowe. Są to kosztowne operacje i mogą prowadzić do braku wolnej pamięci RAM przy imponującej liczbie połączeń.
Zmodyfikowana wersja podkreśla stała liczba wątków (pula wątków), zapobiegając w ten sposób przerwaniu wykonywania przez system, ale jednocześnie wprowadzając nowy problem: jeśli pula wątków jest aktualnie blokowana przez długie operacje odczytu, to inne gniazda, które są już w stanie odbierać dane, nie będą mogły Zrób tak.
Drugie podejście wykorzystuje system powiadamiania o zdarzeniach (selektor systemu) dostarczony przez system operacyjny. W artykule omówiono najpopularniejszy typ selektora systemu, bazujący na alertach (zdarzeniach, powiadomieniach) o gotowości do operacji we/wy, a nie na powiadomienia o ich zakończeniu. Uproszczony przykład jego zastosowania można przedstawić na poniższym schemacie blokowym:
Różnica między tymi podejściami jest następująca:
Blokowanie operacji we/wy wstrzymać przepływ użytkowników aż dodopóki system operacyjny nie będzie poprawny defragmentacje przychodzące Pakiety IP do strumienia bajtów (TCP, odbieranie danych) lub w wewnętrznych buforach zapisu nie będzie wystarczającej ilości miejsca do późniejszego przesłania NIC (wysyłanie danych).
Selektor systemu nadgodziny powiadamia program, że OS już zdefragmentowane pakiety IP (TCP, odbiór danych) lub wystarczająca ilość miejsca w wewnętrznych buforach zapisu już dostępne (wysyłanie danych).
Podsumowując, rezerwowanie wątku systemu operacyjnego dla każdego wejścia/wyjścia jest stratą mocy obliczeniowej, ponieważ w rzeczywistości wątki nie wykonują pożytecznej pracy (stąd określenie „przerwanie programowe”). Selektor systemu rozwiązuje ten problem, umożliwiając programowi użytkownika znacznie bardziej ekonomiczne wykorzystanie zasobów procesora.
Model reaktora we/wy
Reaktor we/wy pełni rolę warstwy pomiędzy selektorem systemu a kodem użytkownika. Zasadę jego działania opisuje poniższy schemat blokowy:
Przypominam, że zdarzenie to powiadomienie, że dane gniazdo może wykonać nieblokującą operację wejścia/wyjścia.
Procedura obsługi zdarzeń to funkcja wywoływana przez reaktor we/wy po odebraniu zdarzenia, który następnie wykonuje nieblokującą operację we/wy.
Należy zauważyć, że reaktor we/wy jest z definicji jednowątkowy, ale nic nie stoi na przeszkodzie, aby tę koncepcję wykorzystać w środowisku wielowątkowym w stosunku 1 wątek: 1 reaktor, recyklingując w ten sposób wszystkie rdzenie procesora.
realizacja
Interfejs publiczny umieścimy w pliku reactor.h, i wdrożenie - w reactor.c. reactor.h składać się będzie z następujących ogłoszeń:
Pokaż deklaracje w reaktorze.h
typedef struct reactor Reactor;
/*
* Указатель на функцию, которая будет вызываться I/O реактором при поступлении
* события от системного селектора.
*/
typedef void (*Callback)(void *arg, int fd, uint32_t events);
/*
* Возвращает `NULL` в случае ошибки, не-`NULL` указатель на `Reactor` в
* противном случае.
*/
Reactor *reactor_new(void);
/*
* Освобождает системный селектор, все зарегистрированные сокеты в данный момент
* времени и сам I/O реактор.
*
* Следующие функции возвращают -1 в случае ошибки, 0 в случае успеха.
*/
int reactor_destroy(Reactor *reactor);
int reactor_register(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg);
int reactor_deregister(const Reactor *reactor, int fd);
int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg);
/*
* Запускает цикл событий с тайм-аутом `timeout`.
*
* Эта функция передаст управление вызывающему коду если отведённое время вышло
* или/и при отсутствии зарегистрированных сокетов.
*/
int reactor_run(const Reactor *reactor, time_t timeout);
Struktura reaktora we/wy składa się z: deskryptor pliku selektor epoka и tablice mieszająceGHashTable, który mapuje każde gniazdo na CallbackData (struktura procedury obsługi zdarzeń i argument użytkownika).
Należy pamiętać, że włączyliśmy możliwość obsługi niekompletny typ według indeksu. W reactor.h deklarujemy strukturę reactora w reactor.c definiujemy je, uniemożliwiając w ten sposób użytkownikowi wyraźną zmianę jego pól. To jeden z wzorów ukrywanie danych, co zwięźle pasuje do semantyki C.
funkcje reactor_register, reactor_deregister и reactor_reregister zaktualizuj listę interesujących gniazd i odpowiednich procedur obsługi zdarzeń w selektorze systemu i tabeli mieszającej.
Po przechwyceniu zdarzenia przez reaktor I/O za pomocą deskryptora fd, wywołuje odpowiednią procedurę obsługi zdarzenia, do której przekazuje fd, bitowa maska wygenerowane zdarzenia i wskaźnik użytkownika do void.
Pokaż funkcję reaktor_run().
int reactor_run(const Reactor *reactor, time_t timeout) {
int result;
struct epoll_event *events;
if ((events = calloc(MAX_EVENTS, sizeof(*events))) == NULL)
abort();
time_t start = time(NULL);
while (true) {
time_t passed = time(NULL) - start;
int nfds =
epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, timeout - passed);
switch (nfds) {
// Ошибка
case -1:
perror("epoll_wait");
result = -1;
goto cleanup;
// Время вышло
case 0:
result = 0;
goto cleanup;
// Успешная операция
default:
// Вызвать обработчиков событий
for (int i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
CallbackData *callback =
g_hash_table_lookup(reactor->table, &fd);
callback->callback(callback->arg, fd, events[i].events);
}
}
}
cleanup:
free(events);
return result;
}
Podsumowując, łańcuch wywołań funkcji w kodzie użytkownika będzie miał następującą formę:
Serwer jednowątkowy
Aby przetestować reaktor I/O pod dużym obciążeniem, napiszemy prosty serwer WWW HTTP, który odpowie obrazem na każde żądanie.
Krótkie odniesienie do protokołu HTTP
HTTP - taki jest protokół poziom aplikacji, używany głównie do interakcji serwer-przeglądarka.
HTTP można łatwo wykorzystać transport protokół TCP, wysyłanie i odbieranie wiadomości w określonym formacie specyfikacja.
CRLF jest ciągiem dwóch znaków: r и n, oddzielając pierwszą linię żądania, nagłówki i dane.
<КОМАНДА> - jeden z CONNECT, DELETE, GET, HEAD, OPTIONS, PATCH, POST, PUT, TRACE. Przeglądarka wyśle polecenie do naszego serwera GET, co oznacza „Wyślij mi zawartość pliku”.
<URI> - jednolity identyfikator zasobu. Na przykład, jeśli URI = /index.html, następnie klient żąda strony głównej witryny.
<ВЕРСИЯ HTTP> — wersja protokołu HTTP w formacie HTTP/X.Y. Obecnie najczęściej używaną wersją jest HTTP/1.1.
<ЗАГОЛОВОК N> to para klucz-wartość w formacie <КЛЮЧ>: <ЗНАЧЕНИЕ>, przesłane do serwera w celu dalszej analizy.
<ДАННЫЕ> — dane wymagane przez serwer do wykonania operacji. Często jest to proste JSON lub dowolnym innym formacie.
<КОД СТАТУСА> jest liczbą reprezentującą wynik operacji. Nasz serwer zawsze zwróci status 200 (pomyślna operacja).
<ОПИСАНИЕ СТАТУСА> — ciąg reprezentujący kod stanu. Dla kodu stanu 200 jest to OK.
<ЗАГОЛОВОК N> — nagłówek w tym samym formacie, co w żądaniu. Tytuły zwrócimy Content-Length (rozmiar pliku) i Content-Type: text/html (zwracany typ danych).
<ДАННЫЕ> — dane żądane przez użytkownika. W naszym przypadku jest to ścieżka do obrazu w HTML.
plik http_server.c (serwer jednowątkowy) zawiera plik common.h, który zawiera następujące prototypy funkcji:
Pokaż wspólne prototypy funkcji.h
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов принять новое соединение.
*/
static void on_accept(void *arg, int fd, uint32_t events);
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов отправить HTTP ответ.
*/
static void on_send(void *arg, int fd, uint32_t events);
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов принять часть HTTP запроса.
*/
static void on_recv(void *arg, int fd, uint32_t events);
/*
* Переводит входящее соединение в неблокирующий режим.
*/
static void set_nonblocking(int fd);
/*
* Печатает переданные аргументы в stderr и выходит из процесса с
* кодом `EXIT_FAILURE`.
*/
static noreturn void fail(const char *format, ...);
/*
* Возвращает файловый дескриптор сокета, способного принимать новые
* TCP соединения.
*/
static int new_server(bool reuse_port);
Opisano także makro funkcjonalne SAFE_CALL() i funkcja jest zdefiniowana fail(). Makro porównuje wartość wyrażenia z błędem i jeśli warunek jest prawdziwy, wywołuje funkcję fail():
#define SAFE_CALL(call, error)
do {
if ((call) == error) {
fail("%s", #call);
}
} while (false)
Funkcja fail() wypisuje przekazane argumenty do terminala (np printf()) i kończy program kodem EXIT_FAILURE:
Funkcja new_server() zwraca deskryptor pliku gniazda „serwer” utworzonego przez wywołania systemowe socket(), bind() и listen() i zdolny do przyjmowania połączeń przychodzących w trybie nieblokującym.
Należy pamiętać, że gniazdo jest początkowo tworzone w trybie nieblokującym przy użyciu flagi SOCK_NONBLOCKtak, że w funkcji on_accept() (czytaj więcej) wywołanie systemowe accept() nie zatrzymał wykonywania wątku.
jeśli reuse_port równa się true, wówczas ta funkcja skonfiguruje gniazdo z opcją SO_REUSEPORT Poprzez setsockopt()aby używać tego samego portu w środowisku wielowątkowym (patrz sekcja „Serwer wielowątkowy”).
Obsługa zdarzeń on_accept() wywoływana po wygenerowaniu zdarzenia przez system operacyjny EPOLLIN, co w tym przypadku oznacza, że nowe połączenie może zostać zaakceptowane. on_accept() akceptuje nowe połączenie, przełącza je w tryb nieblokujący i rejestruje się w procedurze obsługi zdarzeń on_recv() w reaktorze I/O.
Obsługa zdarzeń on_recv() wywoływana po wygenerowaniu zdarzenia przez system operacyjny EPOLLIN, w tym przypadku oznacza to, że połączenie zostało zarejestrowane on_accept(), gotowy do odbioru danych.
on_recv() odczytuje dane z połączenia do momentu całkowitego odebrania żądania HTTP, po czym rejestruje procedurę obsługi on_send() aby wysłać odpowiedź HTTP. Jeśli klient zerwie połączenie, gniazdo zostanie wyrejestrowane i zamknięte za pomocą close().
Pokaż funkcję on_recv()
static void on_recv(void *arg, int fd, uint32_t events) {
RequestBuffer *buffer = arg;
// Принимаем входные данные до тех пор, что recv возвратит 0 или ошибку
ssize_t nread;
while ((nread = recv(fd, buffer->data + buffer->size,
REQUEST_BUFFER_CAPACITY - buffer->size, 0)) > 0)
buffer->size += nread;
// Клиент оборвал соединение
if (nread == 0) {
SAFE_CALL(reactor_deregister(reactor, fd), -1);
SAFE_CALL(close(fd), -1);
request_buffer_destroy(buffer);
return;
}
// read вернул ошибку, отличную от ошибки, при которой вызов заблокирует
// поток
if (errno != EAGAIN && errno != EWOULDBLOCK) {
request_buffer_destroy(buffer);
fail("read");
}
// Получен полный HTTP запрос от клиента. Теперь регистрируем обработчика
// событий для отправки данных
if (request_buffer_is_complete(buffer)) {
request_buffer_clear(buffer);
SAFE_CALL(reactor_reregister(reactor, fd, EPOLLOUT, on_send, buffer),
-1);
}
}
Obsługa zdarzeń on_send() wywoływana po wygenerowaniu zdarzenia przez system operacyjny EPOLLOUT, co oznacza, że połączenie zostało zarejestrowane on_recv(), gotowy do wysłania danych. Ta funkcja wysyła do klienta odpowiedź HTTP zawierającą kod HTML z obrazem, a następnie ponownie zmienia procedurę obsługi zdarzeń na on_recv().
I na koniec w pliku http_server.c, w działaniu main() tworzymy reaktor we/wy za pomocą reactor_new(), utwórz gniazdo serwera i zarejestruj je, uruchom reaktor za pomocą reactor_run() dokładnie przez jedną minutę, a następnie zwalniamy zasoby i wychodzimy z programu.
Sprawdźmy, czy wszystko działa zgodnie z oczekiwaniami. Kompilacja (chmod a+x compile.sh && ./compile.sh w katalogu głównym projektu) i uruchom samodzielnie napisany serwer, otwórz http://127.0.0.1:18470 w przeglądarce i zobacz, czego się spodziewaliśmy:
Zmierzmy wydajność serwera jednowątkowego. Otwórzmy dwa terminale: w jednym pojedziemy ./http_serverw innym - praca. Po minucie w drugim terminalu zostaną wyświetlone następujące statystyki:
Nasz jednowątkowy serwer był w stanie obsłużyć ponad 11 milionów żądań na minutę pochodzących ze 100 połączeń. Wynik nie jest zły, ale czy można go poprawić?
Serwer wielowątkowy
Jak wspomniano powyżej, reaktor we/wy można utworzyć w oddzielnych wątkach, wykorzystując w ten sposób wszystkie rdzenie procesora. Zastosujmy to podejście w praktyce:
Należy pamiętać, że argument funkcji new_server() adwokaci true. Oznacza to, że przypisujemy opcję do gniazda serwera SO_REUSEPORTużywać go w środowisku wielowątkowym. Możesz przeczytać więcej szczegółów tutaj.
Liczba żądań przetworzonych w ciągu 1 minuty wzrosła ~3.28 razy! Ale do okrągłej liczby zabrakło nam tylko około XNUMX milionów, więc spróbujmy to naprawić.
Najpierw przyjrzyjmy się wygenerowanym statystykom perf:
Korzystanie z powinowactwa procesora, kompilacja z -march=native, PGO, wzrost liczby trafień pamięć podręczną, zwiększyć MAX_EVENTS I użyć EPOLLET nie dał znaczącego wzrostu wydajności. Ale co się stanie, jeśli zwiększysz liczbę jednoczesnych połączeń?
Uzyskano pożądany wynik, a wraz z nim ciekawy wykres pokazujący zależność liczby przetworzonych żądań w ciągu 1 minuty od liczby połączeń:
Widzimy, że po kilkuset połączeniach liczba przetworzonych żądań dla obu serwerów gwałtownie spada (w wersji wielowątkowej jest to bardziej zauważalne). Czy jest to związane z implementacją stosu TCP/IP w systemie Linux? Zapraszam do pisania w komentarzach swoich założeń odnośnie takiego zachowania wykresu oraz optymalizacji dla opcji wielowątkowych i jednowątkowych.
jak odnotowany w komentarzach ten test wydajności nie pokazuje zachowania reaktora I/O pod rzeczywistym obciążeniem, ponieważ prawie zawsze serwer współdziała z bazą danych, generuje logi, używa kryptografii TLS itp., w wyniku czego obciążenie staje się nierównomierne (dynamiczne). Testy wraz z komponentami innych firm zostaną przeprowadzone w artykule o proactorze I/O.
Wady reaktora I/O
Musisz zrozumieć, że reaktor we/wy nie jest pozbawiony wad, a mianowicie:
Używanie reaktora we/wy w środowisku wielowątkowym jest nieco trudniejsze, ponieważ będziesz musiał ręcznie zarządzać przepływami.
Praktyka pokazuje, że w większości przypadków obciążenie jest nierównomierne, co może prowadzić do tego, że jeden wątek będzie logowany, podczas gdy inny będzie zajęty pracą.
Jeśli jedna procedura obsługi zdarzeń zablokuje wątek, selektor systemu również zostanie zablokowany, co może prowadzić do trudnych do znalezienia błędów.
Rozwiązuje te problemy Proaktor wejść/wyjść, który często ma harmonogram, który równomiernie rozdziela obciążenie na pulę wątków, a także ma wygodniejszy interfejs API. Porozmawiamy o tym później, w moim innym artykule.
wniosek
W tym miejscu nasza podróż od teorii prosto do wydechu profilera dobiegła końca.
Nie powinieneś się nad tym rozwodzić, ponieważ istnieje wiele innych, równie interesujących podejść do pisania oprogramowania sieciowego o różnym poziomie wygody i szybkości. Ciekawe, moim zdaniem, linki podano poniżej.