GOSTIM: P2P F2F E2EE IM dalam satu petang dengan kriptografi GOST

Menjadi pemaju PyGOST perpustakaan (primitif kriptografi GOST dalam Python tulen), saya sering menerima soalan tentang cara melaksanakan pemesejan selamat yang paling mudah pada lutut. Ramai orang menganggap kriptografi gunaan agak mudah, dan memanggil .encrypt() pada sifir blok sudah cukup untuk menghantarnya dengan selamat melalui saluran komunikasi. Yang lain percaya bahawa kriptografi gunaan adalah nasib segelintir orang, dan boleh diterima bahawa syarikat kaya seperti Telegram dengan ahli matematik olimpiade tidak boleh melaksanakan protokol selamat.

Semua ini mendorong saya untuk menulis artikel ini untuk menunjukkan bahawa melaksanakan protokol kriptografi dan IM selamat bukanlah tugas yang sukar. Walau bagaimanapun, ia tidak berbaloi untuk mencipta pengesahan anda sendiri dan protokol perjanjian utama.

GOSTIM: P2P F2F E2EE IM dalam satu petang dengan kriptografi GOST
Artikel akan menulis peer-to-peer, kawan ke kawan, disulitkan hujung ke hujung utusan segera dengan SIGMA-I pengesahan dan protokol perjanjian utama (berdasarkan ia dilaksanakan IPsec IKE), menggunakan algoritma kriptografi GOST secara eksklusif perpustakaan PyGOST dan perpustakaan pengekodan mesej ASN.1 PyDERASN (tentang saya sudah menulis sebelum ini). Prasyarat: ia mestilah sangat mudah sehingga ia boleh ditulis dari awal dalam satu petang (atau hari kerja), jika tidak, ia bukan lagi program yang mudah. Ia mungkin mempunyai ralat, komplikasi yang tidak perlu, kekurangan, dan ini adalah program pertama saya menggunakan perpustakaan asyncio.

Reka bentuk IM

Pertama, kita perlu memahami bagaimana rupa IM kita. Untuk kesederhanaan, biarkan ia menjadi rangkaian peer-to-peer, tanpa sebarang penemuan peserta. Kami secara peribadi akan menunjukkan alamat mana: port untuk disambungkan untuk berkomunikasi dengan lawan bicara.

Saya faham bahawa, pada masa ini, andaian bahawa komunikasi langsung tersedia antara dua komputer sewenang-wenangnya adalah had ketara ke atas kebolehgunaan IM dalam amalan. Tetapi semakin banyak pembangun melaksanakan semua jenis tongkat traversal NAT, semakin lama kita akan kekal di Internet IPv4, dengan kebarangkalian komunikasi yang menyedihkan antara komputer sewenang-wenangnya. Berapa lama anda boleh bertolak ansur dengan kekurangan IPv6 di rumah dan di tempat kerja?

Kami akan mempunyai rangkaian rakan ke rakan: semua rakan bicara yang mungkin mesti diketahui lebih awal. Pertama, ini sangat memudahkan segala-galanya: kami memperkenalkan diri kami, menjumpai atau tidak menjumpai nama/kunci, terputus sambungan atau terus bekerja, mengetahui lawan bicara. Kedua, secara umum, ia selamat dan menghapuskan banyak serangan.

Antara muka IM akan hampir dengan penyelesaian klasik projek yang tidak berguna, yang saya sangat suka kerana minimalisme dan falsafah Unix-way mereka. Program IM mencipta direktori dengan tiga soket domain Unix untuk setiap lawan bicara:

  • dalamβ€”mesej yang dihantar kepada lawan bicara direkodkan di dalamnya;
  • keluar - mesej yang diterima daripada lawan bicara dibaca daripadanya;
  • negeri - dengan membaca daripadanya, kita mengetahui sama ada lawan bicara sedang disambungkan, alamat/port sambungan.

Di samping itu, soket sambung dicipta, dengan menulis port hos di mana kami memulakan sambungan kepada lawan bicara jauh.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Pendekatan ini membolehkan anda membuat pelaksanaan bebas pengangkutan IM dan antara muka pengguna, kerana tiada rakan, anda tidak boleh menggembirakan semua orang. menggunakan tmux dan / atau berbilang ekor, anda boleh mendapatkan antara muka berbilang tetingkap dengan penyerlahan sintaks. Dan dengan bantuan rlwrap anda boleh mendapatkan baris input mesej serasi GNU Readline.

Malah, projek tidak berguna menggunakan fail FIFO. Secara peribadi, saya tidak dapat memahami cara bekerja dengan fail secara kompetitif dalam asyncio tanpa latar belakang tulisan tangan daripada benang khusus (saya telah menggunakan bahasa untuk perkara sedemikian untuk masa yang lama Go). Oleh itu, saya memutuskan untuk membuat kaitan dengan soket domain Unix. Malangnya, ini menjadikannya mustahil untuk melakukan echo 2001:470:dead::babe 6666 > conn. Saya menyelesaikan masalah ini menggunakan socat: echo 2001:470:dead::babe 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alice/in.

Protokol tidak selamat asal

TCP digunakan sebagai pengangkutan: ia menjamin penghantaran dan pesanannya. UDP tidak menjamin kedua-duanya (yang akan berguna apabila kriptografi digunakan), tetapi menyokong SCTP Python tidak keluar dari kotak.

Malangnya, dalam TCP tidak ada konsep mesej, hanya aliran bait. Oleh itu, adalah perlu untuk menghasilkan format untuk mesej supaya mereka boleh dikongsi sesama mereka dalam urutan ini. Kami boleh bersetuju untuk menggunakan aksara suapan baris. Tidak mengapa untuk permulaan, tetapi sebaik sahaja kami mula menyulitkan mesej kami, watak ini mungkin muncul di mana-mana dalam teks sifir. Dalam rangkaian, oleh itu, protokol popular adalah yang mula-mula menghantar panjang mesej dalam bait. Sebagai contoh, di luar kotak Python mempunyai xdrlib, yang membolehkan anda bekerja dengan format yang serupa XDR.

Kami tidak akan berfungsi dengan betul dan cekap dengan bacaan TCP - kami akan memudahkan kod. Kami membaca data dari soket dalam gelung yang tidak berkesudahan sehingga kami menyahkod mesej lengkap. JSON dengan XML juga boleh digunakan sebagai format untuk pendekatan ini. Tetapi apabila kriptografi ditambah, data perlu ditandatangani dan disahkan - dan ini memerlukan perwakilan yang sama bait demi bait bagi objek, yang tidak disediakan oleh JSON/XML (hasil pembuangan mungkin berbeza-beza).

XDR sesuai untuk tugasan ini, namun saya memilih ASN.1 dengan pengekodan DER dan PyDERASN perpustakaan, kerana kita akan mempunyai objek peringkat tinggi di tangan yang selalunya lebih menyenangkan dan mudah untuk digunakan. Tidak seperti tanpa skema bencode, Pek Mesej atau CBOR, ASN.1 akan menyemak data secara automatik terhadap skema berkod keras.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

Mesej yang diterima ialah Msg: sama ada MsgText teks (dengan satu medan teks buat masa ini) atau mesej jabat tangan MsgHandshake (yang mengandungi nama lawan bicara). Sekarang ia kelihatan terlalu rumit, tetapi ini adalah asas untuk masa depan.

     β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”‚PeerAβ”‚ β”‚PeerBβ”‚ β””β”€β”€β”¬β”€β”€β”˜ ┬──dA ) β”‚ │───────── ────────>β”‚ β”‚ β”‚ β”‚MsgHandshake(IdB) β”‚ β”‚<─────────│ ────── β”‚ β”‚ MsgText() β”‚ │──── MsgText() β”‚ β”‚ β”‚

IM tanpa kriptografi

Seperti yang telah saya katakan, perpustakaan asyncio akan digunakan untuk semua operasi soket. Mari umumkan perkara yang kami jangkakan semasa pelancaran:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Tetapkan nama anda sendiri (--nama-kami alice). Semua lawan bicara yang dijangkakan disenaraikan dipisahkan dengan koma (β€”nama mereka bob,eve). Untuk setiap lawan bicara, direktori dengan soket Unix dibuat, serta coroutine untuk setiap masuk, keluar, nyatakan:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

Mesej yang datang daripada pengguna daripada soket masuk dihantar ke baris gilir IN_QUEUES:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

Mesej yang datang daripada lawan bicara dihantar ke baris gilir OUT_QUEUES, dari mana data ditulis ke soket keluar:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Apabila membaca dari soket keadaan, program mencari alamat lawan bicara dalam kamus PEER_ALIVE. Jika belum ada sambungan dengan lawan bicara, maka baris kosong ditulis.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Apabila menulis alamat ke soket sambungan, fungsi "pemula" sambungan dilancarkan:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Mari kita pertimbangkan pemula. Mula-mula ia jelas membuka sambungan ke hos/port yang ditentukan dan menghantar mesej jabat tangan dengan namanya:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Kemudian, ia menunggu jawapan daripada pihak jauh. Cuba menyahkod respons masuk menggunakan skema Msg ASN.1. Kami menganggap bahawa keseluruhan mesej akan dihantar dalam satu segmen TCP dan kami akan menerimanya secara atom apabila memanggil .read(). Kami menyemak sama ada kami menerima mesej berjabat tangan.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Kami menyemak sama ada nama yang diterima daripada lawan bicara diketahui oleh kami. Jika tidak, maka kita memutuskan sambungan. Kami menyemak sama ada kami telah menjalinkan hubungan dengannya (pembicara sekali lagi memberi arahan untuk menyambung kepada kami) dan menutupnya. Baris gilir IN_QUEUES memegang rentetan Python dengan teks mesej, tetapi mempunyai nilai istimewa None yang memberi isyarat kepada coroutine msg_sender untuk berhenti berfungsi supaya ia melupakan penulisnya yang dikaitkan dengan sambungan TCP warisan.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender menerima mesej keluar (dibaris gilir dari soket dalam), mensirikannya ke dalam mesej MsgText dan menghantarnya melalui sambungan TCP. Ia boleh pecah pada bila-bila masa - kami memintas dengan jelas.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

Pada penghujungnya, pemula memasuki gelung tak terhingga untuk membaca mesej daripada soket. Menyemak sama ada mesej ini ialah mesej teks dan meletakkannya dalam baris gilir OUT_QUEUES, dari mana ia akan dihantar ke soket keluar rakan bicara yang sepadan. Mengapa anda tidak boleh melakukan .read() dan menyahkod mesej? Kerana ada kemungkinan beberapa mesej daripada pengguna akan diagregatkan dalam penimbal sistem pengendalian dan dihantar dalam satu segmen TCP. Kita boleh menyahkod yang pertama, dan kemudian sebahagian daripada yang berikutnya mungkin kekal dalam penimbal. Sekiranya berlaku sebarang situasi yang tidak normal, kami menutup sambungan TCP dan menghentikan coroutine msg_sender (dengan menghantar Tiada ke baris gilir OUT_QUEUES).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Mari kembali ke kod utama. Selepas mencipta semua coroutine pada masa program bermula, kami memulakan pelayan TCP. Untuk setiap sambungan yang ditubuhkan, ia mencipta coroutine responder.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

responder adalah serupa dengan pemula dan mencerminkan semua tindakan yang sama, tetapi gelung tak terhingga untuk membaca mesej bermula serta-merta, untuk kesederhanaan. Pada masa ini, protokol jabat tangan menghantar satu mesej dari setiap sisi, tetapi pada masa hadapan akan ada dua daripada pemula sambungan, selepas itu mesej teks boleh dihantar serta-merta.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Protokol selamat

Sudah tiba masanya untuk menjamin komunikasi kita. Apakah yang kami maksudkan dengan keselamatan dan apa yang kami mahu:

  • kerahsiaan mesej yang dihantar;
  • keaslian dan integriti mesej yang dihantar - perubahannya mesti dikesan;
  • perlindungan terhadap serangan ulang - fakta mesej yang hilang atau berulang mesti dikesan (dan kami memutuskan untuk menamatkan sambungan);
  • pengenalpastian dan pengesahan lawan bicara menggunakan kunci awam yang telah dimasukkan sebelumnya - kami telah memutuskan sebelum ini bahawa kami membuat rangkaian rakan ke rakan. Hanya selepas pengesahan kami akan memahami dengan siapa kami berkomunikasi;
  • ketersediaan kerahsiaan hadapan yang sempurna hartanah (PFS) - menjejaskan kunci tandatangan jangka panjang kami tidak seharusnya membawa kepada keupayaan untuk membaca semua surat-menyurat sebelumnya. Merakam trafik yang dipintas menjadi tidak berguna;
  • kesahihan/kesahan mesej (pengangkutan dan jabat tangan) hanya dalam satu sesi TCP. Memasukkan mesej yang ditandatangani/disahkan dengan betul daripada sesi lain (walaupun dengan lawan bicara yang sama) tidak boleh dilakukan;
  • pemerhati pasif tidak seharusnya melihat sama ada pengecam pengguna, menghantar kunci awam tahan lama atau cincang daripadanya. Tanpa nama tertentu daripada pemerhati pasif.

Anehnya, hampir semua orang mahu mempunyai minimum ini dalam mana-mana protokol jabat tangan, dan sangat sedikit perkara di atas akhirnya dipenuhi untuk protokol "tanah sendiri". Sekarang kami tidak akan mencipta sesuatu yang baharu. Saya pasti akan mengesyorkan menggunakan Rangka kerja bunyi untuk membina protokol, tetapi mari kita pilih sesuatu yang lebih mudah.

Dua protokol yang paling popular ialah:

  • TLS - protokol yang sangat kompleks dengan sejarah panjang pepijat, jambs, kelemahan, pemikiran yang buruk, kerumitan dan kekurangan (namun, ini tidak ada kaitan dengan TLS 1.3). Tetapi kami tidak menganggapnya kerana ia terlalu rumit.
  • IPsec с IKE β€” tidak mempunyai masalah kriptografi yang serius, walaupun ia juga tidak mudah. Jika anda membaca tentang IKEv1 dan IKEv2, maka sumber mereka adalah STS, protokol ISO/IEC IS 9798-3 dan SIGMA (SIGn-and-MAc) - cukup mudah untuk dilaksanakan dalam satu petang.

Apakah kebaikan SIGMA, sebagai pautan terkini dalam pembangunan protokol STS/ISO? Ia memenuhi semua keperluan kami (termasuk "menyembunyikan" pengecam interlocutor) dan tidak mempunyai masalah kriptografi yang diketahui. Ia minimalis - mengalih keluar sekurang-kurangnya satu elemen daripada mesej protokol akan membawa kepada ketidakamanannya.

Mari kita beralih daripada protokol yang paling mudah ditanam di rumah kepada SIGMA. Operasi paling asas yang kami minati ialah perjanjian kunci: Fungsi yang mengeluarkan kedua-dua peserta nilai yang sama, yang boleh digunakan sebagai kunci simetri. Tanpa pergi ke butiran: setiap pihak menjana pasangan kunci sementara (hanya digunakan dalam satu sesi) (kunci awam dan peribadi), menukar kunci awam, memanggil fungsi perjanjian, kepada input yang mereka luluskan kunci peribadi mereka dan orang awam kunci lawan bicara.

β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”‚PeerAβ”‚ β”‚PeerBβ”‚ β””β”€β”€β”¬β”€β”€β”˜ ─── │┬── β”‚ ╔══════════ ══════════╗ │───────────────>β”‚ ║───────────────>β”‚ ║───────────────>β”‚ β•‘β•‘PrvA ─║║PrvA, Pub ═ ════════ ═══════════╝ β”‚ IdB, PubB β”‚ ╔═══════════␐═══␐␐══ β”‚<───────── ──────│ β•‘PrvB, PubB = DHgen()β•‘ β”‚ β”‚ β•šβ•β•β•β•β•β•β•β•β•β•β•β•ββ•β•β•ββ•β•β•β•ββ•β• ───┐ ╔════ ═══╧════════════╗ β”‚ β•‘Key = DH(PrvA, PubB)β•‘ <β”€β”€β”€β”ββ•šβ••β”ββ•šβ••β”ββ•šβ•• ═══════ ════╝ β”‚ β”‚ β”‚ β”‚

Sesiapa sahaja boleh melompat di tengah dan menggantikan kunci awam dengan kunci mereka sendiri - tiada pengesahan rakan bicara dalam protokol ini. Mari tambahkan tandatangan dengan kunci tahan lama.

β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”‚PeerAβ”‚ β”‚PeerBβ”‚ β””β”€β”€β”¬β”€β”€β”˜ │── │┬── SignPrvA, (PubA)) β”‚ ╔═ │──────────── ────────── ──────────P ubA = beban()β•‘ β”‚ β”‚ β•‘PrvA, PubA = DHgen() β•‘ β”‚ β”‚ β•šβ•β•β•β•β•β•β• ═══════ ══════════ ═══════ ═══════␐═B, PubA , tanda(SignPrvB, (PubB)) β”‚ ╔══════════════ ═══════ ══════════ ═══════ ══════‗ β”‚ ─────────── ───────────── ──│ β•‘SignPrvB, SignPubB = load( )β•‘ β”‚ β”‚ β•‘PrvB, PubB = DH ═════════ ══════════════ ══╝ ────┐ β•” ══════␐════␐═╕ ═════╗ β”‚ β”‚ β•‘sahkan( SignPubB, ...)β•‘ β”‚ <β”€β”€β”€β”˜ β•‘Key = DH(Pr vA, PubB) β•‘ β”‚ β”‚ β•šβ•β•β•β•β•β•β•β•β•βββ•β•β•βββ•β•β•β•ββ•β•β•β•ββ•β•β•β•ββ•β• ═╝ β”‚ β”‚ β”‚

Tandatangan sedemikian tidak akan berfungsi, kerana ia tidak terikat pada sesi tertentu. Mesej sedemikian juga "sesuai" untuk sesi dengan peserta lain. Seluruh konteks mesti melanggan. Ini memaksa kami untuk menambah satu lagi mesej daripada A.

Di samping itu, adalah penting untuk menambah pengecam anda sendiri di bawah tandatangan, kerana jika tidak, kami boleh menggantikan IdXXX dan menandatangani semula mesej dengan kunci rakan bicara lain yang dikenali. Untuk mengelakkan serangan refleksi, adalah perlu bahawa elemen di bawah tandatangan berada di tempat yang ditakrifkan dengan jelas mengikut maksudnya: jika tanda A (PubA, PubB), maka B mesti menandatangani (PubB, PubA). Ini juga bercakap tentang kepentingan memilih struktur dan format data bersiri. Contohnya, set dalam pengekodan ASN.1 DER diisih: SET OF(PubA, PubB) akan sama dengan SET OF(PubB, PubA).

β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”‚PeerAβ”‚ β”‚PeerBβ”‚ β””β”€β”€β”¬β”€β”€β”˜ ─── │┬── β”‚ ╔══════════ ═════════════════╗ │───────────——──—───— ────────── ─────────────>β”‚ β•‘SignPrvA, SignPubA = load()β•‘ β”‚ β”‚ β•‘PrvA, PubA = β‘š ␐ β•• ═ ═══════ ═══════════════╝ β”‚IdB, PubB, sign(SignPrvB, (IdB, PubA, PubB)) β”‚ ␔␐╕╕ ═════ ════════════╗ β”‚<────────────────—────—─ ────────── ─────────│ β•‘SignPrvB, SignPubB = muat()β•‘ β”‚ β”‚ β•‘PrvB, PubB = DHgen() β•‘ β”‚ β•β•β•šβ•β•β•šβ•β•β•š ═ ════════ ══════════╝ β”‚ tanda(SignPrvA, (IdA, PubB, PubA)) β”‚ ╔═══════␐═══␐═══␐══ ════╗ │─ ───────────────────────────────—─── ─ ───>β”‚ β•‘sahkan(SignPubB, ...) β•‘ β”‚ β”‚ β•‘key = dh (prva, PUBB) β•‘ β”‚ β”‚ β”‚

Walau bagaimanapun, kami masih belum "membuktikan" bahawa kami telah menjana kunci kongsi yang sama untuk sesi ini. Pada dasarnya, kita boleh lakukan tanpa langkah ini - sambungan pengangkutan pertama akan menjadi tidak sah, tetapi kami mahu apabila jabat tangan selesai, kami akan memastikan bahawa semuanya benar-benar dipersetujui. Pada masa ini kami mempunyai protokol ISO/IEC IS 9798-3.

Kami boleh menandatangani kunci yang dijana itu sendiri. Ini berbahaya, kerana mungkin terdapat kebocoran dalam algoritma tandatangan yang digunakan (walaupun bit-per-tandatangan, tetapi masih bocor). Anda boleh menandatangani cincang kunci terbitan, tetapi cincangan kunci terbitan yang bocor sekalipun boleh menjadi berharga dalam serangan kekerasan terhadap fungsi terbitan. SIGMA menggunakan fungsi MAC yang mengesahkan ID pengirim.

β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”‚PeerAβ”‚ β”‚PeerBβ”‚ β””β”€β”€β”¬β”€β”€β”˜ ─── │┬── β”‚ ╔══════════ ═════════════════╗ │───────────——──—───— ────────── ──────────────────>β”‚ β•‘SignPrvA, SignPubA = load()β•‘ β”‚ β”‚ β•‘ β•‘ β”‚ β”‚ β•‘PrvA, SignPubA = load()β•‘ β”‚ β”‚ β•‘PrvA β•š ═══════ ════════════════════╝ β”‚IdB, PubB, tanda(SignPrvB, (PubA, PubB) β•”(IdB) β•”(IdB) β•”(IdB) β•”(IdB) β•” ═══ β”‚<───────────────── ──────────  ── ─—─ ────────── ─│ β•‘SignPrvB, SignPubB = muat()β•‘ β”‚ β”‚ β•‘PrvB, PubB = DHgen() β•‘ β”‚ β”‚ β•šβ•β•β•β• ══␐════␐═╕═␐␐╕══␐═╕ ═════════ ══╝ β”‚ β”‚ ╔════════════ ═════════════════ ═════════╗ ═══╗ β”‚ tanda MAC(A)B, Pu(A)B, Pu(A)B, Pu(A)B β•‘Kunci = DH( PrvA, PubB) β•‘ │───────────────────── ───────────── ────────── ─────>β”‚ β•‘sahkan(Kunci, IdB) β•‘ β”‚ β”‚ β•‘sahkan(SignPubB, ...)β•‘ β”‚ β”‚ β•šβ•β•β•β•β•βββ•β••β•β•ββ•β••β•βββ•β••β•βββ•β•• ════ ═╝ β”‚ β”‚

Sebagai pengoptimuman, sesetengah orang mungkin mahu menggunakan semula kunci sementara mereka (yang, sudah tentu, malang untuk PFS). Sebagai contoh, kami menjana pasangan kunci, cuba menyambung, tetapi TCP tidak tersedia atau terganggu di suatu tempat di tengah-tengah protokol. Sungguh memalukan untuk membazirkan sumber entropi dan pemproses pada pasangan baharu. Oleh itu, kami akan memperkenalkan apa yang dipanggil kuki - nilai rawak pseudo yang akan melindungi daripada kemungkinan serangan ulang tayang rawak apabila menggunakan semula kunci awam yang tidak lama. Disebabkan pengikatan antara kuki dan kunci awam yang tidak lama, kunci awam pihak bertentangan boleh dialih keluar daripada tandatangan sebagai tidak perlu.

β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”‚PeerAβ”‚ β”‚PeerBβ”‚ β””β”€β”€β”¬β”€β”€β”˜ ‬─ │──, Puteri Adik A β”‚ ╔════════ ═══════════════════╗ │─────────——──——──— ────────── ───────────────────────────────—─── ─ ─>β”‚ β•‘SignPrvA, SignPubA = muat( )β•‘ β”‚ β”‚ β•‘PrvA, PubA = DHgen() β•‘ β”‚ β”‚ β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•ββ•β•β•βββ•β• ══╝ β”‚IdB, PubB, CookieB , tanda(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) β”‚ ╔═════════════════␐════␐═══␐══ β•— β”‚< ───────────────────────────────—─── ─ ────────── ────────────────────│ β•‘SignPrvB, SignPubB = load()()β•‘ │────────│ β•‘SignPrvB, SignPubB = load()()β•‘ β”‚ β”‚ = ,β”‚ β”‚ β”‚ β”‚ β•šβ•β•β•β•β•β• ═════════════════════╝ β”‚ β”‚ ╔════␐════␐═══␐══ ═══════╗ β”‚ tanda( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) β”‚ β•‘Key = DH(PrvA, PubB) β•‘ │──────────────── ─── ── ── ───────────────────────────────—─── ─ ───────>β”‚ β•‘ sahkan(Kunci, IdB) β•‘ β”‚ β”‚ β•‘sahkan(SignPubB, ...)β•‘ β”‚ β”‚ β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•ββ•β•β•ββ•β•β•ββ•β• β”‚

Akhir sekali, kami ingin mendapatkan privasi rakan perbualan kami daripada pemerhati pasif. Untuk melakukan ini, SIGMA bercadang untuk menukar kunci sementara dan membangunkan kunci biasa untuk menyulitkan pengesahan dan pengecaman mesej. SIGMA menerangkan dua pilihan:

  • SIGMA-I - melindungi pemula daripada serangan aktif, responden daripada serangan pasif: pemula mengesahkan responden dan jika sesuatu tidak sepadan, maka ia tidak memberikan pengenalannya. Defendan memberikan pengenalannya jika protokol aktif dimulakan dengannya. Pemerhati pasif tidak belajar apa-apa;
    SIGMA-R - melindungi responden daripada serangan aktif, pemula daripada serangan pasif. Semuanya betul-betul bertentangan, tetapi dalam protokol ini empat mesej jabat tangan sudah dihantar.

    Kami memilih SIGMA-I kerana ia lebih serupa dengan apa yang kami harapkan daripada perkara biasa pelayan klien: klien hanya dikenali oleh pelayan yang disahkan, dan semua orang sudah mengetahui pelayan. Selain itu, ia lebih mudah untuk dilaksanakan kerana lebih sedikit mesej jabat tangan. Apa yang kami tambahkan pada protokol adalah untuk menyulitkan sebahagian daripada mesej dan memindahkan pengecam A ke bahagian yang disulitkan pada mesej terakhir:

    PubA, CookieA β”‚ ╔══════════ ═════════════════ ═══════════════‗ ══‗ ══‐ ────────── ───── ────────── ────────────────────—─ ─────────── ───── ──────>β”‚ β•‘SignPrvA , SignPubA = load()β•‘ β”‚ β”‚ β•‘PrvA, PubA = DHgen() β•‘ ═════ ═════════ ═════════ ════╝ β”‚ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) β••) ═════════ . ────────── ───── ────────── β•‘SignP rvB, SignPubB = load()β•‘ β”‚ β”‚ β•‘ PrvB, PubB β•‘ β•š β•š β•š β•š ════════ ════════════════╝ β”‚ β”‚ ╔═════════␐═══␐══ ══╗ β”‚ Enc((IdA, tanda( Signprva, (cookieb, cookiea, puba)), mac (ida))) β”‚ β•‘key = dh (prva, pubb) β•‘ │────────— ────────────────── ────────── ──—─ ──—─ ─────────── ──────>β”‚ β•‘sahkan(Kunci, IdB) β•‘ β”‚ β”‚ β•‘sahkan( SignPubB, ...)β•‘ β”‚ β”‚ β•šβ•β•β•β•β•ββ•β••β•ββ•β••β•β•ββ•β•• ════ ══╝ β”‚ β”‚
    
    • GOST R digunakan untuk tandatangan 34.10-2012 algoritma dengan kekunci 256-bit.
    • Untuk menjana kunci awam, 34.10/2012/XNUMX VKO digunakan.
    • CMAC digunakan sebagai MAC. Secara teknikal, ini adalah mod operasi khas sifir blok, yang diterangkan dalam GOST R 34.13-2015. Sebagai fungsi penyulitan untuk mod ini βˆ’ Belalang (34.12-2015).
    • Cincang kunci awamnya digunakan sebagai pengecam lawan bicara. Digunakan sebagai hash Stribog-256 (34.11/2012/256 XNUMX bit).

    Selepas berjabat tangan, kami akan bersetuju dengan kunci yang dikongsi. Kami boleh menggunakannya untuk penyulitan yang disahkan bagi mesej pengangkutan. Bahagian ini sangat mudah dan sukar untuk membuat kesilapan: kami menambah pembilang mesej, menyulitkan mesej, mengesahkan (MAC) pembilang dan teks sifir, menghantar. Apabila menerima mesej, kami menyemak sama ada kaunter mempunyai nilai yang dijangkakan, mengesahkan teks sifir dengan kaunter dan menyahsulitnya. Apakah kunci yang harus saya gunakan untuk menyulitkan mesej jabat tangan, mengangkut mesej dan cara untuk mengesahkannya? Menggunakan satu kunci untuk semua tugas ini adalah berbahaya dan tidak bijak. Ia adalah perlu untuk menjana kunci menggunakan fungsi khusus KDF (fungsi terbitan kunci). Sekali lagi, janganlah kita membelah rambut dan mencipta sesuatu: HKDF telah lama diketahui, diteliti dengan baik dan tidak mempunyai masalah yang diketahui. Malangnya, perpustakaan Python asli tidak mempunyai fungsi ini, jadi kami gunakan hkdf beg plastik. HKDF digunakan secara dalaman HMAC, yang seterusnya menggunakan fungsi cincang. Contoh pelaksanaan dalam Python pada halaman Wikipedia hanya memerlukan beberapa baris kod. Seperti dalam kes 34.10/2012/256, kami akan menggunakan Stribog-XNUMX sebagai fungsi cincang. Output fungsi perjanjian utama kami akan dipanggil kunci sesi, yang daripadanya yang simetri yang hilang akan dijana:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Struktur/Skim

    Mari lihat struktur ASN.1 yang kita ada sekarang untuk menghantar semua data ini:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS adalah perkara yang akan ditandatangani. Jabat TanganTBE - perkara yang akan disulitkan. Saya menarik perhatian anda kepada bidang ukm dalam MsgHandshake1. 34.10 VKO, untuk rawak yang lebih besar bagi kunci yang dijana, termasuk parameter UKM (bahan kunci pengguna) - hanya entropi tambahan.

    Menambah Kriptografi pada Kod

    Mari kita pertimbangkan hanya perubahan yang dibuat pada kod asal, kerana rangka kerja tetap sama (sebenarnya, pelaksanaan terakhir ditulis terlebih dahulu, dan kemudian semua kriptografi dipotong daripadanya).

    Memandangkan pengesahan dan pengenalpastian rakan bicara akan dijalankan menggunakan kunci awam, mereka kini perlu disimpan di suatu tempat untuk masa yang lama. Untuk kesederhanaan, kami menggunakan JSON seperti ini:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    kami - pasangan kunci kami, kunci persendirian dan awam perenambelasan. mereka β€” nama rakan bicara dan kunci awam mereka. Mari kita ubah argumen baris arahan dan tambahkan pasca pemprosesan data JSON:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    Kunci persendirian algoritma 34.10 ialah nombor rawak. Saiz 256-bit untuk lengkung elips 256-bit. PyGOST tidak berfungsi dengan set bait, tetapi dengan bilangan yang besar, jadi kunci peribadi kami (urandom(32)) perlu ditukar kepada nombor menggunakan gost3410.prv_unmarshal(). Kunci awam ditentukan secara deterministik daripada kunci persendirian menggunakan gost3410.public_key(). Kunci awam 34.10 ialah dua nombor besar yang juga perlu ditukar kepada jujukan bait untuk memudahkan penyimpanan dan penghantaran menggunakan gost3410.pub_marshal().

    Selepas membaca fail JSON, kunci awam sewajarnya perlu ditukar kembali menggunakan gost3410.pub_unmarshal(). Memandangkan kami akan menerima pengecam interlocutor dalam bentuk cincangan daripada kunci awam, mereka boleh dikira dengan segera dan diletakkan dalam kamus untuk carian pantas. Cincang Stribog-256 ialah gost34112012256.GOST34112012256(), yang memenuhi sepenuhnya antara muka hashlib bagi fungsi cincang.

    Bagaimanakah coroutine pemula telah berubah? Segala-galanya adalah mengikut skema jabat tangan: kami menjana kuki (128-bit sudah banyak), pasangan kekunci fana 34.10, yang akan digunakan untuk fungsi perjanjian kunci VKO.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • kami menunggu maklum balas dan menyahkod mesej Mesej masuk;
    • pastikan anda mendapat jabat tangan1;
    • menyahkod kunci awam sementara pihak bertentangan dan mengira kunci sesi;
    • Kami menjana kunci simetri yang diperlukan untuk memproses bahagian TBE mesej.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM ialah nombor 64-bit (urandom(8)), yang juga memerlukan penyahserikatan daripada perwakilan baitnya menggunakan gost3410_vko.ukm_unmarshal(). Fungsi VKO untuk 34.10/2012/256 3410-bit ialah gost34102012256_vko.kek_XNUMX() (KEK - kunci penyulitan).

    Kunci sesi yang dijana sudah pun menjadi jujukan bait pseudo-rawak 256-bit. Oleh itu, ia boleh digunakan dengan segera dalam fungsi HKDF. Memandangkan GOST34112012256 memenuhi antara muka hashlib, ia boleh digunakan serta-merta dalam kelas Hkdf. Kami tidak menentukan garam (hujah pertama Hkdf), kerana kunci yang dijana, disebabkan oleh kefanaan pasangan kunci yang mengambil bahagian, akan berbeza untuk setiap sesi dan sudah mengandungi entropi yang mencukupi. kdf.expand() secara lalai sudah menghasilkan kunci 256-bit yang diperlukan untuk Grasshopper nanti.

    Seterusnya, bahagian TBE dan TBS bagi mesej masuk disemak:

    • MAC ke atas teks sifir masuk dikira dan disemak;
    • teks sifir dinyahsulit;
    • Struktur TBE dinyahkod;
    • pengecam lawan bicara diambil daripadanya dan ia diperiksa sama ada dia diketahui oleh kami sama sekali;
    • MAC atas pengecam ini dikira dan disemak;
    • tandatangan di atas struktur TBS disahkan, yang termasuk kuki kedua-dua pihak dan kunci sementara awam pihak bertentangan. Tandatangan itu disahkan oleh kunci tandatangan tahan lama lawan bicara.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Seperti yang saya tulis di atas, 34.13/2015/XNUMX menerangkan pelbagai mod operasi sifir blok mulai 34.12/2015/3413. Antaranya terdapat mod untuk menjana sisipan tiruan dan pengiraan MAC. Dalam PyGOST ini ialah gost34.12.mac(). Mod ini memerlukan lulus fungsi penyulitan (menerima dan memulangkan satu blok data), saiz blok penyulitan dan, sebenarnya, data itu sendiri. Mengapa anda tidak boleh mengekod keras saiz blok penyulitan? 2015/128/64 menerangkan bukan sahaja sifir Grasshopper XNUMX-bit, tetapi juga XNUMX-bit Magma - GOST 28147-89 yang diubah suai sedikit, dicipta semula dalam KGB dan masih mempunyai salah satu ambang keselamatan tertinggi.

    Kuznechik dimulakan dengan memanggil gost.3412.GOST3412Kuznechik(kunci) dan mengembalikan objek dengan kaedah .encrypt()/.decrypt() yang sesuai untuk menghantar kepada 34.13 fungsi. MAC dikira seperti berikut: gost3413.mac(GOST3412Kuznechik(kunci).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext). Untuk membandingkan MAC yang dikira dan diterima, anda tidak boleh menggunakan perbandingan biasa (==) rentetan bait, kerana operasi ini membocorkan masa perbandingan, yang, dalam kes umum, boleh membawa kepada kelemahan yang membawa maut seperti BEAST serangan terhadap TLS. Python mempunyai fungsi khas, hmac.compare_digest, untuk ini.

    Fungsi sifir blok hanya boleh menyulitkan satu blok data. Untuk nombor yang lebih besar, malah bukan gandaan panjang, adalah perlu untuk menggunakan mod penyulitan. 34.13-2015 menerangkan perkara berikut: ECB, CTR, OFB, CBC, CFB. Masing-masing mempunyai bidang aplikasi dan ciri yang boleh diterima sendiri. Malangnya, kami masih tidak mempunyai piawaian mod penyulitan yang disahkan (seperti CCM, OCB, GCM dan seumpamanya) - kami terpaksa sekurang-kurangnya menambah MAC sendiri. saya pilih mod kaunter (CTR): ia tidak memerlukan padding kepada saiz blok, boleh diselarikan, hanya menggunakan fungsi penyulitan, boleh digunakan dengan selamat untuk menyulitkan sejumlah besar mesej (tidak seperti CBC, yang mempunyai perlanggaran dengan agak cepat).

    Seperti .mac(), .ctr() mengambil input yang serupa: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). Ia diperlukan untuk menentukan vektor permulaan yang betul-betul separuh panjang blok penyulitan. Jika kunci penyulitan kami hanya digunakan untuk menyulitkan satu mesej (walaupun dari beberapa blok), maka adalah selamat untuk menetapkan vektor permulaan sifar. Untuk menyulitkan mesej jabat tangan, kami menggunakan kunci berasingan setiap kali.

    Mengesahkan tandatangan gost3410.verify() adalah remeh: kami melepasi lengkung eliptik di mana kami sedang bekerja (kami hanya merekodkannya dalam protokol GOSTIM kami), kunci awam penandatangan (jangan lupa bahawa ini harus menjadi dua tuple nombor besar, dan bukan rentetan bait), cincangan 34.11/2012/XNUMX dan tandatangan itu sendiri.

    Seterusnya, dalam pemula kami menyediakan dan menghantar mesej jabat tangan ke jabat tangan2, melakukan tindakan yang sama seperti yang kami lakukan semasa pengesahan, hanya secara simetri: menandatangani kunci kami dan bukannya menyemak, dsb...

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Apabila sesi diwujudkan, kunci pengangkutan dijana (kunci berasingan untuk penyulitan, untuk pengesahan, untuk setiap pihak), dan Grasshopper dimulakan untuk menyahsulit dan menyemak MAC:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    Coroutine msg_sender kini menyulitkan mesej sebelum menghantarnya pada sambungan TCP. Setiap mesej mempunyai nonce yang meningkat secara monoton, yang juga merupakan vektor permulaan apabila disulitkan dalam mod kaunter. Setiap mesej dan blok mesej dijamin mempunyai nilai kaunter yang berbeza.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    Mesej masuk diproses oleh coroutine msg_receiver, yang mengendalikan pengesahan dan penyahsulitan:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    Kesimpulan

    GOSTIM bertujuan untuk digunakan secara eksklusif untuk tujuan pendidikan (kerana ia tidak dilindungi oleh ujian, sekurang-kurangnya)! Kod sumber program boleh dimuat turun di sini (Π‘Ρ‚Ρ€ΠΈΠ±ΠΎΠ³-256 Ρ…ΡΡˆ: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как ΠΈ всС ΠΌΠΎΠΈ ΠΏΡ€ΠΎΠ΅ΠΊΡ‚Ρ‹, Ρ‚ΠΈΠΏΠ° GoGOST, PyDERASN, NCCP, GoVPN, GOSTIM sepenuhnya perisian percuma, diedarkan di bawah terma GPLv3 +.

    Sergey Matveev, cypherpunk, ahli Yayasan SPO, pembangun Python/Go, ketua pakar FSUE "STC "Atlas".

Sumber: www.habr.com

Tambah komen