Este artigo descreve a implementação de pipelines no kernel Unix. Fiquei um pouco desapontado com o fato de um artigo recente intitulado "
O que estamos falando?
Pipelines, “provavelmente a invenção mais importante do Unix”, são uma característica definidora da filosofia subjacente do Unix de vincular pequenos programas, bem como um sinal familiar na linha de comando:
$ echo hello | wc -c
6
Esta funcionalidade depende da chamada de sistema fornecida pelo kernel pipe
, que está descrito nas páginas de documentação
Pipelines fornecem um canal unidirecional para comunicação entre processos. O pipeline tem uma entrada (final de gravação) e uma saída (final de leitura). Os dados gravados na entrada do pipeline podem ser lidos na saída.
O pipeline é criado usando a chamada
pipe(2)
, que retorna dois descritores de arquivo: um referente à entrada do pipeline, o segundo à saída.
A saída de rastreamento do comando acima mostra a criação do pipeline e o fluxo de dados através dele de um processo para outro:
$ strace -qf -e execve,pipe,dup2,read,write
sh -c 'echo hello | wc -c'
execve("/bin/sh", ["sh", "-c", "echo hello | wc -c"], …)
pipe([3, 4]) = 0
[pid 2604795] dup2(4, 1) = 1
[pid 2604795] write(1, "hellon", 6) = 6
[pid 2604796] dup2(3, 0) = 0
[pid 2604796] execve("/usr/bin/wc", ["wc", "-c"], …)
[pid 2604796] read(0, "hellon", 16384) = 6
[pid 2604796] write(1, "6n", 2) = 2
O processo pai chama pipe()
para obter descritores de arquivo montados. Um processo filho grava em um identificador e outro processo lê os mesmos dados de outro identificador. O shell usa dup2 para "renomear" os descritores 3 e 4 para corresponder a stdin e stdout.
Sem pipes, o shell teria que gravar o resultado de um processo em um arquivo e passá-lo para outro processo para ler os dados do arquivo. Como resultado, desperdiçaríamos mais recursos e espaço em disco. No entanto, os pipelines são bons não apenas porque permitem evitar o uso de arquivos temporários:
Se um processo estiver tentando ler um pipeline vazio, então
read(2)
será bloqueado até que os dados estejam disponíveis. Se um processo tentar gravar em um pipeline completo, entãowrite(2)
será bloqueado até que dados suficientes sejam lidos no pipeline para executar a gravação.
Assim como o requisito POSIX, esta é uma propriedade importante: gravar no pipeline até PIPE_BUF
os bytes (pelo menos 512) devem ser atômicos para que os processos possam se comunicar entre si através do pipeline de uma forma que os arquivos regulares (que não fornecem tais garantias) não conseguem.
Ao usar um arquivo normal, um processo pode gravar toda a sua saída nele e passá-la para outro processo. Ou os processos podem operar em modo altamente paralelo, usando um mecanismo de sinalização externo (como um semáforo) para notificar uns aos outros quando uma gravação ou leitura for concluída. Os transportadores nos salvam de todo esse incômodo.
O que você está procurando?
Vou explicar em termos simples para que seja mais fácil para você imaginar como um transportador pode funcionar. Você precisará alocar um buffer e algum estado na memória. Você precisará de funções para adicionar e remover dados do buffer. Você precisará de alguns meios para chamar funções durante operações de leitura e gravação em descritores de arquivo. E você precisará de bloqueios para implementar o comportamento especial descrito acima.
Agora estamos prontos para interrogar o código-fonte do kernel sob a luz de uma lâmpada para confirmar ou refutar nosso vago modelo mental. Mas esteja sempre preparado para o inesperado.
Para onde estamos olhando?
Não sei onde está meu exemplar do famoso livro”
Perambular pelos arquivos do TUHS é como visitar um museu. Podemos olhar para a nossa história partilhada e tenho respeito pelos muitos anos de esforço para recuperar pouco a pouco todo este material a partir de fitas e impressões antigas. E estou perfeitamente consciente dos fragmentos que ainda faltam.
Tendo satisfeito a nossa curiosidade sobre a história antiga dos transportadores, podemos olhar para os grãos modernos para comparação.
By the way, pipe
é o número de chamada do sistema 42 na tabela sysent[]
. Coincidência?
Kernels Unix tradicionais (1970–1974)
Não encontrei nenhum vestígio pipe(2)
nem em
O TUHS afirma que
Unix 1973ª Edição foi a última versão com kernel escrito em linguagem assembly, mas também a primeira versão com pipelines. Durante XNUMX, foram realizados trabalhos de melhoria da terceira edição, o kernel foi reescrito em C e assim surgiu a quarta edição do Unix.
Um leitor encontrou a digitalização de um documento no qual Doug McIlroy propunha a ideia de “conectar programas como uma mangueira de jardim”.
No livro de Brian Kernighan
Quando o Unix foi lançado, meu fascínio por corrotinas me levou a pedir ao autor do sistema operacional, Ken Thompson, que permitisse que os dados gravados em um processo fossem não apenas para o dispositivo, mas também para outro processo. Ken decidiu que era possível. Contudo, como minimalista, ele queria que cada função do sistema desempenhasse um papel significativo. Escrever diretamente entre processos é realmente uma grande vantagem em relação à gravação em um arquivo intermediário? Foi só quando fiz uma proposta específica com o nome cativante “pipeline” e uma descrição da sintaxe para interação entre processos que Ken finalmente exclamou: “Eu farei isso!”
E fez. Em uma noite fatídica, Ken mudou o kernel e o shell, corrigiu vários programas padrão para padronizar como eles aceitavam entradas (que poderiam vir de um pipeline) e também mudou os nomes dos arquivos. No dia seguinte, os pipelines começaram a ser amplamente utilizados em aplicações. No final da semana, as secretárias já os utilizavam para enviar documentos dos processadores de texto para a impressora. Um pouco mais tarde, Ken substituiu a API e a sintaxe originais para envolver o uso de pipelines por convenções mais limpas, que têm sido usadas desde então.
Infelizmente, o código-fonte da terceira edição do kernel Unix foi perdido. E embora tenhamos o código-fonte do kernel escrito em C
Temos documentação de texto para pipe(2)
de ambas as versões, então você pode começar pesquisando a documentação pipe(2)
é escrito em linguagem assembly e retorna apenas um descritor de arquivo, mas já fornece a funcionalidade básica esperada:
Chamada do sistema tubo cria um mecanismo de entrada/saída chamado pipeline. O descritor de arquivo retornado pode ser usado para operações de leitura e gravação. Quando algo é gravado no pipeline, até 504 bytes de dados são armazenados em buffer, após os quais o processo de gravação é suspenso. Ao ler do pipeline, os dados armazenados em buffer são retirados.
No ano seguinte, o kernel foi reescrito em C, e pipe(fildes)
»:
Chamada do sistema tubo cria um mecanismo de entrada/saída chamado pipeline. Os descritores de arquivo retornados podem ser usados em operações de leitura e gravação. Quando algo é gravado no pipeline, o identificador retornado em r1 (resp. fildes[1]) é usado, armazenado em buffer para 4096 bytes de dados, após o qual o processo de gravação é suspenso. Ao ler do pipeline, o identificador retornado para r0 (resp. fildes[0]) pega os dados.
Supõe-se que uma vez definido um pipeline, dois (ou mais) processos de comunicação (criados por chamadas subsequentes para garfo) transferirá dados do pipeline usando chamadas ler и escrever.
O shell possui uma sintaxe para definir uma matriz linear de processos conectados por um pipeline.
Chamadas para leitura de um pipeline vazio (sem dados em buffer) que possui apenas um final (todos os descritores de arquivo de gravação estão fechados) retornam "fim do arquivo". As chamadas para escrever em uma situação semelhante são ignoradas.
Mais antigo
Sexta edição do Unix (1975)
Vamos começar a ler o código-fonte do Unix
Durante muitos anos o livro Lions foi o único documento sobre o kernel Unix disponível fora do Bell Labs. Embora a licença da sexta edição permitisse aos professores a utilização do seu código-fonte, a licença da sétima edição excluía essa possibilidade, pelo que o livro foi distribuído sob a forma de cópias datilografadas ilegais.
Hoje você pode comprar uma reimpressão do livro, cuja capa mostra os alunos em uma copiadora. E graças a Warren Toomey (que iniciou o projeto TUHS) você pode baixar
Há mais de 15 anos, digitei uma cópia do código-fonte fornecido em Lions, porque não gostei da qualidade da minha cópia de um número desconhecido de outras cópias. O TUHS ainda não existia e eu não tinha acesso às fontes antigas. Mas em 1988 encontrei uma fita antiga de 9 trilhas que continha um backup de um computador PDP11. Era difícil dizer se estava funcionando, mas havia uma árvore /usr/src/ intacta na qual a maioria dos arquivos estava rotulada com o ano de 1979, que mesmo naquela época parecia antigo. Foi a sétima edição ou seu derivado PWB, como eu acreditava.
Tomei a descoberta como base e editei manualmente as fontes para a sexta edição. Parte do código permaneceu o mesmo, mas parte teve que ser ligeiramente editada, alterando o token += moderno para o =+ desatualizado. Algumas coisas foram simplesmente excluídas e outras tiveram que ser completamente reescritas, mas não muito.
E hoje podemos ler online no TUHS o código fonte da sexta edição do
A propósito, à primeira vista, a principal característica do código C antes do período de Kernighan e Ritchie é a sua concisão. Não é sempre que consigo inserir trechos de código sem uma edição extensa para caber em uma área de exibição relativamente estreita em meu site.
Cedo
/*
* Max allowable buffering per pipe.
* This is also the max size of the
* file created to implement the pipe.
* If this size is bigger than 4096,
* pipes will be implemented in LARG
* files, which is probably not good.
*/
#define PIPSIZ 4096
O tamanho do buffer não mudou desde a quarta edição. Mas aqui vemos, sem qualquer documentação pública, que os pipelines já usaram arquivos como armazenamento de backup!
Quanto aos arquivos LARG, eles correspondem a
Aqui está a verdadeira chamada do sistema pipe
:
/*
* The sys-pipe entry.
* Allocate an inode on the root device.
* Allocate 2 file structures.
* Put it all together with flags.
*/
pipe()
{
register *ip, *rf, *wf;
int r;
ip = ialloc(rootdev);
if(ip == NULL)
return;
rf = falloc();
if(rf == NULL) {
iput(ip);
return;
}
r = u.u_ar0[R0];
wf = falloc();
if(wf == NULL) {
rf->f_count = 0;
u.u_ofile[r] = NULL;
iput(ip);
return;
}
u.u_ar0[R1] = u.u_ar0[R0]; /* wf's fd */
u.u_ar0[R0] = r; /* rf's fd */
wf->f_flag = FWRITE|FPIPE;
wf->f_inode = ip;
rf->f_flag = FREAD|FPIPE;
rf->f_inode = ip;
ip->i_count = 2;
ip->i_flag = IACC|IUPD;
ip->i_mode = IALLOC;
}
O comentário descreve claramente o que está acontecendo aqui. Mas entender o código não é tão fácil, em parte por causa da forma como "R0
и R1
parâmetros de chamada do sistema e valores de retorno são passados.
Vamos tentar com
pipe()
deve passar R0
и R1
retorna números de descritores de arquivo para leitura e gravação. falloc()
retorna um ponteiro para a estrutura do arquivo, mas também "retorna" via u.u_ar0[R0]
e um descritor de arquivo. Ou seja, o código salva em r
descritor de arquivo para leitura e atribui um descritor de arquivo para gravação diretamente de u.u_ar0[R0]
após a segunda chamada falloc()
.
Bandeira FPIPE
, que definimos ao criar o pipeline, controla o comportamento da função
/*
* common code for read and write calls:
* check permissions, set base, count, and offset,
* and switch out to readi, writei, or pipe code.
*/
rdwr(mode)
{
register *fp, m;
m = mode;
fp = getf(u.u_ar0[R0]);
/* … */
if(fp->f_flag&FPIPE) {
if(m==FREAD)
readp(fp); else
writep(fp);
}
/* … */
}
Então a função readp()
в pipe.c
lê dados do pipeline. Mas é melhor rastrear a implementação a partir de writep()
. Novamente, o código tornou-se mais complexo devido às convenções de passagem de argumentos, mas alguns detalhes podem ser omitidos.
writep(fp)
{
register *rp, *ip, c;
rp = fp;
ip = rp->f_inode;
c = u.u_count;
loop:
/* If all done, return. */
plock(ip);
if(c == 0) {
prele(ip);
u.u_count = 0;
return;
}
/*
* If there are not both read and write sides of the
* pipe active, return error and signal too.
*/
if(ip->i_count < 2) {
prele(ip);
u.u_error = EPIPE;
psignal(u.u_procp, SIGPIPE);
return;
}
/*
* If the pipe is full, wait for reads to deplete
* and truncate it.
*/
if(ip->i_size1 == PIPSIZ) {
ip->i_mode =| IWRITE;
prele(ip);
sleep(ip+1, PPIPE);
goto loop;
}
/* Write what is possible and loop back. */
u.u_offset[0] = 0;
u.u_offset[1] = ip->i_size1;
u.u_count = min(c, PIPSIZ-u.u_offset[1]);
c =- u.u_count;
writei(ip);
prele(ip);
if(ip->i_mode&IREAD) {
ip->i_mode =& ~IREAD;
wakeup(ip+2);
}
goto loop;
}
Queremos escrever bytes na entrada do pipeline u.u_count
. Primeiro precisamos bloquear o inode (veja abaixo plock
/prele
).
Em seguida, verificamos o contador de referência do inode. Enquanto ambas as extremidades do pipeline permanecerem abertas, o contador deverá ser igual a 2. Mantemos um link (de rp->f_inode
), portanto, se o contador for menor que 2, isso deve significar que o processo de leitura fechou o final do pipeline. Em outras palavras, estamos tentando gravar em um pipeline fechado e isso é um erro. Código de erro pela primeira vez EPIPE
e sinal SIGPIPE
apareceu na sexta edição do Unix.
Mas mesmo que o transportador esteja aberto, pode estar cheio. Nesse caso, liberamos o bloqueio e adormecemos na esperança de que outro processo leia o pipeline e libere espaço suficiente nele. Ao acordar, voltamos ao início, desligamos novamente a fechadura e iniciamos um novo ciclo de gravação.
Se houver espaço livre suficiente no pipeline, gravaremos dados nele usando i_size1
inode (se o pipeline estiver vazio, pode ser igual a 0) indica o fim dos dados que ele já contém. Se houver espaço de gravação suficiente, podemos preencher o pipeline de i_size1
para PIPESIZ
. Em seguida, liberamos o bloqueio e tentamos ativar qualquer processo que esteja aguardando para ser lido no pipeline. Voltamos ao início para ver se conseguimos escrever quantos bytes precisávamos. Se falhar, iniciamos um novo ciclo de gravação.
Geralmente o parâmetro i_mode
inode é usado para armazenar permissões r
, w
и x
. Mas no caso de pipelines, sinalizamos que algum processo está aguardando uma gravação ou leitura usando bits IREAD
и IWRITE
respectivamente. O processo define o sinalizador e chama sleep()
, e espera-se que algum outro processo no futuro cause wakeup()
.
A verdadeira magia acontece em sleep()
и wakeup()
. Eles são implementados em
/*
* Give up the processor till a wakeup occurs
* on chan, at which time the process
* enters the scheduling queue at priority pri.
* The most important effect of pri is that when
* pri<0 a signal cannot disturb the sleep;
* if pri>=0 signals will be processed.
* Callers of this routine must be prepared for
* premature return, and check that the reason for
* sleeping has gone away.
*/
sleep(chan, pri) /* … */
/*
* Wake up all processes sleeping on chan.
*/
wakeup(chan) /* … */
O processo que causa sleep()
para um canal específico, poderá mais tarde ser despertado por outro processo, o que causará wakeup()
para o mesmo canal. writep()
и readp()
coordenar suas ações por meio dessas chamadas emparelhadas. Observe que pipe.c
sempre dá prioridade PPIPE
ao ligar sleep()
, Então é isso sleep()
pode ser interrompido por um sinal.
Agora temos tudo para entender a função readp()
:
readp(fp)
int *fp;
{
register *rp, *ip;
rp = fp;
ip = rp->f_inode;
loop:
/* Very conservative locking. */
plock(ip);
/*
* If the head (read) has caught up with
* the tail (write), reset both to 0.
*/
if(rp->f_offset[1] == ip->i_size1) {
if(rp->f_offset[1] != 0) {
rp->f_offset[1] = 0;
ip->i_size1 = 0;
if(ip->i_mode&IWRITE) {
ip->i_mode =& ~IWRITE;
wakeup(ip+1);
}
}
/*
* If there are not both reader and
* writer active, return without
* satisfying read.
*/
prele(ip);
if(ip->i_count < 2)
return;
ip->i_mode =| IREAD;
sleep(ip+2, PPIPE);
goto loop;
}
/* Read and return */
u.u_offset[0] = 0;
u.u_offset[1] = rp->f_offset[1];
readi(ip);
rp->f_offset[1] = u.u_offset[1];
prele(ip);
}
Você pode achar mais fácil ler esta função de baixo para cima. A ramificação "ler e retornar" geralmente é usada quando há alguns dados no pipeline. Neste caso, usamos f_offset
leitura e, em seguida, atualize o valor do deslocamento correspondente.
Nas leituras subsequentes, o pipeline estará vazio se o deslocamento de leitura atingir i_size1
no inode. Redefinimos a posição para 0 e tentamos ativar qualquer processo que queira gravar no pipeline. Sabemos que quando o transportador está cheio, writep()
vai adormecer ip+1
. E agora que o pipeline está vazio, podemos ativá-lo para retomar seu ciclo de gravação.
Se você não tem nada para ler, então readp()
pode definir uma bandeira IREAD
e adormecer ip+2
. Nós sabemos o que vai acordá-lo writep()
, quando grava alguns dados no pipeline.
Comentários no u
"Podemos tratá-los como funções normais de E/S que pegam um arquivo, uma posição, um buffer na memória e contam o número de bytes para ler ou escrever.
/*
* Read the file corresponding to
* the inode pointed at by the argument.
* The actual read arguments are found
* in the variables:
* u_base core address for destination
* u_offset byte offset in file
* u_count number of bytes to read
* u_segflg read to kernel/user
*/
readi(aip)
struct inode *aip;
/* … */
/*
* Write the file corresponding to
* the inode pointed at by the argument.
* The actual write arguments are found
* in the variables:
* u_base core address for source
* u_offset byte offset in file
* u_count number of bytes to write
* u_segflg write to kernel/user
*/
writei(aip)
struct inode *aip;
/* … */
Quanto ao bloqueio “conservador”, então readp()
и writep()
bloquear o inode até que eles terminem seu trabalho ou recebam um resultado (ou seja, ligue wakeup
). plock()
и prele()
trabalhe de forma simples: usando um conjunto diferente de chamadas sleep
и wakeup
nos permite ativar qualquer processo que precise do bloqueio que acabamos de liberar:
/*
* Lock a pipe.
* If its already locked, set the WANT bit and sleep.
*/
plock(ip)
int *ip;
{
register *rp;
rp = ip;
while(rp->i_flag&ILOCK) {
rp->i_flag =| IWANT;
sleep(rp, PPIPE);
}
rp->i_flag =| ILOCK;
}
/*
* Unlock a pipe.
* If WANT bit is on, wakeup.
* This routine is also used to unlock inodes in general.
*/
prele(ip)
int *ip;
{
register *rp;
rp = ip;
rp->i_flag =& ~ILOCK;
if(rp->i_flag&IWANT) {
rp->i_flag =& ~IWANT;
wakeup(rp);
}
}
No começo eu não conseguia entender por que readp()
não causa prele(ip)
antes da chamada wakeup(ip+1)
. A primeira coisa é writep()
causa em seu ciclo, isso plock(ip)
, o que leva ao impasse se readp()
ainda não removi meu bloco, então de alguma forma o código deve funcionar corretamente. Se você olhar wakeup()
, então fica claro que ele apenas marca o processo adormecido como pronto para execução, para que no futuro sched()
realmente lançou. Então readp()
causa wakeup()
, remove o bloqueio, define IREAD
e chamadas sleep(ip+2)
- tudo isso antes writep()
retoma o ciclo.
Isto completa a descrição dos transportadores na sexta edição. Código simples, consequências de longo alcance.
Xv6, um kernel simples do tipo Unix
Para criar um núcleo
O código contém uma implementação clara e cuidadosa pipealloc()
:
#define PIPESIZE 512
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
int
pipealloc(struct file **f0, struct file **f1)
{
struct pipe *p;
p = 0;
*f0 = *f1 = 0;
if((*f0 = filealloc()) == 0 || (*f1 = filealloc()) == 0)
goto bad;
if((p = (struct pipe*)kalloc()) == 0)
goto bad;
p->readopen = 1;
p->writeopen = 1;
p->nwrite = 0;
p->nread = 0;
initlock(&p->lock, "pipe");
(*f0)->type = FD_PIPE;
(*f0)->readable = 1;
(*f0)->writable = 0;
(*f0)->pipe = p;
(*f1)->type = FD_PIPE;
(*f1)->readable = 0;
(*f1)->writable = 1;
(*f1)->pipe = p;
return 0;
bad:
if(p)
kfree((char*)p);
if(*f0)
fileclose(*f0);
if(*f1)
fileclose(*f1);
return -1;
}
pipealloc()
define o estado do resto da implementação, que inclui as funções piperead()
, pipewrite()
и pipeclose()
. Chamada de sistema real sys_pipe
é um wrapper implementado em
Linux 0.01
O código-fonte do Linux 0.01 pode ser encontrado. Será instrutivo estudar a implementação de pipelines em seu fs
/pipe.c
. Isso usa um inode para representar o pipeline, mas o pipeline em si é escrito em C moderno. Se você já trabalhou no código da XNUMXª edição, não terá problemas aqui. Esta é a aparência da função write_pipe()
:
int write_pipe(struct m_inode * inode, char * buf, int count)
{
char * b=buf;
wake_up(&inode->i_wait);
if (inode->i_count != 2) { /* no readers */
current->signal |= (1<<(SIGPIPE-1));
return -1;
}
while (count-->0) {
while (PIPE_FULL(*inode)) {
wake_up(&inode->i_wait);
if (inode->i_count != 2) {
current->signal |= (1<<(SIGPIPE-1));
return b-buf;
}
sleep_on(&inode->i_wait);
}
((char *)inode->i_size)[PIPE_HEAD(*inode)] =
get_fs_byte(b++);
INC_PIPE( PIPE_HEAD(*inode) );
wake_up(&inode->i_wait);
}
wake_up(&inode->i_wait);
return b-buf;
}
Sem sequer olhar para as definições de estrutura, você pode descobrir como a contagem de referência de inode é usada para verificar se uma operação de gravação resulta em SIGPIPE
. Além de funcionar byte a byte, esta função é fácil de comparar com as ideias descritas acima. Até lógica sleep_on
/wake_up
não parece tão estranho.
Kernels Linux modernos, FreeBSD, NetBSD, OpenBSD
Eu rapidamente examinei alguns kernels modernos. Nenhum deles tem mais implementação de disco (o que não é surpreendente). Linux tem sua própria implementação. Embora os três kernels BSD modernos contenham implementações baseadas no código escrito por John Dyson, com o passar dos anos eles se tornaram muito diferentes uns dos outros.
Ler fs
/pipe.c
(no Linux) ou sys
/kern
/sys_pipe.c
(no *BSD), é preciso muita dedicação. O código de hoje é sobre desempenho e suporte para recursos como E/S vetorial e assíncrona. E os detalhes de alocação de memória, bloqueios e configuração do kernel variam muito. Não é disso que as faculdades precisam para um curso introdutório a sistemas operacionais.
De qualquer forma, eu estava interessado em desenterrar alguns padrões antigos (como gerar SIGPIPE
e retorno EPIPE
ao escrever em um pipeline fechado) em todos esses diferentes kernels modernos. Provavelmente nunca verei um computador PDP-11 na vida real, mas ainda há muito a aprender com o código que foi escrito anos antes de eu nascer.
Um artigo escrito por Divi Kapoor em 2011:
Fonte: habr.com