介紹
Node.js的 Tor的 傳輸 鉻 Memcached的 - ...
在本文中,我們將了解 I/O 反應器的詳細情況及其工作原理,用不到 200 行程式碼編寫一個實現,並使一個簡單的 HTTP 伺服器每分鐘處理超過 40 萬個請求。
前言
- 撰寫本文的目的是幫助了解 I/O 反應器的功能,從而了解使用它時的風險。
- 理解本文需要基礎知識。
C語言 以及一些網頁應用程式開發經驗。 - 所有代碼均嚴格遵守(注意:PDF 較長)
符合C11標準 適用於 Linux,可用於GitHub上 .
為什麼這樣做?
隨著互聯網的日益普及,Web伺服器開始需要同時處理大量連接,因此嘗試了兩種方法:大量作業系統執行緒上的阻塞I/O和與非阻塞I/O結合的非阻塞I/O。事件通知系統,也稱為“系統選擇器”(
第一種方法涉及為每個傳入連接建立一個新的作業系統執行緒。 它的缺點是可擴展性差:作業系統必須實現許多
修改後的版本亮點
第二種方法使用
這些方法之間的差異如下:
- 阻塞 I/O 操作 暫停 用戶流量 直到直到作業系統正常
碎片整理 傳入IP封包 到位元組流(TCP ,接收資料),或內部寫入緩衝區中沒有足夠的可用空間用於後續發送NIC (發送資料)。 - 系統選擇器 隨著時間的推移 通知程式作業系統 已經 經過碎片整理的 IP 封包(TCP、資料接收)或內部寫入緩衝區有足夠的空間 已經 可用(發送資料)。
總而言之,為每個 I/O 保留一個作業系統執行緒是對運算能力的浪費,因為實際上,執行緒並沒有做有用的工作(因此術語
I/O反應器模型
I/O 反應器可作為系統選擇器和使用者代碼之間的層。 其工作原理用下面的框圖描述:
- 讓我提醒您,事件是某個套接字能夠執行非阻塞 I/O 操作的通知。
- 事件處理程序是 I/O 反應器在接收事件時呼叫的函數,然後執行非阻塞 I/O 操作。
值得注意的是,I/O 反應器根據定義是單執行緒的,但沒有什麼可以阻止該概念以 1 執行緒:1 反應器的比例在多執行緒環境中使用,從而回收所有 CPU 核心。
履行
我們將把公共介面放在一個文件中 reactor.h
reactor.c
reactor.h
將包括以下公告:
在reactor.h中顯示聲明
typedef struct reactor Reactor;
/*
* Указатель на функцию, которая будет вызываться I/O реактором при поступлении
* события от системного селектора.
*/
typedef void (*Callback)(void *arg, int fd, uint32_t events);
/*
* Возвращает `NULL` в случае ошибки, не-`NULL` указатель на `Reactor` в
* противном случае.
*/
Reactor *reactor_new(void);
/*
* Освобождает системный селектор, все зарегистрированные сокеты в данный момент
* времени и сам I/O реактор.
*
* Следующие функции возвращают -1 в случае ошибки, 0 в случае успеха.
*/
int reactor_destroy(Reactor *reactor);
int reactor_register(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg);
int reactor_deregister(const Reactor *reactor, int fd);
int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg);
/*
* Запускает цикл событий с тайм-аутом `timeout`.
*
* Эта функция передаст управление вызывающему коду если отведённое время вышло
* или/и при отсутствии зарегистрированных сокетов.
*/
int reactor_run(const Reactor *reactor, time_t timeout);
I/O反應器結構包括 GHashTable
CallbackData
(事件處理程序的結構及其使用者參數)。
顯示 Reactor 和 CallbackData
struct reactor {
int epoll_fd;
GHashTable *table; // (int, CallbackData)
};
typedef struct {
Callback callback;
void *arg;
} CallbackData;
請注意,我們已經啟用了處理的能力 reactor.h
我們聲明結構 reactor
,並在 reactor.c
我們定義它,從而防止用戶明確更改其欄位。 這是圖案之一
功能 reactor_register
, reactor_deregister
и reactor_reregister
更新系統選擇器和雜湊表中感興趣的套接字和對應事件處理程序的清單。
顯示註冊功能
#define REACTOR_CTL(reactor, op, fd, interest)
if (epoll_ctl(reactor->epoll_fd, op, fd,
&(struct epoll_event){.events = interest,
.data = {.fd = fd}}) == -1) {
perror("epoll_ctl");
return -1;
}
int reactor_register(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg) {
REACTOR_CTL(reactor, EPOLL_CTL_ADD, fd, interest)
g_hash_table_insert(reactor->table, int_in_heap(fd),
callback_data_new(callback, callback_arg));
return 0;
}
int reactor_deregister(const Reactor *reactor, int fd) {
REACTOR_CTL(reactor, EPOLL_CTL_DEL, fd, 0)
g_hash_table_remove(reactor->table, &fd);
return 0;
}
int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest,
Callback callback, void *callback_arg) {
REACTOR_CTL(reactor, EPOLL_CTL_MOD, fd, interest)
g_hash_table_insert(reactor->table, int_in_heap(fd),
callback_data_new(callback, callback_arg));
return 0;
}
I/O 反應器攔截帶有描述符的事件後 fd
,它調用相應的事件處理程序,並將其傳遞給該事件處理程序 fd
, void
.
顯示reactor_run()函數
int reactor_run(const Reactor *reactor, time_t timeout) {
int result;
struct epoll_event *events;
if ((events = calloc(MAX_EVENTS, sizeof(*events))) == NULL)
abort();
time_t start = time(NULL);
while (true) {
time_t passed = time(NULL) - start;
int nfds =
epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, timeout - passed);
switch (nfds) {
// Ошибка
case -1:
perror("epoll_wait");
result = -1;
goto cleanup;
// Время вышло
case 0:
result = 0;
goto cleanup;
// Успешная операция
default:
// Вызвать обработчиков событий
for (int i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
CallbackData *callback =
g_hash_table_lookup(reactor->table, &fd);
callback->callback(callback->arg, fd, events[i].events);
}
}
}
cleanup:
free(events);
return result;
}
總而言之,使用者程式碼中的函數呼叫鏈將採用以下形式:
單線程伺服器
為了測試高負載下的 I/O 反應器,我們將編寫一個簡單的 HTTP Web 伺服器,用圖像回應任何請求。
HTTP 協定快速參考
HTTP 可以很容易地使用
請求格式
<КОМАНДА> <URI> <ВЕРСИЯ HTTP>CRLF
<ЗАГОЛОВОК 1>CRLF
<ЗАГОЛОВОК 2>CRLF
<ЗАГОЛОВОК N>CRLF CRLF
<ДАННЫЕ>
CRLF
是兩個字元的序列:r
иn
,分隔請求的第一行、標頭和資料。<КОМАНДА>
- 其中之一CONNECT
,DELETE
,GET
,HEAD
,OPTIONS
,PATCH
,POST
,PUT
,TRACE
。 瀏覽器將向我們的伺服器發送命令GET
,意思是“將文件的內容發送給我。”<URI>
-統一資源標識符 。 例如,如果 URI =/index.html
,然後客戶端請求網站的主頁。<ВЕРСИЯ HTTP>
— HTTP 協定的版本,格式為HTTP/X.Y
。 目前最常用的版本是HTTP/1.1
.<ЗАГОЛОВОК N>
是格式為的鍵值對<КЛЮЧ>: <ЗНАЧЕНИЕ>
,發送到伺服器進行進一步分析。<ДАННЫЕ>
— 伺服器執行操作所需的資料。 很多時候很簡單JSON 或任何其他格式。
回應格式
<ВЕРСИЯ HTTP> <КОД СТАТУСА> <ОПИСАНИЕ СТАТУСА>CRLF
<ЗАГОЛОВОК 1>CRLF
<ЗАГОЛОВОК 2>CRLF
<ЗАГОЛОВОК N>CRLF CRLF
<ДАННЫЕ>
<КОД СТАТУСА>
是表示運算結果的數字。 我們的伺服器將始終返回狀態 200(操作成功)。<ОПИСАНИЕ СТАТУСА>
— 狀態代碼的字串表示形式。 對於狀態代碼 200,這是OK
.<ЗАГОЛОВОК N>
— 標頭的格式與請求中的格式相同。 我們將歸還標題Content-Length
(檔案大小)和Content-Type: text/html
(傳回資料類型)。<ДАННЫЕ>
— 用戶要求的資料。 在我們的例子中,這是圖像的路徑HTML .
文件 http_server.c
common.h
在 common.h 中顯示函數原型
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов принять новое соединение.
*/
static void on_accept(void *arg, int fd, uint32_t events);
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов отправить HTTP ответ.
*/
static void on_send(void *arg, int fd, uint32_t events);
/*
* Обработчик событий, который вызовется после того, как сокет будет
* готов принять часть HTTP запроса.
*/
static void on_recv(void *arg, int fd, uint32_t events);
/*
* Переводит входящее соединение в неблокирующий режим.
*/
static void set_nonblocking(int fd);
/*
* Печатает переданные аргументы в stderr и выходит из процесса с
* кодом `EXIT_FAILURE`.
*/
static noreturn void fail(const char *format, ...);
/*
* Возвращает файловый дескриптор сокета, способного принимать новые
* TCP соединения.
*/
static int new_server(bool reuse_port);
功能宏也有描述 SAFE_CALL()
並且函數被定義 fail()
。 巨集將表達式的值與錯誤進行比較,如果條件為真,則呼叫函數 fail()
:
#define SAFE_CALL(call, error)
do {
if ((call) == error) {
fail("%s", #call);
}
} while (false)
功能 fail()
將傳遞的參數列印到終端(例如 printf()
EXIT_FAILURE
:
static noreturn void fail(const char *format, ...) {
va_list args;
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
fprintf(stderr, ": %sn", strerror(errno));
exit(EXIT_FAILURE);
}
功能 new_server()
傳回由系統呼叫建立的「伺服器」套接字的檔案描述符 socket()
bind()
listen()
顯示 new_server() 函數
static int new_server(bool reuse_port) {
int fd;
SAFE_CALL((fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP)),
-1);
if (reuse_port) {
SAFE_CALL(
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int)),
-1);
}
struct sockaddr_in addr = {.sin_family = AF_INET,
.sin_port = htons(SERVER_PORT),
.sin_addr = {.s_addr = inet_addr(SERVER_IPV4)},
.sin_zero = {0}};
SAFE_CALL(bind(fd, (struct sockaddr *)&addr, sizeof(addr)), -1);
SAFE_CALL(listen(fd, SERVER_BACKLOG), -1);
return fd;
}
- 請注意,套接字最初是使用標誌以非阻塞模式建立的
SOCK_NONBLOCK
這樣在函數中on_accept()
(了解更多)系統調用accept()
並沒有停止線程的執行。 - 如果
reuse_port
等於true
,那麼這個函數將使用選項來配置套接字 通過SO_REUSEPORT
在多執行緒環境中使用相同的連接埠(請參閱“多執行緒伺服器”部分)。setsockopt()
事件處理程序 on_accept()
作業系統產生事件後調用 EPOLLIN
,在這種情況下意味著可以接受新連線。 on_accept()
接受新連接,將其切換到非阻塞模式並向事件處理程序註冊 on_recv()
在 I/O 反應器中。
顯示 on_accept() 函數
static void on_accept(void *arg, int fd, uint32_t events) {
int incoming_conn;
SAFE_CALL((incoming_conn = accept(fd, NULL, NULL)), -1);
set_nonblocking(incoming_conn);
SAFE_CALL(reactor_register(reactor, incoming_conn, EPOLLIN, on_recv,
request_buffer_new()),
-1);
}
事件處理程序 on_recv()
作業系統產生事件後調用 EPOLLIN
,在這種情況下意味著連線已註冊 on_accept()
,準備接收資料。
on_recv()
從連線中讀取數據,直到完全接收到 HTTP 請求,然後註冊一個處理程序 on_send()
發送 HTTP 回應。 如果客戶端中斷連接,則套接字將取消註冊並關閉 close()
顯示函數 on_recv()
static void on_recv(void *arg, int fd, uint32_t events) {
RequestBuffer *buffer = arg;
// Принимаем входные данные до тех пор, что recv возвратит 0 или ошибку
ssize_t nread;
while ((nread = recv(fd, buffer->data + buffer->size,
REQUEST_BUFFER_CAPACITY - buffer->size, 0)) > 0)
buffer->size += nread;
// Клиент оборвал соединение
if (nread == 0) {
SAFE_CALL(reactor_deregister(reactor, fd), -1);
SAFE_CALL(close(fd), -1);
request_buffer_destroy(buffer);
return;
}
// read вернул ошибку, отличную от ошибки, при которой вызов заблокирует
// поток
if (errno != EAGAIN && errno != EWOULDBLOCK) {
request_buffer_destroy(buffer);
fail("read");
}
// Получен полный HTTP запрос от клиента. Теперь регистрируем обработчика
// событий для отправки данных
if (request_buffer_is_complete(buffer)) {
request_buffer_clear(buffer);
SAFE_CALL(reactor_reregister(reactor, fd, EPOLLOUT, on_send, buffer),
-1);
}
}
事件處理程序 on_send()
作業系統產生事件後調用 EPOLLOUT
,表示連線已註冊 on_recv()
,準備發送資料。 此函數將包含帶有圖像的 HTML 的 HTTP 回應傳送到客戶端,然後將事件處理程序變更回 on_recv()
.
顯示 on_send() 函數
static void on_send(void *arg, int fd, uint32_t events) {
const char *content = "<img "
"src="https://habrastorage.org/webt/oh/wl/23/"
"ohwl23va3b-dioerobq_mbx4xaw.jpeg">";
char response[1024];
sprintf(response,
"HTTP/1.1 200 OK" CRLF "Content-Length: %zd" CRLF "Content-Type: "
"text/html" DOUBLE_CRLF "%s",
strlen(content), content);
SAFE_CALL(send(fd, response, strlen(response), 0), -1);
SAFE_CALL(reactor_reregister(reactor, fd, EPOLLIN, on_recv, arg), -1);
}
最後,在文件中 http_server.c
,在函數中 main()
我們使用以下方法建立一個 I/O 反應器 reactor_new()
,建立一個伺服器套接字並註冊它,使用啟動反應器 reactor_run()
恰好一分鐘,然後我們釋放資源並退出程序。
顯示http_server.c
#include "reactor.h"
static Reactor *reactor;
#include "common.h"
int main(void) {
SAFE_CALL((reactor = reactor_new()), NULL);
SAFE_CALL(
reactor_register(reactor, new_server(false), EPOLLIN, on_accept, NULL),
-1);
SAFE_CALL(reactor_run(reactor, SERVER_TIMEOUT_MILLIS), -1);
SAFE_CALL(reactor_destroy(reactor), -1);
}
讓我們檢查一切是否按預期工作。 編譯(chmod a+x compile.sh && ./compile.sh
在專案根目錄下)並啟動自己編寫的伺服器,打開
績效衡量
顯示我的汽車規格
$ screenfetch
MMMMMMMMMMMMMMMMMMMMMMMMMmds+. OS: Mint 19.1 tessa
MMm----::-://////////////oymNMd+` Kernel: x86_64 Linux 4.15.0-20-generic
MMd /++ -sNMd: Uptime: 2h 34m
MMNso/` dMM `.::-. .-::.` .hMN: Packages: 2217
ddddMMh dMM :hNMNMNhNMNMNh: `NMm Shell: bash 4.4.20
NMm dMM .NMN/-+MMM+-/NMN` dMM Resolution: 1920x1080
NMm dMM -MMm `MMM dMM. dMM DE: Cinnamon 4.0.10
NMm dMM -MMm `MMM dMM. dMM WM: Muffin
NMm dMM .mmd `mmm yMM. dMM WM Theme: Mint-Y-Dark (Mint-Y)
NMm dMM` ..` ... ydm. dMM GTK Theme: Mint-Y [GTK2/3]
hMM- +MMd/-------...-:sdds dMM Icon Theme: Mint-Y
-NMm- :hNMNNNmdddddddddy/` dMM Font: Noto Sans 9
-dMNs-``-::::-------.`` dMM CPU: Intel Core i7-6700 @ 8x 4GHz [52.0°C]
`/dMNmy+/:-------------:/yMMM GPU: NV136
./ydNMMMMMMMMMMMMMMMMMMMMM RAM: 2544MiB / 7926MiB
.MMMMMMMMMMMMMMMMMMM
讓我們測量一下單線程伺服器的效能。 讓我們打開兩個終端:在一個終端機中我們將運行 ./http_server
,在不同的 -
$ wrk -c100 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 493.52us 76.70us 17.31ms 89.57%
Req/Sec 24.37k 1.81k 29.34k 68.13%
11657769 requests in 1.00m, 1.60GB read
Requests/sec: 193974.70
Transfer/sec: 27.19MB
我們的單線程伺服器每分鐘能夠處理來自 11 個連接的超過 100 萬個請求。 結果還不錯,但是可以改進嗎?
多執行緒伺服器
如上所述,I/O 反應器可以在單獨的執行緒中創建,從而利用所有 CPU 核心。 讓我們將這種方法付諸實現:
顯示http_server_multithreaded.c
#include "reactor.h"
static Reactor *reactor;
#pragma omp threadprivate(reactor)
#include "common.h"
int main(void) {
#pragma omp parallel
{
SAFE_CALL((reactor = reactor_new()), NULL);
SAFE_CALL(reactor_register(reactor, new_server(true), EPOLLIN,
on_accept, NULL),
-1);
SAFE_CALL(reactor_run(reactor, SERVER_TIMEOUT_MILLIS), -1);
SAFE_CALL(reactor_destroy(reactor), -1);
}
}
現在每個線程
static Reactor *reactor;
#pragma omp threadprivate(reactor)
請注意函數參數 new_server()
行為 true
。 這意味著我們將選項分配給伺服器套接字 SO_REUSEPORT
第二次運行
現在讓我們測量多線程伺服器的效能:
$ wrk -c100 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
8 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.14ms 2.53ms 40.73ms 89.98%
Req/Sec 79.98k 18.07k 154.64k 78.65%
38208400 requests in 1.00m, 5.23GB read
Requests/sec: 635876.41
Transfer/sec: 89.14MB
1分鐘處理的請求數增加了~3.28倍! 但我們只差 XNUMX 萬左右,所以讓我們試著解決這個問題。
首先我們來看看產生的統計數據
$ sudo perf stat -B -e task-clock,context-switches,cpu-migrations,page-faults,cycles,instructions,branches,branch-misses,cache-misses ./http_server_multithreaded
Performance counter stats for './http_server_multithreaded':
242446,314933 task-clock (msec) # 4,000 CPUs utilized
1 813 074 context-switches # 0,007 M/sec
4 689 cpu-migrations # 0,019 K/sec
254 page-faults # 0,001 K/sec
895 324 830 170 cycles # 3,693 GHz
621 378 066 808 instructions # 0,69 insn per cycle
119 926 709 370 branches # 494,653 M/sec
3 227 095 669 branch-misses # 2,69% of all branches
808 664 cache-misses
60,604330670 seconds time elapsed
-march=native
, MAX_EVENTS
並使用 EPOLLET
並沒有為性能帶來顯著的提升。 但是如果增加同時連線的數量會發生什麼事?
352個同時連接的統計:
$ wrk -c352 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
8 threads and 352 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 2.12ms 3.79ms 68.23ms 87.49%
Req/Sec 83.78k 12.69k 169.81k 83.59%
40006142 requests in 1.00m, 5.48GB read
Requests/sec: 665789.26
Transfer/sec: 93.34MB
獲得了所需的結果,並顯示了一個有趣的圖表,顯示 1 分鐘內處理的請求數量與連接數量的依賴關係:
我們看到,在數百個連線之後,兩台伺服器處理的請求數量急劇下降(在多執行緒版本中,這一點更為明顯)。 這與 Linux TCP/IP 堆疊實作有關嗎? 請隨意在評論中寫下您對圖形的這種行為以及多線程和單線程選項的優化的假設。
如
I/O反應器的缺點
您需要了解 I/O 反應器並非沒有缺點,即:
- 在多執行緒環境中使用 I/O 反應器有些困難,因為您必須手動管理流量。
- 實踐表明,大多數情況下負載不均勻,這可能會導致一個執行緒在記錄日誌,而另一個執行緒忙於工作。
- 如果一個事件處理程序阻塞了一個線程,那麼系統選擇器本身也會阻塞,這可能會導致難以發現的錯誤。
解決這些問題
結論
我們從理論直接進入分析儀排氣的旅程到此結束。
您不應該糾纏於此,因為還有許多其他同樣有趣的方法來編寫具有不同程度的便利性和速度的網路軟體。 在我看來,有趣的是,下面給出了連結。
直到我們再次見面!
有趣的項目
我還該讀什麼?
https://linux.die.net/man/7/socket https://stackoverflow.com/questions/1050222/what-is-the-difference-between-concurrency-and-parallelism http://www.kegel.com/c10k.html https://kernel.dk/io_uring.pdf https://aturon.github.io/blog/2016/09/07/futures-design/ https://tokio.rs/blog/2019-10-scheduler/ https://www.artima.com/articles/io_design_patterns.html https://habr.com/en/post/183832/
來源: www.habr.com