GOSTIM: P2P F2F E2EE IM em uma noite com criptografia GOST

Ser um desenvolvedor PyGOST bibliotecas (primitivas criptográficas GOST em Python puro), muitas vezes recebo perguntas sobre como implementar as mensagens seguras mais simples no joelho. Muitas pessoas consideram a criptografia aplicada bastante simples, e chamar .encrypt() em uma cifra de bloco será suficiente para enviá-la com segurança por um canal de comunicação. Outros acreditam que a criptografia aplicada é o destino de poucos, e é aceitável que empresas ricas como a Telegram com matemáticos das olimpíadas não pode implementar protocolo seguro.

Tudo isso me levou a escrever este artigo para mostrar que implementar protocolos criptográficos e mensagens instantâneas seguras não é uma tarefa tão difícil. No entanto, não vale a pena inventar seus próprios protocolos de autenticação e de acordo de chaves.

GOSTIM: P2P F2F E2EE IM em uma noite com criptografia GOST
O artigo será escrito peer-to-peer, amigo para amigo, criptografado de ponta a ponta mensageiro instantâneo com SIGMA-I protocolo de autenticação e acordo de chave (com base no qual é implementado IKE IPsec), usando exclusivamente algoritmos criptográficos GOST, biblioteca PyGOST e biblioteca de codificação de mensagens ASN.1 PyDERASN (sobre o qual eu já escreveu antes). Um pré-requisito: deve ser tão simples que possa ser escrito do zero em uma noite (ou dia de trabalho), caso contrário não será mais um programa simples. Provavelmente contém erros, complicações desnecessárias, deficiências, além de ser meu primeiro programa usando a biblioteca asyncio.

Design de mensagens instantâneas

Primeiro, precisamos entender como será a nossa mensagem instantânea. Para simplificar, seja uma rede peer-to-peer, sem qualquer descoberta de participantes. Indicaremos pessoalmente qual endereço: porta de conexão para comunicação com o interlocutor.

Entendo que, neste momento, a suposição de que a comunicação direta está disponível entre dois computadores arbitrários é uma limitação significativa à aplicabilidade do IM na prática. Mas quanto mais os desenvolvedores implementarem todos os tipos de muletas de passagem NAT, mais tempo permaneceremos na Internet IPv4, com uma probabilidade deprimente de comunicação entre computadores arbitrários. Por quanto tempo você consegue tolerar a falta de IPv6 em casa e no trabalho?

Teremos uma rede amigo a amigo: todos os possíveis interlocutores deverão ser conhecidos previamente. Em primeiro lugar, isto simplifica muito tudo: apresentamos-nos, encontramos ou não encontramos o nome/chave, desligamos ou continuamos a trabalhar, conhecendo o interlocutor. Em segundo lugar, em geral, é seguro e elimina muitos ataques.

A interface de IM estará próxima das soluções clássicas projetos inúteis, que eu realmente gosto por seu minimalismo e filosofia Unix. O programa IM cria um diretório com três soquetes de domínio Unix para cada interlocutor:

  • in— nele ficam gravadas as mensagens enviadas ao interlocutor;
  • out - dele são lidas as mensagens recebidas do interlocutor;
  • estado - lendo-o, descobrimos se o interlocutor está conectado no momento, o endereço/porta da conexão.

Além disso, é criado um soquete conn, escrevendo a porta host na qual iniciamos uma conexão com o interlocutor remoto.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Essa abordagem permite que você faça implementações independentes de transporte de mensagens instantâneas e interface de usuário, porque não existe gosto e cor de ninguém, você não pode agradar a todos. Usando tmux e / ou multicauda, você pode obter uma interface de múltiplas janelas com destaque de sintaxe. E com a ajuda rl wrap você pode obter uma linha de entrada de mensagem compatível com GNU Readline.

Na verdade, projetos inúteis usam arquivos FIFO. Pessoalmente, eu não conseguia entender como trabalhar com arquivos de forma competitiva em assíncio sem um histórico escrito à mão de threads dedicados (eu uso a linguagem para essas coisas há muito tempo Go). Portanto, decidi me contentar com soquetes de domínio Unix. Infelizmente, isso torna impossível fazer echo 2001:470:dead::babe 6666 > conn. Eu resolvi esse problema usando socat: eco 2001:470:morto::babe 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alice/in.

O protocolo inseguro original

O TCP é utilizado como transporte: garante a entrega e seu pedido. O UDP não garante nenhum dos dois (o que seria útil quando a criptografia é usada), mas suporta SCTP Python não sai da caixa.

Infelizmente, no TCP não existe o conceito de mensagem, apenas um fluxo de bytes. Portanto, é necessário criar um formato de mensagens para que possam ser compartilhadas entre si neste tópico. Podemos concordar em usar o caractere de alimentação de linha. Para começar, tudo bem, mas assim que começarmos a criptografar nossas mensagens, esse caractere poderá aparecer em qualquer lugar do texto cifrado. Nas redes, portanto, são populares os protocolos que enviam primeiro o comprimento da mensagem em bytes. Por exemplo, o Python pronto para uso possui xdrlib, que permite trabalhar com um formato semelhante XDR.

Não trabalharemos de maneira correta e eficiente com a leitura TCP - simplificaremos o código. Lemos os dados do soquete em um loop infinito até decodificar a mensagem completa. JSON com XML também pode ser usado como formato para esta abordagem. Mas quando a criptografia é adicionada, os dados terão que ser assinados e autenticados - e isso exigirá uma representação idêntica de objetos byte por byte, que o JSON/XML não fornece (os resultados dos despejos podem variar).

XDR é adequado para esta tarefa, porém eu escolho ASN.1 com codificação DER e PyDERASN biblioteca, pois teremos em mãos objetos de alto nível com os quais muitas vezes é mais agradável e conveniente trabalhar. Ao contrário do sem esquema Bencode, Pacote de mensagens ou CBOR, o ASN.1 verificará automaticamente os dados em relação a um esquema codificado.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

A mensagem recebida será Msg: um texto MsgText (com um campo de texto por enquanto) ou uma mensagem de handshake MsgHandshake (que contém o nome do interlocutor). Agora parece complicado demais, mas esta é uma base para o futuro.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │MsgHandshake(IdA ) │ │───────── ────────>│ │ │ │MsgHandshake(IdB) │ │<─────────────────│ │ │ │ MsgText() │ │──── MsgText() │ │ │

IM sem criptografia

Como já disse, a biblioteca asyncio será usada para todas as operações de soquete. Vamos anunciar o que esperamos no lançamento:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Defina seu próprio nome (--nosso-nome Alice). Todos os interlocutores esperados são listados separados por vírgulas (—seus nomes bob,eve). Para cada um dos interlocutores é criado um diretório com soquetes Unix, bem como uma corrotina para cada estado de entrada, saída:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

As mensagens vindas do usuário do soquete in são enviadas para a fila IN_QUEUES:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

As mensagens provenientes dos interlocutores são enviadas para filas OUT_QUEUES, das quais os dados são gravados no soquete de saída:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Ao ler de um soquete de estado, o programa procura o endereço do interlocutor no dicionário PEER_ALIVE. Se ainda não houver conexão com o interlocutor, será escrita uma linha em branco.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Ao escrever um endereço em um soquete conn, a função “iniciador” de conexão é iniciada:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Vamos considerar o iniciador. Primeiro, obviamente, abre uma conexão com o host/porta especificado e envia uma mensagem de handshake com seu nome:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Em seguida, aguarda uma resposta do ponto remoto. Tenta decodificar a resposta recebida usando o esquema Msg ASN.1. Assumimos que a mensagem inteira será enviada em um segmento TCP e a receberemos atomicamente ao chamar .read(). Verificamos se recebemos a mensagem de handshake.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Verificamos se conhecemos o nome recebido do interlocutor. Caso contrário, interrompemos a conexão. Verificamos se já estabelecemos conexão com ele (o interlocutor deu novamente o comando para se conectar conosco) e fechamos. A fila IN_QUEUES contém strings Python com o texto da mensagem, mas tem um valor especial None que sinaliza à co-rotina msg_sender para parar de funcionar para que ela esqueça seu gravador associado à conexão TCP herdada.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender aceita mensagens de saída (enfileiradas em um soquete in), serializa-as em uma mensagem MsgText e as envia por uma conexão TCP. Ele pode quebrar a qualquer momento - nós interceptamos isso claramente.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

No final, o iniciador entra em um loop infinito de leitura de mensagens do soquete. Verifica se estas mensagens são mensagens de texto e as coloca na fila OUT_QUEUES, de onde serão enviadas para o soquete de saída do interlocutor correspondente. Por que você não pode simplesmente fazer .read() e decodificar a mensagem? Porque é possível que várias mensagens do usuário sejam agregadas no buffer do sistema operacional e enviadas em um segmento TCP. Podemos decodificar o primeiro e então parte do subsequente pode permanecer no buffer. Caso ocorra alguma situação anormal, fechamos a conexão TCP e paramos a corrotina msg_sender (enviando None para a fila OUT_QUEUES).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Vamos voltar ao código principal. Após criar todas as corrotinas no momento da inicialização do programa, iniciamos o servidor TCP. Para cada conexão estabelecida, ele cria uma corrotina de resposta.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

respondedor é semelhante ao iniciador e espelha todas as mesmas ações, mas o loop infinito de leitura de mensagens começa imediatamente, para simplificar. Atualmente, o protocolo de handshake envia uma mensagem de cada lado, mas no futuro serão duas do iniciador da conexão, após as quais as mensagens de texto poderão ser enviadas imediatamente.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Protocolo seguro

É hora de proteger nossas comunicações. O que queremos dizer com segurança e o que queremos:

  • confidencialidade das mensagens transmitidas;
  • autenticidade e integridade das mensagens transmitidas – suas alterações devem ser detectadas;
  • proteção contra ataques de repetição - o fato de mensagens perdidas ou repetidas deve ser detectado (e decidimos encerrar a conexão);
  • identificação e autenticação de interlocutores através de chaves públicas pré-inseridas - já decidimos anteriormente que estávamos fazendo uma rede amigo para amigo. Somente após a autenticação entenderemos com quem estamos nos comunicando;
  • disponibilidade sigilo perfeito para a frente propriedades (PFS) - comprometer nossa chave de assinatura de longa duração não deve levar à capacidade de ler toda a correspondência anterior. A gravação do tráfego interceptado torna-se inútil;
  • validade/validade de mensagens (transporte e handshake) apenas dentro de uma sessão TCP. Não deverá ser possível inserir mensagens corretamente assinadas/autenticadas de outra sessão (mesmo com o mesmo interlocutor);
  • um observador passivo não deve ver identificadores de usuário, chaves públicas de longa duração transmitidas ou hashes deles. Um certo anonimato de um observador passivo.

Surpreendentemente, quase todo mundo quer ter esse mínimo em qualquer protocolo de handshake, e muito pouco do que foi dito acima é atendido em protocolos “caseiros”. Agora não vamos inventar nada de novo. Eu definitivamente recomendaria usar Estrutura de ruído para construir protocolos, mas vamos escolher algo mais simples.

Os dois protocolos mais populares são:

  • TLS - um protocolo muito complexo com um longo histórico de bugs, ombreiras, vulnerabilidades, pensamento pobre, complexidade e deficiências (no entanto, isso tem pouco a ver com o TLS 1.3). Mas não consideramos isso porque é muito complicado.
  • IPsec с IKE — não apresentam problemas criptográficos graves, embora também não sejam simples. Se você leu sobre IKEv1 e IKEv2, então a fonte deles é STS, protocolos ISO/IEC IS 9798-3 e SIGMA (SIGn-and-MAc) - simples o suficiente para implementar em uma noite.

O que há de bom no SIGMA, como o elo mais recente no desenvolvimento de protocolos STS/ISO? Ele atende a todos os nossos requisitos (incluindo “ocultar” identificadores de interlocutores) e não apresenta problemas criptográficos conhecidos. É minimalista – remover pelo menos um elemento da mensagem do protocolo levará à sua insegurança.

Vamos do protocolo caseiro mais simples ao SIGMA. A operação mais básica na qual estamos interessados ​​é acordo chave: uma função que gera o mesmo valor para ambos os participantes, que pode ser usada como uma chave simétrica. Sem entrar em detalhes: cada uma das partes gera um par de chaves efêmeras (usadas apenas em uma sessão) (chaves públicas e privadas), trocam chaves públicas, chamam a função de acordo, para cuja entrada passam sua chave privada e a pública chave do interlocutor.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ══════════╗ │───────────────>│ ║PrvA, PubA = DHgen()║ │ │ ╚═ ════════ ═══════════╝ │ IdB, PubB │ ╔════════════════════╗ │ <───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚════════════════════╝ ─ ───┐ ╔════ ═══╧════════════╗ │ ║Chave = DH(PrvA, PubB)║ <───┘ ╚═══════╤═ ═══════ ════╝ │ │ │ │

Qualquer um pode intervir e substituir as chaves públicas pelas suas próprias - não há autenticação de interlocutores neste protocolo. Vamos adicionar uma assinatura com chaves de longa duração.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │IdA, PubA, sinal(S ignPrvA, (PubA)) │ ╔═ │──────────── ────────── ───────────>│ ║SignPrvA, SignP ubA = carregar()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════ ═══════ ═════════════╝ │IdB, PubB, sinal( SignPrvB, (PubB)) │ ╔══════════════ ═══════ ══════╗ │<─────── ─────────── ───────────── ──│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══ ════════ ══════════════ ══╝ ────┐ ╔ ════════════════ ═════╗ │ │ ║verificar( SignPubB, ...)║ │ <───┘ ║Key = DH(Pr vA, PubB) ║ │ │ ╚════════════════════ ═╝ │ │ │

Essa assinatura não funcionará, pois não está vinculada a uma sessão específica. Tais mensagens também são “adequadas” para sessões com outros participantes. Todo o contexto deve se inscrever. Isto obriga-nos a acrescentar também outra mensagem de A.

Além disso, é fundamental adicionar seu próprio identificador na assinatura, caso contrário podemos substituir IdXXX e assinar novamente a mensagem com a chave de outro interlocutor conhecido. Prevenir ataques de reflexão, é necessário que os elementos sob a assinatura estejam em locais claramente definidos de acordo com seu significado: se A assina (PubA, PubB), então B deve assinar (PubB, PubA). Isso também mostra a importância de escolher a estrutura e o formato dos dados serializados. Por exemplo, os conjuntos na codificação ASN.1 DER são classificados: SET OF(PubA, PubB) será idêntico a SET OF(PubB, PubA).

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │───────────────────── ────────── ─────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═════ ═══════ ═══════════════╝ │IdB, PubB, sinal(SignPrvB, (IdB, PubA, PubB)) │ ╔══════════ ═════ ════════════╗ │<───────────────────────── ────────── ─────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═════════ ════════ ══════════╝ │ sinal(SignPrvA, (IdA, PubB, PubA)) │ ╔═════════════════ ═ ═══╗ │─ ──────────────────────────────────────── ───>│ ║verificar(SignPubB, ...) ║ │ │ ║chave = dh (prva, PUBB) ║ │ │ │

No entanto, ainda não “provamos” que geramos a mesma chave compartilhada para esta sessão. Em princípio, podemos prescindir desta etapa - a primeira conexão de transporte será inválida, mas queremos que quando o handshake for concluído tenhamos certeza de que tudo está realmente acordado. No momento temos em mãos o protocolo ISO/IEC IS 9798-3.

Poderíamos assinar a própria chave gerada. Isso é perigoso, pois é possível que haja vazamentos no algoritmo de assinatura utilizado (até mesmo bits por assinatura, mas ainda assim vazamentos). É possível assinar um hash da chave de derivação, mas vazar até mesmo o hash da chave derivada pode ser valioso em um ataque de força bruta à função de derivação. SIGMA usa uma função MAC que autentica o ID do remetente.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │───────────────────── ────────── ──────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚ ═══════ ════════════════════╝ │IdB, PubB, sign(SignPrvB, (PubA, PubB)), MAC(IdB) │ ╔════ ═══ │<───────────────── ────────── ─────────── ────────── ─│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ═════════════ ════════ ══╝ │ │ ╔════════════ ═════════╗ │ sinal(SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Chave = DH( PrvA, PubB) ║ │───────────────────── ── ─────────── ────────── ─────>│ ║verificar(Chave, IdB) ║ │ │ ║verificar(SignPubB, ...)║ │ │ ╚═══════════════ ═════ ═╝ │ │

Como otimização, alguns podem querer reutilizar suas chaves efêmeras (o que é, obviamente, lamentável para o PFS). Por exemplo, geramos um par de chaves, tentamos conectar, mas o TCP não estava disponível ou foi interrompido em algum lugar no meio do protocolo. É uma pena desperdiçar entropia e recursos de processador desperdiçados em um novo par. Portanto, apresentaremos o chamado cookie - um valor pseudo-aleatório que protegerá contra possíveis ataques de repetição aleatória ao reutilizar chaves públicas efêmeras. Devido à ligação entre o cookie e a chave pública efémera, a chave pública da parte oposta pode ser removida da assinatura como desnecessária.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA, CookieA │ ╔════════ ═══════════════════╗ │─────────────────── ────────── ──────────────────────────────────────── ─>│ ║SignPrvA, SignPubA = carregar( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═════════════════════════ ══╝ │IdB, PubB, CookieB , assinar(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔═══════════════════════════ ╗ │< ──────────────────────────────────────── ────────── ────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚══════ ═════════════════════╝ │ │ ╔══════════════ ═══════╗ │ sinal( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Chave = DH(PrvA, PubB) ║ │───────────────────── ─ ─ ──────────────────────────────────────── ───────>│ ║ verificar(Chave, IdB) ║ │ │ ║verificar(SignPubB, ...)║ │ │ ╚═════════════════════╝ │ │

Finalmente, queremos obter a privacidade dos nossos interlocutores de um observador passivo. Para fazer isso, a SIGMA propõe primeiro trocar chaves efêmeras e desenvolver uma chave comum para criptografar mensagens de autenticação e identificação. SIGMA descreve duas opções:

  • SIGMA-I - protege o iniciador de ataques ativos, o respondente de ataques passivos: o iniciador autentica o respondente e se algo não corresponder, não fornece sua identificação. O arguido divulga a sua identificação caso seja iniciado com ele um protocolo ativo. O observador passivo não aprende nada;
    SIGMA-R - protege o respondente de ataques ativos, o iniciador de ataques passivos. Tudo é exatamente o contrário, mas neste protocolo já são transmitidas quatro mensagens de handshake.

    Escolhemos o SIGMA-I porque é mais parecido com o que esperamos de coisas familiares cliente-servidor: o cliente é reconhecido apenas pelo servidor autenticado e todos já conhecem o servidor. Além disso, é mais fácil de implementar devido ao menor número de mensagens de handshake. Tudo o que adicionamos ao protocolo é criptografar parte da mensagem e transferir o identificador A para a parte criptografada da última mensagem:

    PubA, CookieA │ ╔══════════ ═════════════════╗ │────── ────────── ───── ────────── ──────────────────────── ─────────── ───── ──────>│ ║SignPrvA , SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════ ═══════ ═════════ ════╝ │ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔═════ ═══════════════ ═══════╗ │<─────────────── ────────── ───── ────────── ║SignP rvB, SignPubB = load()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚════ ═══════ ════════════════╝ │ │ ╔════════ ═══════════ ══╗ │ Enc((IdA, sinal( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Chave = DH(PrvA, PubB) ║ │───────────────────── ────────────────── ────────── ─────────── ─────────── ──────>│ ║verificar(Chave, IdB) ║ │ │ ║verificar( SignPubB, ...)║ │ │ ╚═══════════════ ════ ══╝ │ │
    
    • GOST R é usado para assinatura 34.10-2012 algoritmo com chaves de 256 bits.
    • Para gerar a chave pública, utiliza-se 34.10/2012/XNUMX VKO.
    • CMAC é usado como MAC. Tecnicamente, este é um modo especial de operação de cifra de bloco, descrito em GOST R 34.13-2015. Como função de criptografia para este modo - Gafanhoto (34.12-2015).
    • O hash de sua chave pública é utilizado como identificador do interlocutor. Usado como hash Stribog-256 (34.11/2012/256 XNUMX bits).

    Após o aperto de mão, chegaremos a um acordo sobre uma chave compartilhada. Podemos usá-lo para criptografia autenticada de mensagens de transporte. Essa parte é muito simples e difícil de errar: incrementamos o contador de mensagens, criptografamos a mensagem, autenticamos (MAC) o contador e o texto cifrado, enviamos. Ao receber uma mensagem, verificamos se o contador possui o valor esperado, autenticamos o texto cifrado com o contador e descriptografamos-o. Que chave devo usar para criptografar mensagens de handshake, transportar mensagens e como autenticá-las? Usar uma chave para todas essas tarefas é perigoso e imprudente. É necessário gerar chaves utilizando funções especializadas KDF (função de derivação de chave). Novamente, não vamos dividir os cabelos e inventar algo: HKDF é conhecido há muito tempo, bem pesquisado e não apresenta problemas conhecidos. Infelizmente, a biblioteca nativa do Python não possui essa função, então usamos hkdf saco de plástico. HKDF usa internamente HMAC, que por sua vez usa uma função hash. Um exemplo de implementação em Python na página da Wikipedia leva apenas algumas linhas de código. Assim como no caso de 34.10/2012/256, usaremos Stribog-XNUMX como função hash. A saída de nossa função de acordo de chave será chamada de chave de sessão, a partir da qual serão geradas as simétricas ausentes:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Estruturas/Esquemas

    Vejamos quais estruturas ASN.1 temos agora para transmitir todos esses dados:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS é o que será assinado. HandshakeTBE - o que será criptografado. Chamo sua atenção para o campo ukm em MsgHandshake1. 34.10 VKO, para uma randomização ainda maior das chaves geradas, inclui o parâmetro UKM (material de codificação do usuário) - apenas entropia adicional.

    Adicionando criptografia ao código

    Consideremos apenas as alterações feitas no código original, já que o framework permaneceu o mesmo (na verdade, a implementação final foi escrita primeiro e depois toda a criptografia foi cortada dela).

    Como a autenticação e identificação dos interlocutores serão feitas por meio de chaves públicas, eles agora precisam ser armazenados em algum lugar por muito tempo. Para simplificar, usamos JSON assim:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    our - nosso par de chaves, chaves hexadecimais privadas e públicas. their — nomes dos interlocutores e suas chaves públicas. Vamos alterar os argumentos da linha de comando e adicionar pós-processamento de dados JSON:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    A chave privada do algoritmo 34.10 é um número aleatório. Tamanho de 256 bits para curvas elípticas de 256 bits. PyGOST não funciona com um conjunto de bytes, mas com grandes números, então nossa chave privada (urandom(32)) precisa ser convertida em um número usando gost3410.prv_unmarshal(). A chave pública é determinada deterministicamente a partir da chave privada usando gost3410.public_key(). A chave pública 34.10 consiste em dois números grandes que também precisam ser convertidos em uma sequência de bytes para facilitar o armazenamento e a transmissão usando gost3410.pub_marshal().

    Depois de ler o arquivo JSON, as chaves públicas precisam ser convertidas novamente usando gost3410.pub_unmarshal(). Como receberemos os identificadores dos interlocutores em forma de hash da chave pública, eles podem ser imediatamente calculados com antecedência e colocados em um dicionário para busca rápida. O hash Stribog-256 é gost34112012256.GOST34112012256(), que satisfaz totalmente a interface hashlib das funções hash.

    Como a rotina do iniciador mudou? Tudo está de acordo com o esquema de handshake: geramos um cookie (128 bits é suficiente), um par de chaves efêmero 34.10, que será usado para a função de acordo de chave VKO.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • esperamos por uma resposta e decodificamos a mensagem de mensagem recebida;
    • certifique-se de obter o handshake1;
    • decodificar a chave pública efêmera da parte oposta e calcular a chave de sessão;
    • Geramos chaves simétricas necessárias para processar a parte TBE da mensagem.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM é um número de 64 bits (urandom(8)), que também requer desserialização de sua representação de bytes usando gost3410_vko.ukm_unmarshal(). A função VKO para 34.10/2012/256 de 3410 bits é gost34102012256_vko.kek_XNUMX() (KEK - chave de criptografia).

    A chave de sessão gerada já é uma sequência de bytes pseudoaleatórios de 256 bits. Portanto, pode ser usado imediatamente em funções HKDF. Como GOST34112012256 satisfaz a interface hashlib, ele pode ser usado imediatamente na classe Hkdf. Não especificamos o salt (o primeiro argumento do Hkdf), pois a chave gerada, devido à efemeridade dos pares de chaves participantes, será diferente para cada sessão e já contém entropia suficiente. kdf.expand() por padrão já produz as chaves de 256 bits necessárias para o Grasshopper posteriormente.

    A seguir, as partes TBE e TBS da mensagem recebida são verificadas:

    • o MAC sobre o texto cifrado recebido é calculado e verificado;
    • o texto cifrado é descriptografado;
    • A estrutura TBE é decodificada;
    • dele é retirado o identificador do interlocutor e verificado se ele é conhecido por nós;
    • O MAC sobre este identificador é calculado e verificado;
    • é verificada a assinatura na estrutura do TBS, que inclui o cookie de ambas as partes e a chave pública efêmera da parte oposta. A assinatura é verificada pela chave de assinatura de longa duração do interlocutor.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Como escrevi acima, 34.13/2015/XNUMX descreve vários bloquear modos de operação de cifra a partir de 34.12/2015/3413. Entre eles existe um modo de geração de inserções de imitação e cálculos MAC. No PyGOST é gost34.12.mac(). Este modo requer a passagem da função de criptografia (recebimento e retorno de um bloco de dados), do tamanho do bloco de criptografia e, de fato, dos próprios dados. Por que você não pode codificar o tamanho do bloco de criptografia? 2015/128/64 descreve não apenas a cifra Grasshopper de XNUMX bits, mas também a cifra de XNUMX bits Magma - um GOST 28147-89 ligeiramente modificado, criado na KGB e ainda possui um dos mais altos limites de segurança.

    Kuznechik é inicializado chamando gost.3412.GOST3412Kuznechik(key) e retorna um objeto com métodos .encrypt()/.decrypt() adequados para passar para funções 34.13. O MAC é calculado da seguinte forma: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, texto cifrado). Para comparar o MAC calculado e recebido, você não pode usar a comparação usual (==) de strings de bytes, pois esta operação perde tempo de comparação, o que, no caso geral, pode levar a vulnerabilidades fatais como BEAST ataques ao TLS. Python tem uma função especial, hmac.compare_digest, para isso.

    A função de cifra de bloco só pode criptografar um bloco de dados. Para um número maior, e mesmo não múltiplo do comprimento, é necessário utilizar o modo de criptografia. 34.13-2015 descreve o seguinte: BCE, CTR, OFB, CBC, CFB. Cada um tem suas próprias áreas de aplicação e características aceitáveis. Infelizmente ainda não temos padronização modos de criptografia autenticados (como CCM, OCB, GCM e similares) - somos forçados a pelo menos adicionar MAC nós mesmos. eu escolho modo contador (CTR): não requer preenchimento para o tamanho do bloco, pode ser paralelizado, usa apenas a função de criptografia, pode ser usado com segurança para criptografar um grande número de mensagens (ao contrário do CBC, que apresenta colisões de forma relativamente rápida).

    Assim como .mac(), .ctr() recebe entrada semelhante: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). É necessário especificar um vetor de inicialização que tenha exatamente metade do comprimento do bloco de criptografia. Se nossa chave de criptografia for usada apenas para criptografar uma mensagem (embora de vários blocos), então é seguro definir um vetor de inicialização zero. Para criptografar mensagens de handshake, usamos uma chave separada a cada vez.

    Verificar a assinatura gost3410.verify() é trivial: passamos a curva elíptica dentro da qual estamos trabalhando (simplesmente registramos em nosso protocolo GOSTIM), a chave pública do signatário (não esqueça que esta deve ser uma tupla de dois números grandes, e não uma string de bytes), hash de 34.11/2012/XNUMX e a própria assinatura.

    A seguir, no iniciador preparamos e enviamos uma mensagem de handshake para handshake2, realizando as mesmas ações que fizemos durante a verificação, só que simetricamente: assinando nossas chaves em vez de verificar, etc...

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Quando a sessão é estabelecida, são geradas chaves de transporte (uma chave separada para criptografia, para autenticação, para cada uma das partes), e o Grasshopper é inicializado para descriptografar e verificar o MAC:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    A corrotina msg_sender agora criptografa mensagens antes de enviá-las em uma conexão TCP. Cada mensagem tem um nonce crescente monotonicamente, que também é o vetor de inicialização quando criptografado no modo contador. É garantido que cada mensagem e bloco de mensagem tenha um valor de contador diferente.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    As mensagens recebidas são processadas pela corrotina msg_receiver, que trata da autenticação e descriptografia:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    Conclusão

    GOSTIM destina-se a ser utilizado exclusivamente para fins educacionais (uma vez que não é abrangido por testes, pelo menos)! O código fonte do programa pode ser baixado aqui (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, GoVPN, GOSTIM é completamente software grátis, distribuído nos termos GPLv3 +.

    Sergey Matveev, cifrapunkmembro Fundação SPO, desenvolvedor Python/Go, especialista-chefe FSUE "STC "Atlas".

Fonte: habr.com

Adicionar um comentário