GOSTIM: P2P F2F E2EE IM v enem večeru s kriptografijo GOST

Biti razvijalec PyGOST knjižnice (kriptografske primitive GOST v čistem Pythonu), pogosto prejmem vprašanja o tem, kako implementirati najpreprostejše varno sporočanje na koleno. Mnogi ljudje menijo, da je uporabna kriptografija precej preprosta in da bo klic .encrypt() na blokovni šifri dovolj za varno pošiljanje prek komunikacijskega kanala. Drugi verjamejo, da je uporabna kriptografija usoda redkih in da je sprejemljivo, da bogata podjetja, kot je Telegram z olimpijadami-matematiki ne more izvajati varen protokol.

Vse to me je spodbudilo, da sem napisal ta članek, da pokažem, da implementacija kriptografskih protokolov in varnega IM ni tako težka naloga. Vendar pa ni vredno izumljati lastnih protokolov za preverjanje pristnosti in dogovora o ključih.

GOSTIM: P2P F2F E2EE IM v enem večeru s kriptografijo GOST
Članek bo napisal peer-to-peer, prijatelj prijatelju, šifrirano od konca do konca instant messenger z SIGMA-I avtentikacijo in protokol dogovora o ključu (na podlagi katerega se izvaja IPsec IKE), z uporabo izključno GOST kriptografskih algoritmov knjižnice PyGOST in knjižnice za kodiranje sporočil ASN.1 PyDERASN (o čemer sem že napisal prej). Pogoj: biti mora tako preprost, da ga je mogoče napisati iz nič v enem večeru (ali delovnem dnevu), sicer ni več enostaven program. Verjetno ima napake, nepotrebne zaplete, pomanjkljivosti, poleg tega je to moj prvi program, ki uporablja knjižnico asyncio.

Oblikovanje IM

Najprej moramo razumeti, kako bo izgledal naš IM. Zaradi poenostavitve naj bo to omrežje enakovrednih, brez kakršnega koli odkrivanja udeležencev. Osebno bomo navedli, na kateri naslov: vrata se povezati za komunikacijo s sogovornikom.

Razumem, da je v tem trenutku predpostavka, da je na voljo neposredna komunikacija med dvema poljubnima računalnikoma, pomembna omejitev za uporabnost IM v praksi. Toda bolj kot bodo razvijalci izvajali vse vrste bergel za prečkanje NAT, dlje bomo ostali na internetu IPv4, z depresivno verjetnostjo komunikacije med poljubnimi računalniki. Kako dolgo lahko tolerirate pomanjkanje IPv6 doma in v službi?

Imeli bomo mrežo prijatelja do prijatelja: vsi možni sogovorniki morajo biti znani vnaprej. Prvič, to močno poenostavi vse: predstavili smo se, našli ali ne našli imena/ključa, prekinili povezavo ali nadaljevali z delom, poznali sogovornika. Drugič, na splošno je varen in odpravlja številne napade.

IM vmesnik bo blizu klasičnim rešitvam brezvezni projekti, ki so mi zelo všeč zaradi njihovega minimalizma in Unixove filozofije. Program IM za vsakega sogovornika ustvari imenik s tremi domenskimi vtičnicami Unix:

  • v—v njej se zabeležijo sporočila, poslana sogovorniku;
  • ven - iz njega se berejo sporočila, prejeta od sogovornika;
  • stanje - z branjem iz njega izvemo, ali je sogovornik trenutno povezan, naslov/vrata povezave.

Poleg tega se ustvari conn socket, tako da zapišemo gostiteljska vrata, v katera sprožimo povezavo z oddaljenim sogovornikom.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Ta pristop vam omogoča neodvisno implementacijo IM transporta in uporabniškega vmesnika, ker ni prijatelja, ne morete ugoditi vsem. Uporaba tmux in / ali multitail, lahko dobite večokenski vmesnik s označevanjem sintakse. In s pomočjo rlwrap lahko dobite vrstico za vnos sporočila, združljivo z GNU Readline.

Pravzaprav nesramni projekti uporabljajo datoteke FIFO. Osebno nisem mogel razumeti, kako delati z datotekami konkurenčno v asyncio brez ročno napisanega ozadja iz namenskih niti (za take stvari že dolgo uporabljam jezik Go). Zato sem se odločil zadovoljiti z domenskimi vtičnicami Unix. Na žalost to onemogoča izvajanje echo 2001:470:dead::babe 6666 > conn. To težavo sem rešil z uporabo socat: echo 2001:470:mrtev::babe 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alice/in.

Izvirni nevaren protokol

TCP se uporablja kot transport: zagotavlja dostavo in naročilo. UDP ne zagotavlja ne enega ne drugega (kar bi bilo uporabno pri uporabi kriptografije), temveč podporo SCTP Python ni na voljo takoj.

Na žalost v TCP ni koncepta sporočila, le tok bajtov. Zato je treba pripraviti obliko sporočil, da jih je mogoče deliti med seboj v tej temi. Lahko se dogovorimo za uporabo znaka za premik vrstice. Za začetek je v redu, a ko začnemo šifrirati svoja sporočila, se lahko ta znak pojavi kjer koli v šifriranem besedilu. V omrežjih so zato priljubljeni protokoli, ki najprej pošljejo dolžino sporočila v bajtih. Python ima na primer xdrlib, ki vam omogoča delo s podobnim formatom XDR.

Z branjem TCP ne bomo delovali pravilno in učinkovito - kodo bomo poenostavili. Podatke iz vtičnice beremo v neskončni zanki, dokler ne dekodiramo celotnega sporočila. JSON z XML se lahko uporablja tudi kot format za ta pristop. Toda ko je dodana kriptografija, bodo morali biti podatki podpisani in overjeni - to pa bo zahtevalo bajt za bajtom identično predstavitev objektov, ki je JSON/XML ne zagotavlja (rezultati izpisov se lahko razlikujejo).

XDR je primeren za to nalogo, vendar sem izbral ASN.1 s kodiranjem DER in PyDERASN knjižnico, saj bomo imeli pri roki visokokakovostne predmete, s katerimi je pogosto bolj prijetno in priročno delati. Za razliko od brez sheme bencode, MessagePack ali CBOR, bo ASN.1 samodejno preveril podatke glede na trdo kodirano shemo.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

Prejeto sporočilo bo Msg: besedilo MsgText (zaenkrat z enim tekstovnim poljem) ali sporočilo rokovanja MsgHandshake (ki vsebuje ime sogovornika). Zdaj je videti preveč zapleteno, vendar je to temelj za prihodnost.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │MsgHandshake( IdA) │ │────────── ────────>│ │ │ │MsgHandshake(IdB) │ │<──────────────────│ │ │ │ MsgText() │ │───── MsgText() │ │ │

IM brez kriptografije

Kot sem že rekel, bo knjižnica asyncio uporabljena za vse operacije vtičnic. Naj objavimo, kaj pričakujemo ob lansiranju:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Nastavite svoje ime (--naše-ime alice). Vsi pričakovani sogovorniki so navedeni ločeno z vejicami (—njihova-imena bob,eve). Za vsakega od sogovornikov je ustvarjen imenik z vtičnicami Unix in korrutina za vsako in, out, stanje:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

Sporočila, ki prihajajo od uporabnika iz vtičnice, so poslana v čakalno vrsto IN_QUEUES:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

Sporočila, ki prihajajo od sogovornikov, se pošljejo v OUT_QUEUES čakalne vrste, iz katerih se podatki zapišejo v izhodno vtičnico:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Pri branju iz državne vtičnice program išče naslov sogovornika v slovarju PEER_ALIVE. Če povezave s sogovornikom še ni, se napiše prazna vrstica.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Pri pisanju naslova v priključno vtičnico se zažene funkcija »iniciator« povezave:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Razmislimo o pobudniku. Najprej očitno odpre povezavo do podanega gostitelja/vrata in pošlje rokovalno sporočilo s svojim imenom:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Nato počaka na odgovor oddaljene stranke. Poskuša dekodirati dohodni odgovor s shemo Msg ASN.1. Predvidevamo, da bo celotno sporočilo poslano v enem segmentu TCP in ga bomo prejeli atomsko ob klicu .read(). Preverimo, ali smo prejeli sporočilo rokovanja.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Preverimo, ali nam je prejeto ime sogovornika znano. Če ne, potem prekinemo povezavo. Preverimo, ali smo z njim že vzpostavili povezavo (sogovornik je ponovno dal ukaz za povezavo z nami) in jo prekinemo. Čakalna vrsta IN_QUEUES vsebuje nize Python z besedilom sporočila, vendar ima posebno vrednost None, ki signalizira koprogramu msg_sender, naj preneha delovati, tako da pozabi na zapisovalca, povezanega s podedovano povezavo TCP.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender sprejme odhodna sporočila (v čakalni vrsti iz vtičnice), jih serializira v sporočilo MsgText in jih pošlje prek povezave TCP. Vsak trenutek se lahko zlomi - to jasno prestrežemo.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

Na koncu iniciator vstopi v neskončno zanko branja sporočil iz vtičnice. Preveri, ali so ta sporočila besedilna sporočila, in jih postavi v čakalno vrsto OUT_QUEUES, iz katere bodo poslana v izhodno vtičnico ustreznega sogovornika. Zakaj preprosto ne naredite .read() in dekodirate sporočila? Ker je možno, da bo več sporočil od uporabnika združenih v medpomnilniku operacijskega sistema in poslanih v enem segmentu TCP. Prvega lahko dekodiramo, nato pa lahko del naslednjega ostane v medpomnilniku. V primeru kakršnih koli neobičajnih situacij zapremo povezavo TCP in ustavimo soprogram msg_sender (s pošiljanjem None v čakalno vrsto OUT_QUEUES).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Vrnimo se k glavni kodi. Po izdelavi vseh soprogramov ob zagonu programa zaženemo strežnik TCP. Za vsako vzpostavljeno povezavo ustvari odzivni koprogram.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

responder je podoben iniciatorju in zrcali vsa ista dejanja, vendar se zaradi poenostavitve neskončna zanka branja sporočil začne takoj. Trenutno protokol rokovanja pošilja po eno sporočilo z vsake strani, v prihodnosti pa bosta dve od iniciatorja povezave, nato pa bo besedilna sporočila mogoče poslati takoj.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Varen protokol

Čas je, da zavarujemo naše komunikacije. Kaj mislimo z varnostjo in kaj želimo:

  • zaupnost prenesenih sporočil;
  • pristnost in celovitost poslanih sporočil - njihove spremembe morajo biti zaznane;
  • zaščita pred napadi ponavljanja - zaznati je treba dejstvo manjkajočih ali ponovljenih sporočil (in se odločimo za prekinitev povezave);
  • identifikacija in avtentikacija sogovornikov z vnaprej vnesenimi javnimi ključi - že prej smo se odločili, da delamo omrežje prijatelj-prijatelj. Šele po avtentikaciji bomo razumeli, s kom komuniciramo;
  • razpoložljivost popolna tajnost naprej lastnosti (PFS) - ogrožanje našega dolgotrajnega ključa za podpisovanje ne bi smelo povzročiti zmožnosti branja vse prejšnje korespondence. Snemanje prestreženega prometa postane neuporabno;
  • veljavnost/veljavnost sporočil (transport in rokovanje) le znotraj ene TCP seje. Vstavljanje pravilno podpisanih/preverjenih sporočil iz druge seje (tudi z istim sogovornikom) ne bi smelo biti mogoče;
  • pasivni opazovalec ne bi smel videti identifikatorjev uporabnikov, prenesenih dolgoživih javnih ključev ali njihovih zgoščenih vrednosti. Določena anonimnost pasivnega opazovalca.

Presenetljivo je, da skoraj vsi želijo imeti ta minimum v katerem koli protokolu rokovanja in zelo malo od zgoraj navedenega je na koncu izpolnjeno za "domače" protokole. Zdaj ne bomo izumili ničesar novega. Vsekakor priporočam uporabo Noise framework za gradnjo protokolov, vendar izberimo nekaj preprostejšega.

Dva najbolj priljubljena protokola sta:

  • TLS - zelo kompleksen protokol z dolgo zgodovino hroščev, zastojov, ranljivosti, slabega razmišljanja, kompleksnosti in pomanjkljivosti (vendar to nima veliko skupnega s TLS 1.3). Vendar tega ne upoštevamo, ker je preveč zapleteno.
  • IPsec с IKE — nimajo resnih kriptografskih težav, čeprav tudi niso preproste. Če berete o IKEv1 in IKEv2, potem je njun vir STS, protokoli ISO/IEC IS 9798-3 in SIGMA (SIGn-and-MAc) – dovolj preprosti za izvedbo v enem večeru.

Kaj je dobrega pri SIGMI, kot zadnjem členu v razvoju STS/ISO protokolov? Izpolnjuje vse naše zahteve (vključno s »skrivanjem« identifikatorjev sogovornika) in nima znanih kriptografskih težav. Je minimalističen - odstranitev vsaj enega elementa iz sporočila protokola bo povzročila njegovo nevarnost.

Pojdimo od najpreprostejšega domačega protokola k SIGMI. Najosnovnejša operacija, ki nas zanima, je ključni dogovor: funkcija, ki obema udeležencema izpiše isto vrednost, ki se lahko uporabi kot simetrični ključ. Ne da bi se spuščali v podrobnosti: vsaka od strani ustvari efemerni (uporablja se samo znotraj ene seje) par ključev (javni in zasebni ključ), izmenja javne ključe, pokliče funkcijo sporazuma, na vhod katere posreduje svoj zasebni ključ in javni ključ. ključ sogovornika.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ══════════╗ │───────────────>│ ║PrvA, PubA = DHgen()║ │ │ ╚═ ════════ ═══════════╝ │ IdB, PubB │ ╔════════════════════╗ │<─────────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚════════════════════╝ ─ ───┐ ╔═ ═══ ═══╧════════════╗ │ ║Ključ = DH(PrvA, PubB)║ <───┘ ╚═══════╤═ ═ ══════ ════╝ │ │ │ │

Vsak lahko skoči v sredino in javne ključe zamenja s svojimi – v tem protokolu ni avtentikacije sogovornikov. Dodajmo podpis z dolgoživimi ključi.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │IdA, PubA, znak(SignPrvA, (PubA)) │ ╔═ │───────────── ─────────── ───────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════ ═══════ ═════════════╝ │IdB, PubBB , znak(SignPrvB, (PubB)) │ ╔══════════════ ═══════ ══════╗ │<─────────────────────────── ───────────── ──│ ║SignPrvB, SignPubB = load( )║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══════════ ══════════════ ══╝ ────┐ ╔ ══════════════════ ═══╗ │ │ ║preveri( SignPubB, ...)║ │ <───┘ ║Key = DH(Pr vA, PubB) ║ │ │ ╚═════════════════════ ╝ │ │ │

Takšen podpis ne bo deloval, ker ni vezan na določeno sejo. Takšna sporočila so "primerna" tudi za seje z drugimi udeleženci. Celoten kontekst se mora naročiti. To nas prisili, da dodamo še eno sporočilo od A.

Poleg tega je ključnega pomena, da pod podpis dodate svoj identifikator, saj lahko sicer zamenjamo IdXXX in sporočilo prepodpišemo s ključem drugega znanega sogovornika. Preprečiti refleksni napadi, je nujno, da so elementi pod podpisom na jasno določenih mestih glede na njihov pomen: če se podpiše A (PubA, PubB), se mora podpisati B (PubB, PubA). To govori tudi o pomembnosti izbire strukture in formata serializiranih podatkov. Na primer, nizi v kodiranju ASN.1 DER so razvrščeni: SET OF(PubA, PubB) bo identičen SET OF(PubB, PubA).

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │────────────────────── ──────────── ─────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═════ ═══════ ═══════════════╝ │IdB, PubB, znak(SignPrvB, (IdB, PubA, PubB)) │ ╔══════════ ═════ ════════════╗ │<─────────────────────────── ─────────── ─────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═════════ ══════ ══ ══════════╝ │ znak(SignPrvA, (IdA, PubB, PubA)) │ ╔═════════════════ ════╗ │─ ───────────────────────────────────────── ───>│ ║preveri(SignPubB, ...) ║ │ │ ║ključ = dh (prva, PUBB) ║ │ │ │

Vendar še vedno nismo »dokazali«, da smo za to sejo ustvarili isti skupni ključ. Načeloma lahko brez tega koraka – že prva prometna povezava bo neveljavna, želimo pa, da bi bili ob končanem rokovanju prepričani, da je res vse dogovorjeno. Trenutno imamo na voljo protokol ISO/IEC IS 9798-3.

Sam ustvarjeni ključ lahko podpišemo. To je nevarno, saj je možno, da pride do puščanja v uporabljenem algoritmu podpisa (čeprav bitov na podpis, vendar še vedno pušča). Zgoščeno vrednost izpeljanega ključa je mogoče podpisati, vendar je lahko uhajanje celo zgoščene vrednosti izpeljanega ključa dragoceno pri napadu s surovo silo na funkcijo izpeljave. SIGMA uporablja funkcijo MAC, ki avtentikira ID pošiljatelja.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │────────────────────── ──────────── ──────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚ ═══════ ════════════════════╝ │IdB, PubB, znak(SignPrvB, (PubA, PubB)), MAC(IdB) │ ╔════ ═══ │<────────────────── ─────────── ────────────────────── ─│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ══════════════ ═══════ ══╝ │ │ ╔════════════ ═════════╗ │ znak(SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Ključ = DH( PrvA, PubB) ║ │────────────────────── ── ────────────────────── ─────>│ ║preveri(Key, IdB) ║ │ │ ║preveri(SignPubB, ...)║ │ │ ╚══════════════════ ══ ═╝ │ │

Za optimizacijo bodo nekateri morda želeli znova uporabiti svoje efemerne ključe (kar je seveda obžalovanja vredno za PFS). Na primer, ustvarili smo par ključev, se poskušali povezati, vendar TCP ni bil na voljo ali pa je bil prekinjen nekje na sredini protokola. Škoda je zapraviti izgubljeno entropijo in procesorske vire za nov par. Zato bomo uvedli tako imenovani piškotek - psevdonaključno vrednost, ki bo ščitila pred morebitnimi napadi naključnega ponavljanja pri ponovni uporabi efemernih javnih ključev. Zaradi vezave med piškotkom in efemernim javnim ključem se lahko javni ključ nasprotne strani odstrani iz podpisa kot nepotreben.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA, CookieA │ ╔════════ ═══════════════════╗ │──────────────────── ──────────── ───────────────────────────────────────── ─>│ ║SignPrvA, SignPubA = load( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═════════════════════════ ══╝ │IdB, PubB, CookieB , znak(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔══════════════════════════ ═ ╗ │< ────────────────────────────────────────── ─────────── ─────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚══════ ═════════════════════╝ │ │ ╔══════════════ ═══════╗ │ znak( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │────────────────────── ── ───────────────────────────────────────── ───────>│ ║ preveri (Key, IdB) ║ │ │ ║ preveri (SignPubB, ...)║ │ │ ╚═════════════════════╝ │ │

Nazadnje želimo pridobiti zasebnost naših sogovornikov od pasivnega opazovalca. Da bi to naredili, SIGMA predlaga, da najprej izmenjamo kratkotrajne ključe in razvijemo skupni ključ za šifriranje sporočil za avtentikacijo in identifikacijo. SIGMA opisuje dve možnosti:

  • SIGMA-I - ščiti pobudnika pred aktivnimi napadi, odzivnika pred pasivnimi: pobudnik avtentikira odzivnika in če nekaj ne ustreza, potem ne izda svoje identifikacije. Obdolženec izda svojo identifikacijo, če se z njim začne aktivni protokol. Pasivni opazovalec se ne nauči ničesar;
    SIGMA-R - ščiti odzivnika pred aktivnimi napadi, pobudnika pred pasivnimi. Vse je ravno obratno, vendar se v tem protokolu prenašajo že štiri sporočila rokovanja.

    Izbrali smo SIGMA-I, ker je bolj podoben tistemu, kar pričakujemo od znanih stvari odjemalec-strežnik: odjemalca prepozna samo overjen strežnik, strežnik pa vsi že poznajo. Poleg tega je lažje implementirati zaradi manj sporočil rokovanja. Vse, kar dodamo protokolu, je šifriranje dela sporočila in prenos identifikatorja A v šifrirani del zadnjega sporočila:

    Puba, piškote │ ╔══════════ ═════════════════╗ │───────────────zgoti ───── ─────────── ────────────────────────────────────── ─dih ═════════ ════╝ │ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔════ ═ ═══════════════ ═══════╗ │<────────────────────────── ───── ────────── ║SignP rvB, SignPubB = load()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚═══════════ ════════════════╝ │ │ ╔════════ ═══════════ ══╗ │ Enc((IdA, znak( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │─────────────────────── ────────────────── ────────── ( SignPubB, ...)║ │ │ ╚═══════════ ══════════╝ │ │
    
    • Za podpis se uporablja GOST R 34.10-2012 algoritem z 256-bitnimi ključi.
    • Za generiranje javnega ključa se uporablja VKO 34.10.
    • CMAC se uporablja kot MAC. Tehnično je to poseben način delovanja blokovne šifre, opisan v GOST R 34.13-2015. Kot funkcija šifriranja za ta način − Kobilica (34.12-2015).
    • Zgoščena vrednost njegovega javnega ključa se uporablja kot identifikator sogovornika. Uporablja se kot hash Stribog-256 (34.11 2012 bitov).

    Po rokovanju se dogovorimo za skupni ključ. Uporabljamo ga lahko za overjeno šifriranje transportnih sporočil. Ta del je zelo preprost in težko se je zmotiti: povečamo števec sporočil, šifriramo sporočilo, overimo (MAC) števec in šifrirano besedilo, pošljemo. Ob prejemu sporočila preverimo, ali ima števec pričakovano vrednost, šifrirano besedilo overimo s števcem in ga dešifriramo. Kateri ključ naj uporabim za šifriranje sporočil rokovanja, transportnih sporočil in kako jih avtentikirati? Uporaba enega ključa za vsa ta opravila je nevarna in nespametna. Potrebno je ustvariti ključe z uporabo specializiranih funkcij KDF (ključna funkcija izpeljave). Še enkrat, ne cepimo in si nekaj izmislimo: HKDF že dolgo poznan, dobro raziskan in nima znanih težav. Na žalost izvorna knjižnica Python nima te funkcije, zato uporabljamo hkdf plastična vrečka. HKDF interno uporablja HMAC, ki pa uporablja zgoščevalno funkcijo. Primer implementacije v Pythonu na strani Wikipedije zahteva le nekaj vrstic kode. Kot v primeru 34.10. 2012. 256 bomo kot zgoščevalno funkcijo uporabili Stribog-XNUMX. Izhod naše funkcije dogovora o ključu se bo imenoval sejni ključ, iz katerega bodo ustvarjeni manjkajoči simetrični:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Strukture/sheme

    Poglejmo, kakšne strukture ASN.1 imamo zdaj za prenos vseh teh podatkov:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS je tisto, kar bo podpisano. HandshakeTBE - kaj bo šifrirano. Opozarjam vas na polje ukm v MsgHandshake1. 34.10 VKO za še večjo randomizacijo generiranih ključev vključuje parameter UKM (user keying material) - samo dodatna entropija.

    Dodajanje kriptografije kodi

    Upoštevajmo samo spremembe prvotne kode, saj je ogrodje ostalo enako (pravzaprav je bila najprej napisana končna izvedba, nato pa je bila iz nje izrezana vsa kriptografija).

    Ker bo avtentikacija in identifikacija sogovornikov potekala z javnimi ključi, jih je treba zdaj nekje shraniti za dalj časa. Zaradi enostavnosti uporabljamo JSON tako:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    naš - naš par ključev, heksadecimalni zasebni in javni ključ. njihovi — imena sogovornikov in njihovi javni ključi. Spremenimo argumente ukazne vrstice in dodamo naknadno obdelavo podatkov JSON:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    Zasebni ključ algoritma 34.10 je naključno število. 256-bitna velikost za 256-bitne eliptične krivulje. PyGOST ne deluje z nizom bajtov, ampak z velike številke, zato je treba naš zasebni ključ (urandom(32)) pretvoriti v številko z gost3410.prv_unmarshal(). Javni ključ se določi deterministično iz zasebnega ključa z gost3410.public_key(). Javni ključ 34.10 sta dve veliki številki, ki ju je prav tako treba pretvoriti v zaporedje bajtov za lažje shranjevanje in prenos z uporabo gost3410.pub_marshal().

    Po branju datoteke JSON je treba javne ključe ustrezno pretvoriti nazaj z uporabo gost3410.pub_unmarshal(). Ker bomo identifikatorje sogovornikov prejeli v obliki zgoščene vrednosti iz javnega ključa, jih lahko takoj vnaprej izračunamo in postavimo v slovar za hitro iskanje. Zgoščevanje Stribog-256 je gost34112012256.GOST34112012256(), ki v celoti ustreza vmesniku hashlib funkcij zgoščevanja.

    Kako se je spremenila korutina iniciatorja? Vse je po shemi rokovanja: ustvarimo piškotek (128-bit je dovolj), kratkotrajni par ključev 34.10, ki bo uporabljen za funkcijo dogovora o ključu VKO.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • čakamo na odgovor in dekodiramo dohodno sporočilo Msg;
    • poskrbite, da boste prejeli rokovanje1;
    • dekodirati efemerni javni ključ nasprotne strani in izračunati ključ seje;
    • Generiramo simetrične ključe, potrebne za obdelavo TBE dela sporočila.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM je 64-bitno število (urandom(8)), ki prav tako zahteva deserializacijo iz svoje bajtne predstavitve z gost3410_vko.ukm_unmarshal(). Funkcija VKO za 34.10-bitni 2012/256/3410 je gost34102012256_vko.kek_XNUMX() (KEK - šifrirni ključ).

    Ustvarjeni ključ seje je že 256-bitno psevdonaključno zaporedje bajtov. Zato ga je mogoče takoj uporabiti v funkcijah HKDF. Ker GOST34112012256 ustreza vmesniku hashlib, ga je mogoče takoj uporabiti v razredu Hkdf. Ne podajamo soli (prvega argumenta Hkdf), saj bo generirani ključ, zaradi minljivosti sodelujočih parov ključev, drugačen za vsako sejo in že vsebuje dovolj entropije. kdf.expand() privzeto že izdela 256-bitne ključe, ki so kasneje potrebni za Grasshopper.

    Nato se preverita dela TBE in TBS dohodnega sporočila:

    • MAC nad dohodnim šifriranim besedilom se izračuna in preveri;
    • šifrirano besedilo je dešifrirano;
    • Struktura TBE je dekodirana;
    • iz njega vzamemo identifikator sogovornika in preverimo, ali nam je sploh znan;
    • MAC nad tem identifikatorjem se izračuna in preveri;
    • verificira se podpis nad strukturo TBS, ki vključuje piškotek obeh strani in javni efemerni ključ nasprotne strani. Podpis se overi z dolgoživim podpisnim ključem sogovornika.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Kot sem napisal zgoraj, 34.13 opisuje različne načini delovanja blokovne šifre od 34.12. Med njimi je način za ustvarjanje imitacijskih vložkov in izračune MAC. V PyGOST je to gost2015.mac(). Ta način zahteva posredovanje šifrirne funkcije (prejemanje in vrnitev enega bloka podatkov), velikost šifrirnega bloka in pravzaprav same podatke. Zakaj ne morete kodirati velikosti šifrirnega bloka? 3413/34.12/2015 opisuje ne samo 128-bitno šifro Grasshopper, ampak tudi 64-bitno Magma - rahlo spremenjen GOST 28147-89, ustvarjen nazaj v KGB in ima še vedno enega najvišjih varnostnih pragov.

    Kuznechik se inicializira s klicem gost.3412.GOST3412Kuznechik(key) in vrne objekt z metodami .encrypt()/.decrypt(), ki so primerne za prehod na funkcije 34.13. MAC se izračuna na naslednji način: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, šifrirano besedilo). Za primerjavo izračunanega in prejetega MAC ne morete uporabiti običajne primerjave (==) bajtnih nizov, saj ta operacija izgubi primerjalni čas, kar lahko v splošnem primeru vodi do usodnih ranljivosti, kot je ZVER napade na TLS. Python ima za to posebno funkcijo, hmac.compare_digest.

    Funkcija šifriranja blokov lahko šifrira samo en blok podatkov. Za večje število in tudi ne večkratnik dolžine je treba uporabiti način šifriranja. 34.13-2015 opisuje naslednje: ECB, CTR, OFB, CBC, CFB. Vsak ima svoja sprejemljiva področja uporabe in značilnosti. Na žalost še vedno nimamo standardiziranih overjeni načini šifriranja (kot npr. CCM, OCB, GCM in podobno) – prisiljeni smo vsaj dodati MAC sami. izberem način števca (CTR): ne zahteva oblazinjenja do velikosti bloka, lahko je vzporeden, uporablja samo šifrirno funkcijo, lahko se varno uporablja za šifriranje velikega števila sporočil (za razliko od CBC, ki ima razmeroma hitre kolizije).

    Tako kot .mac() tudi .ctr() sprejema podobne vnose: šifrirano besedilo = gost3413.ctr(GOST3412Kuznechik(ključ).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). Določiti je treba inicializacijski vektor, ki je točno polovica dolžine šifrirnega bloka. Če se naš šifrirni ključ uporablja samo za šifriranje enega sporočila (čeprav iz več blokov), potem je varno nastaviti ničelni inicializacijski vektor. Za šifriranje sporočil rokovanja uporabimo vsakič ločen ključ.

    Preverjanje podpisa gost3410.verify() je trivialno: posredujemo eliptično krivuljo, znotraj katere delamo (preprosto jo zabeležimo v našem protokolu GOSTIM), javni ključ podpisnika (ne pozabite, da mora biti to dvojka velika števila in ne bajtni niz), 34.11. 2012. XNUMX hash in sam podpis.

    Nato v iniciatorju pripravimo in pošljemo sporočilo rokovanja na handshake2, pri čemer izvedemo enaka dejanja kot med preverjanjem, le simetrično: podpisovanje naših ključev namesto preverjanja itd.

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Ko je seja vzpostavljena, se generirajo transportni ključi (ločen ključ za šifriranje, za avtentikacijo, za vsako stran) in Grasshopper se inicializira za dešifriranje in preverjanje MAC:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    Korrutina msg_sender zdaj šifrira sporočila, preden jih pošlje prek povezave TCP. Vsako sporočilo ima monotono naraščajočo številko nonce, ki je tudi inicializacijski vektor, ko je šifrirana v načinu števca. Vsako sporočilo in blok sporočila imata zagotovljeno drugačno vrednost števca.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    Dohodna sporočila obdela rutina msg_receiver, ki skrbi za preverjanje pristnosti in dešifriranje:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    Zaključek

    GOSTIM je namenjen izključno uporabi v izobraževalne namene (saj ni zajet v testih)! Izvorno kodo programa lahko prenesete tukaj (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, GoVPN, GOSTIM je popolnoma brezplačno programsko opremo, razdeljen pod pogoji GPLv3 +.

    Sergej Matvejev, cypherpunk, član Fundacija SPO, Python/Go razvijalec, glavni specialist FSUE "STC "Atlas".

Vir: www.habr.com

Dodaj komentar