Este artigo describe a implementación de pipelines no núcleo Unix. Decepcionoume un pouco que un artigo recente titulado "
De que estamos a falar?
Os pipelines, "probablemente o invento máis importante de Unix", son unha característica definitoria da filosofía subxacente de Unix de vincular pequenos programas, así como un sinal familiar na liña de comandos:
$ echo hello | wc -c
6
Esta funcionalidade depende da chamada do sistema proporcionada polo núcleo pipe
, que se describe nas páxinas de documentación
As canalizacións proporcionan unha canle unidireccional para a comunicación entre procesos. A canalización ten unha entrada (fin de escritura) e unha saída (fin de lectura). Os datos escritos na entrada da canalización pódense ler na saída.
A canalización créase mediante a chamada
pipe(2)
, que devolve dous descritores de ficheiros: un referido á entrada da canalización, o segundo á saída.
A saída de rastrexo do comando anterior mostra a creación da canalización e o fluxo de datos a través dela dun proceso a outro:
$ strace -qf -e execve,pipe,dup2,read,write
sh -c 'echo hello | wc -c'
execve("/bin/sh", ["sh", "-c", "echo hello | wc -c"], …)
pipe([3, 4]) = 0
[pid 2604795] dup2(4, 1) = 1
[pid 2604795] write(1, "hellon", 6) = 6
[pid 2604796] dup2(3, 0) = 0
[pid 2604796] execve("/usr/bin/wc", ["wc", "-c"], …)
[pid 2604796] read(0, "hellon", 16384) = 6
[pid 2604796] write(1, "6n", 2) = 2
O proceso pai chama pipe()
para obter descritores de ficheiros montados. Un proceso fillo escribe nun identificador e outro proceso le os mesmos datos doutro identificador. O shell usa dup2 para "renomear" os descritores 3 e 4 para que coincidan con stdin e stdout.
Sen tubos, o shell tería que escribir o resultado dun proceso nun ficheiro e pasalo a outro proceso para ler os datos do ficheiro. Como resultado, malgastaríamos máis recursos e espazo no disco. Non obstante, as canalizacións son boas non só porque permiten evitar o uso de ficheiros temporais:
Se un proceso está tentando ler desde unha canalización baleira, entón
read(2)
bloquearase ata que os datos estean dispoñibles. Se un proceso intenta escribir nunha canalización completa, entónwrite(2)
bloqueará ata que se leran datos suficientes da canalización para realizar a escritura.
Do mesmo xeito que o requisito POSIX, esta é unha propiedade importante: escribir na canalización ata PIPE_BUF
os bytes (polo menos 512) deben ser atómicos para que os procesos poidan comunicarse entre si a través da canalización dun xeito que os ficheiros normais (que non ofrecen tales garantías) non poden.
Cando se usa un ficheiro normal, un proceso pode escribir nel toda a súa saída e pasalo a outro proceso. Ou os procesos poden operar nun modo altamente paralelo, utilizando un mecanismo de sinalización externo (como un semáforo) para notificarse mutuamente cando se completa unha escritura ou lectura. As cintas transportadoras sálvanos de toda esta molestia.
Que buscamos?
Explicareino en termos sinxelos para que che sexa máis fácil imaxinar como pode funcionar un transportador. Necesitará asignar un búfer e algún estado na memoria. Necesitará funcións para engadir e eliminar datos do búfer. Necesitará algúns medios para chamar funcións durante as operacións de lectura e escritura nos descritores de ficheiros. E necesitarás bloqueos para implementar o comportamento especial descrito anteriormente.
Agora estamos preparados para interrogar o código fonte do núcleo baixo unha luz brillante para confirmar ou desmentir o noso vago modelo mental. Pero sempre estea preparado para o inesperado.
Onde buscamos?
Non sei onde está o meu exemplar do famoso libro "
Pasear polos arquivos TUHS é como visitar un museo. Podemos mirar a nosa historia compartida, e teño respecto polos moitos anos de esforzo por recuperar todo este material pouco a pouco a partir de cintas e estampas antigas. E son moi consciente deses fragmentos que aínda faltan.
Unha vez satisfeita a nosa curiosidade pola historia antiga dos transportadores, podemos mirar os núcleos modernos para comparalos.
By the way, pipe
é o número de chamada do sistema 42 na táboa sysent[]
. ¿Coincidencia?
Núcleos tradicionais de Unix (1970–1974)
Non atopei ningún rastro pipe(2)
nin en
TUHS afirma que
A 1973ª edición de Unix foi a última versión cun núcleo escrito en linguaxe ensamblador, pero tamén a primeira versión con pipelines. Durante XNUMX traballouse para mellorar a terceira edición, o núcleo foi reescrito en C, e así apareceu a cuarta edición de Unix.
Un lector atopou un escaneado dun documento no que Doug McIlroy propoñía a idea de "conectar programas como unha mangueira de xardín".
No libro de Brian Kernighan
Cando saíu Unix, a miña fascinación polas corrutinas levoume a pedirlle ao autor do SO, Ken Thompson, que permitise que os datos escritos nun proceso fosen non só para o dispositivo, senón tamén para saír a outro proceso. Ken decidiu que era posible. Non obstante, como minimalista, quería que cada función do sistema desempeñase un papel importante. Escribir directamente entre procesos é realmente unha gran vantaxe sobre escribir nun ficheiro intermedio? Foi só cando fixen unha proposta específica co nome pegadizo "pipeline" e unha descrición da sintaxe para a interacción entre procesos que Ken finalmente exclamou: "Eu fareino!"
E fixo. Unha noite fatídica, Ken cambiou o núcleo e o shell, arranxou varios programas estándar para estandarizar como aceptaban a entrada (que podería vir dunha canalización) e tamén cambiou os nomes dos ficheiros. Ao día seguinte, as canalizacións comezaron a utilizarse de forma moi ampla en aplicacións. A finais da semana, as secretarias utilizábanas para enviar documentos dos procesadores de textos á impresora. Un pouco máis tarde, Ken substituíu a API e a sintaxe orixinais para envolver o uso de canalizacións con convencións máis limpas, que se utilizaron desde entón.
Desafortunadamente, o código fonte da terceira edición do núcleo Unix perdeuse. E aínda que temos o código fonte do núcleo escrito en C
Temos documentación de texto para pipe(2)
das dúas versións, polo que podes comezar buscando na documentación pipe(2)
está escrito en linguaxe ensamblador e devolve só un descritor de ficheiro, pero xa proporciona a funcionalidade básica esperada:
Chamada do sistema tubo crea un mecanismo de entrada/saída chamado pipeline. O descritor de ficheiro devolto pódese usar para operacións de lectura e escritura. Cando se escribe algo na canalización, almacénanse ata 504 bytes de datos, despois do cal se suspende o proceso de escritura. Ao ler desde a canalización, quítanse os datos almacenados no búfer.
Ao ano seguinte, o núcleo fora reescrito en C e pipe(fildes)
"
Chamada do sistema tubo crea un mecanismo de entrada/saída chamado pipeline. Os descritores de ficheiros devoltos pódense usar en operacións de lectura e escritura. Cando se escribe algo na canalización, utilízase o identificador devolto en r1 (resp. fildes[1]), almacenado en búfer a 4096 bytes de datos, despois do cal se suspende o proceso de escritura. Ao ler desde a canalización, o identificador devolto a r0 (resp. fildes[0]) toma os datos.
Suponse que unha vez que se define unha canalización, dous (ou máis) procesos de comunicación (creados por chamadas posteriores a garfo) transferirá datos da canalización mediante chamadas ler и escribir.
O shell ten unha sintaxe para definir unha matriz lineal de procesos conectados por unha canalización.
As chamadas para ler desde unha canalización baleira (que non contén datos almacenados no búfer) que só ten un extremo (todos os descritores de ficheiros de escritura están pechados) devolven "fin do ficheiro". As chamadas para escribir nunha situación similar son ignoradas.
O máis cedo
Sexta edición de Unix (1975)
Comecemos a ler o código fonte de Unix
Durante moitos anos o libro Lions foi o único documento no núcleo Unix dispoñible fóra de Bell Labs. Aínda que a licenza da sexta edición permitía aos profesores utilizar o seu código fonte, a licenza da sétima edición excluía esta posibilidade, polo que o libro foi distribuído en forma de copias mecanografiadas ilegais.
Hoxe podedes mercar unha reimpresión do libro, cuxa portada mostra aos alumnos nunha fotocopiadora. E grazas a Warren Toomey (que iniciou o proxecto TUHS) podedes descargar
Hai máis de 15 anos, escribín unha copia do código fonte que figuraba Lions, porque non me gustou a calidade da miña copia dun número descoñecido doutras copias. TUHS aínda non existía e non tiña acceso ás fontes antigas. Pero en 1988, atopei unha cinta antiga de 9 pistas que contiña unha copia de seguridade dun ordenador PDP11. Era difícil saber se estaba funcionando, pero había unha árbore /usr/src/ intacta na que a maioría dos ficheiros tiñan a etiqueta do ano 1979, que aínda entón parecía antigo. Era a sétima edición ou o seu derivado PWB, como eu cría.
Tomei o achado como base e editei manualmente as fontes ata a sexta edición. Algúns do código permaneceron igual, pero algúns tiveron que ser editados lixeiramente, cambiando o token += moderno polo =+ obsoleto. Algunhas cousas foron simplemente borradas, e algunhas tiveron que ser reescritas completamente, pero non demasiado.
E hoxe podemos ler en liña en TUHS o código fonte da sexta edición de
Por certo, a primeira vista, a principal característica do código C antes do período de Kernighan e Ritchie é a súa brevidade. Non adoita ser capaz de inserir fragmentos de código sen unha edición extensa para axustar unha área de visualización relativamente estreita do meu sitio.
Cedo
/*
* Max allowable buffering per pipe.
* This is also the max size of the
* file created to implement the pipe.
* If this size is bigger than 4096,
* pipes will be implemented in LARG
* files, which is probably not good.
*/
#define PIPSIZ 4096
O tamaño do buffer non cambiou desde a cuarta edición. Pero aquí vemos, sen ningunha documentación pública, que as canalizacións usaron ficheiros como almacenamento de copia de seguridade.
En canto aos ficheiros LARG, corresponden a
Aquí está a chamada do sistema real pipe
:
/*
* The sys-pipe entry.
* Allocate an inode on the root device.
* Allocate 2 file structures.
* Put it all together with flags.
*/
pipe()
{
register *ip, *rf, *wf;
int r;
ip = ialloc(rootdev);
if(ip == NULL)
return;
rf = falloc();
if(rf == NULL) {
iput(ip);
return;
}
r = u.u_ar0[R0];
wf = falloc();
if(wf == NULL) {
rf->f_count = 0;
u.u_ofile[r] = NULL;
iput(ip);
return;
}
u.u_ar0[R1] = u.u_ar0[R0]; /* wf's fd */
u.u_ar0[R0] = r; /* rf's fd */
wf->f_flag = FWRITE|FPIPE;
wf->f_inode = ip;
rf->f_flag = FREAD|FPIPE;
rf->f_inode = ip;
ip->i_count = 2;
ip->i_flag = IACC|IUPD;
ip->i_mode = IALLOC;
}
O comentario describe claramente o que está a suceder aquí. Pero entender o código non é tan sinxelo, en parte pola forma "R0
и R1
pásanse os parámetros de chamada ao sistema e os valores de retorno.
Probemos con
pipe()
debe pasar R0
и R1
devolver os números do descritor do ficheiro para ler e escribir. falloc()
devolve un punteiro á estrutura do ficheiro, pero tamén "devolve" vía u.u_ar0[R0]
e un descritor de ficheiros. É dicir, o código garda r
descriptor de ficheiro para ler e asigna un descritor de ficheiro para escribir directamente desde u.u_ar0[R0]
despois da segunda convocatoria falloc()
.
Bandeira FPIPE
, que establecemos ao crear a canalización, controla o comportamento da función
/*
* common code for read and write calls:
* check permissions, set base, count, and offset,
* and switch out to readi, writei, or pipe code.
*/
rdwr(mode)
{
register *fp, m;
m = mode;
fp = getf(u.u_ar0[R0]);
/* … */
if(fp->f_flag&FPIPE) {
if(m==FREAD)
readp(fp); else
writep(fp);
}
/* … */
}
Despois a función readp()
в pipe.c
le datos da canalización. Pero é mellor rastrexar a implementación a partir de writep()
. De novo, o código volveuse máis complexo debido ás convencións de pasar argumentos, pero pódense omitir algúns detalles.
writep(fp)
{
register *rp, *ip, c;
rp = fp;
ip = rp->f_inode;
c = u.u_count;
loop:
/* If all done, return. */
plock(ip);
if(c == 0) {
prele(ip);
u.u_count = 0;
return;
}
/*
* If there are not both read and write sides of the
* pipe active, return error and signal too.
*/
if(ip->i_count < 2) {
prele(ip);
u.u_error = EPIPE;
psignal(u.u_procp, SIGPIPE);
return;
}
/*
* If the pipe is full, wait for reads to deplete
* and truncate it.
*/
if(ip->i_size1 == PIPSIZ) {
ip->i_mode =| IWRITE;
prele(ip);
sleep(ip+1, PPIPE);
goto loop;
}
/* Write what is possible and loop back. */
u.u_offset[0] = 0;
u.u_offset[1] = ip->i_size1;
u.u_count = min(c, PIPSIZ-u.u_offset[1]);
c =- u.u_count;
writei(ip);
prele(ip);
if(ip->i_mode&IREAD) {
ip->i_mode =& ~IREAD;
wakeup(ip+2);
}
goto loop;
}
Queremos escribir bytes na entrada da canalización u.u_count
. Primeiro necesitamos bloquear o inodo (ver a continuación plock
/prele
).
Despois comprobamos o contador de referencia do inodo. Mentres os dous extremos da canalización permanezan abertos, o contador debe ser igual a 2. Temos un enlace (de rp->f_inode
), polo que se o contador é inferior a 2, debe significar que o proceso de lectura pechou o seu extremo da canalización. Noutras palabras, estamos tentando escribir nunha canalización pechada e isto é un erro. Código de erro por primeira vez EPIPE
e sinal SIGPIPE
apareceu na sexta edición de Unix.
Pero aínda que o transportador estea aberto, pode estar cheo. Neste caso, soltamos o bloqueo e imos a durmir coa esperanza de que outro proceso lea da canalización e libere suficiente espazo nel. Despois de espertar, volvemos ao principio, colgamos de novo a pechadura e comezamos un novo ciclo de gravación.
Se hai suficiente espazo libre na canalización, escribimos datos nel usando i_size1
en inode (se a canalización está baleira pode ser igual a 0) indica o final dos datos que xa contén. Se hai espazo suficiente para gravar, podemos encher a canalización desde i_size1
para PIPESIZ
. A continuación, liberamos o bloqueo e tentamos espertar calquera proceso que estea esperando para ler desde a canalización. Volvemos ao principio para ver se eramos capaces de escribir tantos bytes como precisaramos. Se falla, comezamos un novo ciclo de gravación.
Normalmente o parámetro i_mode
inode úsase para almacenar permisos r
, w
и x
. Pero no caso das canalizacións, sinalamos que algún proceso está á espera dunha escritura ou lectura mediante bits IREAD
и IWRITE
respectivamente. O proceso establece a bandeira e chama sleep()
, e espérase que algún outro proceso no futuro cause wakeup()
.
A verdadeira maxia ocorre en sleep()
и wakeup()
. Están implementados en
/*
* Give up the processor till a wakeup occurs
* on chan, at which time the process
* enters the scheduling queue at priority pri.
* The most important effect of pri is that when
* pri<0 a signal cannot disturb the sleep;
* if pri>=0 signals will be processed.
* Callers of this routine must be prepared for
* premature return, and check that the reason for
* sleeping has gone away.
*/
sleep(chan, pri) /* … */
/*
* Wake up all processes sleeping on chan.
*/
wakeup(chan) /* … */
O proceso que provoca sleep()
para unha canle en particular, pode ser espertado máis tarde por outro proceso, o que provocará wakeup()
para a mesma canle. writep()
и readp()
coordinar as súas accións mediante chamadas pareadas. teña en conta que pipe.c
sempre dá prioridade PPIPE
cando se chama sleep()
, así que é todo sleep()
pode ser interrompido por un sinal.
Agora temos todo para entender a función readp()
:
readp(fp)
int *fp;
{
register *rp, *ip;
rp = fp;
ip = rp->f_inode;
loop:
/* Very conservative locking. */
plock(ip);
/*
* If the head (read) has caught up with
* the tail (write), reset both to 0.
*/
if(rp->f_offset[1] == ip->i_size1) {
if(rp->f_offset[1] != 0) {
rp->f_offset[1] = 0;
ip->i_size1 = 0;
if(ip->i_mode&IWRITE) {
ip->i_mode =& ~IWRITE;
wakeup(ip+1);
}
}
/*
* If there are not both reader and
* writer active, return without
* satisfying read.
*/
prele(ip);
if(ip->i_count < 2)
return;
ip->i_mode =| IREAD;
sleep(ip+2, PPIPE);
goto loop;
}
/* Read and return */
u.u_offset[0] = 0;
u.u_offset[1] = rp->f_offset[1];
readi(ip);
rp->f_offset[1] = u.u_offset[1];
prele(ip);
}
Pode que che resulte máis doado ler esta función de abaixo cara arriba. A rama "lectura e devolución" adoita utilizarse cando hai algúns datos na canalización. Neste caso, usamos f_offset
lectura e, a continuación, actualice o valor da compensación correspondente.
Nas lecturas posteriores, a canalización estará baleira se se alcanzou a compensación de lectura i_size1
no inodo. Restablecemos a posición a 0 e tentamos espertar calquera proceso que queira escribir na canalización. Sabemos que cando o transportador está cheo, writep()
quedará durmido ip+1
. E agora que a canalización está baleira, podemos espertala para retomar o seu ciclo de escritura.
Se non tes nada que ler, entón readp()
pode poñer unha bandeira IREAD
e adormecer ip+2
. Sabemos o que o espertará writep()
, cando escribe algúns datos na canalización.
Comentarios sobre u
"Podemos tratalos como funcións de E/S normais que toman un ficheiro, unha posición, un búfer na memoria e contan o número de bytes para ler ou escribir.
/*
* Read the file corresponding to
* the inode pointed at by the argument.
* The actual read arguments are found
* in the variables:
* u_base core address for destination
* u_offset byte offset in file
* u_count number of bytes to read
* u_segflg read to kernel/user
*/
readi(aip)
struct inode *aip;
/* … */
/*
* Write the file corresponding to
* the inode pointed at by the argument.
* The actual write arguments are found
* in the variables:
* u_base core address for source
* u_offset byte offset in file
* u_count number of bytes to write
* u_segflg write to kernel/user
*/
writei(aip)
struct inode *aip;
/* … */
En canto ao bloqueo "conservador", pois readp()
и writep()
bloquear o inodo ata que rematen o seu traballo ou reciban un resultado (é dicir, chame wakeup
). plock()
и prele()
traballar de forma sinxela: usando un conxunto diferente de chamadas sleep
и wakeup
permítenos espertar calquera proceso que necesite o bloqueo que acabamos de liberar:
/*
* Lock a pipe.
* If its already locked, set the WANT bit and sleep.
*/
plock(ip)
int *ip;
{
register *rp;
rp = ip;
while(rp->i_flag&ILOCK) {
rp->i_flag =| IWANT;
sleep(rp, PPIPE);
}
rp->i_flag =| ILOCK;
}
/*
* Unlock a pipe.
* If WANT bit is on, wakeup.
* This routine is also used to unlock inodes in general.
*/
prele(ip)
int *ip;
{
register *rp;
rp = ip;
rp->i_flag =& ~ILOCK;
if(rp->i_flag&IWANT) {
rp->i_flag =& ~IWANT;
wakeup(rp);
}
}
Ao principio non podía entender por que readp()
non provoca prele(ip)
antes da chamada wakeup(ip+1)
. O primeiro é writep()
provoca no seu ciclo, isto plock(ip)
, o que leva ao punto morto se readp()
aínda non eliminei o meu bloqueo, polo que de algunha maneira o código debe funcionar correctamente. Se miras wakeup()
, entón queda claro que só marca o proceso de durmir como listo para executarse, de xeito que no futuro sched()
realmente lanzouno. Entón readp()
causas wakeup()
, quita o bloqueo, pon IREAD
e chamadas sleep(ip+2)
- todo isto antes writep()
retoma o ciclo.
Complétase así a descrición dos transportadores na sexta edición. Código sinxelo, consecuencias de gran alcance.
Xv6, un núcleo simple tipo Unix
Para crear o núcleo
O código contén unha implementación clara e reflexiva pipealloc()
:
#define PIPESIZE 512
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
int
pipealloc(struct file **f0, struct file **f1)
{
struct pipe *p;
p = 0;
*f0 = *f1 = 0;
if((*f0 = filealloc()) == 0 || (*f1 = filealloc()) == 0)
goto bad;
if((p = (struct pipe*)kalloc()) == 0)
goto bad;
p->readopen = 1;
p->writeopen = 1;
p->nwrite = 0;
p->nread = 0;
initlock(&p->lock, "pipe");
(*f0)->type = FD_PIPE;
(*f0)->readable = 1;
(*f0)->writable = 0;
(*f0)->pipe = p;
(*f1)->type = FD_PIPE;
(*f1)->readable = 0;
(*f1)->writable = 1;
(*f1)->pipe = p;
return 0;
bad:
if(p)
kfree((char*)p);
if(*f0)
fileclose(*f0);
if(*f1)
fileclose(*f1);
return -1;
}
pipealloc()
establece o estado do resto da implementación, que inclúe as funcións piperead()
, pipewrite()
и pipeclose()
. Chamada do sistema real sys_pipe
é un envoltorio implementado en
Linux 0.01
Pódese atopar o código fonte de Linux 0.01. Será instructivo estudar a implantación de canalizacións no seu fs
/pipe.c
. Isto usa un inodo para representar a canalización, pero a canalización en si está escrita en C moderno. Se traballaches no código da 6ª edición, non terás ningún problema aquí. Este é o aspecto da función write_pipe()
:
int write_pipe(struct m_inode * inode, char * buf, int count)
{
char * b=buf;
wake_up(&inode->i_wait);
if (inode->i_count != 2) { /* no readers */
current->signal |= (1<<(SIGPIPE-1));
return -1;
}
while (count-->0) {
while (PIPE_FULL(*inode)) {
wake_up(&inode->i_wait);
if (inode->i_count != 2) {
current->signal |= (1<<(SIGPIPE-1));
return b-buf;
}
sleep_on(&inode->i_wait);
}
((char *)inode->i_size)[PIPE_HEAD(*inode)] =
get_fs_byte(b++);
INC_PIPE( PIPE_HEAD(*inode) );
wake_up(&inode->i_wait);
}
wake_up(&inode->i_wait);
return b-buf;
}
Sen sequera mirar as definicións da estrutura, pode descubrir como se usa o reconto de referencia do inodo para comprobar se unha operación de escritura resulta en SIGPIPE
. Ademais de traballar byte a byte, esta función é fácil de comparar coas ideas descritas anteriormente. Incluso lóxica sleep_on
/wake_up
non parece tan alleo.
Núcleos de Linux modernos, FreeBSD, NetBSD, OpenBSD
Axiña atravesei algúns núcleos modernos. Ningún deles ten unha implementación de disco máis (non é sorprendente). Linux ten a súa propia implementación. Aínda que os tres núcleos BSD modernos conteñen implementacións baseadas en código que foi escrito por John Dyson, co paso dos anos fixéronse demasiado diferentes entre si.
Ler fs
/pipe.c
(en Linux) ou sys
/kern
/sys_pipe.c
(en *BSD), é necesario dedicación real. O código de hoxe trata sobre o rendemento e compatibilidade con funcións como a E/S vectorial e asincrónica. E os detalles da asignación de memoria, bloqueos e configuración do núcleo varían moito. Isto non é o que necesitan as universidades para un curso de iniciación aos sistemas operativos.
De todos os xeitos, estaba interesado en desenterrar algúns patróns antigos (como xerar SIGPIPE
e volver EPIPE
ao escribir nunha canalización pechada) en todos estes núcleos modernos diferentes. Probablemente nunca vexa un ordenador PDP-11 na vida real, pero aínda hai moito que aprender do código que se escribiu anos antes de que eu nacera.
Un artigo escrito por Divi Kapoor en 2011:
Fonte: www.habr.com