GOSTIM: P2P F2F E2EE IM in una sera cù criptografia GOST

Esse un sviluppatore PyGOST biblioteche (GOST primitive criptografiche in pura Python), spessu riceve dumande nantu à cumu implementà a messageria sicura più simplice nantu à u ghjinochju. Parechje persone cunzidenu chì a criptografia applicata hè abbastanza simplice, è chjamà .encrypt () nantu à un cifru di bloccu serà abbastanza per mandà in modu sicuru per un canale di cumunicazione. Altri crèdenu chì a criptografia applicata hè u destinu di uni pochi, è hè accettatu chì e cumpagnie ricche cum'è Telegram cù l'olimpiadi-matematici ùn pò micca implementà protocolu sicuru.

Tuttu chistu m'hà incitatu à scrive stu articulu per dimustrà chì l'implementazione di protokolli criptografici è IM sicura ùn hè micca cusì difficiule. Tuttavia, ùn vale a pena inventà a vostra propria autentificazione è protokolli di accordu chjave.

GOSTIM: P2P F2F E2EE IM in una sera cù criptografia GOST
L'articulu scriverà merci-di-avance, amicu à amicu, criptatu end-to-end messenger instantani cun SIGMA-I autentificazione è protocolu d'accordu chjave (in basa di quale u IPsec IKE), utilizendu esclusivamente l'algoritmi criptografici GOST a biblioteca PyGOST è a biblioteca di codificazione di missaghju ASN.1 PyDERASN (circa chì aghju digià hà scrittu prima). Un prerequisite: deve esse cusì simplice chì pò esse scrittu da zero in una sera (o ghjornu di travagliu), altri ùn hè più un prugramma simplice. Hè prubabilmente hà errori, cumplicazioni innecessarii, difetti, più questu hè u mo primu prugramma chì usa a libreria asyncio.

Disegnu IM

Prima, avemu bisognu di capisce ciò chì u nostru IM sarà. Per simplicità, lasciate esse una reta peer-to-peer, senza scuperta di i participanti. Indicà personalmente quale indirizzu: portu per cunnette per cumunicà cù l'interlocutore.

Aghju capitu chì, à questu tempu, l'assunzione chì a cumunicazione diretta hè dispunibule trà dui computer arbitrarii hè una limitazione significativa di l'applicabilità di IM in pratica. Ma u più sviluppatori implementanu ogni tipu di crutches NAT-traversal, più longu resteremu nantu à l'Internet IPv4, cù una probabilità deprimente di cumunicazione trà computer arbitrarii. Quantu tempu pudete tollerà a mancanza di IPv6 in casa è à u travagliu?

Averemu una reta d'amicu à amicu : tutti i pussibuli interlocutori devenu esse cunnisciuti in anticipu. Prima, questu simplificà assai tuttu: avemu intruduciutu, truvamu o ùn truvamu micca u nome / chjave, disconnected o cuntinuà à travaglià, sapendu l'interlocutore. Siconda, in generale, hè sicuru è elimina assai attacchi.

L'interfaccia IM sarà vicinu à suluzioni classic prughjetti senza schiavi, chì mi piace assai per u so minimalismu è a filosofia Unix-way. U prugramma IM crea un repertoriu cù trè sockets di domini Unix per ogni interlocutore:

  • in-messaggi mandati à l'interlocutore sò arregistrati in questu;
  • fora - i missaghji ricevuti da l'interlocutore sò leghje da ellu;
  • statu - per leghje da ellu, avemu da sapè s'ellu l'interlocutore hè attualmente cunnessu, l'indirizzu / portu di cunnessione.

Inoltre, un socket cunn hè creatu, scrivendu u portu di l'ospite in quale avemu principiatu una cunnessione à l'interlocutore remoto.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Stu approcciu permette di fà implementazioni indipendenti di u trasportu IM è l'interfaccia d'utilizatore, perchè ùn ci hè micca amicu, ùn pudete micca piace à tutti. Utilizendu tmux è / o multicoda, pudete uttene una interfaccia multi-finestra cù evidenziazione di sintassi. È cù l'aiutu rlwrap pudete uttene una linea di input di missaghju GNU Readline cumpatibile.

In fatti, i prughjetti suckless utilizanu schedari FIFO. In modu persunale, ùn aghju micca pussutu capisce cumu travaglià cù i fugliali cumpetitivi in ​​l'asyncio senza un sfondate scritta a manu da i fili dedicati (aghju utilizatu a lingua per tali cose per un bellu pezzu. Go). Dunque, aghju decisu di fà fà cù i sockets di duminiu Unix. Sfurtunatamente, questu rende impussibile di fà eco 2001:470:dead::babe 6666 > conn. Aghju risoltu stu prublema usendu socat: echo 2001:470:mortu ::babe 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alice/in.

U protocolu insecuru originale

TCP hè adupratu cum'è trasportu: guarantisci a consegna è u so ordine. UDP guarantisci nè (chì seria utile quandu a criptografia hè aduprata), ma supportu SCTP Python ùn esce micca da a scatula.

Sfortunatamente, in TCP ùn ci hè micca cuncettu di un missaghju, solu un flussu di bytes. Per quessa, hè necessariu di vene cun un furmatu per i missaghji in modu chì ponu esse spartuti trà elli in stu filu. Pudemu accunsentì à aduprà u caratteru di alimentazione di linea. Va bè per principianti, ma una volta cuminciamu à criptà i nostri messagi, stu caratteru pò apparisce in ogni locu in u ciphertext. In e rete, dunque, i protokolli populari sò quelli chì mandanu prima a lunghezza di u messagiu in byte. Per esempiu, fora di a casella Python hà xdrlib, chì vi permette di travaglià cù un furmatu simili XDR.

Ùn avemu micca travagliatu bè è efficace cù a lettura TCP - simplificà u codice. Leghjemu i dati da u socket in un ciclu infinitu finu à decodificà u missaghju cumpletu. JSON cù XML pò ancu esse usatu cum'è formatu per questu approcciu. Ma quandu a criptografia hè aghjuntu, i dati anu da esse firmati è autentificati - è questu richiederà una rapprisintazioni identica di l'uggetti byte-per-byte, chì JSON / XML ùn furnisce micca (i risultati di dumps pò varià).

XDR hè adattatu per questu compitu, ma aghju sceltu ASN.1 cù codificazione DER è PyDERASN biblioteca, postu chì averemu uggetti d'altu livellu in manu cù quale hè spessu più piacevule è cunvene per travaglià. A cuntrariu di schemaless bencode, MessagePack o CBOR, ASN.1 verificarà automaticamente i dati contr'à un schema hard-coded.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

U missaghju ricevutu sarà Msg: o un testu MsgText (cù un campu di testu per avà) o un missaghju MsgHandshake handshake (chì cuntene u nome di l'interlocutore). Avà pare troppu cumplicatu, ma questu hè un fundamentu per u futuru.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └───┐ │PeerA│ │PeerB│ └──┬──┘ └──┘ └────────── │ │──────── ─ ────────>│ │ │ │MsgHandshake(IdB) │ │<──────────────────────── │ │ MsgText() │ │─── ─ MsgText() │ │ │

IM senza criptografia

Cum'è aghju dettu, a libreria asyncio serà aduprata per tutte l'operazioni di socket. Annunziate ciò chì aspittemu à u lanciu:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Pone u vostru nome (--nome-nome alice). Tutti l'interlocutori previsti sò listati separati da virgule (-i so nomi bob,eve). Per ognunu di l'interlocutori, un annuariu cù sockets Unix hè creatu, è ancu una coroutine per ogni in, out, state:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

I missaghji chì venenu da l'utilizatore da u socket sò mandati à a fila IN_QUEUES:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

I missaghji chì venenu da l'interlocutori sò mandati à file OUT_QUEUES, da quali dati sò scritti à u socket out:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Quandu leghje da un socket statale, u prugramma cerca l'indirizzu di l'interlocutore in u dizziunariu PEER_ALIVE. Se ùn ci hè micca una cunnessione cù l'interlocutore, allora una linea viota hè scritta.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Quandu scrivite un indirizzu à un socket di cunnessione, a funzione "iniziatore" di cunnessione hè lanciata:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Pensemu à l'iniziatore. Prima ovviamente apre una cunnessione à l'ospitu / portu specificatu è manda un missaghju di handshake cù u so nome:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Allora, aspetta una risposta da u partitu remoto. Prova à decodificà a risposta entrata cù u schema Msg ASN.1. Assumimu chì u messagiu tutale serà mandatu in un segmentu TCP è l'avemu ricivutu atomicamente quandu chjamate .read (). Avemu verificatu chì avemu ricevutu u missaghju handshake.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Cuntrollamu chì u nome ricevutu di l'interlocutore hè cunnisciutu da noi. Se no, allora rompemu a cunnessione. Cuntrollamu s'ellu avemu digià stabilitu una cunnessione cun ellu (l'interlocutore hà datu novu u cumandamentu per cunnette cù noi) è chjude. A fila IN_QUEUES cuntene stringhe Python cù u testu di u missaghju, ma hà un valore spiciale di None chì signala a corotine msg_sender per cessà di travaglià in modu chì si scurdanu di u so scrittore assuciatu cù a cunnessione TCP legata.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender accetta i missaghji in uscita (queued from an in socket), li serializza in un missaghju MsgText è li manda nantu à una cunnessione TCP. Pò rompe in ogni mumentu - interceptemu chjaramente questu.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

À a fine, l'iniziatore entra in un ciclu infinitu di leghje missaghji da u socket. Cuntrolla s'ellu sti missaghji sò missaghji testu è li mette in a fila OUT_QUEUES, da quale seranu mandati à u socket fora di l'interlocutore currispundente. Perchè ùn pudete micca solu fà .read () è decode u missaghju ? Perchè hè pussibule chì parechji missaghji da l'utilizatori seranu aggregati in u buffer di u sistema operatore è mandati in un segmentu TCP. Pudemu decodificà u primu, è dopu una parte di u prossimu pò esse in u buffer. In casu di ogni situazione anormali, chjudemu a cunnessione TCP è ferma a coroutine msg_sender (mandendu None à a fila OUT_QUEUES).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Riturnemu à u codice principale. Dopu avè creatu tutte e coroutines à u mumentu chì u prugramma principia, avemu principiatu u servitore TCP. Per ogni cunnessione stabilita, crea una coroutine di risposta.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

risponditore hè simile à l'iniziatore è specchi tutti i stessi azzioni, ma u ciclu infinitu di i missaghji di lettura principia subitu, per simplicità. Attualmente, u protocolu di handshake manda un missaghju da ogni parte, ma in u futuru ci saranu dui da l'iniziatore di cunnessione, dopu chì i missaghji di testu ponu esse mandati immediatamente.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Protokollu sicuru

Hè ora di assicurà e nostre cumunicazioni. Chì significhemu per sicurità è ciò chì vulemu:

  • cunfidenziale di i missaghji trasmessi;
  • l'autenticità è l'integrità di i missaghji trasmessi - i so cambiamenti devenu esse rilevati;
  • prutezzione contra l'attacchi di replay - u fattu di missing o ripetuti messagi deve esse rilevatu (è decide di finisce a cunnessione);
  • identificazione è autentificazione di l'interlocutori chì utilizanu chjavi publichi pre-entrati - avemu digià decisu prima chì faciamu una reta d'amicu à amicu. Solu dopu à l'autentificazione avemu da capiscenu cù quale avemu cumunicatu;
  • dispunibilità perfettu sicretu avanti proprietà (PFS) - cumprumissu a nostra chjave di signatura di longa durata ùn deve micca purtà à a capacità di leghje tutte e currispundenza precedente. A registrazione di u trafficu interceptatu diventa inutile;
  • validità / validità di i missaghji (trasportu è handshake) solu in una sessione TCP. Inserisce missaghji firmati / autentificati currettamente da una altra sessione (ancu cù u stessu interlocutore) ùn deve esse pussibule;
  • un observatore passiu ùn deve vede nè identificatori di l'utilizatori, trasmissi chjavi publichi di longa durata, o hash da elli. Un certu anonimatu da un observatore passiu.

Sorprendentemente, quasi tutti volenu avè stu minimu in ogni protokollu di mani, è assai pocu di ciò chì sopra hè ultimamente scontru per i protokolli "casa". Avà ùn avemu micca inventà nunda di novu. Avissi sicuru d'utilizà Quadru di rumore per custruì i protokolli, ma scegliemu qualcosa di più simplice.

I dui protokolli più populari sò:

  • TLS - un protokollu assai cumplessu cù una longa storia di bugs, jambs, vulnerabilities, poviru pensamentu, cumplessità è difetti (in ogni modu, questu hà pocu à fà cù TLS 1.3). Ma ùn avemu micca cunsideratu perchè hè troppu cumplicatu.
  • IPsec с Ike - ùn anu micca prublemi di criptografia seria, ancu s'ellu ùn sò micca simplici. Se leghjite nantu à IKEv1 è IKEv2, allora a so fonte hè STS, ISO / IEC IS 9798-3 è protokolli SIGMA (SIGn-and-MAc) - abbastanza simplice per implementà in una sera.

Cosa hè bonu di SIGMA, cum'è l'ultimu ligame in u sviluppu di protokolli STS / ISO? Soddisfa tutti i nostri bisogni (inclusi l'identificatori di l'interlocutori "ocultanti") è ùn hà micca prublemi criptografichi cunnisciuti. Hè minimalista - sguassate almenu un elementu da u missaghju di u protocolu porta à a so insicurezza.

Andemu da u protocolu più simplice in casa à SIGMA. L'operazione più basica chì ci interessa hè accordu chjave: Una funzione chì pruduce i dui participanti u listessu valore, chì pò esse usatu cum'è una chjave simmetrica. Senza entre in dettagli: ognuna di i partiti genera una coppia di chjave effimera (usata solu in una sessione) (chjavi publichi è privati), scambià e chjave pubbliche, chjamate a funzione di accordu, à l'input di quale passanu a so chjave privata è u publicu. chjave di l'interlocutore.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └─────────────Id ╔══════════ ══════════╗ │───────────────>│ ║PrvA, Pub────────────>│ ║PrvA, ════════ ═══════════╝ │ IdB, PubB │ ╔═══════════════════════════════ ───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚════════════════════════════ ─ ──┐ ╔════ ═══╧════════════╗ │ ║Key = DH(PrvA, PubB)║ <───┘═══════════ ═══════ ════╝ │ │ │ │

Qualchissia pò saltà in u mezu è rimpiazzà i chjavi publichi cù u so propiu - ùn ci hè micca autentificazione di interlocutori in stu protokollu. Aghjunghjemu una firma cù chjavi longu.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └───┐ │PeerA│ │PeerB│ └──┬──┘ └──┘ └────── Sign, P─ rvA, (PubA)) │ ╔═ │──────────── ────────── ─────────────────── ───────────────────────────── = carica()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚════════ ══════ ═════════════════B══B, sign (SignPrvB, (PubB)) │ ╔═══════════════ ══════ ═══════════╀╔╔╔──╔╔╔ ────────── ────────────── ─│ ║SignPrvB, SignPubB = carica( )║ │ │ ║PrvB, PubB = ║│┐══│ DH=gen ════════ ═══════════════ ═╝ ────┐ ╔ ═════════════════════ ═════╗ │ │ ║verify( SignPubB, ...)║ │ <───┘ ║Key = DH(PrvA , PubB) ║ │ │ ╚══════════════════════════ ═╝ │ │ │

Una tale firma ùn funziona micca, postu chì ùn hè micca ligata à una sessione specifica. Tali missaghji sò ancu "adatti" per sessione cù altri participanti. Tuttu u cuntestu deve abbonate. Questu ci obbliga à aghjunghje ancu un altru missaghju da A.

Inoltre, hè criticu per aghjunghje u vostru propiu identificatore sottu à a firma, postu chì altrimente pudemu rimpiazzà IdXXX è re-signà u missaghju cù a chjave di un altru interlocutore cunnisciutu. Per prevene attacchi di riflessione, hè necessariu chì l'elementi sottu à a firma sò in lochi chjaramente definiti secondu u so significatu: se A signu (PubA, PubB), allura B deve firmà (PubB, PubA). Questu parla ancu di l'impurtanza di sceglie a struttura è u formatu di dati seriali. Per esempiu, i setti in a codificazione ASN.1 DER sò ordinati: SET OF (PubA, PubB) serà identicu à SET OF (PubB, PubA).

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └─────────────Id ╔══════════ ═════════════════╗ │─────────────────────────── ────────── ─────────────>│ ║SignPrvA, SignPubA = carica ()║ │ │ ║PrvA, PubA = DHgen() ═│════════ ═ ══════ ═══════════════╝ │IdB, PubB, sign(SignPrvB, (IdB, PubA, PubB)) │ ╔════════ ═════ ════════════╗ │<───────────────────────────── ───────── ─────────│ ║SignPrvB, SignPubB = carica()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ║ │ │ ════════════ ═ ═══════ ══════════╝ │ sign(SignPrvA, (IdA, PubB, PubA)) │ ╔═════════════════════ ═══╗ │─ ──────────────────────────────────────────── ──>│ ║verificà (SignPubB, ...)║ │ │ ║Key = DH(PrvA, PubB) ║ │ │ ╚═══════════════════════════

Tuttavia, ùn avemu ancu "pruvatu" chì avemu generatu a stessa chjave sparta per sta sessione. In principiu, pudemu fà senza stu passu - a prima cunnessione di trasportu serà invalida, ma vulemu chì quandu a stretta di mani hè finita, sariamu sicuru chì tuttu hè veramente accunsentutu. À u mumentu avemu u protocolu ISO / IEC IS 9798-3 in manu.

Pudemu firmà a chjave generata stessu. Questu hè periculosu, postu chì hè pussibule chì ci pò esse fughe in l'algoritmu di signatura utilizatu (anche se bits-per-signature, ma still leaks). Hè pussibule di firmà un hash di a chjave di derivazione, ma leaking even the hash of the derived key can be valuable in a brute-force attack on the derivation function. SIGMA usa una funzione MAC chì autentifica l'ID di mittente.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └─────────────Id ╔══════════ ═════════════════╗ │─────────────────────────── ────────── ──────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║ │ │ ─────>│ ║SignPrvA, SignPubA = load()║ │ │ ║ │ │ ║ │ │ ║PrvA│ ║PrvA ═ ══════ ════════════════════╝ │IdB, PubB, sign(SignPrvB, (PubA, PubB)), MAC═══════ ═══ ════════════════════╗ │<──────────────────── ──────── ─ ─────────────────────│ ║SignPrvB, SignPubB = carica()║ ────────│ ║SignPrvB, SignPubB = load()║ ────────│ ║SignPrvB, SignPubB = load()║ ───────────│ │ ╚════ ═ ══════════════════════╝ │ │ ╔════════════════ ════════╗ │ sign (SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │───────────────────────────── ─ ─ ─────────────────────────>│ ║verify(Key, IdB)(Chiave, IdB)(Pubblica,Significa,B) ║ │ │ ╚ ═════════════════════╝ │ │

Cum'è una ottimisazione, certi pò vulete reutilizà e so chjave effimera (chì hè, sicuru, disgrazia per PFS). Per esempiu, avemu generatu un paru chjave, pruvatu à cunnette, ma TCP ùn era micca dispunibule o hè stata interrotta in un locu in mezu à u protocolu. Hè una vergogna di perdi l'entropia è e risorse di processore in una nova coppia. Per quessa, introduceremu a chjamata cookie - un valore pseudo-aleatoriu chì prutege da pussibuli attacchi di replay aleatoriu quandu si reutilizanu chjavi publichi effimeri. A causa di u ligame trà a cookie è a chjave publica effimera, a chjave publica di u partitu oppostu pò esse eliminata da a firma cum'è innecessaria.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──────────Id. │ ╔════════ ═══════════════════╗ │─────────────────────── ────────── ──────────────────────────────────────────── >│ ║SignPrvA, SignPubA = carica ( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚════════════════════════════════ ═ ═╝ │IdB, PubB, CookieB , sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔═══════════════════════════════════ │< ──────────────────────────────────────────── ───────── ────────────────────│ ║SignPrvB, SignPubB = carica ()║ │ ─────────│ ║SignPrvB, SignPubB = carica()║ │ ──│ │ DH│ │ │ DH │ ╚══════ ═════════════════════╝ │ │ ╔══════════════════ ══════╗ │ sign( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │─────────────────────────────── ─ ──────────────────────────────────────────── ──────>│ ║ verifica (Chiave, IdB) ║ │ │ ║verify (SignPubB, ...)║ │ │ ╚═══════════════════════════ │

Infine, vulemu ottene a privacy di i nostri cumpagni di cunversazione da un observatore passiu. Per fà questu, SIGMA prupone di scambià prima chjave effimeri è sviluppà una chjave cumuna nantu à quale criptà i missaghji di autentificazione è identificazione. SIGMA descrive duie opzioni:

  • SIGMA-I - prutege l'iniziatore da attacchi attivi, u rispunsèvule da i passivi: l'iniziatore autentifica u risponditore è se qualcosa ùn currisponde micca, ùn dà micca a so identificazione. L'accusatu dà a so identificazione se un protokollu attivu hè iniziatu cun ellu. L'osservatore passiu ùn ampara nunda;
    SIGMA-R - prutege u risponditore da attacchi attivi, l'iniziatore da quelli passivi. Tuttu hè esattamente u cuntrariu, ma in questu protokollu sò digià trasmessi quattru missaggi di handshake.

    Scegliemu SIGMA-I chì hè più simili à ciò chì aspittemu da e cose familiari di u cliente-servitore: u cliente hè ricunnisciutu solu da u servitore autentificatu, è tutti cunnosci digià u servitore. In più, hè più faciule da implementà per via di menu messaggi di handshake. Tuttu ciò chì aghjustemu à u protokollu hè di criptà una parte di u messagiu è trasfiriri l'identificatore A à a parte criptata di l'ultimu messagiu:

    ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┘ └─────────── Biscotti ╔══════════ ═════════════════╗ │─────────────────────────── ────────── ──────────────────────────────────────────── ─────>│ ║SignPrvA , SignPubA = carica ()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════════════════════════════ ════╝ │ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔═════════════════════════ ═══════╗ │<───────────────────────────────── ───────── ║SignPrv B, SignPubB = carica ()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚══════════════════════════ ══════ ═╝ │ │ ╔ ════════ ═════════════╗ │ Enc sign(A(Ign), Cookie, MAC IdA) )) │ ║Chiave = DH(PrvA, PubB) ║ │────────────────────────────────── ─────── ──── ───────── ─────────────────────────────── ─>│ ║verificà (Chiave, IdB) ║ │ │ ║verify(Sign PubB, ...)║ │ │ ╚═════════════════════════════
    
    • GOST R hè utilizatu per a firma 34.10-2012 algoritmu cù chjavi di 256-bit.
    • Per generà a chjave publica, 34.10/2012/XNUMX VKO hè utilizatu.
    • CMAC hè adupratu cum'è MAC. Tecnicamente, questu hè un modu speciale di funziunamentu di un cifru di bloccu, descrittu in GOST R 34.13-2015. Cum'è una funzione di criptografia per questu modu - Saltoneru (34.12-2015).
    • U hash di a so chjave publica hè utilizatu com'è identificatore di l'interlocutore. Adupratu cum'è hash Stribog-256 (34.11/2012/256 XNUMX bits).

    Dopu à a stretta di manu, accunseremu una chjave spartuta. Pudemu aduprà per a criptografia autenticata di i missaghji di trasportu. Sta parte hè assai sèmplice è difficiuli di fà un sbagliu: aumentemu u contatore di missaghju, cripta u missaghju, autentificate (MAC) u contatore è ciphertext, mandate. Quandu riceve un missaghju, cuntrollemu chì u cuntatore hà u valore previstu, autentificate u testu cifratu cù u contatore, è u decifratu. Chì chjave deve aduprà per criptà i missaghji di handshake, i missaghji di trasportu, è cumu autentificà? Utilizà una chjave per tutte queste attività hè periculosa è imprudente. Hè necessariu di generà chjave utilizendu funzioni specializate KDF (funzione di derivazione chjave). Di novu, ùn spartemu micca i capelli è inventemu qualcosa: HKDF hè statu longu cunnisciutu, ben investigatu è ùn hà micca prublemi cunnisciuti. Sfurtunatamente, a biblioteca nativa di Python ùn hà micca sta funzione, cusì avemu aduprà hkdf saccu di plastica. HKDF usa internamente HMAC, chì à u turnu usa una funzione hash. Un esempiu di implementazione in Python nantu à a pagina di Wikipedia piglia solu uni pochi di linee di codice. Cum'è in u casu di 34.10/2012/256, avemu aduprà Stribog-XNUMX cum'è a funzione hash. L'output di a nostra funzione d'accordu chjave serà chjamatu a chjave di sessione, da quale seranu generati i simmetrici mancanti:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Strutture/Schemi

    Fighjemu ciò chì strutture ASN.1 avemu avà per trasmette tutte queste dati:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS hè ciò chì serà firmatu. HandshakeTBE - ciò chì serà criptatu. Aghju attiratu a vostra attenzione à u campu ukm in MsgHandshake1. 34.10 VKO, per una randomizazione ancu più grande di e chjavi generati, include u paràmetru UKM (materiale di chjave d'utilizatore) - solu entropia supplementu.

    Aghjunghje a criptografia à u codice

    Cunsideremu solu i cambiamenti fatti à u codice uriginale, postu chì u quadru era u listessu (in fattu, l'implementazione finali hè stata scritta prima, è dopu tutta a criptografia hè stata tagliata).

    Siccomu l'autentificazione è l'identificazione di l'interlocutori seranu realizati cù e chjave pubbliche, avà deve esse guardatu in qualchì locu per un bellu pezzu. Per simplicità, usemu JSON cusì:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    u nostru - a nostra coppia di chjave, chjavi privati ​​​​è publiche esadecimali. i so - nomi di interlocutori è e so chjave publiche. Cambiemu l'argumenti di a linea di cummanda è aghjunghje post-processazione di dati JSON:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    A chjave privata di l'algoritmu 34.10 hè un numeru aleatoriu. Dimensioni di 256 bit per curve ellittiche a 256 bit. PyGOST ùn travaglia micca cù un set di bytes, ma cù gran numaru, cusì a nostra chjave privata (urandom (32)) deve esse cunvertita in un numeru usendu gost3410.prv_unmarshal (). A chjave publica hè determinata deterministicamente da a chjave privata cù gost3410.public_key (). A chjave publica 34.10 hè dui numeri grandi chì anu da esse cunvertiti ancu in una sequenza di byte per facilità di almacenamiento è trasmissione cù gost3410.pub_marshal().

    Dopu avè lettu u schedariu JSON, i chjavi publichi anu da esse cunvertiti torna cù gost3410.pub_unmarshal(). Siccomu riceveremu l'identificatori di l'interlocutori in a forma di un hash da a chjave publica, ponu esse immediatamente calculati in anticipu è posti in un dizziunariu per a ricerca rapida. Stribog-256 hash hè gost34112012256.GOST34112012256 (), chì cumplettamente satisface l'interfaccia hashlib di funzioni hash.

    Cumu hè cambiatu a coroutine di l'iniziatore? Tuttu hè cum'è per u schema di handshake: generà una cookie (128-bit hè abbastanza), una coppia di chjave effimera 34.10, chì serà utilizata per a funzione di l'accordu chjave VKO.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • aspittemu una risposta è decode u missaghju Msg entrata;
    • assicuratevi di ottene handshake1;
    • decodificà a chjave publica effimera di u partitu oppostu è calculate a chjave di sessione;
    • Generemu chjavi simmetrici necessarii per processà a parte TBE di u messagiu.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM hè un numeru di 64 bit (urandom(8)), chì richiede ancu a deserializazione da a so rapprisentazione di byte cù gost3410_vko.ukm_unmarshal(). A funzione VKO per 34.10/2012/256 3410-bit hè gost34102012256_vko.kek_XNUMX() (KEK - chjave di criptografia).

    A chjave di sessione generata hè digià una sequenza di byte pseudo-aleatoriu di 256 bit. Per quessa, si pò ièssiri subitu usatu in funzioni HKDF. Siccomu GOST34112012256 satisface l'interfaccia hashlib, pò esse immediatamente utilizatu in a classa Hkdf. Ùn avemu micca specificà u sali (u primu argumentu di Hkdf), postu chì a chjave generata, per via di l'efimeralità di i coppie di chjave participanti, serà diversu per ogni sessione è cuntene digià abbastanza entropia. kdf.expand() per difettu produce digià e chjave di 256-bit necessarie per Grasshopper più tardi.

    In seguitu, e parte TBE è TBS di u messagiu entrante sò verificate:

    • u MAC nantu à u ciphertext entrante hè calculatu è verificatu;
    • u ciphertext hè decifratu;
    • struttura TBE hè decodificata;
    • l'identificatore di l'interlocutore hè pigliatu da ellu è hè verificatu s'ellu hè cunnisciutu da noi in tuttu;
    • MAC sopra stu identificatore hè calculatu è verificatu;
    • a firma nantu à a struttura TBS hè verificata, chì include u cookie di i dui partiti è a chjave publica effimera di u partitu oppostu. A firma hè verificata da a chjave di firma longa di l'interlocutore.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Cumu aghju scrittu sopra, 34.13/2015/XNUMX descrive diverse bloccà i modi operativi di cifru da u 34.12/2015/3413. Trà elli ci hè un modu per generà inserti d'imitazione è calculi MAC. In PyGOST questu hè gost34.12.mac (). Stu modu hè bisognu di passà a funzione di criptografia (riceve è rinvià un bloccu di dati), a dimensione di u bloccu di criptografia è, in fattu, i dati stessi. Perchè ùn pudete micca codificà a dimensione di u bloccu di criptografia? 2015/128/64 descrive micca solu u cifru Grasshopper XNUMX-bit, ma ancu u XNUMX-bit. Magma - un GOST ligeramente mudificatu 28147-89, creatu torna in u KGB è hà sempre unu di i più alti limiti di sicurezza.

    Kuznechik hè inizializatu chjamendu gost.3412.GOST3412Kuznechik(key) è torna un ughjettu cù metudi .encrypt()/.decrypt() adattati per passà à e funzioni 34.13. MAC hè calculatu cusì: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext). Per paragunà u MAC calculatu è ricivutu, ùn pudete micca aduprà a paraguna abituale (==) di stringhe di byte, postu chì sta operazione perde u tempu di paragone, chì, in u casu generale, pò purtà à vulnerabili fatali cum'è BÈT attacchi à TLS. Python hà una funzione speciale, hmac.compare_digest, per questu.

    A funzione di criptu di bloccu pò criptà solu un bloccu di dati. Per un numaru più grande, è ancu micca un multiplu di a durata, hè necessariu di utilizà u modu di criptografia. 34.13-2015 descrive i seguenti: ECB, CTR, OFB, CBC, CFB. Ognunu hà i so propri spazii accettabili d'applicazione è caratteristiche. Sfortunatamente, ùn avemu micca standardizatu modi di criptografia autenticati (cum'è CCM, OCB, GCM è simili) - simu furzati à almenu aghjunghje MAC noi stessi. I sceglie modu contatore (CTR): ùn hè micca bisognu di padding à a dimensione di u bloccu, pò esse parallelizatu, usa solu a funzione di criptografia, pò esse usata in modu sicuru per criptà un gran numaru di messagi (a cuntrariu di CBC, chì hà collisioni relativamente rapidamente).

    Cum'è .mac (), .ctr () piglia input simili: ciphertext = gost3413.ctr (GOST3412Kuznechik (key).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). Hè necessariu di specificà un vettore d'inizializazione chì hè esattamente a mità di a lunghezza di u bloccu di criptografia. Se a nostra chjave di criptografia hè aduprata solu per criptà un missaghju (ancu da parechji blocchi), allora hè sicuru per stabilisce un vettore d'inizializazione zero. Per criptà i missaghji di handshake, usemu una chjave separata ogni volta.

    A verificazione di a firma gost3410.verify () hè triviale: passemu a curva ellittica in u quale avemu travagliatu (simplicemente l'arregistremu in u nostru protokollu GOSTIM), a chjave publica di u firmante (ùn vi scurdate chì questu deve esse una tupla di dui). grandi numeri, è micca una stringa di byte), 34.11/2012/XNUMX hash è a firma stessa.

    In seguitu, in l'iniziatore preparamu è mandemu un missaghju di handshake à handshake2, eseguendu e stesse azzioni chì avemu fattu durante a verificazione, solu simmetricamente: firmà nantu à e nostre chjave invece di verificà, etc...

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Quandu a sessione hè stabilita, i chjavi di trasportu sò generati (una chjave separata per a criptografia, per l'autentificazione, per ognuna di i partiti), è u Grasshopper hè inizializatu per decriptà è verificate u MAC:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    A coroutine msg_sender ora cripta i missaghji prima di mandà à una cunnessione TCP. Ogni missaghju hà un nonce monotonicamente crescente, chì hè ancu u vettore d'inizializazione quandu hè criptatu in u modu contatore. Ogni missaghju è bloccu di messagiu hè garantitu per avè un valore di cuntrariu sfarente.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    I missaghji entranti sò trattati da a corotina msg_receiver, chì gestisce l'autentificazione è a decifrazione:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    cunchiusioni

    GOSTIM hè destinatu à esse usatu solu per scopi educativi (perchè ùn hè micca cupartu da teste, almenu)! U codice fonte di u prugramma pò esse telecaricatu ccà (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, GoVPN, GOSTIM hè cumpletamente software gratuitu, distribuitu sottu i termini GPLv3 +.

    Sergey Matveev, cypherpunk, membru Fundazione SPO, sviluppatore Python/Go, specialista capu FSUE "STC "Atlas".

Source: www.habr.com

Cumprate un hosting affidabile per i siti cù prutezzione DDoS, servitori VPS VDS 🔥 Cumprate un hosting di siti web affidabile cù prutezzione DDoS, servitori VPS VDS | ProHoster