GOSTIM: GOST kriptografisi ile bir akşam P2P F2F E2EE IM

Geliştirici olmak PyGOST kütüphaneleri (saf Python'daki GOST kriptografik ilkelleri) kullandığımda, sık sık en basit güvenli mesajlaşmanın nasıl uygulanacağına dair sorular alıyorum. Birçok kişi uygulamalı kriptografinin oldukça basit olduğunu düşünür ve blok şifre üzerinden .encrypt() işlevinin çağrılması, şifrenin bir iletişim kanalı üzerinden güvenli bir şekilde gönderilmesi için yeterli olacaktır. Diğerleri uygulamalı kriptografinin az sayıda kişinin kaderi olduğuna inanıyor ve Telegram gibi zengin şirketlerin olimpiyat matematikçilerine sahip olması kabul edilebilir. uygulayamıyorum güvenli protokol.

Bütün bunlar, kriptografik protokolleri ve güvenli IM'yi uygulamanın o kadar da zor bir iş olmadığını göstermek için beni bu makaleyi yazmaya sevk etti. Ancak kendi kimlik doğrulama ve anahtar anlaşma protokollerinizi icat etmeye değmez.

GOSTIM: GOST kriptografisi ile bir akşam P2P F2F E2EE IM
Makale yazılacak peer-to-peer, arkadaştan arkadaşa, uçtan uca şifrelenmiş ile anlık mesajlaşma SIGMA-I kimlik doğrulama ve anahtar anlaşma protokolü (bu esasa göre uygulanır) IPsec IKE), yalnızca GOST şifreleme algoritmaları PyGOST kütüphanesi ve ASN.1 mesaj kodlama kütüphanesi kullanılarak PyDERASN (bunun hakkında zaten daha önce yazdı). Önkoşul: Bir akşam (veya iş günü) içinde sıfırdan yazılabilecek kadar basit olmalıdır, aksi halde artık basit bir program değildir. Muhtemelen hataları, gereksiz komplikasyonları, eksiklikleri var, ayrıca bu benim asyncio kütüphanesini kullanan ilk programım.

Sohbet tasarımı

Öncelikle IM'mizin nasıl görüneceğini anlamamız gerekiyor. Basit olması açısından, katılımcıların herhangi bir şekilde keşfedilmediği eşler arası bir ağ olmasına izin verin. Muhatapla iletişim kurmak için hangi adrese bağlanılacağını kişisel olarak belirteceğiz: bağlantı noktası.

Şu anda, iki rastgele bilgisayar arasında doğrudan iletişimin mevcut olduğu varsayımının IM'nin pratikte uygulanabilirliği üzerinde önemli bir sınırlama olduğunu anlıyorum. Ancak geliştiriciler her türlü NAT geçiş desteğini ne kadar çok uygularsa, IPv4 İnternet'te o kadar uzun süre kalacağız ve keyfi bilgisayarlar arasında iletişim kurma olasılığı da azalıyor. Evde ve işyerinizde IPv6 eksikliğine ne kadar dayanabilirsiniz?

Arkadaştan arkadaşa bir ağımız olacak: olası tüm muhatapların önceden bilinmesi gerekiyor. Birincisi, bu her şeyi büyük ölçüde basitleştirir: kendimizi tanıttık, adı/anahtarı bulduk veya bulamadık, bağlantımız kesildi veya muhatabı bilerek çalışmaya devam ettik. İkincisi, genel olarak güvenlidir ve birçok saldırıyı ortadan kaldırır.

IM arayüzü klasik çözümlere yakın olacak berbat projelerMinimalizmleri ve Unix tarzı felsefeleri nedeniyle gerçekten hoşuma gidiyor. IM programı, her muhatap için üç Unix etki alanı soketine sahip bir dizin oluşturur:

  • içinde - muhataplara gönderilen mesajlar buna kaydedilir;
  • dışarı - muhataptan alınan mesajlar ondan okunur;
  • durum - ondan okuyarak muhatabın şu anda bağlı olup olmadığını, bağlantı adresini/bağlantı noktasını öğreniriz.

Ayrıca uzak muhatapla bağlantı başlattığımız ana bilgisayar portunu yazarak bir bağlantı soketi oluşturulur.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Bu yaklaşım, IM aktarımı ve kullanıcı arayüzünün bağımsız uygulamalarını yapmanıza olanak tanır, çünkü arkadaş yoktur, herkesi memnun edemezsiniz. Kullanma tmux ve / veya çok kuyruklusözdizimi vurgulamalı çoklu pencere arayüzüne sahip olabilirsiniz. Ve yardımıyla sarma GNU Readline uyumlu bir mesaj giriş satırı alabilirsiniz.

Aslında berbat projeler FIFO dosyalarını kullanır. Kişisel olarak, özel başlıklardan elle yazılmış bir arka plan olmadan, eşzamansız olarak dosyalarla rekabetçi bir şekilde nasıl çalışılacağını anlayamadım (dili bu tür şeyler için uzun zamandır kullanıyorum) Go). Bu nedenle Unix alan soketleriyle yetinmeye karar verdim. Ne yazık ki bu, echo 2001:470:dead::babe 6666 > conn yapılmasını imkansız hale getiriyor. Bu sorunu kullanarak çözdüm sokat: echo 2001:470:ölü::bebek 6666 | socat - UNIX-CONNECT:bağlantı, socat READLINE UNIX-CONNECT:alice/in.

Orijinal güvenli olmayan protokol

TCP aktarım olarak kullanılır: teslimatı ve sırasını garanti eder. UDP ikisini de garanti etmez (ki bu, kriptografi kullanıldığında faydalı olabilir), ancak desteği garanti eder SCTP Python kutudan çıkmıyor.

Ne yazık ki TCP'de mesaj kavramı yoktur, yalnızca bayt akışı vardır. Bu nedenle mesajların bu başlıkta kendi aralarında paylaşılabilmesi için bir formatın getirilmesi gerekiyor. Satır besleme karakterini kullanmayı kabul edebiliriz. Yeni başlayanlar için sorun değil ama mesajlarımızı şifrelemeye başladığımızda bu karakter şifreli metnin herhangi bir yerinde görünebilir. Bu nedenle ağlarda popüler protokoller, mesajın uzunluğunu bayt cinsinden ilk gönderen protokollerdir. Örneğin, Python'da benzer bir formatla çalışmanıza olanak tanıyan xdrlib bulunur. XDR.

TCP okumayla doğru ve verimli çalışmayacağız - kodu basitleştireceğiz. Mesajın tamamını çözene kadar soketteki verileri sonsuz bir döngüde okuruz. XML'li JSON da bu yaklaşımın formatı olarak kullanılabilir. Ancak kriptografi eklendiğinde, verilerin imzalanması ve doğrulanması gerekir; bu da nesnelerin bayt bayt özdeş temsilini gerektirir; JSON/XML bunu sağlamaz (döküm sonuçları farklılık gösterebilir).

XDR bu göreve uygundur ancak ben DER kodlamalı ASN.1'i seçiyorum ve PyDERASN kütüphane, çünkü elimizde çalışmanın daha keyifli ve rahat olduğu yüksek seviyeli nesneler olacak. Şemasızın aksine kod, Mesaj Paketi veya CBORASN.1, verileri sabit kodlanmış bir şemaya göre otomatik olarak kontrol edecektir.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

Alınan mesaj Msg olacaktır: ya bir MsgText metni (şimdilik tek metin alanıyla birlikte) ya da bir MsgHandshake el sıkışma mesajı (muhatabın adını içerir). Şimdi aşırı karmaşık görünüyor, ancak bu gelecek için bir temel.

     ┌─────┐ ┌─────┐ │EşA│ │EşB│ └──┬──┘ └──┬──┘ │MsgEl Sıkışma(Kimlik A) │ │──────── ─ ────────>│ │ │ │MsgEl Sıkışma(IdB) │ │<─────────────────│ │ │ │ MsgText() │ │─── ─ MsgText() │ │ │

Kriptografi olmadan anlık mesajlaşma

Daha önce de söylediğim gibi tüm soket işlemleri için asyncio kütüphanesi kullanılacaktır. Lansmanda ne beklediğimizi açıklayalım:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Kendi adınızı belirleyin (--adımız alice). Beklenen tüm muhataplar virgülle ayrılmış olarak listelenir (—isimleri bob,eve). Muhatapların her biri için Unix soketlerine sahip bir dizin ve ayrıca her giriş, çıkış durumu için bir eşyordam oluşturulur:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

Kullanıcıdan in soketinden gelen mesajlar IN_QUEUES kuyruğuna gönderilir:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

Muhataplardan gelen mesajlar, verilerin çıkış soketine yazıldığı OUT_QUEUES kuyruklarına gönderilir:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Durum soketinden okurken program muhatabın adresini PEER_ALIVE sözlüğünde arar. Henüz muhatapla bağlantı yoksa boş bir satır yazılır.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Bir bağlantı soketine adres yazarken, bağlantı "başlatıcı" işlevi başlatılır:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Başlatıcıyı ele alalım. İlk önce açıkça belirtilen ana makineye/bağlantı noktasına bir bağlantı açar ve adını içeren bir el sıkışma mesajı gönderir:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Daha sonra uzak taraftan bir yanıt bekler. Msg ASN.1 şemasını kullanarak gelen yanıtın kodunu çözmeye çalışır. Mesajın tamamının tek bir TCP segmentinde gönderileceğini ve .read() çağrıldığında onu atomik olarak alacağımızı varsayıyoruz. El sıkışma mesajını alıp almadığımızı kontrol ediyoruz.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Muhatabın alınan adının bizim tarafımızdan bilindiğini kontrol ediyoruz. Değilse bağlantıyı keseriz. Onunla daha önce bir bağlantı kurup kurmadığımızı kontrol ediyoruz (muhatap yine bize bağlanma komutunu verdi) ve kapatıyoruz. IN_QUEUES kuyruğu, mesajın metniyle birlikte Python dizelerini tutar, ancak eski TCP bağlantısıyla ilişkili yazıcısını unutması için msg_sender eşyordamının çalışmayı durdurmasını işaret eden özel bir Yok değeri vardır.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender giden mesajları kabul eder (bir giriş soketinden sıraya alınır), bunları bir MsgText mesajı halinde serileştirir ve bir TCP bağlantısı üzerinden gönderir. Her an kırılabilir - bunu açıkça engelliyoruz.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

Sonunda, başlatıcı soketten gelen mesajların okunması için sonsuz bir döngüye girer. Bu mesajların kısa mesaj olup olmadığını kontrol eder ve bunları ilgili muhatabın çıkış soketine gönderilecekleri OUT_QUEUES kuyruğuna yerleştirir. Neden .read() işlevini yapıp mesajın kodunu çözemiyorsunuz? Çünkü kullanıcıdan gelen birkaç mesajın işletim sistemi arabelleğinde toplanıp tek bir TCP segmentinde gönderilmesi mümkündür. İlkinin kodunu çözebiliriz ve ardından sonrakinin bir kısmı arabellekte kalabilir. Herhangi bir anormal durumda TCP bağlantısını kapatırız ve msg_sender koroutini durdururuz (OUT_QUEUES kuyruğuna Yok göndererek).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Ana koda dönelim. Program başladığında tüm eşyordamları oluşturduktan sonra TCP sunucusunu başlatıyoruz. Kurulan her bağlantı için bir yanıtlayıcı eşyordamı oluşturur.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

yanıtlayıcı, başlatıcıya benzer ve aynı eylemlerin tümünü yansıtır, ancak basitlik açısından, mesajların sonsuz okuma döngüsü hemen başlar. Şu anda, el sıkışma protokolü her iki taraftan bir mesaj gönderiyor, ancak gelecekte bağlantı başlatıcıdan iki mesaj gönderilecek ve ardından kısa mesajlar hemen gönderilebilecek.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Güvenli protokol

İletişimimizi güvence altına almanın zamanı geldi. Güvenlikten neyi kastediyoruz ve ne istiyoruz:

  • iletilen mesajların gizliliği;
  • iletilen mesajların özgünlüğü ve bütünlüğü - değişiklikler tespit edilmelidir;
  • tekrar saldırılarına karşı koruma - eksik veya tekrarlanan mesajların tespit edilmesi gerekir (ve bağlantıyı kesmeye karar veririz);
  • muhatapların önceden girilmiş genel anahtarları kullanarak tanımlanması ve doğrulanması - daha önce arkadaşlardan arkadaşa bir ağ kurmaya karar vermiştik. Kiminle iletişim kurduğumuzu ancak kimlik doğrulamadan sonra anlayacağız;
  • durumu Mükemmel ileri gizlilik özellikler (PFS) - uzun ömürlü imzalama anahtarımızdan ödün verilmesi, önceki tüm yazışmaları okuma becerisine yol açmamalıdır. Ele geçirilen trafiğin kaydedilmesi işe yaramaz hale gelir;
  • mesajların geçerliliği/geçerliliği (aktarım ve el sıkışma) yalnızca bir TCP oturumunda. Başka bir oturumdan (aynı muhatap olsa bile) doğru şekilde imzalanmış/kimliği doğrulanmış mesajların eklenmesi mümkün olmamalıdır;
  • Pasif bir gözlemci, kullanıcı tanımlayıcılarını, iletilen uzun ömürlü genel anahtarları veya bunlardan gelen karmaları görmemelidir. Pasif bir gözlemciden gelen belli bir anonimlik.

Şaşırtıcı bir şekilde, hemen hemen herkes herhangi bir el sıkışma protokolünde bu minimum değere sahip olmak ister ve sonuçta yukarıdakilerin çok azı "kendi kendine geliştirilen" protokoller için karşılanır. Artık yeni bir şey icat etmeyeceğiz. Kesinlikle kullanmanızı tavsiye ederim Gürültü çerçevesi protokol oluşturmak için ama daha basit bir şey seçelim.

En popüler iki protokol şunlardır:

  • TLS - hatalar, pervazlar, güvenlik açıkları, zayıf düşünce, karmaşıklık ve eksikliklerle dolu uzun bir geçmişi olan çok karmaşık bir protokol (ancak bunun TLS 1.3 ile pek ilgisi yoktur). Ancak çok karmaşık olduğu için bunu dikkate almıyoruz.
  • IPsec с IKE — basit olmasalar da ciddi kriptografik sorunları yoktur. IKEv1 ve IKEv2 hakkında okursanız, onların kaynağı STS, ISO/IEC IS 9798-3 ve SIGMA (SIGN-and-MAc) protokolleri - bir akşamda uygulanabilecek kadar basit.

STS/ISO protokollerinin geliştirilmesinde en son bağlantı olan SIGMA'nın iyi yanı nedir? Tüm gereksinimlerimizi karşılar (muhatap tanımlayıcılarının "gizlenmesi" dahil) ve bilinen hiçbir şifreleme sorunu yoktur. Minimalisttir; protokol mesajından en az bir öğenin çıkarılması güvensizliğe yol açacaktır.

Evde yetiştirilen en basit protokolden SIGMA'ya geçelim. İlgilendiğimiz en temel işlem anahtar anlaşma: Her iki katılımcıya da aynı değeri veren ve simetrik anahtar olarak kullanılabilen bir işlev. Ayrıntılara girmeden: tarafların her biri geçici (yalnızca bir oturumda kullanılan) bir anahtar çifti (genel ve özel anahtarlar) oluşturur, genel anahtarları değiştirir, girişe kendi özel anahtarlarını ve genel anahtarlarını ilettikleri anlaşma işlevini çağırır. muhatabın anahtarı.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ══════════╗ │───────────────>│ ║PrvA, PubA = DHgen()║ │ │ ╚═ ════════ ═══════════╝ │ IdB, PubB │ ╔════════════════════╗ │< ───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚════════════════════╝ ── ──┐ ╔════ ═══╧════════════╗ │ ║Anahtar = DH(PrvA, PubB)║ <───┘ ╚═══════╤═ ═══════ ════╝ │ │ │ │

Herkes ortaya atlayabilir ve genel anahtarları kendi anahtarlarıyla değiştirebilir - bu protokolde muhatapların kimlik doğrulaması yoktur. Uzun ömürlü anahtarlara sahip bir imza ekleyelim.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │IdA, PubA, imzala( SignPr vA, (PubA)) │ ╔═ │──────────── ────────── ───────────>│ ║SignPrvA, SignPubA = yük()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚════════ ══════ ═════════════╝ │IdB, PubB, işaret(İşaret) PrvB, (YayınB)) │ ╔═══════════════ ══════ ══════╗ │<──────── ────────── ────────────── ─│ ║SignPrvB, SignPubB = yük( )║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══ ════════ ═══════════════ ═╝ ────┐ ╔ ════════════════ ═════╗ │ │ ║doğrula( SignPubB, ...)║ │ <───┘ ║Anahtar = DH(PrvA , PubB) ║ │ │ ╚════════════════════ ═╝ │ │ │

Böyle bir imza belirli bir oturuma bağlı olmadığı için işe yaramayacaktır. Bu tür mesajlar aynı zamanda diğer katılımcılarla yapılan oturumlar için de “uygundur”. İçeriğin tamamı abone olmalıdır. Bu bizi A'dan başka bir mesaj daha eklemeye zorluyor.

Ek olarak, imzanın altına kendi tanımlayıcınızı eklemeniz de önemlidir, çünkü aksi takdirde IdXXX'i değiştirebilir ve bilinen başka bir muhatabın anahtarıyla mesajı yeniden imzalayabiliriz. Önlemek yansıma saldırılarıimzanın altındaki unsurların anlamlarına göre açıkça tanımlanmış yerlerde olması gerekir: A imzalıyorsa (PubA, PubB), B'nin de imzalaması gerekir (PubB, PubA). Bu aynı zamanda serileştirilmiş verilerin yapısını ve formatını seçmenin önemine de değinmektedir. Örneğin, ASN.1 DER kodlamasındaki kümeler sıralanır: SET OF(PubA, PubB), SET OF(PubB, PubA) ile aynı olacaktır.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │───────────────────── ────────── ─────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚══════ ══════ ═══════════════╝ │IdB, PubB,sign(SignPrvB, (IdB, PubA, PubB)) │ ╔══════════ ═ ════ ════════════╗ │<────────────────────────── ───────── ─────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚══════════ ═══════ ══════════╝ │sign(SignPrvA, (IdA, PubB, PubA)) │ ╔══════════════════ ═ ══╗ │─ ───────────────────────────────────────── ──>│ ║doğrula(SignPubB, ...)║ │ │ ║Anahtar = DH(PrvA, PubB) ║ │ │ ╚═════════════════════╝ │ │

Ancak bu oturum için aynı paylaşılan anahtarı oluşturduğumuzu hâlâ "kanıtlayamadık". Prensip olarak bu adımı atlayabiliriz - ilk aktarım bağlantısı geçersiz olacaktır, ancak el sıkışma tamamlandığında her şeyin gerçekten üzerinde anlaşıldığından emin olmak istiyoruz. Şu anda elimizde ISO/IEC IS 9798-3 protokolü bulunmaktadır.

Oluşturulan anahtarın kendisini imzalayabiliriz. Bu tehlikelidir, çünkü kullanılan imza algoritmasında sızıntılar olması mümkündür (imza başına bit olsa da yine de sızıntı). Türetme anahtarının karma değerini imzalamak mümkündür, ancak türetilmiş anahtarın karma değerini sızdırmak bile türetme işlevine yapılan kaba kuvvet saldırısında değerli olabilir. SIGMA, gönderenin kimliğini doğrulayan bir MAC işlevi kullanır.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │───────────────────── ────────── ──────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═ ══════ ════════════════════╝ │IdB, PubB,sign(SignPrvB, (PubA, PubB)), MAC(IdB) │ ╔════ ═ ══ ════════════════════╗ │<───────────────── ─ ──────── ─ ─────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ═ ══════════════════════╝ │ │ ╔═════════════ ════════╗ │ işareti (SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Anahtar = DH(PrvA, PubB) ║ │────────────────────── ─ ─ ─────────────────────────>│ ║doğrula(Anahtar, IdB) ║ │ │ ║doğrula(SignPubB, ...) ║ │ │ ╚ ═════════════════════╝ │ │

Bir optimizasyon olarak, bazıları geçici anahtarlarını yeniden kullanmak isteyebilir (ki bu elbette PFS için talihsiz bir durumdur). Örneğin, bir anahtar çifti oluşturduk, bağlanmaya çalıştık ama TCP kullanılamıyordu veya protokolün ortasında bir yerde kesintiye uğradı. Boşa harcanan entropiyi ve işlemci kaynaklarını yeni bir çifte harcamak utanç verici. Bu nedenle, geçici genel anahtarları yeniden kullanırken olası rastgele yeniden oynatma saldırılarına karşı koruma sağlayacak sözde rastgele bir değer olan sözde çerezi tanıtacağız. Çerez ile geçici genel anahtar arasındaki bağlantı nedeniyle karşı tarafın ortak anahtarı gereksiz olarak imzadan çıkarılabilir.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA, CookieA │ ╔════════ ═══════════════════╗ │─────────────────── ────────── ───────────────────────────────────────── >│ ║SignPrvA, SignPubA = yük( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚══════════════════════════ ═╝ │IdB, PubB, CookieB , işaret(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔═══════════════════════════╗ │< ───────────────────────────────────────── ───────── ────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚══════ ═════════════════════╝ │ │ ╔═══════════════ ══════╗ │ işareti( SignPRVA, (Cookieb, Cookiea, Puba), Mac (Ida) │ ║key = dh (prva, pubb) ║ │ │YAYAYIN ───────────────────────────────────────── ──────>│ ║ doğrula(Anahtar, IdB) ║ │ │ ║doğrula(SignPubB, ...)║ │ │ ╚═════════════════════╝ │ │

Son olarak, sohbet ortaklarımızın mahremiyetini pasif bir gözlemciden almak istiyoruz. Bunu yapmak için SIGMA, öncelikle geçici anahtarların değişimini ve kimlik doğrulama ve tanımlama mesajlarının şifreleneceği ortak bir anahtar geliştirmeyi öneriyor. SIGMA iki seçeneği açıklamaktadır:

  • SIGMA-I - başlatıcıyı aktif saldırılardan, yanıtlayıcıyı ise pasif saldırılardan korur: başlatıcı, yanıtlayıcının kimliğini doğrular ve bir şey eşleşmezse kimliğini vermez. Sanık, kendisiyle aktif bir protokol başlatılması halinde kimliğini veriyor. Pasif gözlemci hiçbir şey öğrenmez;
    SIGMA-R - müdahaleciyi aktif saldırılardan, başlatıcıyı ise pasif saldırılardan korur. Her şey tam tersidir ancak bu protokolde zaten dört el sıkışma mesajı iletilmektedir.

    İstemci-sunucudan beklediğimiz tanıdık şeylere daha çok benzediği için SIGMA-I'yi seçiyoruz: istemci yalnızca kimliği doğrulanmış sunucu tarafından tanınır ve herkes sunucuyu zaten bilir. Ayrıca daha az el sıkışma mesajı nedeniyle uygulanması daha kolaydır. Protokole eklediğimiz tek şey, mesajın bir kısmını şifrelemek ve A tanımlayıcısını son mesajın şifrelenmiş kısmına aktarmaktır:

    ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ PubA, CookieA │ ╔══════════ ═════════════════╗ │───────────────────── ────────── ───────────────────────────────────────── ─────>│ ║SignPrvA , SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════════════════════ ════╝ │ PubB, CookieB, Enc((IdB, işaret(SignPrvB, (CookieA, CookieB, PubB))), MAC(IdB))) │ ╔════════════════════ ═══════╗ │<─────────────────────────────── ───────── ║SignPrv B, SignPubB = load()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚═════════════════════ ═════ ═╝ │ │ ╔ ════════ ═════════════╗ │ Enc((IdA, işaret(SignPrvA, (CookieB, CookieA, PubA))), MAC(Id A) )) │ ║Anahtar = DH(PrvA, PubB) ║ │────────────────────────────── ────── ──── ───────── ─────────────────────────── ─>│ ║doğrula(Anahtar, IdB) ║ │ │ ║doğrula(PubB İmzala, ...)║ │ │ ╚═════════════════════╝ │ │
    
    • GOST R imza için kullanılır 34.10-2012 256 bit anahtarlı algoritma.
    • Genel anahtarı oluşturmak için 34.10 VKO kullanılır.
    • CMAC, MAC olarak kullanılır. Teknik olarak bu, GOST R 34.13-2015'te açıklanan blok şifrenin özel bir çalışma modudur. Bu mod için bir şifreleme işlevi olarak – çekirge (34.12-2015).
    • Genel anahtarının karması muhatabın tanımlayıcısı olarak kullanılır. Hash olarak kullanılır Stribog-256 (34.11 2012 bit).

    El sıkışmanın ardından ortak bir anahtar üzerinde anlaşacağız. Aktarım mesajlarının kimlik doğrulamalı şifrelemesi için kullanabiliriz. Bu kısım çok basit ve hata yapılması zor: mesaj sayacını artırıyoruz, mesajı şifreliyoruz, sayacın ve şifreli metnin kimliğini doğruluyor (MAC), gönderiyoruz. Mesaj alırken sayacın beklenen değere sahip olup olmadığını kontrol eder, şifreli metni sayaçla doğrular ve şifresini çözeriz. El sıkışma mesajlarını ve taşıma mesajlarını şifrelemek için hangi anahtarı kullanmalıyım ve bunların kimliğini nasıl doğrulayabilirim? Tüm bu görevler için tek bir anahtar kullanmak tehlikeli ve akıllıca değildir. Özel işlevleri kullanarak anahtarlar oluşturmak gereklidir KDF (anahtar türetme işlevi). Tekrar söylüyorum, kılı kırk yararak bir şeyler icat etmeyelim: HKDF uzun zamandır bilinmektedir, iyi araştırılmıştır ve bilinen hiçbir sorunu yoktur. Ne yazık ki yerel Python kütüphanesinde bu fonksiyon yoktur, bu yüzden kullanıyoruz HKDF naylon poşet. HKDF dahili olarak kullanır HMAC, bu da bir karma işlevi kullanır. Wikipedia sayfasındaki Python'daki örnek bir uygulama yalnızca birkaç satır kod gerektirir. 34.10'de olduğu gibi hash fonksiyonu olarak Stribog-2012'yı kullanacağız. Anahtar anlaşma fonksiyonumuzun çıktısına, eksik simetrik olanların oluşturulacağı oturum anahtarı adı verilecektir:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Yapılar/Şemalar

    Tüm bu verileri iletmek için şu anda sahip olduğumuz ASN.1 yapılarına bakalım:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS imzalanacak olandır. El SıkışmaTBE - ne şifrelenecek. MsgHandshake1'deki ukm alanına dikkatinizi çekerim. 34.10 VKO, oluşturulan anahtarların daha da rastgele hale getirilmesi için UKM (kullanıcı anahtarlama malzemesi) parametresini içerir - yalnızca ek entropi.

    Koda Kriptografi Ekleme

    Çerçeve aynı kaldığı için yalnızca orijinal kodda yapılan değişiklikleri ele alalım (aslında, önce son uygulama yazıldı ve ardından tüm şifreleme bundan çıkarıldı).

    Muhatapların kimlik doğrulaması ve tanımlanması genel anahtarlar kullanılarak gerçekleştirileceğinden, artık bunların uzun süre bir yerde saklanması gerekiyor. Basit olması açısından JSON'u şu şekilde kullanıyoruz:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    bizim - anahtar çiftimiz, onaltılık özel ve genel anahtarlarımız. onların - muhatapların adları ve genel anahtarları. Komut satırı argümanlarını değiştirelim ve JSON verilerinin sonradan işlenmesini ekleyelim:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    34.10 algoritmasının özel anahtarı rastgele bir sayıdır. 256 bit eliptik eğriler için 256 bit boyut. PyGOST bir bayt kümesiyle çalışmaz, ancak büyük sayılaryani özel anahtarımızın (urandom(32)) gost3410.prv_unmarshal() kullanılarak bir sayıya dönüştürülmesi gerekiyor. Genel anahtar, gost3410.public_key() kullanılarak özel anahtardan deterministik olarak belirlenir. Genel anahtar 34.10, depolama ve iletim kolaylığı için gost3410.pub_marshal() kullanılarak bir bayt dizisine dönüştürülmesi gereken iki büyük sayıdır.

    JSON dosyasını okuduktan sonra genel anahtarların gost3410.pub_unmarshal() kullanılarak geri dönüştürülmesi gerekir. Muhatapların tanımlayıcılarını genel anahtardan karma şeklinde alacağımız için, bunlar hemen önceden hesaplanabilir ve hızlı arama için bir sözlüğe yerleştirilebilir. Stribog-256 hash'i, hash fonksiyonlarının hashlib arayüzünü tamamen karşılayan gost34112012256.GOST34112012256()'dır.

    Başlatıcı koroutini nasıl değişti? Her şey el sıkışma şemasına göre: VKO anahtar anlaşması işlevi için kullanılacak bir çerez (128 bit yeterli) ve geçici bir anahtar çifti 34.10 oluşturuyoruz.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • bir yanıt bekliyoruz ve gelen Msj mesajının kodunu çözüyoruz;
    • el sıkışma1 aldığınızdan emin olun;
    • karşı tarafın geçici genel anahtarının kodunu çözün ve oturum anahtarını hesaplayın;
    • Mesajın TBE kısmını işlemek için gerekli simetrik anahtarları üretiyoruz.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM, 64 bitlik bir sayıdır (urandom(8)), bu da gost3410_vko.ukm_unmarshal() kullanılarak bayt temsilinden seri durumdan çıkarma gerektirir. 34.10 2012-bit için VKO işlevi gost256_vko.kek_3410()'dır (KEK - şifreleme anahtarı).

    Oluşturulan oturum anahtarı zaten 256 bitlik sözde rastgele bayt dizisidir. Bu nedenle HKDF fonksiyonlarında hemen kullanılabilir. GOST34112012256 hashlib arayüzünü karşıladığından Hkdf sınıfında hemen kullanılabilir. Katılan anahtar çiftlerinin geçiciliği nedeniyle oluşturulan anahtar her oturum için farklı olacağından ve zaten yeterli entropi içereceğinden, tuzu (Hkdf'nin ilk argümanı) belirtmiyoruz. kdf.expand() varsayılan olarak daha sonra Grasshopper için gereken 256 bitlik anahtarları zaten üretmektedir.

    Daha sonra gelen mesajın TBE ve TBS kısımları kontrol edilir:

    • Gelen şifreli metnin MAC'i hesaplanır ve kontrol edilir;
    • şifreli metnin şifresi çözülür;
    • TBE yapısının kodu çözüldü;
    • muhatabın kimliği ondan alınır ve onu tanıyıp tanımadığımız kontrol edilir;
    • Bu tanımlayıcı üzerinden MAC hesaplanır ve kontrol edilir;
    • Her iki tarafın çerezini ve karşı tarafın genel geçici anahtarını içeren TBS yapısı üzerindeki imza doğrulanır. İmza, muhatabın uzun ömürlü imza anahtarıyla doğrulanır.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Yukarıda yazdığım gibi 34.13 tarihinde çeşitli açıklamalar yapılıyor blok şifre çalışma modları 34.12 tarihinden itibaren. Bunların arasında taklit ekler ve MAC hesaplamaları oluşturmak için bir mod vardır. PyGOST'ta bu gost2015.mac()'tır. Bu mod, şifreleme fonksiyonunun (bir veri bloğunun alınması ve geri gönderilmesi), şifreleme bloğunun boyutunun ve aslında verinin kendisinin geçmesini gerektirir. Neden şifreleme bloğunun boyutunu sabit kodlayamıyorsunuz? 3413 yalnızca 34.12-bit Grasshopper şifresini değil, aynı zamanda 2015-bit şifresini de açıklıyor Magma - KGB'de yeniden oluşturulmuş ve hala en yüksek güvenlik eşiklerinden birine sahip olan, hafifçe değiştirilmiş bir GOST 28147-89.

    Kuznechik, gost.3412.GOST3412Kuznechik(key) çağrılarak başlatılır ve 34.13 işlevlerine geçmeye uygun .encrypt()/.decrypt() yöntemlerine sahip bir nesne döndürür. MAC şu şekilde hesaplanır: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext). Hesaplanan ve alınan MAC'yi karşılaştırmak için bayt dizelerinin olağan karşılaştırmasını (==) kullanamazsınız, çünkü bu işlem karşılaştırma süresini sızdırır ve bu genel durumda aşağıdaki gibi ölümcül güvenlik açıklarına yol açabilir: BEAST TLS'ye saldırılar. Python'un bunun için hmac.compare_digest adında özel bir işlevi vardır.

    Blok şifreleme işlevi yalnızca bir veri bloğunu şifreleyebilir. Daha büyük bir sayı için ve hatta uzunluğun katları için şifreleme modunun kullanılması gerekir. 34.13-2015 aşağıdakileri açıklamaktadır: ECB, CTR, OFB, CBC, CFB. Her birinin kendi kabul edilebilir uygulama alanları ve özellikleri vardır. Ne yazık ki hala standartlaştıramadık kimliği doğrulanmış şifreleme modları (CCM, OCB, GCM ve benzeri gibi) - en azından MAC'ı kendimiz eklemek zorundayız. seçerim sayaç modu (TO): blok boyutuna göre doldurma gerektirmez, paralelleştirilebilir, yalnızca şifreleme işlevini kullanır, çok sayıda mesajı şifrelemek için güvenli bir şekilde kullanılabilir (nispeten hızlı çarpışmalara sahip olan CBC'den farklı olarak).

    .mac() gibi, .ctr() da benzer girdiler alır: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). Şifreleme bloğunun uzunluğunun tam olarak yarısı kadar bir başlatma vektörünün belirtilmesi gerekmektedir. Şifreleme anahtarımız yalnızca bir mesajı (birkaç bloktan da olsa) şifrelemek için kullanılıyorsa, sıfır başlatma vektörünü ayarlamak güvenlidir. El sıkışma mesajlarını şifrelemek için her seferinde ayrı bir anahtar kullanırız.

    gost3410.verify() imzasını doğrulamak önemsizdir: içinde çalıştığımız eliptik eğriyi geçiririz (bunu GOSTIM protokolümüze kaydederiz), imzalayanın genel anahtarını (bunun ikiden oluşan bir demet olması gerektiğini unutmayın) büyük sayılar ve bir bayt dizisi değil), 34.11 karma değeri ve imzanın kendisi.

    Daha sonra, başlatıcıda, doğrulama sırasında yaptığımız eylemlerin aynısını yalnızca simetrik olarak gerçekleştirerek, el sıkışma2'ye bir el sıkışma mesajı hazırlayıp göndeririz: kontrol etmek yerine anahtarlarımızı imzalamak vb.

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Oturum oluşturulduğunda, taşıma anahtarları oluşturulur (tarafların her biri için şifreleme ve kimlik doğrulama için ayrı bir anahtar) ve Grasshopper, MAC'in şifresini çözüp kontrol etmek için başlatılır:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    msg_sender eşyordamı artık mesajları TCP bağlantısıyla göndermeden önce şifreliyor. Her mesajın monoton olarak artan bir tekrarı vardır; bu aynı zamanda sayaç modunda şifrelendiğinde başlatma vektörüdür. Her mesaj ve mesaj bloğunun farklı bir sayaç değerine sahip olması garanti edilir.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    Gelen mesajlar, kimlik doğrulama ve şifre çözme işlemlerini gerçekleştiren msg_receiver eşyordamı tarafından işlenir:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    Sonuç

    GOSTIM yalnızca eğitim amaçlı kullanılmak üzere tasarlanmıştır (en azından testlerin kapsamına girmediği için)! Programın kaynak kodu indirilebilir burada (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, GoVPN, GOSTIM tamamen ücretsiz yazılımşartlara göre dağıtılır GPLv3 +.

    Sergey Matveev, Şifrepunküye DPT Vakfı, Python/Go geliştiricisi, baş uzman FSUE "STC" Atlas".

Kaynak: habr.com

Yorum ekle