В этой статье описана реализация конвейеров в ядре Unix. Я был несколько разочарован, что недавняя статья под названием «
О чём речь?
Конвейеры — «вероятно, самое важное изобретение в Unix» — это определяющая характеристика лежащей в основе Unix философии объединения воедино маленьких программ, а также знакомая надпись в командной строке:
$ echo hello | wc -c
6
Эта функциональность зависит от предоставляемого ядром системного вызова pipe
, который описан на страницах документации
Конвейеры обеспечивают однонаправленный канал межпроцессного взаимодействия. У конвейера есть вход (write end) и выход (read end). Данные, записанные во вход конвейера, могут быть считаны на выходе.
Конвейер создаётся с помощью вызова
pipe(2)
, который возвращает два файловых дескриптора: один ссылается на вход конвейера, второй на выход.
Результаты трассировки приведённой выше команды демонстрируют создание конвейера и поток данных через него из одного процесса в другой:
$ strace -qf -e execve,pipe,dup2,read,write
sh -c 'echo hello | wc -c'
execve("/bin/sh", ["sh", "-c", "echo hello | wc -c"], …)
pipe([3, 4]) = 0
[pid 2604795] dup2(4, 1) = 1
[pid 2604795] write(1, "hellon", 6) = 6
[pid 2604796] dup2(3, 0) = 0
[pid 2604796] execve("/usr/bin/wc", ["wc", "-c"], …)
[pid 2604796] read(0, "hellon", 16384) = 6
[pid 2604796] write(1, "6n", 2) = 2
Родительский процесс вызывает pipe()
, чтобы получить подключённые файловые дескрипторы. Один дочерний процесс записывает в один дескриптор, а другой процесс считывает те же данные из другого дескриптора. Оболочка с помощью dup2 «переименовывает» дескрипторы 3 и 4, чтобы они соответствовали stdin и stdout.
Без конвейеров оболочке пришлось бы записывать результат одного процесса в файл и передавать его другому процессу, чтобы тот прочитал данные из файла. В результате мы тратили бы больше ресурсов и место на диске. Однако конвейеры хороши не только тем, что позволяют избежать использования временных файлов:
Если процесс пытается прочитать из пустого конвейера, тогда
read(2)
заблокирует до тех пор, пока данные не станут доступны. Если процесс попытается записать в заполненный конвейер, тогдаwrite(2)
заблокирует до тех пор, пока из конвейера не будет считано достаточно данных для выполнения записи.
Как и POSIX-требование, это важное свойство: запись в конвейер вплоть до PIPE_BUF
байтов (как минимум 512) должна быть атомарной, чтобы процессы могли взаимодействовать друг с другом через конвейер так, как обычные файлы (не предоставляющие таких гарантий) не могут.
При использовании обычного файла процесс может записать в него все свои выходные данные и передать другому процессу. Или процессы могут действовать в режиме жёсткого распараллеливания, с помощью внешнего сигнального механизма (вроде семафора) сообщая друг другу о завершении записи или чтения. Конвейеры избавляют нас от всех этих хлопот.
Что мы ищем?
Объясню на пальцах, чтобы вам было легче представить, как может работать конвейер. Вам понадобится выделить в памяти буфер и какое-то состояние. Понадобятся функции для добавления и удаления данных из буфера. Понадобится какое-то средство, чтобы вызывать функции в ходе операций чтения и записи в файловые дескрипторы. И понадобятся блокировки, чтобы реализовать описанное выше специальное поведение.
Теперь мы готовы допросить при ярком свете ламп исходный код ядра, чтобы подтвердить или опровергнуть нашу смутную мысленную модель. Но всегда будьте готовы к неожиданностям.
Где мы ищем?
Я не знаю, где лежит мой экземпляр известной книги «
Блуждание по архивам TUHS сродни посещению музея. Мы можем взглянуть на нашу общую историю, и я испытываю уважение к многолетним усилиям по восстановлению всех этих материалов бит за битом со старых кассет и распечаток. И остро осознаю те фрагменты, которые ещё отсутствуют.
Удовлетворив своё любопытство в части древней истории конвейеров, для сравнения можем посмотреть на современные ядра.
Кстати, pipe
является системным вызовом номер 42 в таблице sysent[]
. Совпадение?
Традиционные ядра Unix (1970–1974)
Я не нашёл никаких следов pipe(2)
ни в
TUHS утверждает, что
Третья редакция Unix была последней версией с ядром, написанным на ассемблере, но при этом первой версией с конвейерами. В течение 1973-го велись работы по улучшению третьей редакции, ядро переписали на С, и так появилась четвёртая редакция Unix.
Один из читателей нашёл скан документа, в котором Даг МакИлрой предложил идею «соединения программ по принципу садового шланга».
В книге Брайана Кернигана «
Когда появилась Unix, моё увлечение корутинами заставило меня попросить автора ОС, Кена Томпсона, позволить данным, записанным в какой-то процесс, идти не только на устройство, но и на выход к другому процессу. Кен решил, что это возможно. Однако, как минималист, он хотел, чтобы каждая системная функция играла значительную роль. Действительно ли прямая запись между процессами имеет большое преимущество по сравнению с записью в промежуточный файл? И только когда я внёс конкретное предложение с броским названием «конвейер» и описанием синтаксиса взаимодействия процессов, Кен, наконец-то, воскликнул: «Я сделаю это!».
И сделал. Одним судьбоносным вечером Кен изменил ядро и оболочку, исправил несколько стандартных программ, стандартизировав их процедуру принятия входных данных (которые могут поступать из конвейера), а также поменял имена файлов. На следующий день конвейеры начали очень широко применять в приложениях. К концу недели секретарши с их помощью отправляли на принтер документы из текстовых редакторов. Чуть позднее Кен заменил оригинальный API и синтаксис для оболочки использования конвейеров на более чистые соглашения, которые с тех пор и применяются.
К сожалению, исходный код ядра третьей редакции Unix утерян. И хотя у нас есть написанный на С исходный код ядра
У нас есть текст документации по pipe(2)
из обоих релизов, поэтому можно начать с поиска в документации pipe(2)
написан на ассемблере и возвращает только один файловый дескриптор, но уже предоставляет ожидаемую основную функциональность:
Системный вызов pipe создаёт механизм ввода вывода, который называется конвейером. Возвращаемый файловый дескриптор можно использовать для операций чтения и записи. Когда в конвейер что-то записывается, то буферизуется до 504 байтов данных, после чего процесс записи приостанавливается. При чтении из конвейера буферизированные данные забираются.
К следующему году ядро было переписано на С, а pipe(fildes)
»:
Системный вызов pipe создаёт механизм ввода вывода, который называется конвейером. Возвращаемые файловые дескрипторы можно использовать в операциях чтения и записи. Когда что-то записывается в конвейер, то используется дескриптор, возвращаемый в r1 (соотв. fildes[1]), то буферизуется до 4096 байтов данных, после чего процесс записи приостанавливается. При чтении из конвейера дескриптор, возвращаемый в r0 (соотв. fildes[0]), забирает данные.
Предполагается, что после определения конвейера два (или более) взаимодействующих процесса (созданных последующими вызовами fork) будут передавать данные из конвейера с помощью вызовов read и write.
В оболочке есть синтаксис для определения линейного массива процессов, соединённых посредством конвейера.
Вызовы на чтение из пустого конвейера (не содержащего буферизированных данных), имеющего лишь один конец (закрыты все записывающие файловые дескрипторы), возвращают «конец файла». Вызовы на запись в аналогичной ситуации игнорируются.
Самая ранняя
Шестая редакция Unix (1975)
Начинаем читать исходный код Unix
Многие годы книга Lions была единственным документом по ядру Unix, доступным вне стен Bell Labs. Хотя лицензия шестой редакции позволяла преподавателям использовать её исходный код, однако лицензия седьмой редакции исключила эту возможность, поэтому книга распространялась в виде нелегальных машинописных копий.
Сегодня можно купить репринтный экземпляр книги, на обложке которой изображены студенты у копировального аппарата. А благодаря Уоррену Туми (который запустил проект TUHS) вы можете скачать
Больше 15 лет назад я набрал копию исходного кода, приведённого в Lions, потому что мне не нравилось качество моей копии с неизвестного количества других копий. TUHS ещё не существовало, и у меня не было доступа к старым исходникам. Но в 1988-м я нашёл старую ленту с 9 дорожками, на которой была резервная копия из компьютера PDP11. Трудно было понять, работает ли она, но там было неповреждённое дерево /usr/src/, в котором большинство файлов были помечены 1979-м годом, что уже тогда выглядело древностью. Это была седьмая редакция или её производная PWB, как я считал.
Я взял находку за основу и вручную отредактировал исходники до состояния шестой редакции. Часть кода осталась такой же, часть пришлось слегка подредактировать, поменяв современный токен += на устаревший =+. Что-то просто удалил, а что-то пришлось полностью переписать, но не слишком много.
И сегодня мы можем в онлайне читать на TUHS исходный код шестой редакции из
Кстати, на первый взгляд, главной особенностью С-кода до периода Кернигана и Ричи является его краткость. Не так часто мне удаётся вставлять фрагменты кода без обширного редактирования, чтобы он соответствовал относительно узкой области отображения на моём сайте.
В начале
/*
* Max allowable buffering per pipe.
* This is also the max size of the
* file created to implement the pipe.
* If this size is bigger than 4096,
* pipes will be implemented in LARG
* files, which is probably not good.
*/
#define PIPSIZ 4096
Размер буфера не менялся со времён четвёртой редакции. Но здесь мы безо всякой публичной документации видим, что когда-то конвейеры использовали файлы в качестве запасного хранилища!
Что касается LARG-файлов, то они соответствуют
Вот настоящий системный вызов pipe
:
/*
* The sys-pipe entry.
* Allocate an inode on the root device.
* Allocate 2 file structures.
* Put it all together with flags.
*/
pipe()
{
register *ip, *rf, *wf;
int r;
ip = ialloc(rootdev);
if(ip == NULL)
return;
rf = falloc();
if(rf == NULL) {
iput(ip);
return;
}
r = u.u_ar0[R0];
wf = falloc();
if(wf == NULL) {
rf->f_count = 0;
u.u_ofile[r] = NULL;
iput(ip);
return;
}
u.u_ar0[R1] = u.u_ar0[R0]; /* wf's fd */
u.u_ar0[R0] = r; /* rf's fd */
wf->f_flag = FWRITE|FPIPE;
wf->f_inode = ip;
rf->f_flag = FREAD|FPIPE;
rf->f_inode = ip;
ip->i_count = 2;
ip->i_flag = IACC|IUPD;
ip->i_mode = IALLOC;
}
В комментарии ясно описано, что тут происходит. Но разобраться в коде не так просто, отчасти из-за того, как с помощью «R0
и R1
передаются параметры системных вызовов и возвращаемые значения.
Попробуем с помощью
pipe()
должен через R0
и R1
возвращать номера файловых дескрипторов для чтения и записи. falloc()
возвращает указатель на файловую структуру, но также «возвращает» через u.u_ar0[R0]
и файловый дескриптор. То есть код сохраняет в r
файловый дескриптор для чтения и присваивает дескриптор для записи прямо из u.u_ar0[R0]
после второго вызова falloc()
.
Флаг FPIPE
, который мы задали при создании конвейера, управляет поведением функции
/*
* common code for read and write calls:
* check permissions, set base, count, and offset,
* and switch out to readi, writei, or pipe code.
*/
rdwr(mode)
{
register *fp, m;
m = mode;
fp = getf(u.u_ar0[R0]);
/* … */
if(fp->f_flag&FPIPE) {
if(m==FREAD)
readp(fp); else
writep(fp);
}
/* … */
}
Затем функция readp()
в pipe.c
считывает данные из конвейера. Но проследить реализацию лучше начиная с writep()
. Повторюсь, код усложнился из-за особенностей соглашения о передаче аргументов, но некоторые подробности можно опустить.
writep(fp)
{
register *rp, *ip, c;
rp = fp;
ip = rp->f_inode;
c = u.u_count;
loop:
/* If all done, return. */
plock(ip);
if(c == 0) {
prele(ip);
u.u_count = 0;
return;
}
/*
* If there are not both read and write sides of the
* pipe active, return error and signal too.
*/
if(ip->i_count < 2) {
prele(ip);
u.u_error = EPIPE;
psignal(u.u_procp, SIGPIPE);
return;
}
/*
* If the pipe is full, wait for reads to deplete
* and truncate it.
*/
if(ip->i_size1 == PIPSIZ) {
ip->i_mode =| IWRITE;
prele(ip);
sleep(ip+1, PPIPE);
goto loop;
}
/* Write what is possible and loop back. */
u.u_offset[0] = 0;
u.u_offset[1] = ip->i_size1;
u.u_count = min(c, PIPSIZ-u.u_offset[1]);
c =- u.u_count;
writei(ip);
prele(ip);
if(ip->i_mode&IREAD) {
ip->i_mode =& ~IREAD;
wakeup(ip+2);
}
goto loop;
}
На вход конвейера мы хотим записать байты u.u_count
. Сначала потребуем заблокировать индексный дескриптор (см. ниже plock
/prele
).
Затем проверяем счётчик ссылок inode. Пока оба конца конвейера остаются открытыми, счётчик должен быть равен 2. Мы придерживаем одну ссылку (из rp->f_inode
), так что если счётчик будет меньше 2, то это должно означать, что читающий процесс закрыл свой конец конвейера. Иными словами, мы пытаемся писать в закрытый конвейер, а это является ошибкой. Впервые код ошибки EPIPE
и сигнал SIGPIPE
появились в шестой редакции Unix.
Но даже если конвейер открыт, он может быть заполнен. В этом случае мы снимаем блокировку и идём спать в надежде, что другой процесс прочитает из конвейера и освободит в нём достаточно места. Проснувшись, мы возвращаемся к началу, опять вешаем блокировку и запускаем новый цикл записи.
Если в конвейере достаточно свободного места, то мы записываем в него данные с помощью i_size1
у inode’а (при пустом конвейере может быть равен 0) указывает на конец данных, которые в нём уже содержатся. Если места для записи достаточно, мы можем заполнить конвейер от i_size1
до PIPESIZ
. Затем снимаем блокировку и пытаемся пробудить любой процесс, который ждёт возможности прочитать из конвейера. Возвращаемся к началу, чтобы посмотреть, удалось ли записать столько байтов, сколько нам было нужно. Если не удалось, то начинаем новый цикл записи.
Обычно параметр i_mode
у inode’а используется для хранения разрешений r
, w
и x
. Но в случае с конвейерами мы сигнализируем об ожидании каким-то процессом записи или чтения с помощью битов IREAD
и IWRITE
соответственно. Процесс задаёт флаг и вызывает sleep()
, и ожидается, что будущем какой-то другой процесс вызовет wakeup()
.
Настоящее волшебство происходит в sleep()
и wakeup()
. Они реализованы в
/*
* Give up the processor till a wakeup occurs
* on chan, at which time the process
* enters the scheduling queue at priority pri.
* The most important effect of pri is that when
* pri<0 a signal cannot disturb the sleep;
* if pri>=0 signals will be processed.
* Callers of this routine must be prepared for
* premature return, and check that the reason for
* sleeping has gone away.
*/
sleep(chan, pri) /* … */
/*
* Wake up all processes sleeping on chan.
*/
wakeup(chan) /* … */
Процесс, который вызывает sleep()
для определённого канала, может быть позднее разбужен другим процессом, который вызовет wakeup()
для того же канала. writep()
и readp()
координируют свои действия посредством таких парных вызовов. Обратите внимание, что pipe.c
всегда отдаёт приоритет PPIPE
при вызове sleep()
, поэтому все sleep()
могут прерываться по сигналу.
Теперь у нас есть всё, чтобы разобраться в функции readp()
:
readp(fp)
int *fp;
{
register *rp, *ip;
rp = fp;
ip = rp->f_inode;
loop:
/* Very conservative locking. */
plock(ip);
/*
* If the head (read) has caught up with
* the tail (write), reset both to 0.
*/
if(rp->f_offset[1] == ip->i_size1) {
if(rp->f_offset[1] != 0) {
rp->f_offset[1] = 0;
ip->i_size1 = 0;
if(ip->i_mode&IWRITE) {
ip->i_mode =& ~IWRITE;
wakeup(ip+1);
}
}
/*
* If there are not both reader and
* writer active, return without
* satisfying read.
*/
prele(ip);
if(ip->i_count < 2)
return;
ip->i_mode =| IREAD;
sleep(ip+2, PPIPE);
goto loop;
}
/* Read and return */
u.u_offset[0] = 0;
u.u_offset[1] = rp->f_offset[1];
readi(ip);
rp->f_offset[1] = u.u_offset[1];
prele(ip);
}
Возможно, вам будет проще читать эту функцию снизу вверх. Ветка «read and return» обычно используется, когда в конвейере есть какие-то данные. В этом случае мы с помощью f_offset
чтения, а затем обновляем значение соответствующего смещения.
При последующем чтении конвейер будет пустым, если смещение чтения достигло значения i_size1
у inode’а. Мы сбрасываем позицию на 0 и пытаемся пробудить любой процесс, который хочет записать в конвейер. Мы знаем, что когда конвейер будет полон, writep()
заснёт на ip+1
. А теперь, когда конвейер пуст, мы можем пробудить его, чтобы он возобновил свой цикл записи.
Если читать нечего, то readp()
может задать флаг IREAD
и заснуть на ip+2
. Мы знаем, что его пробудит writep()
, когда запишет в конвейер какие-нибудь данные.
Комментарии к u
» мы можем обращаться с ними как с обычными функциями ввода-вывода, которые берут файл, позицию, буфер в памяти, и подсчитывают количество байтов для чтения или записи.
/*
* Read the file corresponding to
* the inode pointed at by the argument.
* The actual read arguments are found
* in the variables:
* u_base core address for destination
* u_offset byte offset in file
* u_count number of bytes to read
* u_segflg read to kernel/user
*/
readi(aip)
struct inode *aip;
/* … */
/*
* Write the file corresponding to
* the inode pointed at by the argument.
* The actual write arguments are found
* in the variables:
* u_base core address for source
* u_offset byte offset in file
* u_count number of bytes to write
* u_segflg write to kernel/user
*/
writei(aip)
struct inode *aip;
/* … */
Что касается «консервативной» блокировки, то readp()
и writep()
блокируют inode до тех пор, пока не закончат работу или не получат результат (то есть вызовут wakeup
). plock()
и prele()
работают просто: с помощью другого набора вызовов sleep
и wakeup
позволяют нам пробуждать любой процесс, которому нужна блокировка, которую мы только что сняли:
/*
* Lock a pipe.
* If its already locked, set the WANT bit and sleep.
*/
plock(ip)
int *ip;
{
register *rp;
rp = ip;
while(rp->i_flag&ILOCK) {
rp->i_flag =| IWANT;
sleep(rp, PPIPE);
}
rp->i_flag =| ILOCK;
}
/*
* Unlock a pipe.
* If WANT bit is on, wakeup.
* This routine is also used to unlock inodes in general.
*/
prele(ip)
int *ip;
{
register *rp;
rp = ip;
rp->i_flag =& ~ILOCK;
if(rp->i_flag&IWANT) {
rp->i_flag =& ~IWANT;
wakeup(rp);
}
}
Сначала я не мог понять, почему readp()
не вызывает prele(ip)
до вызова wakeup(ip+1)
. Первое, что writep()
вызывает в своём цикле, это plock(ip)
, который приводит к взаимоблокировке, если readp()
ещё не снял свой блок, поэтому код каким-то образом должен работать правильно. Если посмотреть на wakeup()
, то становится понятно, что он только помечает спящий процесс как готовый к исполнению, чтобы в будущем sched()
действительно запустила его. Так что readp()
вызывает wakeup()
, снимает блокировку, задаёт IREAD
и вызывает sleep(ip+2)
— всё это до того, как writep()
возобновляет цикл.
На этом описание конвейеров в шестой редакции закончено. Простой код, далеко идущие последствия.
Xv6, простое Unix-образное ядро
На создание ядра
В коде содержится понятная и продуманная реализация pipealloc()
:
#define PIPESIZE 512
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
int
pipealloc(struct file **f0, struct file **f1)
{
struct pipe *p;
p = 0;
*f0 = *f1 = 0;
if((*f0 = filealloc()) == 0 || (*f1 = filealloc()) == 0)
goto bad;
if((p = (struct pipe*)kalloc()) == 0)
goto bad;
p->readopen = 1;
p->writeopen = 1;
p->nwrite = 0;
p->nread = 0;
initlock(&p->lock, "pipe");
(*f0)->type = FD_PIPE;
(*f0)->readable = 1;
(*f0)->writable = 0;
(*f0)->pipe = p;
(*f1)->type = FD_PIPE;
(*f1)->readable = 0;
(*f1)->writable = 1;
(*f1)->pipe = p;
return 0;
bad:
if(p)
kfree((char*)p);
if(*f0)
fileclose(*f0);
if(*f1)
fileclose(*f1);
return -1;
}
pipealloc()
задаёт состояние все остальной реализации, которая включает в себя функции piperead()
, pipewrite()
и pipeclose()
. Фактический системный вызов sys_pipe
является обёрткой, реализованной в
Linux 0.01
Можно найти исходный код Linux 0.01. Будет поучительно изучить реализацию конвейеров в его fs
/pipe.c
. Здесь для представления конвейера используется inode, но сам конвейер написан на современном C. Если вы продрались через код шестой редакции, то здесь вы не испытаете трудностей. Так выглядит функция write_pipe()
:
int write_pipe(struct m_inode * inode, char * buf, int count)
{
char * b=buf;
wake_up(&inode->i_wait);
if (inode->i_count != 2) { /* no readers */
current->signal |= (1<<(SIGPIPE-1));
return -1;
}
while (count-->0) {
while (PIPE_FULL(*inode)) {
wake_up(&inode->i_wait);
if (inode->i_count != 2) {
current->signal |= (1<<(SIGPIPE-1));
return b-buf;
}
sleep_on(&inode->i_wait);
}
((char *)inode->i_size)[PIPE_HEAD(*inode)] =
get_fs_byte(b++);
INC_PIPE( PIPE_HEAD(*inode) );
wake_up(&inode->i_wait);
}
wake_up(&inode->i_wait);
return b-buf;
}
Даже не глядя на определения структур можно разобраться, как счётчик ссылок inode используется для проверки, приводит ли операция записи к SIGPIPE
. Помимо побайтовой работы эту функцию легко сопоставить с вышеописанными идеями. Даже логика sleep_on
/wake_up
не выглядит такой чужеродной.
Современные ядра Linux, FreeBSD, NetBSD, OpenBSD
Я быстро пробежался по некоторым современным ядрам. Ни в одном из них уже нет реализации с использованием диска (не удивительно). В Linux своя собственная реализация. И хотя три современных BSD-ядра содержат реализации на основе кода, который был написан Джоном Дайсоном, за прошедшие годы они стали слишком сильно отличаться друг от друга.
Чтобы читать fs
/pipe.c
(на Linux) или sys
/kern
/sys_pipe.c
(на *BSD), требуется настоящая самоотдача. Сегодня в коде важны производительность и поддержка таких функций, как векторные и асинхронные операции ввода-вывода. А подробности выделения памяти, блокировок и конфигурации ядра — всё это сильно разнится. Это не то, что нужно вузам для вводного курса по операционным системам.
В любом случае, мне было интересно раскопать несколько старинных паттернов (например, генерирование SIGPIPE
и возврат EPIPE
при записи в закрытый конвейер) во всех этих, таких разных, современных ядрах. Вероятно, я никогда не увижу вживую компьютер PDP-11, но ещё есть чему поучиться на коде, который был написан за несколько лет до моего рождения.
Написанная Диви Капуром в 2011-м году статья «
Источник: habr.com