GOSTIM: P2P F2F E2EE IM za jeden večer s kryptografiou GOST

Byť vývojárom PyGOST knižnice (GOST kryptografické primitívy v čistom Pythone), často dostávam otázky o tom, ako implementovať najjednoduchšie bezpečné posielanie správ na kolene. Mnoho ľudí považuje aplikovanú kryptografiu za celkom jednoduchú a volanie .encrypt() na blokovú šifru bude stačiť na jej bezpečné odoslanie cez komunikačný kanál. Iní veria, že aplikovaná kryptografia je osudom niekoľkých a je prijateľné, aby bohaté spoločnosti ako Telegram s olympijskými matematikmi nemožno realizovať zabezpečený protokol.

To všetko ma podnietilo k napísaniu tohto článku, aby som ukázal, že implementácia kryptografických protokolov a bezpečných IM nie je až taká náročná úloha. Neoplatí sa však vymýšľať si vlastnú autentifikáciu a protokoly na dohodu o kľúčoch.

GOSTIM: P2P F2F E2EE IM za jeden večer s kryptografiou GOST
Článok napíše peer-to-peer, priateľ-priateľovi, end-to-end šifrované instant messenger s SIGMA-I autentifikačný a kľúčový protokol (na základe ktorého sa implementuje IPsec IKE), používajúce výhradne kryptografické algoritmy GOST knižnicu PyGOST a knižnicu kódovania správ ASN.1 PyDERASN (o ktorom som už napísal predtým). Predpoklad: musí byť taký jednoduchý, aby sa dal napísať od nuly za jeden večer (alebo pracovný deň), inak to už nie je jednoduchý program. Asi má chyby, zbytočné komplikácie, nedostatky, plus je to môj prvý program využívajúci knižnicu asyncio.

IM dizajn

Najprv musíme pochopiť, ako bude naša IM vyzerať. Pre jednoduchosť nech je to sieť typu peer-to-peer, bez akéhokoľvek objavovania účastníkov. Osobne uvedieme, na ktorú adresu: port sa pripojiť na komunikáciu s partnerom.

Chápem, že v súčasnosti je predpoklad, že je k dispozícii priama komunikácia medzi dvoma ľubovoľnými počítačmi, významným obmedzením použiteľnosti IM v praxi. Ale čím viac vývojárov implementuje všetky druhy NAT-traversal bariel, tým dlhšie zostaneme na internete IPv4 s deprimujúcou pravdepodobnosťou komunikácie medzi ľubovoľnými počítačmi. Ako dlho dokážete tolerovať nedostatok IPv6 doma a v práci?

Budeme mať sieť priateľov: všetci možní partneri musia byť známi vopred. Po prvé, toto všetko výrazne zjednodušuje: predstavili sme sa, našli alebo nenašli meno/kľúč, odpojili sme sa alebo pokračovali v práci, pričom sme poznali partnera. Po druhé, vo všeobecnosti je bezpečný a eliminuje mnohé útoky.

Rozhranie IM bude blízke klasickým riešeniam nepodarené projekty, ktoré sa mi veľmi páčia pre ich minimalizmus a filozofiu Unix-way. Program IM vytvorí adresár s tromi zásuvkami domény Unix pre každého partnera:

  • in—správy odoslané účastníkovi rozhovoru sú v ňom zaznamenané;
  • von - správy prijaté od partnera sa z neho čítajú;
  • stav - čítaním z neho zistíme, či je hovorca aktuálne pripojený, adresu spojenia/port.

Okrem toho sa vytvorí conn socket zapísaním hostiteľského portu, do ktorého iniciujeme spojenie so vzdialeným účastníkom.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Tento prístup vám umožňuje vykonávať nezávislé implementácie prenosu okamžitých správ a používateľského rozhrania, pretože neexistuje žiadny priateľ, nemôžete potešiť každého. Použitím tmux a / alebo viackanálový, môžete získať rozhranie s viacerými oknami so zvýraznením syntaxe. A s pomocou rlwrap môžete získať vstupný riadok správ kompatibilný s GNU Readline.

V skutočnosti, bezstarostné projekty používajú súbory FIFO. Osobne som nedokázal pochopiť, ako pracovať so súbormi konkurencieschopne v asyncio bez ručne písaného pozadia z vyhradených vlákien (jazyk používam na takéto veci už dlho Go). Preto som sa rozhodol vystačiť si s Unix doménovými socketmi. Bohužiaľ to znemožňuje vykonať echo 2001:470:dead::babe 6666 > spoj. Tento problém som vyriešil pomocou socat: echo 2001:470:dead::babe 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alica/in.

Pôvodný nezabezpečený protokol

TCP sa používa ako preprava: zaručuje doručenie a jeho objednávku. UDP nezaručuje ani jedno (čo by bolo užitočné pri použití kryptografie), ale podporu SCTP Python nevychádza z krabice.

Bohužiaľ, v TCP neexistuje koncept správy, iba prúd bajtov. Preto je potrebné vymyslieť formát správ, aby si ich mohli medzi sebou v tomto vlákne zdieľať. Môžeme súhlasiť s používaním znaku pre posun riadkov. Na začiatok je to v poriadku, ale akonáhle začneme šifrovať naše správy, tento znak sa môže objaviť kdekoľvek v šifrovom texte. V sieťach sú preto populárne protokoly, ktoré najskôr posielajú dĺžku správy v bajtoch. Napríklad Python má hneď po vybalení xdrlib, ktorý vám umožňuje pracovať s podobným formátom XDR.

S čítaním TCP nebudeme pracovať správne a efektívne – zjednodušíme kód. Čítame dáta zo zásuvky v nekonečnej slučke, kým nedekódujeme kompletnú správu. Ako formát pre tento prístup možno použiť aj JSON s XML. Keď sa však pridá kryptografia, údaje budú musieť byť podpísané a overené – a to si bude vyžadovať bajtovú identickú reprezentáciu objektov, ktorú JSON/XML neposkytuje (výsledky výpisov sa môžu líšiť).

XDR je na túto úlohu vhodný, ja však volím ASN.1 s kódovaním DER a PyDERASN knižnicu, keďže budeme mať po ruke predmety na vysokej úrovni, s ktorými sa často pracuje príjemnejšie a pohodlnejšie. Na rozdiel od bez schémy bencode, MessagePack alebo CBOR, ASN.1 automaticky porovná údaje s pevne zakódovanou schémou.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

Prijatá správa bude Msg: buď textová MsgText (zatiaľ s jedným textovým poľom) alebo MsgHandshake handshake správa (ktorá obsahuje meno partnera). Teraz to vyzerá príliš komplikovane, ale toto je základ do budúcnosti.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┬──┘ ┘──s IdA) │ │───────── - │ │ Text správy () │ │──── MsgText() │ │ │

IM bez kryptografie

Ako som už povedal, knižnica asyncio bude použitá pre všetky operácie soketov. Poďme oznámiť, čo očakávame pri spustení:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Nastavte si svoje meno (--naše-meno Alice). Všetci očakávaní účastníci rozhovoru sú uvedení oddelení čiarkami (—ich mená bob, eve). Pre každého z účastníkov rozhovoru sa vytvorí adresár so zásuvkami Unix, ako aj korutín pre každý vstupný a výstupný stav:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

Správy prichádzajúce od používateľa zo vstupného soketu sa odosielajú do frontu IN_QUEUES:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

Správy prichádzajúce od účastníkov rozhovoru sa odosielajú do frontov OUT_QUEUES, z ktorých sa údaje zapisujú do výstupného soketu:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Pri čítaní zo stavového socketu program hľadá adresu partnera v slovníku PEER_ALIVE. Ak ešte nie je spojenie s účastníkom rozhovoru, napíše sa prázdny riadok.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Pri zápise adresy do conn soketu sa spustí funkcia „iniciátora“ spojenia:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Uvažujme o iniciátorovi. Najprv očividne otvorí spojenie so zadaným hostiteľom/portom a odošle handshake správu s jeho názvom:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Potom čaká na odpoveď od vzdialenej strany. Pokúsi sa dekódovať prichádzajúcu odpoveď pomocou schémy Msg ASN.1. Predpokladáme, že celá správa bude odoslaná v jednom TCP segmente a pri volaní .read() ju prijmeme atomicky. Skontrolujeme, či sme dostali správu o podaní ruky.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Skontrolujeme, či je nám známe prijaté meno partnera. Ak nie, potom prerušíme spojenie. Skontrolujeme, či sme s ním už nadviazali spojenie (účastník opäť vydal príkaz, aby sa k nám pripojil) a uzavrieme ho. Front IN_QUEUES obsahuje reťazce Pythonu s textom správy, ale má špeciálnu hodnotu None, ktorá signalizuje korutíne msg_sender, aby prestala pracovať, aby zabudla na svoj zapisovač spojený so starým pripojením TCP.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender prijíma odchádzajúce správy (zaradené do frontu zo zásuvky), serializuje ich do správy MsgText a odosiela ich cez TCP spojenie. Môže sa kedykoľvek zlomiť - jasne to zachytíme.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

Na konci sa iniciátor dostane do nekonečnej slučky čítania správ zo zásuvky. Skontroluje, či sú tieto správy textovými správami a umiestni ich do frontu OUT_QUEUES, z ktorého budú odoslané do výstupného konektora príslušného účastníka. Prečo nemôžete jednoducho urobiť .read() a dekódovať správu? Pretože je možné, že niekoľko správ od používateľa bude agregovaných vo vyrovnávacej pamäti operačného systému a odoslaných v jednom segmente TCP. Prvú môžeme dekódovať a časť nasledujúcej potom môže zostať vo vyrovnávacej pamäti. V prípade akejkoľvek abnormálnej situácie ukončíme TCP spojenie a zastavíme korutínu msg_sender (odoslaním None do frontu OUT_QUEUES).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Vráťme sa k hlavnému kódu. Po vytvorení všetkých korutín pri spustení programu spustíme TCP server. Pre každé vytvorené spojenie vytvorí korutínnu odpoveď.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

respondér je podobný iniciátoru a odzrkadľuje všetky rovnaké akcie, ale nekonečná slučka čítania správ sa pre jednoduchosť spustí okamžite. V súčasnosti protokol handshake posiela jednu správu z každej strany, no v budúcnosti budú dve od iniciátora spojenia, po ktorých je možné okamžite posielať textové správy.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Zabezpečený protokol

Je čas zabezpečiť našu komunikáciu. Čo rozumieme pod pojmom bezpečnosť a čo chceme:

  • dôvernosť prenášaných správ;
  • autentickosť a integrita prenášaných správ – ich zmeny musia byť zisťované;
  • ochrana pred opakovanými útokmi – treba zistiť chýbajúce alebo opakované správy (a rozhodneme sa ukončiť spojenie);
  • identifikácia a autentifikácia účastníkov rozhovoru pomocou vopred zadaných verejných kľúčov – už skôr sme sa rozhodli, že vytvoríme sieť priateľov. Až po autentifikácii pochopíme, s kým komunikujeme;
  • dostupnosť dokonalé tajomstvo dopredu vlastnosti (PFS) – ohrozenie nášho dlhodobého podpisového kľúča by nemalo viesť k schopnosti čítať všetku predchádzajúcu korešpondenciu. Nahrávanie zachytenej prevádzky sa stáva zbytočným;
  • platnosť/platnosť správ (transport a handshake) len v rámci jednej TCP relácie. Vkladanie správne podpísaných/overených správ z inej relácie (dokonca aj s tým istým partnerom) by nemalo byť možné;
  • pasívny pozorovateľ by nemal vidieť ani používateľské identifikátory, prenesené dlhodobé verejné kľúče, ani z nich hash. Istá anonymita od pasívneho pozorovateľa.

Prekvapivo takmer každý chce mať toto minimum v akomkoľvek protokole handshake a len veľmi málo z vyššie uvedeného je nakoniec splnené pre „domáce“ protokoly. Teraz nevymyslíme nič nové. Určite by som odporučil používať Hlukový rámec pre budovanie protokolov, ale vyberme si niečo jednoduchšie.

Dva najpopulárnejšie protokoly sú:

  • TLS - veľmi zložitý protokol s dlhou históriou chýb, zásekov, zraniteľností, zlých myšlienok, zložitosti a nedostatkov (to však nemá veľa spoločného s TLS 1.3). Ale neberieme to do úvahy, pretože je to príliš komplikované.
  • IPsec с IKE — nemajú vážne kryptografické problémy, aj keď tiež nie sú jednoduché. Ak čítate o IKEv1 a IKEv2, tak ich zdroj je STS, ISO/IEC IS 9798-3 a protokoly SIGMA (SIGN-and-MAc) – dostatočne jednoduché na implementáciu za jeden večer.

Čo je dobré na SIGMA, ako najnovšom spojení vo vývoji STS/ISO protokolov? Spĺňa všetky naše požiadavky (vrátane „skrytia“ identifikátorov partnera) a nemá žiadne známe kryptografické problémy. Je minimalistický – odstránenie aspoň jedného prvku zo správy protokolu povedie k jej nezabezpečeniu.

Poďme od najjednoduchšieho domáceho protokolu k SIGMA. Najzákladnejšia operácia, ktorá nás zaujíma, je kľúčová dohoda: Funkcia, ktorá dáva obom účastníkom na výstupe rovnakú hodnotu, ktorú možno použiť ako symetrický kľúč. Bez toho, aby sme zachádzali do detailov: každá zo strán vygeneruje efemérny (použitý iba v rámci jednej relácie) pár kľúčov (verejný a súkromný kľúč), vymieňa si verejné kľúče, zavolá funkciu dohody, na vstup ktorej odovzdá svoj súkromný kľúč a verejný kľúč. kľúč partnera.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┬──┘ ┘── A┘ ┘── I┘ ┘─ I─ Ad │ ╔══════════ ══════════╗ │───────────────>│ ║DHA (PubA) ════════ ═══════════╝ │ IdB, PubB │ ╔═════════╕════╕═══════════ │<───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚═══════════════―‐═══╕‐ ───┐ ╔════ ═══╧════════════╗ │ ║Kľúč = DH(PrvA, PubB)║ <───┕──┕╕╕⤕ ═══════ ════╝ │ │ │ │

Ktokoľvek môže skočiť do stredu a nahradiť verejné kľúče svojimi vlastnými - v tomto protokole neexistuje žiadna autentifikácia účastníkov rozhovoru. Pridajme podpis s dlhovekými kľúčmi.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┬└,─└,└,─┘ └──┘ ┘─── A znak(SignPrvA, (PubA)) │ ╔═ │──────────── ────────── ──────────> ign = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════ ═══════ ═══════╕══╕═══╂═══╂, PubA , podpísať (SignPrvB, (PubB)) - ─────────── ───────────── ──│ ║SignPrvB, SignPubB = zaťaženie ( )║ │ │ ║PrvB, PubB = DHgen() ║ ════════ - ═════╗ │ │ ║overiť( podpis ═╝ │ │ │

Takýto podpis nebude fungovať, pretože nie je viazaný na konkrétnu reláciu. Takéto správy sú tiež „vhodné“ na stretnutia s inými účastníkmi. Celý kontext sa musí prihlásiť. To nás núti pridať aj ďalšiu správu od A.

Okrem toho je dôležité pridať svoj vlastný identifikátor pod podpis, pretože inak môžeme nahradiť IdXXX a znova podpísať správu kľúčom iného známeho partnera. Zabrániť odrazové útoky, je potrebné, aby prvky pod podpisom boli na jasne definovaných miestach podľa ich významu: ak sa podpíše A (PubA, PubB), musí sa podpísať B (PubB, PubA). To tiež hovorí o dôležitosti výberu štruktúry a formátu serializovaných údajov. Napríklad množiny v kódovaní ASN.1 DER sú zoradené: SET OF(PubA, PubB) bude identické so SET OF(PubB, PubA).

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └─┘ ┘─ I┘ ┘── I┘ ┘─ I┘ ┘─ I─ │ ╔══════════ ═════════════════╗ │───────────└└└———└—— ────────── ─────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ╚║│ ┕║─│ ┕␚┕│ ═══════ ═══════════════╝ │IdB, PubB, sign(SignPrvB, (IdB, PubA, PubB)) ││␕╕╕══␕╕╕══╕══ ═════ ════════════╗ │<─────────────────└———└—— ────────── ─────────│ ║SignPrvB, SignPubB = načítať()║ │ │ ║PrvB, PubB = DHgen() ║ ││ ││╕│ ═╕╕═══╕══║ ═══════ ══════════╝ │ znak (SignPrvA, (IdA, PubB, PubA)) │ ╔═════╕════╕════╕══════ ═══╗ │─ ────────────────────────────────└└—— ───>│ ║overiť (SignPubB, ...) ║ │ │ ║kľúč = dh (prva, PUBB) ║ │ │ │

Stále sme však „nedokázali“, že sme pre túto reláciu vygenerovali rovnaký zdieľaný kľúč. V zásade sa bez tohto kroku zaobídeme – hneď prvý dopravný spoj bude neplatný, no chceme, aby sme po podaní ruky mali istotu, že je naozaj všetko dohodnuté. V súčasnosti máme po ruke protokol ISO/IEC IS 9798-3.

Samotný vygenerovaný kľúč sme mohli podpísať. Je to nebezpečné, pretože je možné, že v použitom podpisovom algoritme môže dochádzať k únikom (síce bitov na podpis, ale stále dochádza k únikom). Je možné podpísať hash derivačného kľúča, ale únik dokonca aj hashu odvodeného kľúča môže byť cenný pri útoku hrubou silou na derivačnú funkciu. SIGMA používa funkciu MAC, ktorá overuje ID odosielateľa.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └─┘ ┘─ I┘ ┘── I┘ ┘─ I┘ ┘─ I─ │ ╔══════════ ═════════════════╗ │───────────└└└———└—— ────────── ──────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ =║DHA│ ║DHA, PubA ═══════ ════════════════════╝ │IdB, PubB, znamenie (SignPrvB, (PubA, PubB)╔╔ₕ ═══ │<───────────────── ─────────────└──└───── ─────────────└──└─ — ────────── ─│ ║SignPrvB, SignPubB = načítať()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ═╕═══╕════╕══════ ═══════ ══╝ │ │ ╔════════════ ═════════, Publikácia A) │ znak (Ib(Pub)A)Prv ║Kľúč = DH( PrvA, PubB) ║ │───────────────────── ───└———└─— ────────── ─────>│ ║overiť (kľúč, IdB) ║ │ │ ║overiť (SignPubB, ...)║ │ │ ╚══╕════╕════╕══␕═╕══ ═════ ═╝ │ │

Ako optimalizáciu môžu niektorí chcieť znova použiť svoje efemérne kľúče (čo je, samozrejme, pre PFS nešťastné). Napríklad sme vygenerovali kľúčový pár, pokúsili sme sa pripojiť, ale TCP nebol dostupný alebo bol niekde uprostred protokolu prerušený. Je škoda plytvať entropiou a zdrojmi procesora na nový pár. Preto zavedieme takzvaný cookie – pseudonáhodnú hodnotu, ktorá ochráni pred prípadnými náhodnými útokmi opakovaného prehrávania pri opätovnom použití efemérnych verejných kľúčov. Vzhľadom na väzbu medzi cookie a efemérnym verejným kľúčom môže byť verejný kľúč opačnej strany z podpisu odstránený ako nepotrebný.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┬└,─└, └,─┘ └── A┘ ┘── A┘ ┘── CookieA │ ╔════════ ═══════════════════╗ │──────└└└─—└└— — ────────── ────────────────────────────────└└—— ─>│ ║SignPrvA, SignPubA = load( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚════════════════════════════ ══╝ │IdB, PubB, CookieB , podpísať (SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔═══════════════════════════════ ╗ │< ────────────────────────────────└└—— ────────── ────────────────────│ ║SignPrvB, SignPubB = načítanie()║ │ │ ──────────────│ ║SignPrvB, SignPubB = zaťaženie()║ │ │ ┕B, ║ = Publikácia │ DH, ║ ╚══════ ═════════════════════╝ │ │ ╔═╕═══╕════╕═══╕══ ═══════╗ │ znak ( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Kľúč = DH(PrvA, PubB) ║ │───────────└———└—— ─ ────────────────────────────────└└—— ───────>│ ║ overiť (Kľúč, IdB) ║ │ │ ║overiť (SignPubB, ...)║ │ │ ╚══════════╕═══╕════╕ₕ │

Nakoniec chceme získať súkromie našich konverzačných partnerov od pasívneho pozorovateľa. Aby to bolo možné, SIGMA navrhuje najprv vymeniť efemérne kľúče a vyvinúť spoločný kľúč, na ktorom sa budú šifrovať autentifikačné a identifikačné správy. SIGMA popisuje dve možnosti:

  • SIGMA-I - chráni iniciátora pred aktívnymi útokmi, respondenta pred pasívnymi: iniciátor autentifikuje respondenta a ak sa niečo nezhoduje, nevydá svoju identifikáciu. Odporca vydá svoju identifikáciu, ak sa s ním začne aktívny protokol. Pasívny pozorovateľ sa nič nedozvie;
    SIGMA-R - chráni respondenta pred aktívnymi útokmi, iniciátora pred pasívnymi. Všetko je presne naopak, ale v tomto protokole sa už prenášajú štyri správy o podaní ruky.

    Vybrali sme SIGMA-I, pretože sa viac podobá tomu, čo očakávame od známych vecí typu klient-server: klienta rozpozná iba overený server a server už pozná každý. Navyše je jednoduchšie implementovať vďaka menšiemu počtu správ typu handshake. Všetko, čo pridávame do protokolu, je zašifrovať časť správy a preniesť identifikátor A do zašifrovanej časti poslednej správy:

    PubA, CookieA │ ╔══════════ ════════════════―└―――└――— ────────── ───── ────────── ─────────────————— ─────────── ───── ──────>│ ║SignPrvA , SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ═ ─┕╕│ ══┕╕┕ ═══════ ═════════ ════╝ │ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB)╕═␐)╕)═)╕) ═════════ ═══════════════ ═══════╗ │<───—└———└— — ────────── ───── ────────── ║SignP rvB, SignPubB = load()║ │ │ ═ PrvB, PubB = DHgen() ║ DHgen() ║ ═══════ – ══╗ │ Enc((IdA, sign( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Kľúč = DH(PrvA, PubB) ║ │──────────└─—— - ─────────── ──────>│ ║overiť (kľúč, IdB) ║ │ │ ║overiť (SignPubB, ...)║ │ │ ╚═╕═══╕════════════ ═════ ══╝ │ │
    
    • Na podpis sa používa GOST R 34.10-2012 algoritmus s 256-bitovými kľúčmi.
    • Na vygenerovanie verejného kľúča sa používa 34.10-2012 VKO.
    • CMAC sa používa ako MAC. Technicky ide o špeciálny režim fungovania blokovej šifry, opísaný v GOST R 34.13-2015. Ako šifrovacia funkcia pre tento režim − koník (34.12-2015).
    • Hash jeho verejného kľúča sa používa ako identifikátor partnera. Používa sa ako hash Stribog-256 (34.11. 2012. 256 XNUMX bitov).

    Po podaní ruky sa dohodneme na spoločnom kľúči. Môžeme ho použiť na autentifikované šifrovanie transportných správ. Táto časť je veľmi jednoduchá a je ťažké urobiť chybu: inkrementujeme počítadlo správ, zašifrujeme správu, overíme (MAC) počítadlo a šifrový text, odošleme. Pri prijatí správy skontrolujeme, či má počítadlo očakávanú hodnotu, overíme šifrový text počítadlom a dešifrujeme. Aký kľúč by som mal použiť na šifrovanie správ handshake, prenos správ a ako ich overiť? Používanie jedného kľúča na všetky tieto úlohy je nebezpečné a nerozumné. Je potrebné generovať kľúče pomocou špecializovaných funkcií KDF (funkcia odvodzovania kľúča). Opäť si neštiepime vlasy a vymyslime niečo: HKDF je už dlho známy, dobre preskúmaný a nemá žiadne známe problémy. Bohužiaľ, natívna knižnica Python túto funkciu nemá, preto používame hkdf plastový sáčok. HKDF interne používa HMAC, ktorý zase využíva hašovaciu funkciu. Príklad implementácie v Pythone na stránke Wikipedia trvá len niekoľko riadkov kódu. Rovnako ako v prípade 34.10 použijeme ako hashovaciu funkciu Stribog-2012. Výstup našej funkcie kľúčovej dohody sa bude nazývať kľúč relácie, z ktorého sa vygenerujú chýbajúce symetrické:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Štruktúry/schémy

    Pozrime sa, aké štruktúry ASN.1 máme teraz na prenos všetkých týchto údajov:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS je to, čo bude podpísané. HandshakeTBE - čo bude šifrované. Upozorňujem na pole ukm v MsgHandshake1. 34.10 VKO pre ešte väčšiu náhodnosť generovaných kľúčov obsahuje parameter UKM (materiál na kľúčovanie používateľa) - len dodatočná entropia.

    Pridanie kryptografie do kódu

    Uvažujme len zmeny vykonané v pôvodnom kóde, keďže rámec zostal rovnaký (v skutočnosti bola najprv napísaná konečná implementácia a potom z nej bola vyrezaná všetka kryptografia).

    Keďže autentifikácia a identifikácia účastníkov rozhovoru sa bude vykonávať pomocou verejných kľúčov, musia byť teraz niekde uložené na dlhú dobu. Pre jednoduchosť používame JSON takto:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    náš - náš pár kľúčov, hexadecimálny súkromný a verejný kľúč. ich — mená účastníkov rozhovoru a ich verejné kľúče. Zmeňme argumenty príkazového riadku a pridáme následné spracovanie údajov JSON:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    Súkromný kľúč algoritmu 34.10 je náhodné číslo. 256-bitová veľkosť pre 256-bitové eliptické krivky. PyGOST nepracuje s množinou bajtov, ale s veľké čísla, takže náš súkromný kľúč (urandom(32)) je potrebné previesť na číslo pomocou gost3410.prv_unmarshal(). Verejný kľúč je určený deterministicky zo súkromného kľúča pomocou gost3410.public_key(). Verejný kľúč 34.10 sú dve veľké čísla, ktoré je tiež potrebné previesť na bajtovú sekvenciu, aby sa uľahčilo ukladanie a prenos pomocou gost3410.pub_marshal().

    Po prečítaní súboru JSON je potrebné verejné kľúče skonvertovať späť pomocou gost3410.pub_unmarshal(). Keďže identifikátory účastníkov rozhovoru dostaneme vo forme hash z verejného kľúča, možno ich okamžite vopred vypočítať a umiestniť do slovníka na rýchle vyhľadávanie. Hash Stribog-256 je gost34112012256.GOST34112012256(), ktorý plne vyhovuje rozhraniu hashlib hašovacích funkcií.

    Ako sa zmenil korutín iniciátora? Všetko je podľa schémy handshake: vygenerujeme súbor cookie (128-bit je dostatok), dočasnú dvojicu kľúčov 34.10, ktorá sa použije na funkciu dohody kľúčov VKO.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • čakáme na odpoveď a dekódujeme prichádzajúcu správu Msg;
    • uistite sa, že dostanete handshake1;
    • dekódovať efemérny verejný kľúč opačnej strany a vypočítať kľúč relácie;
    • Generujeme symetrické kľúče potrebné na spracovanie TBE časti správy.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM je 64-bitové číslo (urandom(8)), ktoré tiež vyžaduje deserializáciu zo svojej bajtovej reprezentácie pomocou gost3410_vko.ukm_unmarshal(). Funkcia VKO k 34.10 2012-bit je gost256_vko.kek_3410() (KEK - šifrovací kľúč).

    Vygenerovaný kľúč relácie je už 256-bitová pseudonáhodná sekvencia bajtov. Preto môže byť okamžite použitý vo funkciách HKDF. Keďže GOST34112012256 vyhovuje rozhraniu hashlib, môže byť okamžite použitý v triede Hkdf. Soľ (prvý argument Hkdf) nešpecifikujeme, pretože vygenerovaný kľúč bude vzhľadom na dočasnosť zúčastnených párov kľúčov pre každú reláciu iný a už teraz obsahuje dostatok entropie. kdf.expand() v predvolenom nastavení už vytvára 256-bitové kľúče potrebné pre Grasshopper neskôr.

    Ďalej sa skontrolujú časti TBE a TBS prichádzajúcej správy:

    • MAC cez prichádzajúci šifrový text sa vypočíta a skontroluje;
    • šifrovaný text je dešifrovaný;
    • Štruktúra TBE je dekódovaná;
    • vyberie sa z neho identifikátor partnera a skontroluje sa, či je nám vôbec známy;
    • MAC cez tento identifikátor sa vypočíta a skontroluje;
    • overuje sa podpis nad štruktúrou TBS, ktorá obsahuje cookie oboch strán a verejný efemérny kľúč protistrany. Podpis je overený trvanlivým podpisovým kľúčom partnera.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Ako som písal vyššie, 34.13 popisuje rôzne prevádzkové režimy blokovej šifry zo dňa 34.12. Medzi nimi je režim na generovanie imitácií vložiek a výpočty MAC. V PyGOST je to gost2015.mac(). Tento režim vyžaduje odovzdanie funkcie šifrovania (prijatie a vrátenie jedného bloku dát), veľkosť šifrovacieho bloku a vlastne aj samotné dáta. Prečo nemôžete pevne zakódovať veľkosť šifrovacieho bloku? 3413 popisuje nielen 34.12-bitovú šifru Grasshopper, ale aj 2015-bit Magma - mierne upravený GOST 28147-89, vytvorený späť v KGB a stále má jeden z najvyšších bezpečnostných prahov.

    Kuznechik sa inicializuje volaním gost.3412.GOST3412Kuznechik(key) a vráti objekt s metódami .encrypt()/.decrypt() vhodnými na prechod do funkcií 34.13. MAC sa vypočíta takto: gost3413.mac(GOST3412Kuznechik(kľúč).šifrovať, KUZNECHIK_BLOCKSIZE, šifrový text). Na porovnanie vypočítanej a prijatej MAC nemôžete použiť zvyčajné porovnanie (==) bajtových reťazcov, pretože pri tejto operácii uniká čas porovnania, čo môže vo všeobecnom prípade viesť k fatálnym zraniteľnostiam ako napr. BEAST útoky na TLS. Python má na to špeciálnu funkciu hmac.compare_digest.

    Funkcia blokovej šifry dokáže zašifrovať iba jeden blok údajov. Pre väčší počet a aj to nie násobok dĺžky je potrebné použiť režim šifrovania. 34.13-2015 popisuje nasledovné: ECB, CTR, OFB, CBC, CFB. Každý z nich má svoje vlastné prijateľné oblasti použitia a vlastnosti. Žiaľ, ešte stále nemáme štandardizované overené režimy šifrovania (ako CCM, OCB, GCM a podobne) - sme nútení si MAC pridávať aspoň sami. vyberám si režim počítadla (CTR): nevyžaduje výplň do veľkosti bloku, dá sa paralelizovať, používa iba funkciu šifrovania, dá sa bezpečne použiť na šifrovanie veľkého množstva správ (na rozdiel od CBC, ktoré má kolízie pomerne rýchlo).

    Podobne ako .mac(), aj .ctr() má podobný vstup: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, čistý text, iv). Je potrebné špecifikovať inicializačný vektor, ktorý má presne polovicu dĺžky šifrovacieho bloku. Ak sa náš šifrovací kľúč používa len na zašifrovanie jednej správy (hoci z viacerých blokov), potom je bezpečné nastaviť nulový inicializačný vektor. Na šifrovanie správ typu handshake používame zakaždým samostatný kľúč.

    Overenie podpisu gost3410.verify() je triviálne: prejdeme eliptickú krivku, v rámci ktorej pracujeme (jednoducho ju zaznamenáme v našom protokole GOSTIM), verejný kľúč podpisovateľa (nezabudnite, že by to mala byť n-tica dvoch veľké čísla a nie bajtový reťazec), hash 34.11. 2012. XNUMX a samotný podpis.

    Ďalej v iniciátore pripravíme a odošleme správu handshake2, pričom vykonáme rovnaké akcie ako pri overovaní, iba symetricky: podpíšeme sa na naše kľúče namiesto kontroly atď...

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Po vytvorení relácie sa vygenerujú prenosové kľúče (samostatný kľúč na šifrovanie, overenie, pre každú zo strán) a inicializuje sa Grasshopper na dešifrovanie a kontrolu MAC:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    Korutína msg_sender teraz šifruje správy pred ich odoslaním cez TCP spojenie. Každá správa má monotónne rastúci nonce, ktorý je tiež inicializačným vektorom pri zašifrovaní v režime čítača. Každá správa a blok správ má zaručenú inú hodnotu počítadla.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    Prichádzajúce správy spracováva korutín msg_receiver, ktorý sa stará o autentifikáciu a dešifrovanie:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    Záver

    GOSTIM je určený výhradne na vzdelávacie účely (keďže nie je pokrytý testami, aspoň)! Zdrojový kód programu je možné stiahnuť tu (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, GoVPN, GOSTIM je úplne slobodný softvér, distribuovaný za podmienok GPLv3 +.

    Sergej Matveev, cypherpunk, člen Nadácia SPO, vývojár Python/Go, hlavný špecialista FSUE "STC "Atlas".

Zdroj: hab.com

Pridať komentár