Full-featured bare-C I/O reactor

Full-featured bare-C I/O reactor

Introduction

I/O reactor (single threaded event loop) is a pattern for writing high-load software, used in many popular solutions:

In this article, we will look at the ins and outs of an I/O reactor and how it works, write an implementation in less than 200 lines of code, and make a simple HTTP server process over 40 million requests/min.

foreword

  • The article was written to help understand the functioning of the I/O reactor, and therefore understand the risks when using it.
  • Knowledge of the basics is required to understand the article. C language and some experience in network application development.
  • All code is written in C language strictly according to (caution: long PDF) to C11 standard for Linux and available on GitHub.

Why do it?

With the growing popularity of the Internet, web servers began to need to handle a large number of connections simultaneously, and therefore two approaches were tried: blocking I/O on a large number of OS threads and non-blocking I/O in combination with an event notification system, also called “system selector" (epoll/KQUEUE/IOCP/etc).

The first approach involved creating a new OS thread for each incoming connection. Its disadvantage is poor scalability: the operating system will have to implement many context transitions и system calls. They are expensive operations and can lead to a lack of free RAM with an impressive number of connections.

The modified version highlights fixed number of threads (thread pool), thereby preventing the system from aborting execution, but at the same time introducing a new problem: if a thread pool is currently blocked by long read operations, then other sockets that are already able to receive data will not be able to do so.

The second approach uses event notification system (system selector) provided by the OS. This article discusses the most common type of system selector, based on alerts (events, notifications) about readiness for I/O operations, rather than on notifications about their completion. A simplified example of its use can be represented by the following block diagram:

Full-featured bare-C I/O reactor

The difference between these approaches is as follows:

  • Blocking I/O operations suspend user flow untiluntil the OS is properly defragments incoming IP packets to byte stream (TCP, receiving data) or there will not be enough space available in the internal write buffers for subsequent sending via NIC (sending data).
  • System selector over time notifies the program that the OS already defragmented IP packets (TCP, data reception) or enough space in internal write buffers already available (sending data).

To sum it up, reserving an OS thread for each I/O is a waste of computing power, because in reality, the threads are not doing useful work (hence the term "software interrupt"). The system selector solves this problem, allowing the user program to use CPU resources much more economically.

I/O reactor model

The I/O reactor acts as a layer between the system selector and the user code. The principle of its operation is described by the following block diagram:

Full-featured bare-C I/O reactor

  • Let me remind you that an event is a notification that a certain socket is able to perform a non-blocking I/O operation.
  • An event handler is a function called by the I/O reactor when an event is received, which then performs a non-blocking I/O operation.

It is important to note that the I/O reactor is by definition single-threaded, but there is nothing stopping the concept from being used in a multi-threaded environment at a ratio of 1 thread: 1 reactor, thereby recycling all CPU cores.

implementation

We will place the public interface in a file reactor.h, and implementation - in reactor.c. reactor.h will consist of the following announcements:

Show declarations in reactor.h

typedef struct reactor Reactor;

/*
 * Указатель на функцию, которая будет вызываться I/O реактором при поступлении
 * события от системного селектора.
 */
typedef void (*Callback)(void *arg, int fd, uint32_t events);

/*
 * Возвращает `NULL` в случае ошибки, не-`NULL` указатель на `Reactor` в
 * противном случае.
 */
Reactor *reactor_new(void);

/*
 * Освобождает системный селектор, все зарегистрированные сокеты в данный момент
 * времени и сам I/O реактор.
 *
 * Следующие функции возвращают -1 в случае ошибки, 0 в случае успеха.
 */
int reactor_destroy(Reactor *reactor);

int reactor_register(const Reactor *reactor, int fd, uint32_t interest,
                     Callback callback, void *callback_arg);
int reactor_deregister(const Reactor *reactor, int fd);
int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest,
                       Callback callback, void *callback_arg);

/*
 * Запускает цикл событий с тайм-аутом `timeout`.
 *
 * Эта функция передаст управление вызывающему коду если отведённое время вышло
 * или/и при отсутствии зарегистрированных сокетов.
 */
int reactor_run(const Reactor *reactor, time_t timeout);

The I/O reactor structure consists of file descriptor selector epoll и hash tables GHashTable, which maps each socket to CallbackData (structure of an event handler and a user argument for it).

Show Reactor and CallbackData

struct reactor {
    int epoll_fd;
    GHashTable *table; // (int, CallbackData)
};

typedef struct {
    Callback callback;
    void *arg;
} CallbackData;

Please note that we have enabled the ability to handle incomplete type according to the index. IN reactor.h we declare the structure reactor, And in reactor.c we define it, thereby preventing the user from explicitly changing its fields. This is one of the patterns hiding data, which succinctly fits into C semantics.

Functions reactor_register, reactor_deregister и reactor_reregister update the list of sockets of interest and corresponding event handlers in the system selector and hash table.

Show registration functions

#define REACTOR_CTL(reactor, op, fd, interest)                                 
    if (epoll_ctl(reactor->epoll_fd, op, fd,                                   
                  &(struct epoll_event){.events = interest,                    
                                        .data = {.fd = fd}}) == -1) {          
        perror("epoll_ctl");                                                   
        return -1;                                                             
    }

int reactor_register(const Reactor *reactor, int fd, uint32_t interest,
                     Callback callback, void *callback_arg) {
    REACTOR_CTL(reactor, EPOLL_CTL_ADD, fd, interest)
    g_hash_table_insert(reactor->table, int_in_heap(fd),
                        callback_data_new(callback, callback_arg));
    return 0;
}

int reactor_deregister(const Reactor *reactor, int fd) {
    REACTOR_CTL(reactor, EPOLL_CTL_DEL, fd, 0)
    g_hash_table_remove(reactor->table, &fd);
    return 0;
}

int reactor_reregister(const Reactor *reactor, int fd, uint32_t interest,
                       Callback callback, void *callback_arg) {
    REACTOR_CTL(reactor, EPOLL_CTL_MOD, fd, interest)
    g_hash_table_insert(reactor->table, int_in_heap(fd),
                        callback_data_new(callback, callback_arg));
    return 0;
}

After the I/O reactor has intercepted the event with the descriptor fd, it calls the corresponding event handler, to which it passes fd, bit mask generated events and a user pointer to void.

Show reactor_run() function

int reactor_run(const Reactor *reactor, time_t timeout) {
    int result;
    struct epoll_event *events;
    if ((events = calloc(MAX_EVENTS, sizeof(*events))) == NULL)
        abort();

    time_t start = time(NULL);

    while (true) {
        time_t passed = time(NULL) - start;
        int nfds =
            epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, timeout - passed);

        switch (nfds) {
        // Ошибка
        case -1:
            perror("epoll_wait");
            result = -1;
            goto cleanup;
        // Время вышло
        case 0:
            result = 0;
            goto cleanup;
        // Успешная операция
        default:
            // Вызвать обработчиков событий
            for (int i = 0; i < nfds; i++) {
                int fd = events[i].data.fd;

                CallbackData *callback =
                    g_hash_table_lookup(reactor->table, &fd);
                callback->callback(callback->arg, fd, events[i].events);
            }
        }
    }

cleanup:
    free(events);
    return result;
}

To summarize, the chain of function calls in user code will take the following form:

Full-featured bare-C I/O reactor

Single threaded server

In order to test the I/O reactor under high load, we will write a simple HTTP web server that responds to any request with an image.

A quick reference to the HTTP protocol

HTTP - this is the protocol application level, primarily used for server-browser interaction.

HTTP can be easily used over transport protocol TCP, sending and receiving messages in a format specified specification.

Request format

<КОМАНДА> <URI> <ВЕРСИЯ HTTP>CRLF
<ЗАГОЛОВОК 1>CRLF
<ЗАГОЛОВОК 2>CRLF
<ЗАГОЛОВОК N>CRLF CRLF
<ДАННЫЕ>

  • CRLF is a sequence of two characters: r и n, separating the first line of the request, headers and data.
  • <КОМАНДА> - one of CONNECT, DELETE, GET, HEAD, OPTIONS, PATCH, POST, PUT, TRACE. The browser will send a command to our server GET, meaning "Send me the contents of the file."
  • <URI>uniform resource identifier. For example, if URI = /index.html, then the client requests the main page of the site.
  • <ВЕРСИЯ HTTP> — version of the HTTP protocol in the format HTTP/X.Y. The most commonly used version today is HTTP/1.1.
  • <ЗАГОЛОВОК N> is a key-value pair in the format <КЛЮЧ>: <ЗНАЧЕНИЕ>, sent to the server for further analysis.
  • <ДАННЫЕ> — data required by the server to perform the operation. Often it's simple JSON or any other format.

Response format

<ВЕРСИЯ HTTP> <КОД СТАТУСА> <ОПИСАНИЕ СТАТУСА>CRLF
<ЗАГОЛОВОК 1>CRLF
<ЗАГОЛОВОК 2>CRLF
<ЗАГОЛОВОК N>CRLF CRLF
<ДАННЫЕ>

  • <КОД СТАТУСА> is a number representing the result of the operation. Our server will always return status 200 (successful operation).
  • <ОПИСАНИЕ СТАТУСА> — string representation of the status code. For status code 200 this is OK.
  • <ЗАГОЛОВОК N> — header of the same format as in the request. We will return the titles Content-Length (file size) and Content-Type: text/html (return data type).
  • <ДАННЫЕ> — data requested by the user. In our case, this is the path to the image in HTML.

File http_server.c (single threaded server) includes file common.h, which contains the following function prototypes:

Show function prototypes in common.h

/*
 * Обработчик событий, который вызовется после того, как сокет будет
 * готов принять новое соединение.
 */
static void on_accept(void *arg, int fd, uint32_t events);

/*
 * Обработчик событий, который вызовется после того, как сокет будет
 * готов отправить HTTP ответ.
 */
static void on_send(void *arg, int fd, uint32_t events);

/*
 * Обработчик событий, который вызовется после того, как сокет будет
 * готов принять часть HTTP запроса.
 */
static void on_recv(void *arg, int fd, uint32_t events);

/*
 * Переводит входящее соединение в неблокирующий режим.
 */
static void set_nonblocking(int fd);

/*
 * Печатает переданные аргументы в stderr и выходит из процесса с
 * кодом `EXIT_FAILURE`.
 */
static noreturn void fail(const char *format, ...);

/*
 * Возвращает файловый дескриптор сокета, способного принимать новые
 * TCP соединения.
 */
static int new_server(bool reuse_port);

The functional macro is also described SAFE_CALL() and the function is defined fail(). The macro compares the value of the expression with the error, and if the condition is true, calls the function fail():

#define SAFE_CALL(call, error)                                                 
    do {                                                                       
        if ((call) == error) {                                                   
            fail("%s", #call);                                                 
        }                                                                      
    } while (false)

Function fail() prints the passed arguments to the terminal (like printf()) and terminates the program with the code EXIT_FAILURE:

static noreturn void fail(const char *format, ...) {
    va_list args;
    va_start(args, format);
    vfprintf(stderr, format, args);
    va_end(args);
    fprintf(stderr, ": %sn", strerror(errno));
    exit(EXIT_FAILURE);
}

Function new_server() returns the file descriptor of the "server" socket created by system calls socket(), bind() и listen() and capable of accepting incoming connections in a non-blocking mode.

Show new_server() function

static int new_server(bool reuse_port) {
    int fd;
    SAFE_CALL((fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP)),
              -1);

    if (reuse_port) {
        SAFE_CALL(
            setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &(int){1}, sizeof(int)),
            -1);
    }

    struct sockaddr_in addr = {.sin_family = AF_INET,
                               .sin_port = htons(SERVER_PORT),
                               .sin_addr = {.s_addr = inet_addr(SERVER_IPV4)},
                               .sin_zero = {0}};

    SAFE_CALL(bind(fd, (struct sockaddr *)&addr, sizeof(addr)), -1);
    SAFE_CALL(listen(fd, SERVER_BACKLOG), -1);
    return fd;
}

  • Note that the socket is initially created in non-blocking mode using the flag SOCK_NONBLOCKso that in the function on_accept() (read more) system call accept() did not stop the thread execution.
  • If reuse_port is true, then this function will configure the socket with the option SO_REUSEPORT through setsockopt()to use the same port in a multi-threaded environment (see section “Multi-threaded server”).

Event Handler on_accept() called after the OS generates an event EPOLLIN, in this case meaning that the new connection can be accepted. on_accept() accepts a new connection, switches it to non-blocking mode and registers with an event handler on_recv() in an I/O reactor.

Show on_accept() function

static void on_accept(void *arg, int fd, uint32_t events) {
    int incoming_conn;
    SAFE_CALL((incoming_conn = accept(fd, NULL, NULL)), -1);
    set_nonblocking(incoming_conn);
    SAFE_CALL(reactor_register(reactor, incoming_conn, EPOLLIN, on_recv,
                               request_buffer_new()),
              -1);
}

Event Handler on_recv() called after the OS generates an event EPOLLIN, in this case meaning that the connection registered on_accept(), ready to receive data.

on_recv() reads data from the connection until the HTTP request is completely received, then it registers a handler on_send() to send an HTTP response. If the client breaks the connection, the socket is deregistered and closed using close().

Show function on_recv()

static void on_recv(void *arg, int fd, uint32_t events) {
    RequestBuffer *buffer = arg;

    // Принимаем входные данные до тех пор, что recv возвратит 0 или ошибку
    ssize_t nread;
    while ((nread = recv(fd, buffer->data + buffer->size,
                         REQUEST_BUFFER_CAPACITY - buffer->size, 0)) > 0)
        buffer->size += nread;

    // Клиент оборвал соединение
    if (nread == 0) {
        SAFE_CALL(reactor_deregister(reactor, fd), -1);
        SAFE_CALL(close(fd), -1);
        request_buffer_destroy(buffer);
        return;
    }

    // read вернул ошибку, отличную от ошибки, при которой вызов заблокирует
    // поток
    if (errno != EAGAIN && errno != EWOULDBLOCK) {
        request_buffer_destroy(buffer);
        fail("read");
    }

    // Получен полный HTTP запрос от клиента. Теперь регистрируем обработчика
    // событий для отправки данных
    if (request_buffer_is_complete(buffer)) {
        request_buffer_clear(buffer);
        SAFE_CALL(reactor_reregister(reactor, fd, EPOLLOUT, on_send, buffer),
                  -1);
    }
}

Event Handler on_send() called after the OS generates an event EPOLLOUT, meaning that the connection registered on_recv(), ready to send data. This function sends an HTTP response containing HTML with an image to the client and then changes the event handler back to on_recv().

Show on_send() function

static void on_send(void *arg, int fd, uint32_t events) {
    const char *content = "<img "
                          "src="https://habrastorage.org/webt/oh/wl/23/"
                          "ohwl23va3b-dioerobq_mbx4xaw.jpeg">";
    char response[1024];
    sprintf(response,
            "HTTP/1.1 200 OK" CRLF "Content-Length: %zd" CRLF "Content-Type: "
            "text/html" DOUBLE_CRLF "%s",
            strlen(content), content);

    SAFE_CALL(send(fd, response, strlen(response), 0), -1);
    SAFE_CALL(reactor_reregister(reactor, fd, EPOLLIN, on_recv, arg), -1);
}

And finally, in the file http_server.c, in function main() we create an I/O reactor using reactor_new(), create a server socket and register it, start the reactor using reactor_run() for exactly one minute, and then we release resources and exit the program.

Show http_server.c

#include "reactor.h"

static Reactor *reactor;

#include "common.h"

int main(void) {
    SAFE_CALL((reactor = reactor_new()), NULL);
    SAFE_CALL(
        reactor_register(reactor, new_server(false), EPOLLIN, on_accept, NULL),
        -1);
    SAFE_CALL(reactor_run(reactor, SERVER_TIMEOUT_MILLIS), -1);
    SAFE_CALL(reactor_destroy(reactor), -1);
}

Let's check that everything is working as expected. Compiling (chmod a+x compile.sh && ./compile.sh in the project root) and launch the self-written server, open http://127.0.0.1:18470 in the browser and see what we expected:

Full-featured bare-C I/O reactor

Performance measurement

Show my car specifications

$ screenfetch
 MMMMMMMMMMMMMMMMMMMMMMMMMmds+.        OS: Mint 19.1 tessa
 MMm----::-://////////////oymNMd+`     Kernel: x86_64 Linux 4.15.0-20-generic
 MMd      /++                -sNMd:    Uptime: 2h 34m
 MMNso/`  dMM    `.::-. .-::.` .hMN:   Packages: 2217
 ddddMMh  dMM   :hNMNMNhNMNMNh: `NMm   Shell: bash 4.4.20
     NMm  dMM  .NMN/-+MMM+-/NMN` dMM   Resolution: 1920x1080
     NMm  dMM  -MMm  `MMM   dMM. dMM   DE: Cinnamon 4.0.10
     NMm  dMM  -MMm  `MMM   dMM. dMM   WM: Muffin
     NMm  dMM  .mmd  `mmm   yMM. dMM   WM Theme: Mint-Y-Dark (Mint-Y)
     NMm  dMM`  ..`   ...   ydm. dMM   GTK Theme: Mint-Y [GTK2/3]
     hMM- +MMd/-------...-:sdds  dMM   Icon Theme: Mint-Y
     -NMm- :hNMNNNmdddddddddy/`  dMM   Font: Noto Sans 9
      -dMNs-``-::::-------.``    dMM   CPU: Intel Core i7-6700 @ 8x 4GHz [52.0°C]
       `/dMNmy+/:-------------:/yMMM   GPU: NV136
          ./ydNMMMMMMMMMMMMMMMMMMMMM   RAM: 2544MiB / 7926MiB
             .MMMMMMMMMMMMMMMMMMM

Let's measure the performance of a single-threaded server. Let's open two terminals: in one we'll run ./http_server, in a different - wrk. After a minute, the following statistics will be displayed in the second terminal:

$ wrk -c100 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
  8 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   493.52us   76.70us  17.31ms   89.57%
    Req/Sec    24.37k     1.81k   29.34k    68.13%
  11657769 requests in 1.00m, 1.60GB read
Requests/sec: 193974.70
Transfer/sec:     27.19MB

Our single-threaded server was able to process over 11 million requests per minute originating from 100 connections. Not a bad result, but can it be improved?

Multithreaded server

As mentioned above, the I/O reactor can be created in separate threads, thereby utilizing all CPU cores. Let's put this approach into practice:

Show http_server_multithreaded.c

#include "reactor.h"

static Reactor *reactor;
#pragma omp threadprivate(reactor)

#include "common.h"

int main(void) {
#pragma omp parallel
    {
        SAFE_CALL((reactor = reactor_new()), NULL);
        SAFE_CALL(reactor_register(reactor, new_server(true), EPOLLIN,
                                   on_accept, NULL),
                  -1);
        SAFE_CALL(reactor_run(reactor, SERVER_TIMEOUT_MILLIS), -1);
        SAFE_CALL(reactor_destroy(reactor), -1);
    }
}

Now every thread owns his own reactor:

static Reactor *reactor;
#pragma omp threadprivate(reactor)

Please note that the function argument new_server() advocates true. This means that we assign the option to the server socket SO_REUSEPORTto use it in a multi-threaded environment. You can read more details here.

Second entry

Now let's measure the performance of a multi-threaded server:

$ wrk -c100 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
  8 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.14ms    2.53ms  40.73ms   89.98%
    Req/Sec    79.98k    18.07k  154.64k    78.65%
  38208400 requests in 1.00m, 5.23GB read
Requests/sec: 635876.41
Transfer/sec:     89.14MB

The number of requests processed in 1 minute increased by ~3.28 times! But we were only ~XNUMX million short of the round number, so let’s try to fix that.

First let's look at the statistics generated perf:

$ sudo perf stat -B -e task-clock,context-switches,cpu-migrations,page-faults,cycles,instructions,branches,branch-misses,cache-misses ./http_server_multithreaded

 Performance counter stats for './http_server_multithreaded':

     242446,314933      task-clock (msec)         #    4,000 CPUs utilized          
         1 813 074      context-switches          #    0,007 M/sec                  
             4 689      cpu-migrations            #    0,019 K/sec                  
               254      page-faults               #    0,001 K/sec                  
   895 324 830 170      cycles                    #    3,693 GHz                    
   621 378 066 808      instructions              #    0,69  insn per cycle         
   119 926 709 370      branches                  #  494,653 M/sec                  
     3 227 095 669      branch-misses             #    2,69% of all branches        
           808 664      cache-misses                                                

      60,604330670 seconds time elapsed

Using CPU Affinity, compilation with -march=native, PGO, an increase in the number of hits Cache, increase MAX_EVENTS and use EPOLLET did not give a significant increase in performance. But what happens if you increase the number of simultaneous connections?

Statistics for 352 simultaneous connections:

$ wrk -c352 -d1m -t8 http://127.0.0.1:18470 -H "Host: 127.0.0.1:18470" -H "Accept-Language: en-US,en;q=0.5" -H "Connection: keep-alive"
Running 1m test @ http://127.0.0.1:18470
  8 threads and 352 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     2.12ms    3.79ms  68.23ms   87.49%
    Req/Sec    83.78k    12.69k  169.81k    83.59%
  40006142 requests in 1.00m, 5.48GB read
Requests/sec: 665789.26
Transfer/sec:     93.34MB

The desired result was obtained, and with it an interesting graph showing the dependence of the number of processed requests in 1 minute on the number of connections:

Full-featured bare-C I/O reactor

We see that after a couple of hundred connections, the number of processed requests for both servers drops sharply (in the multi-threaded version this is more noticeable). Is this related to the Linux TCP/IP stack implementation? Feel free to write your assumptions about this behavior of the graph and optimizations for multi-threaded and single-threaded options in the comments.

Как noted in the comments, this performance test does not show the behavior of the I/O reactor under real loads, because almost always the server interacts with the database, outputs logs, uses cryptography with TLS etc., as a result of which the load becomes non-uniform (dynamic). Tests together with third-party components will be carried out in the article about the I/O proactor.

Disadvantages of I/O reactor

You need to understand that the I/O reactor is not without its drawbacks, namely:

  • Using an I/O reactor in a multi-threaded environment is somewhat more difficult, because you will have to manually manage the flows.
  • Practice shows that in most cases the load is non-uniform, which can lead to one thread logging while another is busy with work.
  • If one event handler blocks a thread, then the system selector itself will also block, which can lead to hard-to-find bugs.

Solves these problems I/O proactor, which often has a scheduler that evenly distributes the load to a pool of threads, and also has a more convenient API. We will talk about it later, in my other article.

Conclusion

This is where our journey from theory straight into the profiler exhaust has come to an end.

You shouldn’t dwell on this, because there are many other equally interesting approaches to writing network software with different levels of convenience and speed. Interesting, in my opinion, links are given below.

Until next time!

Interesting projects

What else should I read?

Source: habr.com

Add a comment