This article describes the implementation of pipelines in the Unix kernel. I was somewhat disappointed that a recent article titled "
What is it?
Pipelines are "probably the most important invention in Unix" - a defining feature of Unix's underlying philosophy of putting together small programs, and the familiar command-line slogan:
$ echo hello | wc -c
6
This functionality depends on the kernel-provided system call pipe
, which is described on the documentation pages
Pipelines provide a one-way channel for inter-process communication. The pipeline has an input (write end) and an output (read end). Data written to the input of the pipeline can be read at the output.
The pipeline is created by calling
pipe(2)
, which returns two file descriptors: one refers to the input of the pipeline, the second to the output.
The trace output from the above command shows the creation of a pipeline and the flow of data through it from one process to another:
$ strace -qf -e execve,pipe,dup2,read,write
sh -c 'echo hello | wc -c'
execve("/bin/sh", ["sh", "-c", "echo hello | wc -c"], β¦)
pipe([3, 4]) = 0
[pid 2604795] dup2(4, 1) = 1
[pid 2604795] write(1, "hellon", 6) = 6
[pid 2604796] dup2(3, 0) = 0
[pid 2604796] execve("/usr/bin/wc", ["wc", "-c"], β¦)
[pid 2604796] read(0, "hellon", 16384) = 6
[pid 2604796] write(1, "6n", 2) = 2
The parent process calls pipe()
to get attached file descriptors. One child process writes to one descriptor and another process reads the same data from another descriptor. The shell "renames" descriptors 2 and 3 with dup4 to match stdin and stdout.
Without pipelines, the shell would have to write the output of one process to a file and pipe it to another process to read the data from the file. As a result, we would waste more resources and disk space. However, pipelines are good for more than just avoiding temporary files:
If a process tries to read from an empty pipeline, then
read(2)
will block until the data is available. If a process tries to write to a full pipeline, thenwrite(2)
will block until enough data has been read from the pipeline to complete the write.
Like the POSIX requirement, this is an important property: writing to the pipeline up to PIPE_BUF
bytes (at least 512) must be atomic so that processes can communicate with each other through the pipeline in a way that normal files (which provide no such guarantees) cannot.
With a regular file, a process can write all of its output to it and pass it on to another process. Or processes can operate in a hard parallel mode, using an external signaling mechanism (like a semaphore) to inform each other about the completion of a write or read. Conveyors save us from all this hassle.
What are we looking for?
I will explain on my fingers to make it easier for you to imagine how a conveyor can work. You will need to allocate a buffer and some state in memory. You will need functions to add and remove data from the buffer. You will need some facility to call functions during read and write operations on file descriptors. And locks are needed to implement the special behavior described above.
We are now ready to interrogate the source code of the kernel under bright lamplight to confirm or disprove our vague mental model. But always be prepared for the unexpected.
Where are we looking?
I don't know where my copy of the famous book lies.
Wandering through the TUHS archives is like visiting a museum. We can look at our shared history and I have respect for the years of effort to recover all of this material bit by bit from old cassettes and printouts. And I am acutely aware of those fragments that are still missing.
Having satisfied our curiosity about the ancient history of pipelines, we can look at modern cores for comparison.
Incidentally, pipe
is system call number 42 in the table sysent[]
. Coincidence?
Traditional Unix kernels (1970β1974)
I didn't find any trace pipe(2)
not in
TUHS claims that
The third edition of Unix was the last version with an assembler kernel, but the first version with pipelines. During 1973, work was underway to improve the third edition, the kernel was rewritten in C, and so the fourth edition of Unix was born.
One reader found a scan of a document in which Doug McIlroy proposed the idea of ββ"connecting programs like a garden hose."
In Brian Kernighan's book
When Unix appeared, my passion for coroutines made me ask the OS author, Ken Thompson, to allow data written to some process to go not only to the device, but also to the exit to another process. Ken thought it was possible. However, as a minimalist, he wanted every system feature to play a significant role. Is direct writing between processes really a big advantage over writing to an intermediate file? And only when I made a specific proposal with the catchy name "pipeline" and a description of the syntax of the interaction of processes, Ken finally exclaimed: "I will do it!".
And did. One fateful evening, Ken changed the kernel and shell, fixed several standard programs to standardize how they accept input (which might come from a pipeline), and changed filenames. The next day, pipelines were very widely used in applications. By the end of the week, the secretaries used them to send documents from word processors to the printer. Somewhat later, Ken replaced the original API and syntax for wrapping the use of pipelines with cleaner conventions that have been used ever since.
Unfortunately, the source code for the third edition Unix kernel has been lost. And although we have the kernel source code written in C
We have documentation text for pipe(2)
from both releases, so you can start by searching the documentation pipe(2)
is written in assembler and returns only one file descriptor, but already provides the expected core functionality:
System call pipe creates an I/O mechanism called a pipeline. The returned file descriptor can be used for read and write operations. When something is written to the pipeline, it buffers up to 504 bytes of data, after which the writing process is suspended. When reading from the pipeline, the buffered data is taken.
By the following year, the kernel had been rewritten in C, and pipe(fildes)
":
System call pipe creates an I/O mechanism called a pipeline. The returned file descriptors can be used in read and write operations. When something is written to the pipeline, the descriptor returned in r1 (resp. fildes[1]) is used, buffered up to 4096 bytes of data, after which the write process is suspended. When reading from the pipeline, the descriptor returned to r0 (resp. fildes[0]) takes the data.
It is assumed that once a pipeline has been defined, two (or more) interacting processes (created by subsequent invocations fork) will pass data from the pipeline using calls read ΠΈ write.
The shell has a syntax for defining a linear array of processes connected via a pipeline.
Calls to read from an empty pipeline (containing no buffered data) that has only one end (all write file descriptors closed) return "end of file". Write calls in a similar situation are ignored.
Earliest
Unix Sixth Edition (1975)
Starting to read Unix source code
For many years the book Lions was the only document on the Unix kernel available outside of Bell Labs. Although the sixth edition license allowed teachers to use its source code, the seventh edition license excluded this possibility, so the book was distributed in illegal typewritten copies.
Today you can buy a reprint copy of the book, the cover of which depicts students at the copier. And thanks to Warren Toomey (who started the TUHS project), you can download
Over 15 years ago, I typed in a copy of the source code provided in Lionsbecause I didn't like the quality of my copy from an unknown number of other copies. TUHS didn't exist yet, and I didn't have access to the old sources. But in 1988 I found an old tape with 9 tracks that had a backup from a PDP11 computer. It was hard to know if it worked, but there was an intact /usr/src/ tree in which most of the files were marked 1979, which even then looked ancient. It was the seventh edition, or a PWB derivative, I thought.
I took the find as a basis and manually edited the sources to the state of the sixth edition. Part of the code remained the same, part had to be slightly edited, changing the modern token += to the obsolete =+. Something was simply deleted, and something had to be completely rewritten, but not too much.
And today we can read online at TUHS the source code of the sixth edition of
By the way, at first glance, the main feature of the C-code before the period of Kernighan and Ritchie is its brevity. It's not often that I'm able to insert snippets of code without extensive editing to fit a relatively narrow display area on my site.
Early
/*
* Max allowable buffering per pipe.
* This is also the max size of the
* file created to implement the pipe.
* If this size is bigger than 4096,
* pipes will be implemented in LARG
* files, which is probably not good.
*/
#define PIPSIZ 4096
The buffer size has not changed since the fourth edition. But here we see, without any public documentation, that pipelines once used files as fallback storage!
As for LARG files, they correspond to
Here is the real system call pipe
:
/*
* The sys-pipe entry.
* Allocate an inode on the root device.
* Allocate 2 file structures.
* Put it all together with flags.
*/
pipe()
{
register *ip, *rf, *wf;
int r;
ip = ialloc(rootdev);
if(ip == NULL)
return;
rf = falloc();
if(rf == NULL) {
iput(ip);
return;
}
r = u.u_ar0[R0];
wf = falloc();
if(wf == NULL) {
rf->f_count = 0;
u.u_ofile[r] = NULL;
iput(ip);
return;
}
u.u_ar0[R1] = u.u_ar0[R0]; /* wf's fd */
u.u_ar0[R0] = r; /* rf's fd */
wf->f_flag = FWRITE|FPIPE;
wf->f_inode = ip;
rf->f_flag = FREAD|FPIPE;
rf->f_inode = ip;
ip->i_count = 2;
ip->i_flag = IACC|IUPD;
ip->i_mode = IALLOC;
}
The comment clearly describes what is happening here. But it's not that easy to understand the code, partly because of how "R0
ΠΈ R1
system call parameters and return values ββare passed.
Let's try with
pipe()
due through R0
ΠΈ R1
return file descriptor numbers for reading and writing. falloc()
returns a pointer to a file structure, but also "returns" via u.u_ar0[R0]
and a file descriptor. That is, the code is stored in r
file descriptor for reading and assigns a descriptor for writing directly from u.u_ar0[R0]
after second call falloc()
.
Flag FPIPE
, which we set when creating the pipeline, controls the behavior of the function
/*
* common code for read and write calls:
* check permissions, set base, count, and offset,
* and switch out to readi, writei, or pipe code.
*/
rdwr(mode)
{
register *fp, m;
m = mode;
fp = getf(u.u_ar0[R0]);
/* β¦ */
if(fp->f_flag&FPIPE) {
if(m==FREAD)
readp(fp); else
writep(fp);
}
/* β¦ */
}
Then the function readp()
Π² pipe.c
reads data from the pipeline. But it is better to trace the implementation starting from writep()
. Again, the code has become more complicated due to the nature of the argument passing convention, but some details can be omitted.
writep(fp)
{
register *rp, *ip, c;
rp = fp;
ip = rp->f_inode;
c = u.u_count;
loop:
/* If all done, return. */
plock(ip);
if(c == 0) {
prele(ip);
u.u_count = 0;
return;
}
/*
* If there are not both read and write sides of the
* pipe active, return error and signal too.
*/
if(ip->i_count < 2) {
prele(ip);
u.u_error = EPIPE;
psignal(u.u_procp, SIGPIPE);
return;
}
/*
* If the pipe is full, wait for reads to deplete
* and truncate it.
*/
if(ip->i_size1 == PIPSIZ) {
ip->i_mode =| IWRITE;
prele(ip);
sleep(ip+1, PPIPE);
goto loop;
}
/* Write what is possible and loop back. */
u.u_offset[0] = 0;
u.u_offset[1] = ip->i_size1;
u.u_count = min(c, PIPSIZ-u.u_offset[1]);
c =- u.u_count;
writei(ip);
prele(ip);
if(ip->i_mode&IREAD) {
ip->i_mode =& ~IREAD;
wakeup(ip+2);
}
goto loop;
}
We want to write bytes to the pipeline input u.u_count
. First we need to lock the inode (see below plock
/prele
).
Then we check the inode reference count. As long as both ends of the pipeline remain open, the counter should be 2. We hold on to one link (from rp->f_inode
), so if the counter is less than 2, then this should mean that the reading process has closed its end of the pipeline. In other words, we are trying to write to a closed pipeline, which is a mistake. First error code EPIPE
and signal SIGPIPE
appeared in the sixth edition of Unix.
But even if the conveyor is open, it may be full. In this case, we release the lock and go to sleep in the hope that another process will read from the pipeline and free up enough space in it. When we wake up, we return to the beginning, hang up the lock again and start a new write cycle.
If there is enough free space in the pipeline, then we write data to it using i_size1
the inode'a (with an empty pipeline can be equal to 0) points to the end of the data that it already contains. If there is enough space to write, we can fill the pipeline from i_size1
to PIPESIZ
. Then we release the lock and try to wake up any process that is waiting to read from the pipeline. We go back to the beginning to see if we managed to write as many bytes as we needed. If it fails, then we start a new recording cycle.
Usually parameter i_mode
inode is used to store permissions r
, w
ΠΈ x
. But in the case of pipelines, we signal that some process is waiting for a write or read using bits IREAD
ΠΈ IWRITE
respectively. The process sets the flag and calls sleep()
, and it is expected that in the future some other process will call wakeup()
.
The real magic happens in sleep()
ΠΈ wakeup()
. They are implemented in
/*
* Give up the processor till a wakeup occurs
* on chan, at which time the process
* enters the scheduling queue at priority pri.
* The most important effect of pri is that when
* pri<0 a signal cannot disturb the sleep;
* if pri>=0 signals will be processed.
* Callers of this routine must be prepared for
* premature return, and check that the reason for
* sleeping has gone away.
*/
sleep(chan, pri) /* β¦ */
/*
* Wake up all processes sleeping on chan.
*/
wakeup(chan) /* β¦ */
The process that calls sleep()
for a particular channel, may later be woken up by another process, which will call wakeup()
for the same channel. writep()
ΠΈ readp()
coordinate their actions through such paired calls. note that pipe.c
always prioritize PPIPE
when called sleep()
, so all sleep()
can be interrupted by a signal.
Now we have everything to understand the function readp()
:
readp(fp)
int *fp;
{
register *rp, *ip;
rp = fp;
ip = rp->f_inode;
loop:
/* Very conservative locking. */
plock(ip);
/*
* If the head (read) has caught up with
* the tail (write), reset both to 0.
*/
if(rp->f_offset[1] == ip->i_size1) {
if(rp->f_offset[1] != 0) {
rp->f_offset[1] = 0;
ip->i_size1 = 0;
if(ip->i_mode&IWRITE) {
ip->i_mode =& ~IWRITE;
wakeup(ip+1);
}
}
/*
* If there are not both reader and
* writer active, return without
* satisfying read.
*/
prele(ip);
if(ip->i_count < 2)
return;
ip->i_mode =| IREAD;
sleep(ip+2, PPIPE);
goto loop;
}
/* Read and return */
u.u_offset[0] = 0;
u.u_offset[1] = rp->f_offset[1];
readi(ip);
rp->f_offset[1] = u.u_offset[1];
prele(ip);
}
You might find it easier to read this function from bottom to top. The "read and return" branch is usually used when there is some data in the pipeline. In this case, we use f_offset
read, and then update the value of the corresponding offset.
On subsequent reads, the pipeline will be empty if the read offset has reached i_size1
at the inode. We reset the position to 0 and try to wake up any process that wants to write to the pipeline. We know that when the conveyor is full, writep()
fall asleep on ip+1
. And now that the pipeline is empty, we can wake it up to resume its write cycle.
If there is nothing to read, then readp()
can set a flag IREAD
and fall asleep on ip+2
. We know what will awaken him writep()
when it writes some data to the pipeline.
Comments on u
Β» we can treat them like regular I/O functions that take a file, a position, a buffer in memory, and count the number of bytes to read or write.
/*
* Read the file corresponding to
* the inode pointed at by the argument.
* The actual read arguments are found
* in the variables:
* u_base core address for destination
* u_offset byte offset in file
* u_count number of bytes to read
* u_segflg read to kernel/user
*/
readi(aip)
struct inode *aip;
/* β¦ */
/*
* Write the file corresponding to
* the inode pointed at by the argument.
* The actual write arguments are found
* in the variables:
* u_base core address for source
* u_offset byte offset in file
* u_count number of bytes to write
* u_segflg write to kernel/user
*/
writei(aip)
struct inode *aip;
/* β¦ */
As for "conservative" blocking, then readp()
ΠΈ writep()
lock inodes until they finish or get a result (i.e. call wakeup
). plock()
ΠΈ prele()
work simply: using a different set of calls sleep
ΠΈ wakeup
allow us to wake up any process that needs the lock we just released:
/*
* Lock a pipe.
* If its already locked, set the WANT bit and sleep.
*/
plock(ip)
int *ip;
{
register *rp;
rp = ip;
while(rp->i_flag&ILOCK) {
rp->i_flag =| IWANT;
sleep(rp, PPIPE);
}
rp->i_flag =| ILOCK;
}
/*
* Unlock a pipe.
* If WANT bit is on, wakeup.
* This routine is also used to unlock inodes in general.
*/
prele(ip)
int *ip;
{
register *rp;
rp = ip;
rp->i_flag =& ~ILOCK;
if(rp->i_flag&IWANT) {
rp->i_flag =& ~IWANT;
wakeup(rp);
}
}
At first I couldn't understand why readp()
does not cause prele(ip)
before the call wakeup(ip+1)
. The first thing writep()
calls in its loop, this plock(ip)
, which results in a deadlock if readp()
hasn't removed its block yet, so the code must somehow work correctly. If you look at wakeup()
, it becomes clear that it only marks the sleeping process as ready for execution, so that in the future sched()
really launched it. So readp()
causes wakeup()
, unlocks, sets IREAD
and causes sleep(ip+2)
- all this before writep()
restarts the cycle.
This completes the description of pipelines in the sixth edition. Simple code, far-reaching implications.
Xv6, a simple Unix-like kernel
To create a nucleus
The code contains a clear and thoughtful implementation pipealloc()
:
#define PIPESIZE 512
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
int
pipealloc(struct file **f0, struct file **f1)
{
struct pipe *p;
p = 0;
*f0 = *f1 = 0;
if((*f0 = filealloc()) == 0 || (*f1 = filealloc()) == 0)
goto bad;
if((p = (struct pipe*)kalloc()) == 0)
goto bad;
p->readopen = 1;
p->writeopen = 1;
p->nwrite = 0;
p->nread = 0;
initlock(&p->lock, "pipe");
(*f0)->type = FD_PIPE;
(*f0)->readable = 1;
(*f0)->writable = 0;
(*f0)->pipe = p;
(*f1)->type = FD_PIPE;
(*f1)->readable = 0;
(*f1)->writable = 1;
(*f1)->pipe = p;
return 0;
bad:
if(p)
kfree((char*)p);
if(*f0)
fileclose(*f0);
if(*f1)
fileclose(*f1);
return -1;
}
pipealloc()
sets the state of all the rest of the implementation, which includes functions piperead()
, pipewrite()
ΠΈ pipeclose()
. The actual system call sys_pipe
is a wrapper implemented in
Linux 0.01
You can find the source code for Linux 0.01. It will be instructive to study the implementation of pipelines in his fs
/pipe.c
. Here, an inode is used to represent the pipeline, but the pipeline itself is written in modern C. If you've hacked your way through the sixth edition code, you won't have any trouble here. This is what the function looks like write_pipe()
:
int write_pipe(struct m_inode * inode, char * buf, int count)
{
char * b=buf;
wake_up(&inode->i_wait);
if (inode->i_count != 2) { /* no readers */
current->signal |= (1<<(SIGPIPE-1));
return -1;
}
while (count-->0) {
while (PIPE_FULL(*inode)) {
wake_up(&inode->i_wait);
if (inode->i_count != 2) {
current->signal |= (1<<(SIGPIPE-1));
return b-buf;
}
sleep_on(&inode->i_wait);
}
((char *)inode->i_size)[PIPE_HEAD(*inode)] =
get_fs_byte(b++);
INC_PIPE( PIPE_HEAD(*inode) );
wake_up(&inode->i_wait);
}
wake_up(&inode->i_wait);
return b-buf;
}
Even without looking at the struct definitions, you can figure out how the inode reference count is used to check if a write operation results in SIGPIPE
. In addition to byte-by-byte work, this function is easy to compare with the above ideas. Even logic sleep_on
/wake_up
doesn't look so alien.
Modern Linux Kernels, FreeBSD, NetBSD, OpenBSD
I quickly went over some modern kernels. None of them already have a disk-based implementation (not surprising). Linux has its own implementation. And although the three modern BSD kernels contain implementations based on code that was written by John Dyson, over the years they have become too different from each other.
To read fs
/pipe.c
(on Linux) or sys
/kern
/sys_pipe.c
(on *BSD), it takes real dedication. Performance and support for features such as vector and asynchronous I/O are important in code today. And the details of memory allocation, locks, and kernel configuration all vary greatly. This is not what universities need for an introductory course on operating systems.
In any case, it was interesting for me to unearth a few old patterns (for example, generating SIGPIPE
and return EPIPE
when writing to a closed pipeline) in all these, so different, modern kernels. I will probably never see a PDP-11 computer live, but there is still a lot to learn from the code that was written a few years before I was born.
Written by Divi Kapoor in 2011, the article "
Source: habr.com