導入
この記事では、I/O リアクターの詳細とその仕組みを説明し、200 行未満のコードで実装を記述し、毎分 40 万リクエストを超える単純な HTTP サーバーを作成します。
序文
- この記事は、I/O リアクターの機能を理解し、それを使用する際のリスクを理解するのに役立つように書かれています。
- 記事を理解するには基本的な知識が必要です。
C言語 ネットワーク アプリケーション開発の経験があること。 - すべてのコードは厳密に (注意: 長い PDF)
C11規格に準拠 Linux 用、で利用可能GitHubの .
なぜそれを行いますか?
インターネットの普及に伴い、Web サーバーは多数の接続を同時に処理する必要が生じたため、多数の OS スレッドで I/O をブロックする方法と、OS スレッドとノンブロッキング I/O を組み合わせた 2 つのアプローチが試みられました。 「システム セレクター」とも呼ばれるイベント通知システム (
最初のアプローチでは、受信接続ごとに新しい OS スレッドを作成します。欠点はスケーラビリティが低いことです。オペレーティング システムは多くの機能を実装する必要があります。
修正されたバージョンのハイライト
2 番目のアプローチでは、
これらのアプローチの違いは次のとおりです。
- I/O 操作のブロック つるす ユーザーフロー までOSが正常になるまで
デフラグ 入ってくるIPパケット バイトストリームへ(TCP 、データを受信中)、または内部書き込みバッファーにその後の送信に使用できる十分なスペースがありません。NIC (データの送信)。 - システムセレクター 時間とともに OS が次のことをプログラムに通知します。 すでに 最適化された IP パケット (TCP、データ受信)、または内部書き込みバッファーに十分なスペースがある すでに 利用可能(データ送信)。
要約すると、実際にはスレッドは有益な作業を行っていないため、I/O ごとに OS スレッドを予約することは計算能力の無駄です (したがって、この用語は
I/Oリアクトルモデル
I/O リアクターは、システム セレクターとユーザー コードの間の層として機能します。その動作原理は次のブロック図で説明されています。
- イベントとは、特定のソケットがノンブロッキング I/O 操作を実行できることを通知するものであることを思い出してください。
- イベント ハンドラーは、イベントの受信時に I/O リアクターによって呼び出される関数であり、ノンブロッキング I/O 操作を実行します。
I/O リアクターは定義上シングルスレッドであることに注意することが重要ですが、このコンセプトを 1 スレッド: 1 リアクターの比率でマルチスレッド環境で使用することを妨げるものはなく、それによってすべての CPU コアがリサイクルされます。
具現化
パブリックインターフェースをファイルに配置します reactor.h
reactor.c
reactor.h
以下の発表で構成されます。
宣言をreactor.hに表示する
typedef struct reactor Reactor;
/*
* Указатель на функцию, которая будет вызываться I/O реактором при поступлении
* события от системного селектора.
*/
typedef void (*Callback)(void *arg, int fd, uint32_t events);
/*
* Возвращает `NULL` в случае ошибки, не-`NULL` указатель на `Reactor` в
* противном случае.
*/
Reactor *reactor_new(void);
/*
* Освобождает системный селектор, все зарегистрированные сокеты в данный момент
* времени и сам I/O реактор.
*
* Следующие функции возвращают -1 в случае ошибки, 0 в случае успеха.
*/
int reactor_destroy(Reactor *reactor);
int reactor_register(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg);
int reactor_deregister(const Reactor *reactor, int fd);
int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg);
/*
* Запускает цикл событий с тайм-аутом `timeout`.
*
* Эта функция передаст управление вызывающему коду если отведённое время вышло
* или/и при отсутствии зарегистрированных сокетов.
*/
int reactor_run(const Reactor *reactor, time_t timeout);
I/O リアクターの構造は次のもので構成されます。 GHashTable
CallbackData
(イベント ハンドラーの構造とそのユーザー引数)。
Reactor と CallbackData を表示
struct reactor {
int epoll_fd;
GHashTable *table; // (int, CallbackData)
};
typedef struct {
Callback callback;
void *arg;
} CallbackData;
を処理できるようになりましたのでご注意ください。 reactor.h
構造を宣言します reactor
とで reactor.c
これを定義することで、ユーザーがフィールドを明示的に変更できないようにします。これもパターンの一つです
機能 reactor_register
, reactor_deregister
и reactor_reregister
システム セレクターとハッシュ テーブル内の対象ソケットと対応するイベント ハンドラーのリストを更新します。
登録機能を表示する
#define REACTOR_CTL(reactor, op, fd, interest)
if (epoll_ctl(reactor->epoll_fd, op, fd,
&(struct epoll_event){.events = interest,
.data = {.fd = fd}}) == -1) {
perror("epoll_ctl");
return -1;
}
int reactor_register(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg) {
REACTOR_CTL(reactor, EPOLL_CTL_ADD, fd, interest)
g_hash_table_insert(reactor->table, int_in_heap(fd),
callback_data_new(callback, callback_arg));
return 0;
}
int reactor_deregister(const Reactor *reactor, int fd) {
REACTOR_CTL(reactor, EPOLL_CTL_DEL, fd, 0)
g_hash_table_remove(reactor->table, &fd);
return 0;
}
int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg) {
REACTOR_CTL(reactor, EPOLL_CTL_MOD, fd, interest)
g_hash_table_insert(reactor->table, int_in_heap(fd),
callback_data_new(callback, callback_arg));
return 0;
}
I/O リアクターが記述子を使用してイベントをインターセプトした後 fd
、対応するイベント ハンドラーを呼び出し、それに渡されます。 fd
, void
.
リアクター_run() 関数を表示
int reactor_run(const Reactor *reactor, time_t timeout) {
int result;
struct epoll_event *events;
if ((events = calloc(MAX_EVENTS, sizeof(*events))) == NULL)
abort();
time_t start = time(NULL);
while (true) {
time_t passed = time(NULL) - start;
int nfds =
epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, timeout - passed);
switch (nfds) {
// Ошибка
case -1:
perror("epoll_wait");
result = -1;
goto cleanup;
// Время вышло
case 0:
result = 0;
goto cleanup;
// Успешная операция
default:
// Вызвать обработчиков событий
for (int i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
CallbackData *callback =
g_hash_table_lookup(reactor->table, &fd);
callback->callback(callback->arg, fd, events[i].events);
}
}
}
cleanup:
free(events);
return result;
}
要約すると、ユーザー コード内の関数呼び出しのチェーンは次の形式になります。
シングルスレッドサーバー
高負荷下で I/O リアクターをテストするために、イメージを使用してあらゆるリクエストに応答する単純な HTTP Web サーバーを作成します。
HTTP プロトコルのクイック リファレンス
HTTP は簡単に使用できます
リクエストフォーマット
<КОМАНДА> <URI> <ВЕРСИЯ HTTP>CRLF
<ЗАГОЛОВОК 1>CRLF
<ЗАГОЛОВОК 2>CRLF
<ЗАГОЛОВОК N>CRLF CRLF
<ДАННЫЕ>
CRLF
は 2 つの文字のシーケンスです。r
иn
、リクエストの最初の行、ヘッダー、データを区切ります。<КОМАНДА>
- の一つCONNECT
,DELETE
,GET
,HEAD
,OPTIONS
,PATCH
,POST
,PUT
,TRACE
。ブラウザはサーバーにコマンドを送信しますGET
, 「ファイルの内容を送ってください」という意味です。<URI>
-統一リソース識別子 。たとえば、URI =/index.html
、その後、クライアントはサイトのメイン ページをリクエストします。<ВЕРСИЯ HTTP>
— 形式の HTTP プロトコルのバージョンHTTP/X.Y
。現在最も一般的に使用されているバージョンは、HTTP/1.1
.<ЗАГОЛОВОК N>
次の形式のキーと値のペアです。<КЛЮЧ>: <ЗНАЧЕНИЕ>
、さらなる分析のためにサーバーに送信されます。<ДАННЫЕ>
— サーバーが操作を実行するために必要なデータ。多くの場合それは単純ですJSONの または他の形式。
応答フォーマット
<ВЕРСИЯ HTTP> <КОД СТАТУСА> <ОПИСАНИЕ СТАТУСА>CRLF
<ЗАГОЛОВОК 1>CRLF
<ЗАГОЛОВОК 2>CRLF
<ЗАГОЛОВОК N>CRLF CRLF
<ДАННЫЕ>
<КОД СТАТУСА>
演算の結果を表す数値です。私たちのサーバーは常にステータス 200 (正常な操作) を返します。<ОПИСАНИЕ СТАТУСА>
— ステータス コードの文字列表現。ステータスコード 200 の場合、これは次のようになります。OK
.<ЗАГОЛОВОК N>
— リクエストと同じ形式のヘッダー。タイトルは返却させていただきますContent-Length
(ファイルサイズ)とContent-Type: text/html
(戻りデータ型)。<ДАННЫЕ>
— ユーザーが要求したデータ。私たちの場合、これは次の画像へのパスです。HTML .
ファイル http_server.c
common.h
common.h で関数プロトタイプを表示
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов принять новое соединение.
*/
static void on_accept(void *arg, int fd, uint32_t events);
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов отправить HTTP ответ.
*/
static void on_send(void *arg, int fd, uint32_t events);
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов принять часть HTTP запроса.
*/
static void on_recv(void *arg, int fd, uint32_t events);
/*
* Переводит входящее соединение в неблокирующий режим.
*/
static void set_nonblocking(int fd);
/*
* Печатает переданные аргументы в stderr и выходит из процесса с
* кодом `EXIT_FAILURE`.
*/
static noreturn void fail(const char *format, ...);
/*
* Возвращает файловый дескриптор сокета, способного принимать новые
* TCP соединения.
*/
static int new_server(bool reuse_port);
機能マクロも解説 SAFE_CALL()
そして関数が定義されています fail()
。マクロは式の値とエラーを比較し、条件が true の場合は関数を呼び出します。 fail()
:
#define SAFE_CALL(call, error)
do {
if ((call) == error) {
fail("%s", #call);
}
} while (false)
機能 fail()
渡された引数を端末に出力します(例: printf()
EXIT_FAILURE
:
static noreturn void fail(const char *format, ...) {
va_list args;
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
fprintf(stderr, ": %sn", strerror(errno));
exit(EXIT_FAILURE);
}
機能 new_server()
システムコールによって作成された「サーバー」ソケットのファイル記述子を返します。 socket()
bind()
listen()
new_server() 関数を表示
static int new_server(bool reuse_port) {
int fd;
SAFE_CALL((fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP)),
-1);
if (reuse_port) {
SAFE_CALL(
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int)),
-1);
}
struct sockaddr_in addr = {.sin_family = AF_INET,
.sin_port = htons(SERVER_PORT),
.sin_addr = {.s_addr = inet_addr(SERVER_IPV4)},
.sin_zero = {0}};
SAFE_CALL(bind(fd, (struct sockaddr *)&addr, sizeof(addr)), -1);
SAFE_CALL(listen(fd, SERVER_BACKLOG), -1);
return fd;
}
- ソケットは最初、フラグを使用して非ブロッキング モードで作成されることに注意してください。
SOCK_NONBLOCK
したがって、関数内でon_accept()
(続きを読む) システムコールaccept()
スレッドの実行を停止しませんでした。 - もし
reuse_port
等しいtrue
、その後、この関数はオプションを使用してソケットを構成します を通してSO_REUSEPORT
マルチスレッド環境で同じポートを使用します (セクション「マルチスレッドサーバー」を参照)。setsockopt()
イベントハンドラ on_accept()
OSがイベントを生成した後に呼び出されます EPOLLIN
, この場合は、新しい接続を受け入れることができることを意味します。 on_accept()
新しい接続を受け入れ、それをノンブロッキング モードに切り替え、イベント ハンドラーに登録します。 on_recv()
I/O リアクター内。
on_accept() 関数を表示
static void on_accept(void *arg, int fd, uint32_t events) {
int incoming_conn;
SAFE_CALL((incoming_conn = accept(fd, NULL, NULL)), -1);
set_nonblocking(incoming_conn);
SAFE_CALL(reactor_register(reactor, incoming_conn, EPOLLIN, on_recv,
request_buffer_new()),
-1);
}
イベントハンドラ on_recv()
OSがイベントを生成した後に呼び出されます EPOLLIN
、この場合、接続が登録されたことを意味します on_accept()
、データを受信する準備ができています。
on_recv()
HTTP リクエストが完全に受信されるまで接続からデータを読み取り、その後ハンドラーを登録します on_send()
HTTP 応答を送信します。クライアントが接続を切断すると、ソケットは登録解除され、次を使用して閉じられます。 close()
関数 on_recv() を表示
static void on_recv(void *arg, int fd, uint32_t events) {
RequestBuffer *buffer = arg;
// Принимаем входные данные до тех пор, что recv возвратит 0 или ошибку
ssize_t nread;
while ((nread = recv(fd, buffer->data + buffer->size,
REQUEST_BUFFER_CAPACITY - buffer->size, 0)) > 0)
buffer->size += nread;
// Клиент оборвал соединение
if (nread == 0) {
SAFE_CALL(reactor_deregister(reactor, fd), -1);
SAFE_CALL(close(fd), -1);
request_buffer_destroy(buffer);
return;
}
// read вернул ошибку, отличную от ошибки, при которой вызов заблокирует
// поток
if (errno != EAGAIN && errno != EWOULDBLOCK) {
request_buffer_destroy(buffer);
fail("read");
}
// Получен полный HTTP запрос от клиента. Теперь регистрируем обработчика
// событий для отправки данных
if (request_buffer_is_complete(buffer)) {
request_buffer_clear(buffer);
SAFE_CALL(reactor_reregister(reactor, fd, EPOLLOUT, on_send, buffer),
-1);
}
}
イベントハンドラ on_send()
OSがイベントを生成した後に呼び出されます EPOLLOUT
、接続が登録されたことを意味します on_recv()
、データを送信する準備ができました。この関数は、画像を含む HTML を含む HTTP 応答をクライアントに送信し、イベント ハンドラーを元の状態に戻します。 on_recv()
.
on_send() 関数を表示
static void on_send(void *arg, int fd, uint32_t events) {
const char *content = "<img "
"src="https://habrastorage.org/webt/oh/wl/23/"
"ohwl23va3b-dioerobq_mbx4xaw.jpeg">";
char response[1024];
sprintf(response,
"HTTP/1.1 200 OK" CRLF "Content-Length: %zd" CRLF "Content-Type: "
"text/html" DOUBLE_CRLF "%s",
strlen(content), content);
SAFE_CALL(send(fd, response, strlen(response), 0), -1);
SAFE_CALL(reactor_reregister(reactor, fd, EPOLLIN, on_recv, arg), -1);
}
そして最後に、ファイル内に http_server.c
、関数内 main()
次を使用して I/O リアクターを作成します reactor_new()
、サーバーソケットを作成して登録し、次を使用してリアクターを起動します reactor_run()
ちょうど 1 分間続けた後、リソースを解放してプログラムを終了します。
http_server.c を表示する
#include "reactor.h"
static Reactor *reactor;
#include "common.h"
int main(void) {
SAFE_CALL((reactor = reactor_new()), NULL);
SAFE_CALL(
reactor_register(reactor, new_server(false), EPOLLIN, on_accept, NULL),
-1);
SAFE_CALL(reactor_run(reactor, SERVER_TIMEOUT_MILLIS), -1);
SAFE_CALL(reactor_destroy(reactor), -1);
}
すべてが期待どおりに動作していることを確認してみましょう。コンパイル中 (chmod a+x compile.sh && ./compile.sh
プロジェクトルート内) を作成し、自己作成サーバーを起動し、開きます
性能測定
私の車の仕様を見せてください
$ screenfetch
MMMMMMMMMMMMMMMMMMMMMMMMMmds+. OS: Mint 19.1 tessa
MMm----::-://////////////oymNMd+` Kernel: x86_64 Linux 4.15.0-20-generic
MMd /++ -sNMd: Uptime: 2h 34m
MMNso/` dMM `.::-. .-::.` .hMN: Packages: 2217
ddddMMh dMM :hNMNMNhNMNMNh: `NMm Shell: bash 4.4.20
NMm dMM .NMN/-+MMM+-/NMN` dMM Resolution: 1920x1080
NMm dMM -MMm `MMM dMM. dMM DE: Cinnamon 4.0.10
NMm dMM -MMm `MMM dMM. dMM WM: Muffin
NMm dMM .mmd `mmm yMM. dMM WM Theme: Mint-Y-Dark (Mint-Y)
NMm dMM` ..` ... ydm. dMM GTK Theme: Mint-Y [GTK2/3]
hMM- +MMd/-------...-:sdds dMM Icon Theme: Mint-Y
-NMm- :hNMNNNmdddddddddy/` dMM Font: Noto Sans 9
-dMNs-``-::::-------.`` dMM CPU: Intel Core i7-6700 @ 8x 4GHz [52.0°C]
`/dMNmy+/:-------------:/yMMM GPU: NV136
./ydNMMMMMMMMMMMMMMMMMMMMM RAM: 2544MiB / 7926MiB
.MMMMMMMMMMMMMMMMMMM
シングルスレッドサーバーのパフォーマンスを測定してみましょう。 2 つのターミナルを開いてみましょう。1 つのターミナルで実行します。 ./http_server
、別の -
$ wrk -c100 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 493.52us 76.70us 17.31ms 89.57%
Req/Sec 24.37k 1.81k 29.34k 68.13%
11657769 requests in 1.00m, 1.60GB read
Requests/sec: 193974.70
Transfer/sec: 27.19MB
当社のシングルスレッド サーバーは、11 の接続から発生する 100 分あたり XNUMX 万を超えるリクエストを処理できました。悪くない結果ですが、改善できるでしょうか?
マルチスレッドサーバー
前述したように、I/O リアクターは別個のスレッドで作成できるため、すべての CPU コアを利用できます。このアプローチを実践してみましょう。
http_server_multithreaded.c を表示する
#include "reactor.h"
static Reactor *reactor;
#pragma omp threadprivate(reactor)
#include "common.h"
int main(void) {
#pragma omp parallel
{
SAFE_CALL((reactor = reactor_new()), NULL);
SAFE_CALL(reactor_register(reactor, new_server(true), EPOLLIN,
on_accept, NULL),
-1);
SAFE_CALL(reactor_run(reactor, SERVER_TIMEOUT_MILLIS), -1);
SAFE_CALL(reactor_destroy(reactor), -1);
}
}
今ではどのスレッドでも
static Reactor *reactor;
#pragma omp threadprivate(reactor)
関数の引数に注意してください。 new_server()
支持者 true
。これは、オプションをサーバーソケットに割り当てることを意味します SO_REUSEPORT
XNUMX回目の実行
次に、マルチスレッド サーバーのパフォーマンスを測定してみましょう。
$ wrk -c100 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.14ms 2.53ms 40.73ms 89.98%
Req/Sec 79.98k 18.07k 154.64k 78.65%
38208400 requests in 1.00m, 5.23GB read
Requests/sec: 635876.41
Transfer/sec: 89.14MB
1 分間に処理されるリクエストの数が最大 3.28 倍に増加しました。しかし、ラウンド数にはわずか約 XNUMX 万足りなかったので、それを修正してみましょう。
まず、生成された統計を見てみましょう
$ sudo perf stat -B -e task-clock,context-switches,cpu-migrations,page-faults,cycles,instructions,branches,branch-misses,cache-misses ./http_server_multithreaded
Performance counter stats for './http_server_multithreaded':
242446,314933 task-clock (msec) # 4,000 CPUs utilized
1 813 074 context-switches # 0,007 M/sec
4 689 cpu-migrations # 0,019 K/sec
254 page-faults # 0,001 K/sec
895 324 830 170 cycles # 3,693 GHz
621 378 066 808 instructions # 0,69 insn per cycle
119 926 709 370 branches # 494,653 M/sec
3 227 095 669 branch-misses # 2,69% of all branches
808 664 cache-misses
60,604330670 seconds time elapsed
-march=native
, MAX_EVENTS
そして使う EPOLLET
パフォーマンスの大幅な向上はありませんでした。しかし、同時接続数を増やすとどうなるでしょうか?
352 の同時接続の統計:
$ wrk -c352 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
8 threads and 352 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 2.12ms 3.79ms 68.23ms 87.49%
Req/Sec 83.78k 12.69k 169.81k 83.59%
40006142 requests in 1.00m, 5.48GB read
Requests/sec: 665789.26
Transfer/sec: 93.34MB
望ましい結果が得られ、1 分間に処理されたリクエストの数と接続数の依存性を示す興味深いグラフが得られました。
数百回の接続の後、両方のサーバーで処理されたリクエストの数が急激に減少していることがわかります (マルチスレッド バージョンではこれがより顕著です)。これは Linux TCP/IP スタックの実装に関連していますか?グラフのこの動作と、マルチスレッドおよびシングルスレッドのオプションの最適化に関する仮定をコメントに自由に書き込んでください。
Как
I/Oリアクターのデメリット
I/O リアクターには次のような欠点がないわけではないことを理解する必要があります。
- マルチスレッド環境で I/O リアクターを使用するのは多少難しくなります。フローを手動で管理する必要があります。
- 実際にやってみると、ほとんどの場合、負荷は不均一であり、あるスレッドがログを記録している間に、別のスレッドが作業でビジー状態になる可能性があります。
- 1 つのイベント ハンドラーがスレッドをブロックすると、システム セレクター自体もブロックされるため、見つけにくいバグが発生する可能性があります。
これらの問題を解決します
まとめ
これで、理論からプロファイラーのエグゼクティブへの私たちの旅は終わりました。
さまざまなレベルの利便性と速度を備えたネットワーク ソフトウェアを作成するための同様に興味深いアプローチが他にもたくさんあるため、これにこだわる必要はありません。私の意見では興味深いので、リンクを以下に示します。
我々は再び会うまで!
興味深いプロジェクト
他に何を読むべきですか?
https://linux.die.net/man/7/socket https://stackoverflow.com/questions/1050222/what-is-the-difference-between-concurrency-and-parallelism http://www.kegel.com/c10k.html https://kernel.dk/io_uring.pdf https://aturon.github.io/blog/2016/09/07/futures-design/ https://tokio.rs/blog/2019-10-scheduler/ https://www.artima.com/articles/io_design_patterns.html https://habr.com/en/post/183832/
出所: habr.com