GOSTIM: P2P F2F E2EE IM per vieną vakarą su GOST kriptografija

Būdamas kūrėju PyGOST bibliotekos (GOST kriptografiniai primityvai gryname Python), dažnai sulaukiu klausimų, kaip įdiegti paprasčiausią saugų pranešimų siuntimą ant kelio. Daugelis žmonių mano, kad taikomoji kriptografija yra gana paprasta, ir pakaks iškviesti .encrypt() blokiniame šifre, kad jį būtų galima saugiai išsiųsti ryšio kanalu. Kiti mano, kad taikomoji kriptografija yra nedaugelio žmonių likimas, ir priimtina, kad tokios turtingos įmonės kaip „Telegram“ su olimpiados matematikais negali įgyvendinti saugus protokolas.

Visa tai paskatino mane parašyti šį straipsnį, kad parodyčiau, jog kriptografinių protokolų ir saugaus MP diegimas nėra tokia sudėtinga užduotis. Tačiau neverta sugalvoti savo autentifikavimo ir pagrindinių susitarimų protokolų.

GOSTIM: P2P F2F E2EE IM per vieną vakarą su GOST kriptografija
Straipsnis bus parašytas "peer-to-peer", draugas draugui, šifruojama nuo galo iki galo momentinis pasiuntinys su SIGMA-I autentifikavimo ir rakto sutarties protokolas (kurio pagrindu IPsec IKE), naudojant išskirtinai GOST kriptografinius algoritmus PyGOST biblioteka ir ASN.1 pranešimų kodavimo biblioteka PyDERASN (apie kurią aš jau rašė anksčiau). Būtina sąlyga: ji turi būti tokia paprasta, kad ją būtų galima parašyti nuo nulio per vieną vakarą (arba darbo dieną), kitaip tai nebėra paprasta programa. Tikriausiai joje yra klaidų, nereikalingų komplikacijų, trūkumų, be to, tai yra mano pirmoji programa, naudojanti asyncio biblioteką.

IM dizainas

Pirmiausia turime suprasti, kaip atrodys mūsų IM. Dėl paprastumo leiskite tai būti lygiaverčiu tinklu, neatrandant dalyvių. Asmeniškai nurodysime, kuriuo adresu: prievadu jungtis bendrauti su pašnekovu.

Suprantu, kad šiuo metu prielaida, kad galimas tiesioginis ryšys tarp dviejų savavališkų kompiuterių, yra reikšmingas IM taikymo praktikoje apribojimas. Tačiau kuo daugiau kūrėjų įdiegs visokius NAT perėjimo ramentus, tuo ilgiau liksime IPv4 internete su slegiančia tikimybe bendrauti tarp savavališkų kompiuterių. Kiek laiko galite toleruoti IPv6 trūkumą namuose ir darbe?

Turėsime draugų tinklą: visi galimi pašnekovai turi būti žinomi iš anksto. Pirma, tai labai viską supaprastina: prisistatėme, radome arba neradome vardo/rakto, atsijungėme arba tęsiame darbą, pažindami pašnekovą. Antra, apskritai jis yra saugus ir pašalina daugybę atakų.

IM sąsaja bus artima klasikiniams sprendimams neskanūs projektai, kurie man labai patinka dėl minimalizmo ir Unix būdo filosofijos. IM programa kiekvienam pašnekovui sukuria katalogą su trimis Unix domeno lizdais:

  • in—jame įrašomi pašnekovui siunčiami pranešimai;
  • out - iš jo skaitomi iš pašnekovo gauti pranešimai;
  • būsena – iš jos skaitydami sužinome, ar pašnekovas šiuo metu prisijungęs, ryšio adresą/prievadą.

Be to, sukuriamas conn lizdas, įrašant pagrindinio kompiuterio prievadą, į kurį inicijuojame ryšį su nuotoliniu pašnekovu.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Šis metodas leidžia savarankiškai įgyvendinti IM transportą ir vartotojo sąsają, nes nėra draugo, negali įtikti visiems. Naudojant tmux ir (arba) daugiauodegė, galite gauti kelių langų sąsają su sintaksės paryškinimu. Ir su pagalba rlwrap galite gauti su GNU Readline suderinamą pranešimų įvesties eilutę.

Tiesą sakant, neskanūs projektai naudoja FIFO failus. Asmeniškai aš negalėjau suprasti, kaip konkurencingai dirbti su failais asinchroniškai be ranka rašyto fono iš tam skirtų gijų (aš ilgą laiką naudoju kalbą tokiems dalykams Go). Todėl nusprendžiau tenkintis su Unix domeno lizdais. Deja, dėl to neįmanoma atlikti echo 2001:470:dead::babe 6666 > conn. Aš išsprendžiau šią problemą naudodamas socat: echo 2001:470:dead::babe 6666 | socat - UNIX-CONNECT:jungtis, socat READLINE UNIX-CONNECT:alice/in.

Originalus nesaugus protokolas

TCP naudojamas kaip transportas: garantuoja pristatymą ir jo užsakymą. UDP negarantuoja nei vieno, nei kito (kas būtų naudinga, kai naudojama kriptografija), o palaikymą SCTP Python neišeina iš dėžutės.

Deja, TCP nėra pranešimo sąvokos, tik baitų srautas. Todėl būtina sugalvoti pranešimų formatą, kad jais būtų galima dalytis tarpusavyje šioje temoje. Galime sutikti naudoti eilutės tiekimo simbolį. Pradedantiesiems tai tinka, bet kai tik pradedame šifruoti savo pranešimus, šis simbolis gali pasirodyti bet kurioje šifruoto teksto vietoje. Todėl tinkluose populiarūs yra tie protokolai, kurie pirmiausia siunčia pranešimo ilgį baitais. Pavyzdžiui, „Python“ turi xdrlib, leidžiantį dirbti su panašiu formatu XDR.

Su TCP skaitymu nedirbsime teisingai ir efektyviai – supaprastinsime kodą. Mes skaitome duomenis iš lizdo begaliniu ciklu, kol iššifruojame visą pranešimą. JSON su XML taip pat gali būti naudojamas kaip šio metodo formatas. Tačiau kai pridedama kriptografija, duomenys turės būti pasirašyti ir autentifikuoti – tam reikės baitais identiško objektų atvaizdavimo, ko JSON/XML nepateikia (iškeltų rezultatai gali skirtis).

XDR tinka šiai užduočiai, tačiau aš renkuosi ASN.1 su DER kodavimu ir PyDERASN biblioteką, nes po ranka turėsime aukšto lygio objektus, su kuriais dažnai maloniau ir patogiau dirbti. Skirtingai nei beschemos bencode, MessagePack arba CBOR., ASN.1 automatiškai patikrins duomenis pagal užkoduotą schemą.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

Gautas pranešimas bus Msg: arba tekstinis MsgText (kol kas su vienu teksto lauku) arba MsgHandshake rankos paspaudimo pranešimas (kuriame yra pašnekovo vardas). Dabar tai atrodo pernelyg sudėtinga, bet tai yra ateities pagrindas.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬───┬───s(H) d A) │ │──────── ─ ────────>│ │ │ │MsgHandshake (IdB) │ │<────────────────────────────── │ │ │ MsgText() │ │─── ─ MsgText() │ │ │

IM be kriptografijos

Kaip jau sakiau, asyncio biblioteka bus naudojama visoms lizdo operacijoms. Pranešame, ko tikimės pristatydami:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Nustatykite savo vardą (--mūsų vardas alisa). Visi laukiami pašnekovai išvardyti atskiriant kableliais (-jų vardai bob,eve). Kiekvienam pašnekovui sukuriamas katalogas su Unix lizdais, taip pat kiekvienos įėjimo, išėjimo, būsenos koruna:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

Pranešimai, gaunami iš vartotojo iš įvesties lizdo, siunčiami į IN_QUEUES eilę:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

Iš pašnekovų gaunami pranešimai siunčiami į OUT_QUEUES eiles, iš kurių duomenys įrašomi į išvesties lizdą:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Skaitydama iš būsenos lizdo, programa PEER_ALIVE žodyne ieško pašnekovo adreso. Jei dar nėra ryšio su pašnekovu, tada rašoma tuščia eilutė.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Rašant adresą į jungties lizdą, paleidžiama ryšio „iniciatoriaus“ funkcija:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Apsvarstykime iniciatorių. Pirmiausia jis akivaizdžiai atidaro ryšį su nurodytu pagrindiniu kompiuteriu / prievadu ir išsiunčia rankos paspaudimo pranešimą su savo pavadinimu:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Tada jis laukia atsakymo iš nuotolinės šalies. Bando iššifruoti gaunamą atsakymą naudodamas Msg ASN.1 schemą. Mes darome prielaidą, kad visas pranešimas bus išsiųstas viename TCP segmente ir mes jį gausime atomiškai, kai skambinsime .read(). Patikriname, ar gavome rankos paspaudimo pranešimą.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Patikriname, ar gautas pašnekovo vardas mums žinomas. Jei ne, nutraukiame ryšį. Patikriname, ar jau užmezgėme su juo ryšį (pašnekovas vėl davė komandą prisijungti prie mūsų) ir jį uždarome. Eilėje IN_QUEUES yra Python eilutės su pranešimo tekstu, tačiau turi specialią reikšmę None, kuri signalizuoja msg_sender korutinai nustoti veikti, kad ji pamirštų apie savo rašytoją, susietą su senu TCP ryšiu.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender priima siunčiamus pranešimus (sudėtus į eilę iš lizdo), suskirsto juos į MsgText pranešimą ir siunčia juos TCP ryšiu. Jis gali lūžti bet kurią akimirką – mes aiškiai tai perimame.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

Pabaigoje iniciatorius įveda begalinę pranešimų skaitymo kilpą iš lizdo. Patikrina, ar šios žinutės yra tekstinės žinutės, ir įdeda jas į OUT_QUEUES eilę, iš kurios bus siunčiamos į atitinkamo pašnekovo išvesties lizdą. Kodėl negalite tiesiog padaryti .read() ir iššifruoti pranešimo? Nes gali būti, kad kelios vartotojo žinutės bus sukauptos operacinės sistemos buferyje ir išsiųstos viename TCP segmente. Mes galime iššifruoti pirmąjį, o tada dalis sekančio gali likti buferyje. Esant bet kokiai neįprastai situacijai, mes uždarome TCP ryšį ir sustabdome msg_sender korutine (į OUT_QUEUES eilę nusiųsdami None).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Grįžkime prie pagrindinio kodo. Sukūrę visas korutinas tuo metu, kai programa paleidžiama, paleidžiame TCP serverį. Kiekvienam užmegztam ryšiui sukuriama atsakiklio koruna.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

atsakiklis yra panašus į iniciatorių ir atspindi visus tuos pačius veiksmus, tačiau begalinis pranešimų skaitymo ciklas paprastumo dėlei prasideda iš karto. Šiuo metu rankos paspaudimo protokolas siunčia po vieną žinutę iš abiejų pusių, tačiau ateityje bus dvi iš ryšio iniciatoriaus, po kurių iškart bus galima siųsti trumpąsias žinutes.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Saugus protokolas

Atėjo laikas apsaugoti mūsų ryšius. Ką reiškia saugumas ir ko norime:

  • perduodamų pranešimų konfidencialumas;
  • perduodamų pranešimų autentiškumas ir vientisumas – turi būti aptikti jų pokyčiai;
  • apsauga nuo pakartojimo atakų – turi būti nustatytas trūkstamų arba pasikartojančių pranešimų faktas (ir mes nusprendžiame nutraukti ryšį);
  • pašnekovų identifikavimas ir autentifikavimas naudojant iš anksto įvestus viešuosius raktus – jau anksčiau nusprendėme, kad kuriame draugas-draugui tinklą. Tik po autentifikavimo suprasime, su kuo bendraujame;
  • prieinamumas tobulas išankstinis slaptumas ypatybės (PFS) – pažeidžiant mūsų ilgaamžį pasirašymo raktą neturėtų atsirasti galimybė perskaityti visą ankstesnę korespondenciją. Perimto srauto įrašymas tampa nenaudingas;
  • pranešimų (transportavimo ir rankos paspaudimo) galiojimas / galiojimas tik per vieną TCP seansą. Teisingai pasirašytų/autentifikuotų pranešimų įterpimas iš kitos sesijos (net su tuo pačiu pašnekovu) neturėtų būti įmanomas;
  • pasyvus stebėtojas neturėtų matyti nei vartotojo identifikatorių, nei perduotų ilgalaikių viešųjų raktų, nei maišos iš jų. Tam tikras anonimiškumas iš pasyvaus stebėtojo.

Keista, bet beveik visi nori turėti šį minimumą bet kuriame rankos paspaudimo protokole, o „naminių“ protokolų atveju labai mažai iš aukščiau paminėtų dalykų. Dabar nieko naujo nesugalvosime. Tikrai rekomenduočiau naudoti Triukšmo karkasas pastatymo protokolams, bet renkamės ką nors paprastesnio.

Du populiariausi protokolai yra šie:

  • TLS - labai sudėtingas protokolas, turintis ilgą klaidų, strigčių, pažeidžiamumų, prasto mąstymo, sudėtingumo ir trūkumų istoriją (tačiau tai mažai ką bendro su TLS 1.3). Bet mes to nesvarstome, nes tai per daug sudėtinga.
  • IPsec с IKE — neturi rimtų kriptografinių problemų, nors jos taip pat nėra paprastos. Jei skaitote apie IKEv1 ir IKEv2, tada jų šaltinis yra STS, ISO/IEC IS 9798-3 ir SIGMA (SIGn-and-MAc) protokolai – pakankamai paprasti, kad juos būtų galima įdiegti per vieną vakarą.

Kuo naudinga SIGMA, kaip naujausia STS/ISO protokolų kūrimo grandis? Jis atitinka visus mūsų reikalavimus (įskaitant pašnekovo identifikatorių „slėpimą“) ir neturi žinomų kriptografinių problemų. Jis yra minimalistinis – pašalinus bent vieną elementą iš protokolo pranešimo, jis bus nesaugus.

Nuo paprasčiausio namuose auginamo protokolo pereikime prie SIGMA. Pati pagrindinė mus dominanti operacija yra pagrindinis susitarimas: funkcija, kuri išveda abiem dalyviams tą pačią reikšmę, kurią galima naudoti kaip simetrinį raktą. Nesileidžiant į smulkmenas: kiekviena iš šalių sukuria trumpalaikę (naudojamą tik per vieną seansą) raktų porą (viešuosius ir privačius raktus), keičiasi viešaisiais raktais, iškviečia susitarimo funkciją, kurios įėjimui perduoda savo privatųjį raktą ir viešąjį. pašnekovo raktas.

┌- │ ╔══════════ ══════════╗ │───────────────>│ ║║PrvA, Pu ════════ - ───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚═══════════════════════════════════════════════ ─ ──┐ ╔════ ═══╧════════════╗ │ ║Raktas = DH(PrvA, PubB)║ <───═════════════ ═══════ ════╝ │ │ │ │

Kiekvienas gali peršokti per vidurį ir pakeisti viešuosius raktus savo – pašnekovų autentifikavimo šiame protokole nėra. Pridėkime parašą su ilgaamžiais raktais.

┌─────┐ ženklas SignPrvA, (PubA)) │ ╔═ Pasižymėkite A = apkrova()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚════════ ══════ ══════════════════════════════════════════│B, Pub, I❔════ (SignPrvB, (PubB)) │ ╔═══════════════ ══════ ────────── ────────────── ─│ ║SignPrvB, SignPubB = apkrova( )║ │ │ ║PrvB, ││║║│║┕│║ ═════════ ═══════════════ ═╝ ────┐╔ ═════╗ │ │ ║patikrinti( SignPubB, ...)║ │ <───┘ ║Key = DH(PrvA , PubB) ║ │ │ ╚════════════════════════════════════════════ ═╝ │ │ │

Toks parašas neveiks, nes jis nesusietas su konkrečia sesija. Tokie pranešimai taip pat „tinka“ seansams su kitais dalyviais. Visas kontekstas turi pasirašyti. Tai verčia mus taip pat pridėti dar vieną pranešimą nuo A.

Be to, labai svarbu po parašu pridėti savo identifikatorių, nes kitu atveju galime pakeisti IdXXX ir iš naujo pasirašyti pranešimą kito žinomo pašnekovo raktu. Apsaugoti refleksijos priepuoliai, būtina, kad elementai po parašu būtų aiškiai apibrėžtose vietose pagal savo reikšmę: jei A žymi (PubA, PubB), tai B turi pasirašyti (PubB, PubA). Tai taip pat byloja apie nuosekliųjų duomenų struktūros ir formato pasirinkimo svarbą. Pavyzdžiui, ASN.1 DER kodavimo rinkiniai yra rūšiuojami: SET OF(PubA, PubB) bus identiškas SET OF(PubB, PubA).

┌- │ ╔══════════ ═════════════════╗ ────────── ─────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen()││ ═ ══════ ═══════════════╝ │IdB, PubB, ženklas(SignPrvB, (IdB, PubA, PubB)) │═══════════ ═════ ════════════╗ │<───────────────────────────── ───────── ─────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ ───│ ║ │ ───│═══ ═ ═══════ ══════════╝ │ ženklas(SignPrvA, (IdA, PubB, PubA)) │ ╔═════════════════════════════════════════ ═══╗ │─ ───────────────────────────────- ──>│ ║patikrinti(SignPubB, ...)║ │ │ ║ Raktas = DH(PrvA, PubB) ║ │ │ ╚═══════════════════════════════════════════════════

Tačiau mes vis dar neįrodėme, kad šiai sesijai sukūrėme tą patį bendrinamą raktą. Iš principo galime apsieiti ir be šio žingsnio – nebegalios pats pirmas transporto susisiekimas, bet norime, kad baigus rankos paspaudimą būtume tikri, kad dėl visko tikrai sutarta. Šiuo metu turime ISO/IEC IS 9798-3 protokolą.

Galėtume pasirašyti patį sugeneruotą raktą. Tai pavojinga, nes gali būti, kad naudojamas parašo algoritmas gali nutekėti (nors ir bitai per parašą, bet vis tiek nutekėjimas). Galima pasirašyti išvedimo rakto maišą, tačiau net išvestinio rakto maišos nutekėjimas gali būti vertingas brutaliosios jėgos atakoje prieš išvedimo funkciją. SIGMA naudoja MAC funkciją, kuri patvirtina siuntėjo ID.

┌- │ ╔══════════ ═════════════════╗ ────────── ──────────────────>│ ║SignPrvA, SignPubA = apkrova()║ │ ││║│││││││││││Pr ═ ══════ ════════════════════╝ │IdB, PubB, ženklas(SignPrvB, (PubA═, PubB)╔┐(IdB)═══ ═══ ' ──────── ─ ─────────────────────│ ║SignPrvB, SignPubB = apkrova()║║│v,gen()║║││ │ ╚════ ═ ══════════════════════╝ │ │ ════════╗ │ ženklas (SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │───────────────────────────────────── ─ ─ ────────────────────────>│B ║patikrinti (raktas, IDB)││││n. ║ │ │ ╚ ═════════════════════╝ │ │

Siekiant optimizavimo, kai kurie gali norėti pakartotinai naudoti savo trumpalaikius raktus (o tai, žinoma, gaila PFS). Pavyzdžiui, sugeneravome raktų porą, bandėme prisijungti, bet TCP nepasiekiamas arba buvo nutrauktas kažkur protokolo viduryje. Gaila švaistyti švaistomus entropijos ir procesoriaus išteklius naujai porai. Todėl pristatysime taip vadinamą slapuką – pseudoatsitiktinę reikšmę, kuri apsaugos nuo galimų atsitiktinių pakartojimų atakų pakartotinai naudojant trumpalaikius viešuosius raktus. Dėl slapuko ir trumpalaikio viešojo rakto susiejimo priešingos šalies viešasis raktas gali būti pašalintas iš parašo kaip nereikalingas.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬───┬───┬──┘ ─│─┘ ───, I─── ieA │ ╔════════ ═══════════════════╗ │─────────────────- ────────── ───────────────────────────────- >│ ║SignPrvA, SignPubA = load( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚══════════════════════════════════════════════════ ═ ═╝ │IdB, PubB, CookieB , ženklas(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔══════════════════════════════════════════════ ╗ │< ───────────────────────────────- ───────── ───────────────────│ ║SignPrvB, SignPubB = apkrova()║ ┕│rB, (Pub) │ ╚══════ ═════════════════════╝ │ │ │ ══════╗ │ ženklas( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Raktas = DH(PrvA, PubB) ║ │───────────────────────────────────── ─ ─ ───────────────────────────────- ──────>│ ║ patvirtinti(raktas, IdB) ║ │ │ ║patikrinti(SignPubB, ...)║ │ │ ╚═════════════════════════════════════════════════ │

Galiausiai norime gauti savo pokalbio partnerių privatumą iš pasyvaus stebėtojo. Norėdami tai padaryti, SIGMA siūlo pirmiausia apsikeisti trumpalaikiais raktais ir sukurti bendrą raktą, kuriuo būtų galima užšifruoti autentifikavimo ir identifikavimo pranešimus. SIGMA aprašo dvi parinktis:

  • SIGMA-I - apsaugo iniciatorių nuo aktyvių atakų, atsakytoją nuo pasyviųjų: iniciatorius autentifikuoja atsakytoją ir jei kažkas nesutampa, tai neišduoda savo identifikacijos. Kaltinamasis išduoda savo tapatybę, jei su juo pradedamas aktyvus protokolas. Pasyvus stebėtojas nieko neišmoksta;
    SIGMA-R – apsaugo atsakytoją nuo aktyvių atakų, iniciatorių nuo pasyvių. Viskas yra visiškai priešingai, tačiau šiame protokole jau perduodami keturi rankos paspaudimo pranešimai.

    Mes pasirenkame SIGMA-I, nes jis yra panašesnis į tai, ko tikimės iš kliento-serverio pažįstamų dalykų: klientą atpažįsta tik autentifikuotas serveris, o serverį jau žino visi. Be to, jį lengviau įgyvendinti, nes mažiau pranešama rankos paspaudimu. Viskas, ką pridedame prie protokolo, yra užšifruoti dalį pranešimo ir perkelti identifikatorių A į užšifruotą paskutinio pranešimo dalį:

    ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬───┬───┬──┘ ───┘ ─────── │ ╔══════════ ═════════════════╗ ────────── ───────────────────────────────- ─────>│ ║SignPrvA , SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚════════════════════════════════════════════════ ════╝ │ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔════════════════════════════ ═══════╗ │<─────────────────────────────── ───────── ║SignPrv B, SignPubB = load()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚════════════════════════════════════════ ══════ ═╝ │ │ ╔ ════════ ═════════════════╗═══╗ │ A,P,Cob,okie,((Id) ), MAC (IdA) )) │ ║ Raktas = DH (PrvA, PubB) ║ │────────────────────────────────────────── ────── ──── ───────── ────────────────────- ─>│ ║patikrinti (raktas, IdB) ║ │ │ ║patvirtinti(Pasirašykite PubB, ...)║ │ │ ╚═════════════════════════════════════╂╂════
    
    • GOST R naudojamas parašui 34.10-2012 algoritmas su 256 bitų raktais.
    • Viešajam raktui generuoti naudojamas 34.10-2012-XNUMX VKO.
    • CMAC naudojamas kaip MAC. Techniškai tai yra specialus blokinio šifro veikimo režimas, aprašytas GOST R 34.13-2015. Kaip šio režimo šifravimo funkcija − Žiogas (34.12-2015).
    • Jo viešojo rakto maiša naudojama kaip pašnekovo identifikatorius. Naudojamas kaip maišas Stribog-256 (34.11-2012-256 XNUMX bitai).

    Po rankos paspaudimo susitarsime dėl bendro rakto. Galime jį naudoti autentifikuotam transportavimo pranešimų šifravimui. Ši dalis yra labai paprasta ir sunku suklysti: padidiname pranešimų skaitiklį, užšifruojame pranešimą, autentifikuojame (MAC) skaitiklį ir šifruotą tekstą, siunčiame. Gavę pranešimą patikriname, ar skaitiklis turi laukiamą reikšmę, skaitikliu autentifikuojame šifruotą tekstą ir jį iššifruojame. Kokį raktą turėčiau naudoti norint užšifruoti rankos paspaudimo pranešimus, perduoti pranešimus ir kaip juos autentifikuoti? Visoms šioms užduotims naudoti vieną raktą yra pavojinga ir neprotinga. Būtina generuoti raktus naudojant specializuotas funkcijas KDF (raktų išvedimo funkcija). Vėlgi, neskaldykime plaukų ir nieko nesugalvokime: HKDF jau seniai žinomas, gerai ištirtas ir neturi jokių žinomų problemų. Deja, gimtoji Python biblioteka šios funkcijos neturi, todėl naudojame hkdf plastikinis maišelis. HKDF naudoja viduje HMAC, kuri savo ruožtu naudoja maišos funkciją. „Python“ diegimo pavyzdys Vikipedijos puslapyje užima vos kelias kodo eilutes. Kaip ir 34.10-2012-256 atveju, kaip maišos funkciją naudosime Stribog-XNUMX. Mūsų rakto susitarimo funkcijos išvestis bus vadinama seanso raktu, iš kurio bus generuojami trūkstami simetriški:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Struktūros/schemos

    Pažiūrėkime, kokias ASN.1 struktūras dabar turime visiems šiems duomenims perduoti:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    Bus pasirašyta HandshakeTBS. HandshakeTBE – kas bus užšifruota. Atkreipiu jūsų dėmesį į ukm lauką MsgHandshake1. 34.10 VKO, siekiant dar didesnio generuojamų raktų atsitiktinio atskyrimo, apima UKM (vartotojo įrakinimo medžiagos) parametrą – tik papildomą entropiją.

    Kriptografijos pridėjimas prie kodo

    Apsvarstykime tik pradinio kodo pakeitimus, nes sistema išliko ta pati (tiesą sakant, pirmiausia buvo parašytas galutinis įgyvendinimas, o tada iš jo buvo iškirpta visa kriptografija).

    Kadangi pašnekovų autentifikavimas ir identifikavimas bus atliekami naudojant viešuosius raktus, dabar juos reikia kažkur saugoti ilgą laiką. Dėl paprastumo naudojame JSON taip:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    mūsų – mūsų raktų pora, šešioliktainiai privatūs ir viešieji raktai. jų — pašnekovų vardai ir jų viešieji raktai. Pakeiskime komandinės eilutės argumentus ir pridėkime JSON duomenų papildomą apdorojimą:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    34.10 algoritmo privatusis raktas yra atsitiktinis skaičius. 256 bitų dydis 256 bitų elipsinėms kreivėms. PyGOST veikia ne su baitų rinkiniu, o su dideli skaičiai, todėl mūsų privatus raktas (urandom(32)) turi būti konvertuojamas į skaičių naudojant gost3410.prv_unmarshal(). Viešasis raktas nustatomas deterministiškai iš privataus rakto naudojant gost3410.public_key(). Viešasis raktas 34.10 yra du dideli skaičiai, kuriuos taip pat reikia konvertuoti į baitų seką, kad būtų lengviau saugoti ir perduoti naudojant gost3410.pub_marshal().

    Perskaičius JSON failą, viešuosius raktus reikia konvertuoti atgal naudojant gost3410.pub_unmarshal(). Kadangi pašnekovų identifikatorius gausime maišos pavidalu iš viešojo rakto, juos galima iš karto iš anksto apskaičiuoti ir įdėti į žodyną greitai paieškai. Stribog-256 maiša yra gost34112012256.GOST34112012256(), kuri visiškai atitinka maišos funkcijų maišos sąsają.

    Kaip pasikeitė iniciatoriaus koruna? Viskas pagal rankos paspaudimo schemą: sugeneruojame slapuką (128 bitų pakanka), trumpalaikę raktų porą 34.10, kuri bus naudojama VKO rakto susitarimo funkcijai.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • laukiame atsakymo ir dekoduojame gaunamą žinutę;
    • įsitikinkite, kad gausite rankos paspaudimą1;
    • iššifruoti efemerinį priešingos šalies viešąjį raktą ir apskaičiuoti seanso raktą;
    • Sugeneruojame simetrinius raktus, reikalingus pranešimo TBE daliai apdoroti.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM yra 64 bitų skaičius (urandom (8)), kuris taip pat reikalauja deserializacijos iš jo baitų atvaizdavimo naudojant gost3410_vko.ukm_unmarshal(). 34.10-2012-256 3410 bitų VKO funkcija yra gost34102012256_vko.kek_XNUMX() (KEK - šifravimo raktas).

    Sugeneruotas seanso raktas jau yra 256 bitų pseudoatsitiktinė baitų seka. Todėl jis gali būti nedelsiant naudojamas HKDF funkcijose. Kadangi GOST34112012256 atitinka hashlib sąsają, ją galima iš karto naudoti Hkdf klasėje. Druskos nenurodome (pirmasis Hkdf argumentas), nes sugeneruotas raktas dėl dalyvaujančių raktų porų trumpalaikiškumo kiekvienoje sesijoje skirsis ir jame jau yra pakankamai entropijos. Pagal numatytuosius nustatymus kdf.expand() jau sukuria 256 bitų raktus, reikalingus Grasshopper vėliau.

    Tada patikrinamos gaunamo pranešimo TBE ir TBS dalys:

    • apskaičiuojamas ir patikrinamas gaunamo šifruoto teksto MAC;
    • šifruotas tekstas iššifruojamas;
    • TBE struktūra iššifruojama;
    • iš jo paimamas pašnekovo identifikatorius ir patikrinama, ar jis mums apskritai žinomas;
    • MAC per šį identifikatorių apskaičiuojamas ir patikrinamas;
    • patikrinamas parašas per TBS struktūrą, į kurį įtrauktas abiejų šalių slapukas ir priešingos šalies viešasis trumpalaikis raktas. Parašas patikrinamas ilgaamžiu pašnekovo parašo raktu.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Kaip rašiau aukščiau, 34.13-2015-XNUMX aprašo įvairius blokinio šifro veikimo režimai nuo 34.12-2015-3413. Tarp jų yra imitacinių įdėklų generavimo ir MAC skaičiavimų režimas. PyGOST tai gost34.12.mac(). Šis režimas reikalauja perduoti šifravimo funkciją (gauti ir grąžinti vieną duomenų bloką), šifravimo bloko dydį ir, tiesą sakant, pačius duomenis. Kodėl negalite užkoduoti šifravimo bloko dydžio? 2015-128-64 aprašomas ne tik XNUMX bitų Grasshopper šifras, bet ir XNUMX bitų Magma - šiek tiek pakeistas GOST 28147-89, sukurtas dar KGB ir vis dar turi vieną aukščiausių saugos slenksčių.

    Kuznechik inicijuojamas skambinant gost.3412.GOST3412Kuznechik(key) ir grąžinamas objektas su .encrypt()/.decrypt() metodais, tinkančiais pereiti prie 34.13 funkcijų. MAC apskaičiuojamas taip: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, šifruotas tekstas). Norėdami palyginti apskaičiuotą ir gautą MAC, negalite naudoti įprasto baitų eilučių palyginimo (==), nes ši operacija praleidžia palyginimo laiką, o tai paprastai gali sukelti mirtinus pažeidžiamumus, pvz. BEAST atakų prieš TLS. Python tam turi specialią funkciją hmac.compare_digest.

    Bloko šifravimo funkcija gali užšifruoti tik vieną duomenų bloką. Didesniam skaičiui ir net ne kartotiniam ilgiui būtina naudoti šifravimo režimą. 34.13-2015 aprašoma: ECB, CTR, OFB, CBC, CFB. Kiekvienas iš jų turi savo priimtinas taikymo sritis ir charakteristikas. Deja, mes vis dar neturime standartizuotų autentifikuoti šifravimo režimai (pvz., CCM, OCB, GCM ir panašiai) – esame priversti bent jau patys pridėti MAC. aš renkuosi skaitiklio režimas (CTR): nereikalauja užpildymo pagal bloko dydį, gali būti lygiagretinamas, naudojama tik šifravimo funkcija, gali būti saugiai naudojamas šifruoti daug pranešimų (skirtingai nei CBC, kuris gana greitai susiduria su susidūrimais).

    Kaip ir .mac(), .ctr() naudoja panašią įvestį: šifruotas tekstas = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, paprastas tekstas, iv). Būtina nurodyti inicijavimo vektorių, kuris yra lygiai pusė šifravimo bloko ilgio. Jei mūsų šifravimo raktas naudojamas tik vienam pranešimui užšifruoti (nors ir iš kelių blokų), tada saugu nustatyti nulinį iniciacijos vektorių. Norėdami užšifruoti rankos paspaudimo pranešimus, kiekvieną kartą naudojame atskirą raktą.

    Parašo tikrinimas gost3410.verify() yra trivialus: praeiname elipsinę kreivę, kurioje dirbame (tiesiog įrašome ją į savo GOSTIM protokolą), pasirašančiojo viešąjį raktą (nepamirškite, kad tai turėtų būti dviejų eilė dideli skaičiai, o ne baitų eilutė), 34.11-2012-XNUMX maišos ir pats parašas.

    Toliau iniciatoriuje paruošiame ir išsiunčiame rankos paspaudimo pranešimą handshake2, atlikdami tuos pačius veiksmus, kaip ir tikrinimo metu, tik simetriškai: pasirašome ant savo raktų, o ne tikriname ir t.t.

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Užmezgus seansą, generuojami transportavimo raktai (atskiras šifravimo, autentifikavimo raktas kiekvienai šaliai), o Grasshopper inicijuojamas iššifruoti ir patikrinti MAC:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    Korutina msg_sender dabar užšifruoja pranešimus prieš siųsdama juos TCP ryšiu. Kiekvienas pranešimas turi monotoniškai didėjantį nonce, kuris taip pat yra inicijavimo vektorius, kai šifruojamas skaitiklio režimu. Garantuojama, kad kiekvienas pranešimas ir pranešimų blokas turės skirtingą skaitiklio reikšmę.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    Įeinantys pranešimai apdorojami msg_receiver korutine, kuri tvarko autentifikavimą ir iššifravimą:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    išvada

    GOSTIM skirtas naudoti tik švietimo tikslais (kadangi bent jau nėra testų)! Programos šaltinio kodą galima atsisiųsti čia (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, GoVPN, GOSTIM yra visiškai nemokama programinė įranga, platinamas pagal sąlygas GPLv3 +.

    Sergejus Matvejevas, cypherpunk, narys SPO fondas, Python/Go kūrėjas, vyriausiasis specialistas FSUE "STC "Atlas".

Šaltinis: www.habr.com

Добавить комментарий