GOSTIM: P2P F2F E2EE IM nunha noite con criptografía GOST

Ser un programador PyGOST bibliotecas (primitivas criptográficas GOST en puro Python), a miúdo recibo preguntas sobre como implementar a mensaxería segura máis sinxela no xeonllo. Moitas persoas consideran que a criptografía aplicada é bastante sinxela, e chamar a .encrypt() nun cifrado de bloque será suficiente para envialo de forma segura a través dunha canle de comunicación. Outros cren que a criptografía aplicada é o destino duns poucos, e é aceptable que empresas ricas como Telegram con matemáticos olímpicos. non pode implementar protocolo seguro.

Todo isto levoume a escribir este artigo para demostrar que implementar protocolos criptográficos e IM segura non é unha tarefa tan difícil. Non obstante, non paga a pena inventar os teus propios protocolos de autenticación e acordo de claves.

GOSTIM: P2P F2F E2EE IM nunha noite con criptografía GOST
O artigo escribirá peer-to-peer, amigo a amigo, cifrado de extremo a extremo mensaxería instantánea con SIGMA-I protocolo de autenticación e acordo de claves (en base ao cal se implementa IPsec IKE), utilizando exclusivamente algoritmos criptográficos GOST biblioteca PyGOST e biblioteca de codificación de mensaxes ASN.1 PyDERASN (sobre o que xa escribiu antes). Un requisito previo: debe ser tan sinxelo que se poida escribir desde cero nunha noite (ou día laboral), se non, xa non é un programa sinxelo. Probablemente teña erros, complicacións innecesarias, deficiencias, ademais este é o meu primeiro programa que utiliza a biblioteca asyncio.

Deseño de IM

En primeiro lugar, necesitamos entender como será o noso IM. Por simplicidade, deixe que sexa unha rede peer-to-peer, sen que se descubra ningún participante. Indicaremos persoalmente a que enderezo: porto conectarse para comunicarse co interlocutor.

Entendo que, neste momento, a suposición de que a comunicación directa está dispoñible entre dous ordenadores arbitrarios é unha limitación significativa na aplicabilidade da MI na práctica. Pero cantos máis desenvolvedores implementen todo tipo de muletas NAT-traversal, máis tempo permaneceremos na Internet IPv4, cunha probabilidade deprimente de comunicación entre ordenadores arbitrarios. Canto tempo podes tolerar a falta de IPv6 na casa e no traballo?

Teremos unha rede de amigo a amigo: hai que coñecer previamente todos os posibles interlocutores. En primeiro lugar, isto simplifícase moito todo: presentámonos, atopamos ou non o nome/chave, desconectamos ou seguimos traballando, coñecendo o interlocutor. En segundo lugar, en xeral, é seguro e elimina moitos ataques.

A interface de IM estará próxima ás solucións clásicas proxectos descabellados, que me gustan moito polo seu minimalismo e a súa filosofía Unix-way. O programa de MI crea un directorio con tres sockets de dominio Unix para cada interlocutor:

  • en—n rexístranse as mensaxes enviadas ao interlocutor;
  • fóra - as mensaxes recibidas do interlocutor lense del;
  • estado: ao ler nel, descubrimos se o interlocutor está conectado actualmente, o enderezo/porto de conexión.

Ademais, créase un socket de conexión, escribindo o porto host no que iniciamos unha conexión co interlocutor remoto.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Este enfoque permítelle facer implementacións independentes de transporte de mensaxería instantánea e interface de usuario, porque non hai ningún amigo, non pode agradar a todos. Usando tmux e / ou multicola, podes obter unha interface multi-xanela con resaltado de sintaxe. E coa axuda rlwrap pode obter unha liña de entrada de mensaxes compatible con GNU Readline.

De feito, os proxectos sen mamar usan ficheiros FIFO. Persoalmente, non podía entender como traballar con ficheiros de forma competitiva en asíncio sen un fondo escrito a man a partir de fíos dedicados (estiven usando a linguaxe para tales cousas durante moito tempo Go). Polo tanto, decidín conformarme con sockets de dominio Unix. Desafortunadamente, isto fai imposible facer echo 2001:470:dead::babe 6666 > conn. Resolvín este problema usando socat: echo 2001:470:dead::babe 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alice/in.

O protocolo orixinal inseguro

TCP utilízase como transporte: garante a entrega e o seu pedido. UDP non garante ningún dos dous (o que sería útil cando se usa a criptografía), senón soporte SCTP Python non sae da caixa.

Desafortunadamente, en TCP non hai ningún concepto de mensaxe, só un fluxo de bytes. Polo tanto, é necesario elaborar un formato para as mensaxes para que se poidan compartir entre eles neste fío. Podemos aceptar usar o carácter de avance de liña. Para comezar, está ben, pero unha vez que comezamos a cifrar as nosas mensaxes, este carácter pode aparecer en calquera lugar do texto cifrado. Nas redes, polo tanto, son populares os protocolos que envían primeiro a lonxitude da mensaxe en bytes. Por exemplo, Python ten xdrlib, o que che permite traballar cun formato similar XDR.

Non traballaremos de forma correcta e eficiente coa lectura TCP: simplificaremos o código. Lemos os datos do socket nun bucle interminable ata decodificar a mensaxe completa. JSON con XML tamén se pode usar como formato para este enfoque. Pero cando se engade a criptografía, os datos terán que estar asinados e autenticados, e isto requirirá unha representación idéntica de byte por byte dos obxectos, que JSON/XML non proporciona (os resultados dos volcados poden variar).

XDR é axeitado para esta tarefa, porén escollín ASN.1 con codificación DER e PyDERASN biblioteca, xa que teremos a man obxectos de alto nivel cos que moitas veces resulta máis agradable e cómodo traballar. A diferenza do sen esquema bencode, Paquete de mensaxes ou CBOR, ASN.1 comprobará automaticamente os datos contra un esquema codificado.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

A mensaxe recibida será Msg: ben un texto MsgText (cun ​​campo de texto polo momento) ou unha mensaxe de apretón de mans MsgHandshake (que contén o nome do interlocutor). Agora parece demasiado complicado, pero esta é unha base para o futuro.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └───┐ │PeerA│ │PeerB│ └──┬──┘ └───└─────│ ) │ │───────── ────────>│ │ │ │MsgHandshake(IdB) │ │<────────────────────│ │ │ MsgText() │ │──── MsgText() │ │ │

IM sen criptografía

Como xa dixen, a biblioteca asyncio empregarase para todas as operacións de socket. Anunciamos o que esperamos no lanzamento:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Establece o teu propio nome (--o noso nome alice). Todos os interlocutores esperados aparecen separados por comas (-os seus nomes bob,eve). Para cada un dos interlocutores, créase un directorio con sockets Unix, así como unha corrutina para cada estado de entrada, saída e estado:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

As mensaxes procedentes do usuario desde o socket de entrada envíanse á cola IN_QUEUES:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

As mensaxes procedentes dos interlocutores envíanse a OUT_QUEUES filas, desde as que se escriben os datos no socket de saída:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Ao ler desde un socket de estado, o programa busca o enderezo do interlocutor no dicionario PEER_ALIVE. Se aínda non hai conexión co interlocutor, escríbese unha liña baleira.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Ao escribir un enderezo nun socket de conexión, lánzase a función "iniciador" de conexión:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Consideremos o iniciador. Primeiro, obviamente, abre unha conexión co host/porto especificado e envía unha mensaxe de apretón de mans co seu nome:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Despois, agarda unha resposta da parte remota. Tenta decodificar a resposta entrante mediante o esquema Msg ASN.1. Supoñemos que toda a mensaxe enviarase nun segmento TCP e recibirémola atómicamente cando chamemos a .read(). Comprobamos que recibimos a mensaxe de apretón de mans.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Comprobamos que o nome recibido do interlocutor é coñecido por nós. Se non, rompemos a conexión. Comprobamos se xa establecemos unha conexión con el (o interlocutor deu de novo a orde de conectarnos) e pechamos. A cola IN_QUEUES contén cadeas de Python co texto da mensaxe, pero ten un valor especial de None que indica que a corrutina msg_sender deixe de funcionar para que se esqueza do seu escritor asociado á conexión TCP herdada.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender acepta mensaxes de saída (en cola dun socket interno), serialízaas nunha mensaxe MsgText e envíaas a través dunha conexión TCP. Pode romper en calquera momento - interceptamos isto claramente.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

Ao final, o iniciador entra nun bucle infinito de lectura de mensaxes desde o socket. Comproba se estas mensaxes son mensaxes de texto e colócaas na cola OUT_QUEUES, desde a que se enviarán á toma de saída do interlocutor correspondente. Por que non podes simplemente facer .read() e decodificar a mensaxe? Porque é posible que varias mensaxes do usuario sexan agregadas no búfer do sistema operativo e enviadas nun segmento TCP. Podemos decodificar o primeiro e, a continuación, parte do seguinte pode permanecer no búfer. En caso de calquera situación anormal, pechamos a conexión TCP e paramos a corrutina msg_sender (enviando None á cola OUT_QUEUES).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Volvamos ao código principal. Despois de crear todas as corrutinas no momento en que se inicia o programa, iniciamos o servidor TCP. Para cada conexión establecida, crea unha corrutina de resposta.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

responder é semellante ao iniciador e reflicte todas as mesmas accións, pero o bucle infinito de lectura de mensaxes comeza inmediatamente, para simplificar. Actualmente, o protocolo de apretón de mans envía unha mensaxe de cada lado, pero no futuro haberá dúas do iniciador da conexión, despois das cales pódense enviar mensaxes de texto inmediatamente.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Protocolo seguro

É hora de protexer as nosas comunicacións. Que entendemos por seguridade e que queremos:

  • confidencialidade das mensaxes transmitidas;
  • autenticidade e integridade das mensaxes transmitidas: os seus cambios deben ser detectados;
  • protección contra ataques de repetición: hai que detectar o feito de faltar ou repetir mensaxes (e decidimos finalizar a conexión);
  • identificación e autenticación de interlocutores mediante chaves públicas preintroducidas -xa decidimos antes que facíamos unha rede de amigo a amigo-. Só despois da autenticación entenderemos con quen nos estamos comunicando;
  • dispoñibilidade perfecto segredo directo propiedades (PFS): comprometer a nosa chave de sinatura de longa duración non debería levar á posibilidade de ler toda a correspondencia anterior. A gravación do tráfico interceptado faise inútil;
  • validez/validez das mensaxes (transporte e apretón de mans) só nunha sesión TCP. Non debería ser posible inserir mensaxes correctamente asinadas/autenticadas doutra sesión (mesmo co mesmo interlocutor);
  • un observador pasivo non debería ver nin os identificadores de usuario, as claves públicas de longa duración transmitidas nin os hash deles. Certo anonimato dun observador pasivo.

Sorprendentemente, case todo o mundo quere ter este mínimo en calquera protocolo de apretón de mans, e moi pouco do anterior se cumpre finalmente para os protocolos "na casa". Agora non imos inventar nada novo. Definitivamente recomendaría usar Marco de ruído para construír protocolos, pero elixamos algo máis sinxelo.

Os dous protocolos máis populares son:

  • TLS - un protocolo moi complexo cunha longa historia de erros, xambas, vulnerabilidades, mal pensamento, complexidade e deficiencias (non obstante, isto ten pouco que ver con TLS 1.3). Pero non o consideramos porque é demasiado complicado.
  • IPsec с Ike — non teñen problemas criptográficos graves, aínda que tampouco son sinxelos. Se le sobre IKEv1 e IKEv2, entón a súa fonte é STS, ISO/IEC IS 9798-3 e protocolos SIGMA (SIGn-and-MAc) - o suficientemente sinxelos para implementar nunha noite.

Que ten de bo SIGMA, como a última ligazón no desenvolvemento de protocolos STS/ISO? Cumpre todos os nosos requisitos (incluíndo "ocultar" os identificadores do interlocutor) e non ten problemas criptográficos coñecidos. É minimalista: eliminar polo menos un elemento da mensaxe do protocolo provocará a súa inseguridade.

Imos do protocolo máis sinxelo de cultivo propio a SIGMA. A operación máis básica que nos interesa é acordo clave: unha función que dá a ambos os participantes o mesmo valor, que se pode usar como clave simétrica. Sen entrar en detalles: cada unha das partes xera un par de claves efémero (utilizado só dentro dunha sesión) (chaves públicas e privadas), intercambia claves públicas, chama a función de acordo, a cuxa entrada pasan a súa clave privada e a pública. clave do interlocutor.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └───┐────────│ ╔══════════ ══════════╗ │───────────────>│ ║PrvA, Pu()───────────>│ ║PrvA, ═ ════════ ═══════════╝ │ IdB, PubB │ ╔═════════════════════════════════ <───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚═════════════════════════════════ ───┐ ╔════ ═══╧════════════╗ │ ║Clave = DH(PrvA, PubB)║ <───┘═══════════ ═ ═══════ ════╝ │ │ │ │

Calquera pode saltar polo medio e substituír as claves públicas polas súas propias: neste protocolo non hai autenticación de interlocutores. Engademos unha sinatura con claves de longa duración.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └───┐ │PeerA│ │PeerB│ └──┬──┘ └──┘ └────── sinal, A│ SignPrvA, (PubA)) │ ╔═ │──────────── ────────── ─────────────────── ────────────────────Asinar bA = carga ()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════ ═══════ ═════════════════════B , sinal(SignPrvB, (PubB)) │ ╔══════════════ ═══════ ══════════════╔───╔╔╔╔╔ ─────────── ───────────── ──│ ║SignPrvB, SignPubB = carga ( )║ │ │ ║PrvB, PubB = ║││ DHgen ═════════ ══════════════ ══╝ ────┐ ╔ ═════════════════════════ ═════╗ │ │ ║verificar( SignPubB, ...)║ │ <───┘ ║Key = DH(Pr vA, PubB) ║ │ │ ╚═════════════════════════════ ══╝ │ │ │

Tal sinatura non funcionará, xa que non está vinculada a unha sesión específica. Estas mensaxes tamén son "adecuadas" para sesións con outros participantes. Debe subscribirse todo o contexto. Isto obríganos a engadir tamén outra mensaxe de A.

Ademais, é fundamental engadir o teu propio identificador baixo a sinatura, xa que, en caso contrario, podemos substituír IdXXX e volver a asinar a mensaxe coa chave doutro interlocutor coñecido. Para evitar ataques de reflexión, é necesario que os elementos baixo a sinatura estean en lugares claramente definidos segundo o seu significado: se A asina (PubA, PubB), entón B debe asinar (PubB, PubA). Isto tamén fala da importancia de escoller a estrutura e o formato dos datos serializados. Por exemplo, os conxuntos na codificación ASN.1 DER están ordenados: SET OF(PubA, PubB) será idéntico a SET OF(PubB, PubA).

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └───┐────────│ ╔══════════ ═════════════════╗ │──────────────────────────── ────────── ─────────────>│ ║SignPrvA, SignPubA = cargar()║ │ │ ║PrvA, PubA = DHgen() ═ₕ═ₕ═══ₕ ═ ═══════ ═══════════════╝ │IdB, PubB, signo(SignPrvB, (IdB, PubA, PubB)) │ ╔═══════════ ═════ ════════════╗ │<─────────────────────────────── ────────── ─────────│ ║SignPrvB, SignPubB = carga ()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ║ │ │ ══════════════ ════════ ══════════╝ │ signo(SignPrvA, (IdA, PubB, PubA)) │ ╔═════════════════════════ ════╗ │─ ─────────────────────────────────────────── ───>│ ║verificar(SignPubB, ...) ║ │ │ ║key = dh (prva, PUBB) ║ │ │ │

Non obstante, aínda non "probamos" que xeramos a mesma clave compartida para esta sesión. En principio, podemos prescindir deste paso: a primeira conexión de transporte non será válida, pero queremos que, cando se complete o apretón de mans, esteamos seguros de que todo estea realmente acordado. Polo momento temos a man o protocolo ISO/IEC IS 9798-3.

Poderiamos asinar a propia chave xerada. Isto é perigoso, xa que é posible que haxa fugas no algoritmo de sinatura utilizado (aínda que sexa bits por sinatura, pero aínda así). É posible asinar un hash da clave de derivación, pero filtrar mesmo o hash da clave derivada pode ser valioso nun ataque de forza bruta á función de derivación. SIGMA usa unha función MAC que autentica o ID do remitente.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └───┐────────│ ╔══════════ ═════════════════╗ │──────────────────────────── ────────── ──────────────────>│ ║SignPrvA, SignPubA = carga ()║ │ │ ───── ╚ ═══════ ════════════════════╝ │IdB, PubB, signo(SignPrvB, (PubA, PubB═)), MAC│I══════ ═══ │<───────────────── ────────── ──────────── ────────── ─│ ║SignPrvB, SignPubB = cargar()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ════ ══════════════ ═════════ ══╝ │ │ ╔════════════ ═════════╗ ══╗ │ signo MAC(IbA, Pub, Pub) ║Chave = DH( PrvA, PubB) ║ │───────────────────── ── ───────────── ────────── ─────>│ ║verificar(Clave, IdB) ║ │ │ ║verificar(SignPubB, ...)║ │ │ ╚════════════════════════ ═════ ═╝ │ │

Como optimización, algúns poden querer reutilizar as súas claves efémeras (o que é, por suposto, desafortunado para PFS). Por exemplo, xeramos un par de claves, tentamos conectarnos, pero TCP non estaba dispoñible ou interrompeuse nalgún lugar no medio do protocolo. É unha mágoa desperdiciar a entropía e os recursos do procesador desperdiciados nun novo par. Por iso, introduciremos a chamada cookie, un valor pseudoaleatorio que protexerá contra posibles ataques de repetición aleatorios cando se reutilicen chaves públicas efémeras. Debido á vinculación entre a cookie e a clave pública efémera, a clave pública da parte contraria pode eliminarse da sinatura por ser innecesaria.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └───┐ │PeerA│ │PeerB│ └──┬──┘ └──┘ └───────│ Cookie │ ╔════════ ═══════════════════╗ │──────────────────────── ────────── ─────────────────────────────────────────── ─>│ ║SignPrvA, SignPubA = cargar( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚══════════════════════════════════ ══╝ │IdB, PubB, CookieB , signo(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔══════════════════════════════════════ ╗ │< ─────────────────────────────────────────── ────────── ────────────────────│ ║SignPrvB, SignPubB = cargar()║ │ ──────│ ║SignPrvB, SignPubB = cargar()║ │ ────│ DH⑕) =vB││ │ ╚══════ ═════════════════════╝ │ │ ╔════════════════════ ═══════╗ │ signo( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │─────────────────────────── ── ─────────────────────────────────────────── ───────>│ ║ verificar(Clave, IdB) ║ │ │ ║verificar(SignPubB, ...)║ │ │ ╚════════════════════════════════ │

Finalmente, queremos obter a privacidade dos nosos interlocutores dun observador pasivo. Para iso, SIGMA propón intercambiar primeiro claves efémeras e desenvolver unha clave común na que cifrar as mensaxes de autenticación e identificación. SIGMA describe dúas opcións:

  • SIGMA-I - protexe o iniciador dos ataques activos, o respondedor dos pasivos: o iniciador autentica o respondedor e, se algo non coincide, entón non dá a súa identificación. O acusado entrega a súa identificación se se inicia un protocolo activo con el. O observador pasivo non aprende nada;
    SIGMA-R - protexe o respondedor dos ataques activos, o iniciador dos pasivos. Todo é exactamente o contrario, pero neste protocolo xa se transmiten catro mensaxes de apretón de mans.

    Escollemos SIGMA-I xa que é máis parecido ao que esperamos das cousas familiares cliente-servidor: o cliente só é recoñecido polo servidor autenticado e todos xa coñecen o servidor. Ademais, é máis fácil de implementar debido a menos mensaxes de apretón de mans. O único que engadimos ao protocolo é cifrar parte da mensaxe e transferir o identificador A á parte cifrada da última mensaxe:

    PubA, CookieA │ ╔══════════ ════════════════════╀╔╔╔╔╔╔╔╔╔ ────────── ───── ────────── ────────────────────────── ─────────── ───── ──────>│ ║SignPrvA , SignPubA = cargar()║ │ │ ║PrvA, PubA = DHgen() ║ ═════│ ════════ ═════════ ════╝ │ PubB, CookieB, Enc((IdB, signo(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB)══════════════════ ═══════════════ ═══════╗ │<────────────────── ────────── ───── ────────── ║SignP rvB, SignPubB = cargar()║ │ │ ║ PrvB, PubB = DHgen ││═ₕ ││ ════════ ════════════════╝ │ │ ╔════════ ══════════════ ══╗ │ Enc((IdA, signo( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │───────────────────────── ─ ────────────────── ────────── ( SignPubB, ...)───────────── ( SignPubB, ...)═│ ═══════ ══════════╝ │ │
    
    • GOST R úsase para a sinatura 34.10-2012 algoritmo con claves de 256 bits.
    • Para xerar a clave pública, utilízase 34.10-2012 VKO.
    • CMAC úsase como MAC. Tecnicamente, este é un modo especial de operación dun cifrado de bloques, descrito en GOST R 34.13-2015. Como función de cifrado para este modo − Saltón (34.12-2015).
    • O hash da súa chave pública úsase como identificador do interlocutor. Usado como hash Stribog-256 (34.11/2012/256 XNUMX bits).

    Despois do apretón de mans, acordaremos unha chave compartida. Podemos usalo para o cifrado autenticado de mensaxes de transporte. Esta parte é moi sinxela e difícil de cometer un erro: incrementamos o contador de mensaxes, ciframos a mensaxe, autenticamos (MAC) o contador e o texto cifrado, enviamos. Ao recibir unha mensaxe, comprobamos que o contador ten o valor esperado, autenticamos o texto cifrado co contador e o desciframos. Que chave debo usar para cifrar mensaxes de enlace, transportar mensaxes e como autenticalas? Usar unha tecla para todas estas tarefas é perigoso e imprudente. É necesario xerar claves mediante funcións especializadas KDF (función de derivación de clave). Unha vez máis, non imos partir os pelos e inventemos algo: HKDF coñécese desde hai moito tempo, está ben investigado e non ten problemas coñecidos. Desafortunadamente, a biblioteca nativa de Python non ten esta función, polo que usamos hkdf bolsa de plástico. HKDF usa internamente HMAC, que á súa vez usa unha función hash. Un exemplo de implementación en Python na páxina de Wikipedia leva só unhas poucas liñas de código. Como no caso do 34.10/2012/256, usaremos Stribog-XNUMX como función hash. A saída da nosa función de acordo de clave chamarase clave de sesión, a partir da cal se xerarán as simétricas que faltan:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Estruturas/Esquemas

    Vexamos que estruturas ASN.1 temos agora para transmitir todos estes datos:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS é o que se asinará. HandshakeTBE: o que se cifrará. Chamo a súa atención sobre o campo ukm en MsgHandshake1. 34.10 VKO, para unha maior aleatorización das claves xeradas, inclúe o parámetro UKM (material de claves de usuario), só entropía adicional.

    Engadir criptografía ao código

    Consideremos só os cambios realizados no código orixinal, xa que o marco seguía sendo o mesmo (de feito, primeiro escribiuse a implementación final e, a continuación, corrouse toda a criptografía).

    Dado que a autenticación e identificación dos interlocutores realizarase mediante claves públicas, agora é necesario gardarlas nalgún lugar durante moito tempo. Para simplificar, usamos JSON así:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    noso - o noso par de claves, chaves privadas e públicas hexadecimais. os seus — nomes dos interlocutores e as súas claves públicas. Imos cambiar os argumentos da liña de comandos e engadir o post-procesamento dos datos JSON:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    A clave privada do algoritmo 34.10 é un número aleatorio. Tamaño de 256 bits para curvas elípticas de 256 bits. PyGOST non funciona cun conxunto de bytes, senón con grandes números, polo que a nosa clave privada (urandom(32)) debe converterse nun número usando gost3410.prv_unmarshal(). A chave pública determínase de forma determinista a partir da clave privada mediante gost3410.public_key(). A chave pública 34.10 son dous números grandes que tamén deben converterse nunha secuencia de bytes para facilitar o almacenamento e a transmisión mediante gost3410.pub_marshal().

    Despois de ler o ficheiro JSON, as chaves públicas deben converterse de novo usando gost3410.pub_unmarshal(). Dado que recibiremos os identificadores dos interlocutores en forma de hash da chave pública, pódense calcular inmediatamente con antelación e colocalos nun dicionario para unha busca rápida. O hash Stribog-256 é gost34112012256.GOST34112012256(), que satisface totalmente a interface hashlib das funcións hash.

    Como cambiou a corrutina do iniciador? Todo é segundo o esquema de apretón de mans: xeramos unha cookie (de 128 bits é suficiente), un par de claves efémero 34.10, que se utilizará para a función de acordo de claves VKO.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • agardamos unha resposta e decodificamos a mensaxe de Msg entrante;
    • asegúrate de recibir o apretón de mans1;
    • decodificar a clave pública efémera da parte contraria e calcular a clave de sesión;
    • Xeramos as claves simétricas necesarias para procesar a parte TBE da mensaxe.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM é un número de 64 bits (urandom(8)), que tamén require a deserialización da súa representación de bytes usando gost3410_vko.ukm_unmarshal(). A función VKO para o 34.10/2012/256 de 3410 bits é gost34102012256_vko.kek_XNUMX() (KEK - clave de cifrado).

    A clave de sesión xerada xa é unha secuencia de bytes pseudoaleatorias de 256 bits. Polo tanto, pódese usar inmediatamente nas funcións HKDF. Dado que GOST34112012256 satisface a interface hashlib, pódese usar inmediatamente na clase Hkdf. Non especificamos o sal (o primeiro argumento de Hkdf), xa que a clave xerada, debido á efémeridade dos pares de claves participantes, será diferente para cada sesión e xa contén suficiente entropía. kdf.expand() xa produce as claves de 256 bits necesarias para Grasshopper máis adiante.

    A continuación, compróbanse as partes TBE e TBS da mensaxe entrante:

    • calcúlase e compróbase o MAC sobre o texto cifrado entrante;
    • o texto cifrado está descifrado;
    • a estrutura TBE está decodificada;
    • sácaselle o identificador do interlocutor e compróbase se é coñecido por nós;
    • O MAC sobre este identificador calcúlase e compróbase;
    • verifícase a sinatura sobre a estrutura de TBS, que inclúe a cookie de ambas as partes e a clave pública efémera da parte contraria. A sinatura é verificada pola clave de sinatura de longa duración do interlocutor.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Como escribín anteriormente, o 34.13/2015/XNUMX describe varios modos operativos de cifrado de bloques dende o 34.12/2015/3413. Entre eles hai un modo para xerar insercións de imitación e cálculos MAC. En PyGOST isto é gost34.12.mac(). Este modo require pasar a función de cifrado (recibir e devolver un bloque de datos), o tamaño do bloque de cifrado e, de feito, os propios datos. Por que non pode codificar o tamaño do bloque de cifrado? 2015/128/64 describe non só o cifrado Grasshopper de XNUMX bits, senón tamén o de XNUMX bits. Magma - un GOST 28147-89 lixeiramente modificado, creado no KGB e aínda ten un dos limiares de seguridade máis altos.

    Kuznechik iníciase chamando a gost.3412.GOST3412Kuznechik(clave) e devolve un obxecto con métodos .encrypt()/.decrypt() axeitados para pasar a funcións 34.13. O MAC calcúlase do seguinte xeito: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, texto cifrado). Para comparar o MAC calculado e recibido, non pode usar a comparación habitual (==) de cadeas de bytes, xa que esta operación perde tempo de comparación, o que, no caso xeral, pode provocar vulnerabilidades mortais como Beast ataques a TLS. Python ten unha función especial, hmac.compare_digest, para iso.

    A función de cifrado de bloque só pode cifrar un bloque de datos. Para un número maior, e aínda non un múltiplo da lonxitude, é necesario utilizar o modo de cifrado. 34.13-2015 describe o seguinte: ECB, CTR, OFB, CBC, CFB. Cada un ten as súas propias áreas de aplicación e características aceptables. Por desgraza, aínda non temos estandarizado modos de cifrado autenticados (como CCM, OCB, GCM e similares) - estamos obrigados a polo menos engadir MAC nós mesmos. elixo modo contador (CTR): non require recheo ao tamaño do bloque, pódese paralelizar, usa só a función de cifrado, pódese usar con seguridade para cifrar grandes cantidades de mensaxes (a diferenza de CBC, que ten colisións relativamente rápido).

    Do mesmo xeito que .mac(), .ctr() toma entradas similares: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, texto plano, iv). É necesario especificar un vector de inicialización que teña exactamente a metade da lonxitude do bloque de cifrado. Se a nosa clave de cifrado só se usa para cifrar unha mensaxe (aínda que de varios bloques), entón é seguro establecer un vector de inicialización cero. Para cifrar as mensaxes de apretón de mans, usamos unha chave separada cada vez.

    Verificar a sinatura gost3410.verify() é trivial: pasamos a curva elíptica dentro da que estamos a traballar (simplemente rexistrámola no noso protocolo GOSTIM), a chave pública do asinante (non esquezamos que esta debe ser unha tupla de dous). grandes números, e non unha cadea de bytes), 34.11/2012/XNUMX hash e a propia sinatura.

    A continuación, no iniciador preparamos e enviamos unha mensaxe de apretón de mans a handshake2, realizando as mesmas accións que fixemos durante a verificación, só de forma simétrica: asinar as nosas chaves en lugar de verificar, etc...

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Cando se establece a sesión, xéranse claves de transporte (unha clave separada para o cifrado, para a autenticación, para cada unha das partes) e o Grasshopper iníciase para descifrar e comprobar o MAC:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    A rutina msg_sender agora cifra as mensaxes antes de envialas nunha conexión TCP. Cada mensaxe ten un nonce monótonamente crecente, que tamén é o vector de inicialización cando se cifra no modo contador. Cada mensaxe e bloque de mensaxes ten garantido un valor de contador diferente.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    As mensaxes entrantes son procesadas pola corrutina msg_receiver, que xestiona a autenticación e o descifrado:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    Conclusión

    GOSTIM está pensado para ser usado exclusivamente con fins educativos (xa que non está cuberto por probas, polo menos)! O código fonte do programa pódese descargar aquí (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, GoVPN, GOSTIM é completamente software libre, distribuído segundo os termos GPLv3 +.

    Sergey Matveev, cypherpunk, membro Fundación SPO, programador Python/Go, especialista xefe FSUE "STC "Atlas".

Fonte: www.habr.com

Engadir un comentario