这篇文章描述了管道在 Unix 内核中的实现。 最近一篇题为“
我们在说什么?
管道“可能是 Unix 中最重要的发明”——这是 Unix 将小程序组合在一起的基本哲学的一个定义特征,以及熟悉的命令行口号:
$ echo hello | wc -c
6
此功能取决于内核提供的系统调用 pipe
,在文档页面上进行了描述
管道为进程间通信提供了一种单向通道。 管道有一个输入端(写端)和一个输出端(读端)。 写入管道输入的数据可以在输出处读取。
管道是通过调用创建的
pipe(2)
,它返回两个文件描述符:一个指向管道的输入,第二个指向输出。
上述命令的跟踪输出显示了管道的创建以及从一个进程到另一个进程的数据流:
$ strace -qf -e execve,pipe,dup2,read,write
sh -c 'echo hello | wc -c'
execve("/bin/sh", ["sh", "-c", "echo hello | wc -c"], …)
pipe([3, 4]) = 0
[pid 2604795] dup2(4, 1) = 1
[pid 2604795] write(1, "hellon", 6) = 6
[pid 2604796] dup2(3, 0) = 0
[pid 2604796] execve("/usr/bin/wc", ["wc", "-c"], …)
[pid 2604796] read(0, "hellon", 16384) = 6
[pid 2604796] write(1, "6n", 2) = 2
父进程调用 pipe()
获取附加的文件描述符。 一个子进程写入一个描述符,另一个进程从另一个描述符读取相同的数据。 shell 使用 dup2“重命名”描述符 3 和 4 以匹配 stdin 和 stdout。
如果没有管道,shell 将不得不将一个进程的输出写入文件并将其通过管道传输到另一个进程以从文件中读取数据。 结果,我们会浪费更多的资源和磁盘空间。 然而,管道不仅仅适用于避免临时文件:
如果一个进程试图从一个空的管道中读取,那么
read(2)
将阻塞直到数据可用。 如果一个进程试图写入一个完整的管道,那么write(2)
将阻塞,直到从管道中读取了足够的数据以完成写入。
与 POSIX 要求一样,这是一个重要的属性:写入管道最多 PIPE_BUF
字节(至少 512)必须是原子的,以便进程可以通过管道以普通文件(不提供此类保证)无法实现的方式相互通信。
使用常规文件,进程可以将其所有输出写入其中并将其传递给另一个进程。 或者进程可以在硬并行模式下运行,使用外部信号机制(如信号量)相互通知写入或读取的完成。 输送机使我们免于所有这些麻烦。
我们在找什么?
我将亲手解释,让您更容易想象传送带的工作原理。 您将需要在内存中分配一个缓冲区和一些状态。 您将需要函数来向缓冲区添加和删除数据。 在对文件描述符进行读写操作期间,您将需要一些工具来调用函数。 并且需要锁来实现上述特殊行为。
我们现在准备在明亮的灯光下审视内核的源代码,以确认或反驳我们模糊的心智模型。 但始终要为意外做好准备。
我们在看哪里?
我不知道我那本名著在哪儿。
漫步在 TUHS 档案中就像参观博物馆一样。 我们可以看看我们共同的历史,我很尊重多年来从旧磁带和打印输出中一点一点地恢复所有这些材料的努力。 我敏锐地意识到那些仍然缺失的碎片。
满足了我们对管道古代历史的好奇之后,我们可以看看现代的核心来进行比较。
顺便说一下, pipe
是表中的系统调用号42 sysent[]
。 巧合?
传统的 Unix 内核 (1970–1974)
我没有找到任何踪迹 pipe(2)
既不在
TUHS 声称
第三版 Unix 是最后一个使用汇编语言编写内核的版本,也是第一个使用管道的版本。 1973 年,第三版的改进工作正在进行,内核用 C 重写,第四版 Unix 诞生了。
一位读者找到了一份文件的扫描件,道格·麦克罗伊在其中提出了“像花园软管一样连接程序”的想法。
在 Brian Kernighan 的书中
当 Unix 出现时,我对协程的热情促使我向 OS 作者 Ken Thompson 提出要求,让写入某个进程的数据不仅可以到达设备,还可以到达另一个进程的出口。 肯认为这是可能的。 然而,作为一个极简主义者,他希望每个系统功能都能发挥重要作用。 进程之间直接写入真的比写入中间文件有很大优势吗? 并且只有当我提出了一个具体的建议,并给出了一个朗朗上口的名称“管道”,并描述了流程交互的语法时,Ken 才终于惊呼:“我会做的!”。
并且做到了。 一个决定性的夜晚,Ken 更改了内核和 shell,修复了几个标准程序以标准化它们接受输入(可能来自管道)的方式,并更改了文件名。 第二天,管道在应用中得到了非常广泛的应用。 到周末,秘书们用它们将文件从文字处理器发送到打印机。 稍后,Ken 将用于包装管道使用的原始 API 和语法替换为从那时起一直使用的更清晰的约定。
不幸的是,第三版 Unix 内核的源代码已经丢失。 虽然我们有用 C 编写的内核源代码
我们有文档文本 pipe(2)
来自两个版本,因此您可以从搜索文档开始 pipe(2)
用汇编语言编写,只返回一个文件描述符,但已经提供了预期的核心功能:
系统调用 管 创建称为管道的 I/O 机制。 返回的文件描述符可用于读写操作。 当有东西被写入管道时,它最多缓冲 504 字节的数据,之后写入过程被暂停。 从管道读取时,会获取缓冲的数据。
到次年,内核已用 C 重写,并且 pipe(fildes)
»:
系统调用 管 创建称为管道的 I/O 机制。 返回的文件描述符可用于读写操作。 当向管道写入内容时,将使用 r1 (resp.fildes[1]) 中返回的描述符,缓冲最多 4096 字节的数据,之后暂停写入过程。 从管道读取时,返回到 r0 (resp.fildes[0]) 的描述符获取数据。
假设一旦定义了管道,两个(或更多)交互进程(由后续调用创建 叉) 将使用调用从管道传递数据 读 и 写.
shell 具有用于定义通过管道连接的线性进程数组的语法。
调用从只有一端(所有写入文件描述符关闭)的空管道(不包含缓冲数据)读取返回“文件结尾”。 忽略类似情况下的写入调用。
最早的
Unix 第六版 (1975)
开始阅读 Unix 源代码
多年来书 狮子 是贝尔实验室以外唯一可用的关于 Unix 内核的文档。 虽然第六版许可允许教师使用其源代码,但第七版许可排除了这种可能性,因此该书以非法打印副本分发。
今天,您可以购买这本书的重印本,封面描绘的是复印机旁的学生。 感谢 Warren Toomey(TUHS 项目的发起人),您可以下载
15 多年前,我输入了一份提供的源代码副本 狮子因为我不喜欢未知数量的其他副本中我的副本的质量。 TUHS 还不存在,我也无法访问旧资源。 但在 1988 年,我发现了一盘有 9 条轨道的旧磁带,它有一台 PDP11 计算机的备份。 很难知道它是否有效,但有一个完好无损的 /usr/src/ 树,其中大部分文件都标记为 1979,即使在那时看起来也很古老。 我想那是第七版,或者是 PWB 的衍生版本。
我以查找为基础,手动编辑源代码到第六版的状态。 部分代码保持不变,部分代码需要稍微修改,将现代的 += 标记更改为过时的 =+。 有些东西被简单地删除了,有些东西必须完全重写,但不要太多。
今天我们可以在 TUHS 在线阅读第六版的源代码
顺便说一句,乍一看,Kernighan 和 Ritchie 时期之前的 C 代码的主要特征是它的 短. 我很少能够在不进行大量编辑以适应网站上相对狭窄的显示区域的情况下插入代码片段。
早
/*
* Max allowable buffering per pipe.
* This is also the max size of the
* file created to implement the pipe.
* If this size is bigger than 4096,
* pipes will be implemented in LARG
* files, which is probably not good.
*/
#define PIPSIZ 4096
自第四版以来缓冲区大小没有改变。 但是在这里我们看到,在没有任何公开文档的情况下,管道曾经使用文件作为后备存储!
至于 LARG 文件,它们对应于
这是真正的系统调用 pipe
:
/*
* The sys-pipe entry.
* Allocate an inode on the root device.
* Allocate 2 file structures.
* Put it all together with flags.
*/
pipe()
{
register *ip, *rf, *wf;
int r;
ip = ialloc(rootdev);
if(ip == NULL)
return;
rf = falloc();
if(rf == NULL) {
iput(ip);
return;
}
r = u.u_ar0[R0];
wf = falloc();
if(wf == NULL) {
rf->f_count = 0;
u.u_ofile[r] = NULL;
iput(ip);
return;
}
u.u_ar0[R1] = u.u_ar0[R0]; /* wf's fd */
u.u_ar0[R0] = r; /* rf's fd */
wf->f_flag = FWRITE|FPIPE;
wf->f_inode = ip;
rf->f_flag = FREAD|FPIPE;
rf->f_inode = ip;
ip->i_count = 2;
ip->i_flag = IACC|IUPD;
ip->i_mode = IALLOC;
}
评论清楚地描述了这里发生的事情。 但是理解代码并不那么容易,部分原因是“R0
и R1
传递系统调用参数和返回值。
让我们试试
pipe()
由于通过 R0
и R1
返回用于读取和写入的文件描述符编号。 falloc()
返回指向文件结构的指针,但也通过“返回” u.u_ar0[R0]
和一个文件描述符。 也就是说,代码存储在 r
用于读取的文件描述符,并分配一个用于直接写入的描述符 u.u_ar0[R0]
第二次通话后 falloc()
.
旗 FPIPE
,我们在创建管道时设置,控制函数的行为
/*
* common code for read and write calls:
* check permissions, set base, count, and offset,
* and switch out to readi, writei, or pipe code.
*/
rdwr(mode)
{
register *fp, m;
m = mode;
fp = getf(u.u_ar0[R0]);
/* … */
if(fp->f_flag&FPIPE) {
if(m==FREAD)
readp(fp); else
writep(fp);
}
/* … */
}
然后是函数 readp()
в pipe.c
从管道中读取数据。 但最好从以下位置开始跟踪实施 writep()
. 同样,由于参数传递约定的性质,代码变得更加复杂,但可以省略一些细节。
writep(fp)
{
register *rp, *ip, c;
rp = fp;
ip = rp->f_inode;
c = u.u_count;
loop:
/* If all done, return. */
plock(ip);
if(c == 0) {
prele(ip);
u.u_count = 0;
return;
}
/*
* If there are not both read and write sides of the
* pipe active, return error and signal too.
*/
if(ip->i_count < 2) {
prele(ip);
u.u_error = EPIPE;
psignal(u.u_procp, SIGPIPE);
return;
}
/*
* If the pipe is full, wait for reads to deplete
* and truncate it.
*/
if(ip->i_size1 == PIPSIZ) {
ip->i_mode =| IWRITE;
prele(ip);
sleep(ip+1, PPIPE);
goto loop;
}
/* Write what is possible and loop back. */
u.u_offset[0] = 0;
u.u_offset[1] = ip->i_size1;
u.u_count = min(c, PIPSIZ-u.u_offset[1]);
c =- u.u_count;
writei(ip);
prele(ip);
if(ip->i_mode&IREAD) {
ip->i_mode =& ~IREAD;
wakeup(ip+2);
}
goto loop;
}
我们想将字节写入管道输入 u.u_count
. 首先我们需要锁定索引节点(见下文 plock
/prele
).
然后我们检查 inode 引用计数。 只要管道的两端保持打开状态,计数器就应该为 2。我们保留一个链接(来自 rp->f_inode
),所以如果计数器小于 2,那么这应该意味着读取进程已经关闭了它的管道末端。 换句话说,我们试图写入一个封闭的管道,这是一个错误。 第一个错误代码 EPIPE
和信号 SIGPIPE
出现在 Unix 第六版中。
但即使传送带打开,它也可能已满。 在这种情况下,我们释放锁并进入睡眠状态,希望另一个进程将从管道中读取并释放足够的空间。 当我们醒来时,我们回到起点,再次挂掉锁,开始新的写周期。
如果管道中有足够的可用空间,那么我们使用 i_size1
inode'a(空管道可以等于 0)指向它已经包含的数据的末尾。 如果有足够的空间可以写入,我们可以从 i_size1
对 PIPESIZ
. 然后我们释放锁并尝试唤醒任何正在等待从管道读取的进程。 我们回到开头,看看我们是否设法写入了我们需要的字节数。 如果没有,那么我们开始一个新的记录周期。
通常参数 i_mode
inode用来存放权限 r
, w
и x
. 但在管道的情况下,我们用位表示某个进程正在等待写入或读取 IREAD
и IWRITE
分别。 该过程设置标志并调用 sleep()
, 预计将来其他进程会调用 wakeup()
.
真正的魔法发生在 sleep()
и wakeup()
. 它们是在
/*
* Give up the processor till a wakeup occurs
* on chan, at which time the process
* enters the scheduling queue at priority pri.
* The most important effect of pri is that when
* pri<0 a signal cannot disturb the sleep;
* if pri>=0 signals will be processed.
* Callers of this routine must be prepared for
* premature return, and check that the reason for
* sleeping has gone away.
*/
sleep(chan, pri) /* … */
/*
* Wake up all processes sleeping on chan.
*/
wakeup(chan) /* … */
调用的过程 sleep()
对于特定通道,稍后可能会被另一个进程唤醒,该进程将调用 wakeup()
对于同一个频道。 writep()
и readp()
通过这样的配对呼叫协调他们的行动。 注意 pipe.c
总是优先考虑 PPIPE
当被调用时 sleep()
, 所以所有 sleep()
可以被信号打断。
现在我们已经了解了该功能的所有内容 readp()
:
readp(fp)
int *fp;
{
register *rp, *ip;
rp = fp;
ip = rp->f_inode;
loop:
/* Very conservative locking. */
plock(ip);
/*
* If the head (read) has caught up with
* the tail (write), reset both to 0.
*/
if(rp->f_offset[1] == ip->i_size1) {
if(rp->f_offset[1] != 0) {
rp->f_offset[1] = 0;
ip->i_size1 = 0;
if(ip->i_mode&IWRITE) {
ip->i_mode =& ~IWRITE;
wakeup(ip+1);
}
}
/*
* If there are not both reader and
* writer active, return without
* satisfying read.
*/
prele(ip);
if(ip->i_count < 2)
return;
ip->i_mode =| IREAD;
sleep(ip+2, PPIPE);
goto loop;
}
/* Read and return */
u.u_offset[0] = 0;
u.u_offset[1] = rp->f_offset[1];
readi(ip);
rp->f_offset[1] = u.u_offset[1];
prele(ip);
}
您可能会发现从下到上阅读此函数更容易。 “读取并返回”分支通常在管道中有一些数据时使用。 在这种情况下,我们使用 f_offset
读取,然后更新相应偏移量的值。
在后续读取中,如果读取偏移量已达到,管道将为空 i_size1
在索引节点。 我们将位置重置为 0 并尝试唤醒任何想要写入管道的进程。 我们知道,当传送带装满时, writep()
睡着了 ip+1
. 现在管道是空的,我们可以唤醒它以恢复其写入周期。
如果没有什么可读的,那么 readp()
可以设置一个标志 IREAD
睡着了 ip+2
. 我们知道什么会唤醒他 writep()
当它向管道写入一些数据时。
评论 u
» 我们可以将它们视为常规 I/O 函数,这些函数获取文件、位置、内存中的缓冲区,并计算要读取或写入的字节数。
/*
* Read the file corresponding to
* the inode pointed at by the argument.
* The actual read arguments are found
* in the variables:
* u_base core address for destination
* u_offset byte offset in file
* u_count number of bytes to read
* u_segflg read to kernel/user
*/
readi(aip)
struct inode *aip;
/* … */
/*
* Write the file corresponding to
* the inode pointed at by the argument.
* The actual write arguments are found
* in the variables:
* u_base core address for source
* u_offset byte offset in file
* u_count number of bytes to write
* u_segflg write to kernel/user
*/
writei(aip)
struct inode *aip;
/* … */
至于“保守”封锁,那么 readp()
и writep()
阻塞 inode 直到它们完成或得到结果(也就是说,它们调用 wakeup
). plock()
и prele()
工作简单:使用一组不同的调用 sleep
и wakeup
允许我们唤醒任何需要我们刚刚释放的锁的进程:
/*
* Lock a pipe.
* If its already locked, set the WANT bit and sleep.
*/
plock(ip)
int *ip;
{
register *rp;
rp = ip;
while(rp->i_flag&ILOCK) {
rp->i_flag =| IWANT;
sleep(rp, PPIPE);
}
rp->i_flag =| ILOCK;
}
/*
* Unlock a pipe.
* If WANT bit is on, wakeup.
* This routine is also used to unlock inodes in general.
*/
prele(ip)
int *ip;
{
register *rp;
rp = ip;
rp->i_flag =& ~ILOCK;
if(rp->i_flag&IWANT) {
rp->i_flag =& ~IWANT;
wakeup(rp);
}
}
一开始我不明白为什么 readp()
不会导致 prele(ip)
通话前 wakeup(ip+1)
. 第一件事 writep()
在它的循环中调用,这个 plock(ip)
,这会导致死锁,如果 readp()
尚未删除其块,因此代码必须以某种方式正常工作。 如果你看 wakeup()
, 很明显它只将休眠进程标记为准备执行,以便将来 sched()
真的推出了。 所以 readp()
原因 wakeup()
, 解锁, 设置 IREAD
和电话 sleep(ip+2)
- 这一切之前 writep()
重新开始循环。
这完成了第六版中管道的描述。 简单的代码,深远的影响。
Xv6,一个简单的类 Unix 内核
创建一个核
该代码包含清晰而周到的实现 pipealloc()
:
#define PIPESIZE 512
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
int
pipealloc(struct file **f0, struct file **f1)
{
struct pipe *p;
p = 0;
*f0 = *f1 = 0;
if((*f0 = filealloc()) == 0 || (*f1 = filealloc()) == 0)
goto bad;
if((p = (struct pipe*)kalloc()) == 0)
goto bad;
p->readopen = 1;
p->writeopen = 1;
p->nwrite = 0;
p->nread = 0;
initlock(&p->lock, "pipe");
(*f0)->type = FD_PIPE;
(*f0)->readable = 1;
(*f0)->writable = 0;
(*f0)->pipe = p;
(*f1)->type = FD_PIPE;
(*f1)->readable = 0;
(*f1)->writable = 1;
(*f1)->pipe = p;
return 0;
bad:
if(p)
kfree((char*)p);
if(*f0)
fileclose(*f0);
if(*f1)
fileclose(*f1);
return -1;
}
pipealloc()
设置所有其余实现的状态,其中包括函数 piperead()
, pipewrite()
и pipeclose()
. 实际的系统调用 sys_pipe
是一个包装器实现
Linux 0.01
您可以找到 Linux 0.01 的源代码。 在他的研究中研究管道的实现将是有益的 fs
/pipe.c
. 在这里,一个 inode 用于表示管道,但管道本身是用现代 C 语言编写的。如果您已经通过第六版代码进行了破解,那么您在这里就不会有任何问题。 这就是函数的样子 write_pipe()
:
int write_pipe(struct m_inode * inode, char * buf, int count)
{
char * b=buf;
wake_up(&inode->i_wait);
if (inode->i_count != 2) { /* no readers */
current->signal |= (1<<(SIGPIPE-1));
return -1;
}
while (count-->0) {
while (PIPE_FULL(*inode)) {
wake_up(&inode->i_wait);
if (inode->i_count != 2) {
current->signal |= (1<<(SIGPIPE-1));
return b-buf;
}
sleep_on(&inode->i_wait);
}
((char *)inode->i_size)[PIPE_HEAD(*inode)] =
get_fs_byte(b++);
INC_PIPE( PIPE_HEAD(*inode) );
wake_up(&inode->i_wait);
}
wake_up(&inode->i_wait);
return b-buf;
}
即使不查看结构定义,您也可以弄清楚索引节点引用计数是如何用于检查写入操作是否导致 SIGPIPE
. 除了逐字节的工作,这个函数很容易和上面的思路进行比较。 偶逻辑 sleep_on
/wake_up
看起来不那么陌生。
现代 Linux 内核、FreeBSD、NetBSD、OpenBSD
我很快浏览了一些现代内核。 它们都没有基于磁盘的实现(不足为奇)。 Linux 有自己的实现。 尽管三个现代 BSD 内核包含基于 John Dyson 编写的代码的实现,但多年来它们彼此之间的差异太大了。
读书 fs
/pipe.c
(在 Linux 上)或 sys
/kern
/sys_pipe.c
(在 *BSD 上),它需要真正的奉献精神。 性能和对矢量和异步 I/O 等功能的支持在当今的代码中非常重要。 并且内存分配、锁和内核配置的细节都大不相同。 这不是大学操作系统入门课程所需要的。
无论如何,发掘一些旧模式对我来说很有趣(例如,生成 SIGPIPE
并返回 EPIPE
在所有这些如此不同的现代内核中写入封闭管道时。 我可能永远不会现场看到 PDP-11 计算机,但是从我出生前几年编写的代码中仍然可以学到很多东西。
迪维·卡普尔 (Divi Kapoor) 于 2011 年撰写的文章“
来源: habr.com