GOSTIM: P2P F2F E2EE IM w jeden wieczór z kryptografią GOST

Bycie programistą PyGOST bibliotek (prymitywy kryptograficzne GOST w czystym Pythonie), często otrzymuję pytania o to, jak zaimplementować najprostszą bezpieczną komunikację na kolanie. Wiele osób uważa, że ​​kryptografia stosowana jest dość prosta i wywołanie .encrypt() w szyfrze blokowym wystarczy, aby bezpiecznie przesłać go kanałem komunikacyjnym. Inni uważają, że kryptografia stosowana jest przeznaczeniem nielicznych i dopuszczalne jest, aby bogate firmy takie jak Telegram z matematycznymi olimpijczykami nie może wdrożyć bezpieczny protokół.

Wszystko to skłoniło mnie do napisania tego artykułu, aby pokazać, że wdrożenie protokołów kryptograficznych i bezpiecznych komunikatorów internetowych nie jest aż tak trudnym zadaniem. Nie warto jednak wymyślać własnych protokołów uwierzytelniania i uzgadniania kluczy.

GOSTIM: P2P F2F E2EE IM w jeden wieczór z kryptografią GOST
Artykuł napiszę peer-to-peer, przyjaciel do przyjaciela, szyfrowane od końca do końca komunikator internetowy z SIGMA-I protokół uwierzytelnienia i uzgodnienia klucza (na podstawie którego jest realizowany IPsec IKE), wykorzystując wyłącznie algorytmy kryptograficzne GOST, bibliotekę PyGOST i bibliotekę kodowania wiadomości ASN.1 PyDERASN (o czym już mówiłem napisał wcześniej). Warunek: musi być na tyle prosty, aby można go było napisać od podstaw w jeden wieczór (lub dzień roboczy), w przeciwnym razie nie jest to już prosty program. Pewnie zawiera błędy, niepotrzebne komplikacje, niedociągnięcia, a poza tym jest to mój pierwszy program wykorzystujący bibliotekę asyncio.

Projekt komunikatora

Po pierwsze, musimy zrozumieć, jak będzie wyglądał nasz komunikator. Dla uproszczenia niech będzie to sieć typu peer-to-peer, bez odkrywania uczestników. Osobiście wskażemy pod jaki adres: port pod który się połączyć aby komunikować się z rozmówcą.

Rozumiem, że obecnie założenie, że pomiędzy dwoma dowolnymi komputerami możliwa jest bezpośrednia komunikacja, stanowi znaczące ograniczenie możliwości zastosowania komunikatora internetowego w praktyce. Jednak im więcej programistów wdraża wszelkiego rodzaju kule umożliwiające przejście NAT, tym dłużej pozostaniemy w Internecie IPv4, z przygnębiającym prawdopodobieństwem komunikacji między dowolnymi komputerami. Jak długo możesz tolerować brak IPv6 w domu i w pracy?

Będziemy mieć sieć przyjaciół: wszyscy potencjalni rozmówcy muszą być znani z wyprzedzeniem. Po pierwsze, to znacznie upraszcza wszystko: przedstawiliśmy się, znaleźliśmy lub nie znaleźliśmy nazwy/klucza, rozłączyliśmy się lub kontynuujemy pracę, znając rozmówcę. Po drugie, ogólnie jest bezpieczny i eliminuje wiele ataków.

Interfejs komunikatora będzie zbliżony do klasycznych rozwiązań beznadziejne projekty, które bardzo lubię za ich minimalizm i filozofię uniksową. Program komunikatora tworzy dla każdego rozmówcy katalog z trzema gniazdami domen uniksowych:

  • in – zapisywane są w nim wiadomości wysyłane do rozmówcy;
  • out - odczytywane są z niego wiadomości otrzymane od rozmówcy;
  • stan - czytając z niego dowiadujemy się, czy rozmówca jest aktualnie podłączony, jaki jest adres/port połączenia.

Dodatkowo tworzone jest gniazdo conn, wpisując port hosta, na którym inicjujemy połączenie ze zdalnym rozmówcą.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

Takie podejście pozwala na niezależne implementacje transportu komunikatorów i interfejsu użytkownika, bo nie ma przyjaciela, nie da się zadowolić wszystkich. Za pomocą tmux i / lub wieloogonowy, możesz uzyskać interfejs wielu okien z podświetlaniem składni. I z pomocą rwrap możesz uzyskać linię wprowadzania wiadomości zgodną z GNU Readline.

W rzeczywistości projekty bez sensu używają plików FIFO. Osobiście nie rozumiałem jak można konkurencyjnie pracować z plikami w asyncio bez odręcznego tła z dedykowanych wątków (używam tego języka od dawna do takich rzeczy Go). Dlatego zdecydowałem się zadowolić gniazdami domeny Unix. Niestety uniemożliwia to wykonanie echo 2001:470:dead::babe 6666 > conn. Rozwiązałem ten problem za pomocą sok: echo 2001:470:martwy::kochanie 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alicja/in.

Oryginalny niebezpieczny protokół

Jako transport służy protokół TCP: gwarantuje dostawę i jej porządek. UDP nie gwarantuje żadnego (co byłoby przydatne w przypadku korzystania z kryptografii), ale wsparcie SCTP Python nie jest gotowy do użycia.

Niestety w protokole TCP nie ma pojęcia komunikatu, a jedynie strumień bajtów. Dlatego konieczne jest wymyślenie formatu wiadomości, aby można było je dzielić między sobą w tym wątku. Możemy zgodzić się na użycie znaku nowego wiersza. Na początek jest to w porządku, ale gdy zaczniemy szyfrować nasze wiadomości, znak ten może pojawić się w dowolnym miejscu tekstu zaszyfrowanego. Dlatego w sieciach popularne są protokoły, które najpierw wysyłają długość wiadomości w bajtach. Na przykład standardowo Python ma xdrlib, który pozwala na pracę z podobnym formatem XDR.

Z odczytem TCP nie będziemy pracować poprawnie i wydajnie - uprościmy kod. Dane z gniazda odczytujemy w nieskończonej pętli, aż do momentu, w którym zdekodujemy całą wiadomość. JSON z XML może być również używany jako format w tym podejściu. Jednak po dodaniu kryptografii dane będą musiały zostać podpisane i uwierzytelnione, a to będzie wymagało identycznej reprezentacji obiektów bajt po bajcie, czego nie zapewnia JSON/XML (wyniki zrzutów mogą się różnić).

Do tego zadania nadaje się XDR, jednak ja wybieram ASN.1 z kodowaniem DER i PyDERASN bibliotekę, gdyż będziemy mieli pod ręką obiekty wysokiego poziomu, z którymi często przyjemniej i wygodniej się pracuje. Inaczej niż bez schematu kod kreskowy, Pakiet wiadomości lub CBOR, ASN.1 automatycznie sprawdzi dane względem zakodowanego na stałe schematu.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

Otrzymana wiadomość będzie miała postać Msg: albo będzie to wiadomość tekstowa MsgText (na razie z jednym polem tekstowym) albo wiadomość MsgHandshake (zawierająca imię rozmówcy). Teraz wydaje się to zbyt skomplikowane, ale to podstawa na przyszłość.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │MsgHandshake( IdA) │ │───────── ────────>│ │ │ │MsgHandshake(IdB) │ │<─────────────────│ │ │ │ MsgText() │ │──── MsgText() │ │ │

Komunikatory internetowe bez kryptografii

Jak już mówiłem, do wszystkich operacji na gniazdach używana będzie biblioteka asyncio. Ogłośmy, czego oczekujemy podczas premiery:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

Ustaw własne imię (--nasze-imie alice). Wszyscy oczekiwani rozmówcy są wymienieni oddzieleni przecinkami (—ich imiona bob, ewa). Dla każdego z rozmówców tworzony jest katalog z gniazdami uniksowymi, a także współprogram dla każdego stanu in, out:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

Wiadomości przychodzące od użytkownika z gniazda in kierowane są do kolejki IN_QUEUES:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

Wiadomości przychodzące od rozmówców kierowane są do kolejek OUT_QUEUES, z których dane zapisywane są na gniazdo wyjściowe:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

Podczas odczytu z gniazda stanu program szuka adresu rozmówcy w słowniku PEER_ALIVE. Jeśli nie ma jeszcze połączenia z rozmówcą, zapisywana jest pusta linia.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

Podczas zapisywania adresu do gniazda połączeniowego uruchamiana jest funkcja „inicjatora połączenia”:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

Rozważmy inicjatora. Najpierw oczywiście otwiera połączenie z określonym hostem/portem i wysyła wiadomość uzgadniającą ze swoją nazwą:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

Następnie oczekuje na odpowiedź od strony zdalnej. Próbuje zdekodować przychodzącą odpowiedź przy użyciu schematu Msg ASN.1. Zakładamy, że cała wiadomość zostanie wysłana w jednym segmencie TCP i otrzymamy ją atomowo przy wywołaniu .read(). Sprawdzamy, czy otrzymaliśmy wiadomość o uścisku dłoni.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

Sprawdzamy, czy otrzymane imię rozmówcy jest nam znane. Jeżeli nie, to zrywamy połączenie. Sprawdzamy, czy nawiązaliśmy już z nim połączenie (rozmówca ponownie wydał polecenie połączenia się z nami) i zamykamy je. Kolejka IN_QUEUES przechowuje ciągi znaków Pythona z tekstem wiadomości, ale ma specjalną wartość None, która sygnalizuje, że współprogram msg_sender przestaje działać i zapomina o swoim programie piszącym powiązanym ze starszym połączeniem TCP.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender akceptuje wiadomości wychodzące (kolejkowane z gniazda wejściowego), serializuje je do wiadomości MsgText i wysyła przez połączenie TCP. W każdej chwili może się zepsuć – wyraźnie to przechwytujemy.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

Na koniec inicjator wchodzi w nieskończoną pętlę odczytywania komunikatów z gniazda. Sprawdza, czy są to wiadomości tekstowe i umieszcza je w kolejce OUT_QUEUES, z której zostaną przesłane do gniazda wyjściowego odpowiedniego rozmówcy. Dlaczego nie możesz po prostu wykonać .read() i odszyfrować wiadomość? Ponieważ istnieje możliwość, że kilka wiadomości od użytkownika zostanie zagregowanych w buforze systemu operacyjnego i przesłanych w jednym segmencie TCP. Pierwszą z nich możemy rozszyfrować, a następnie część kolejnej może pozostać w buforze. W przypadku jakiejkolwiek nietypowej sytuacji zamykamy połączenie TCP i zatrzymujemy koronę msg_sender (wysyłając None do kolejki OUT_QUEUES).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

Wróćmy do głównego kodu. Po utworzeniu wszystkich współprogramów w chwili uruchomienia programu uruchamiamy serwer TCP. Dla każdego nawiązanego połączenia tworzy współprogram odpowiadający.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

obiekt odpowiadający jest podobny do inicjatora i odzwierciedla te same działania, ale dla uproszczenia nieskończona pętla odczytywania wiadomości rozpoczyna się natychmiast. Obecnie protokół Handshake wysyła po jednej wiadomości z każdej strony, ale w przyszłości od inicjatora połączenia będą dwie, po czym wiadomości tekstowe będą mogły być wysyłane od razu.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

Bezpieczny protokół

Czas zabezpieczyć naszą komunikację. Co rozumiemy przez bezpieczeństwo i czego chcemy:

  • poufność przesyłanych wiadomości;
  • autentyczność i integralność przesyłanych komunikatów – należy wykryć ich zmiany;
  • ochrona przed atakami typu „replay” – należy wykryć fakt braku lub powtórzenia wiadomości (i podejmujemy decyzję o zakończeniu połączenia);
  • identyfikacja i uwierzytelnianie rozmówców za pomocą wcześniej wprowadzonych kluczy publicznych - już wcześniej zdecydowaliśmy, że tworzymy sieć przyjaciel-znajomy. Dopiero po uwierzytelnieniu zrozumiemy, z kim się komunikujemy;
  • dostępność idealna tajemnica przekazu właściwości (PFS) - naruszenie naszego długotrwałego klucza podpisu nie powinno prowadzić do możliwości odczytania całej wcześniejszej korespondencji. Nagrywanie przechwyconego ruchu staje się bezużyteczne;
  • ważność/ważność komunikatów (transport i uzgadnianie) tylko w ramach jednej sesji TCP. Wstawianie poprawnie podpisanych/uwierzytelnionych wiadomości z innej sesji (nawet z tym samym rozmówcą) nie powinno być możliwe;
  • bierny obserwator nie powinien widzieć ani identyfikatorów użytkowników, przesłanych długowiecznych kluczy publicznych, ani skrótów z nich. Pewna anonimowość ze strony biernego obserwatora.

Co zaskakujące, prawie każdy chce mieć to minimum w dowolnym protokole uzgadniania, a bardzo niewiele z powyższych wymagań jest ostatecznie spełnionych w przypadku protokołów „domowych”. Teraz nie wymyślimy nic nowego. Zdecydowanie polecam używać Ramy hałasu do budowania protokołów, ale wybierzmy coś prostszego.

Dwa najpopularniejsze protokoły to:

  • TLS - bardzo złożony protokół z długą historią błędów, zacięć, luk, złego myślenia, złożoności i niedociągnięć (jednak ma to niewiele wspólnego z TLS 1.3). Ale nie bierzemy tego pod uwagę, ponieważ jest to zbyt skomplikowane.
  • IPsec с IKE — nie mają poważnych problemów kryptograficznych, chociaż nie są one też proste. Jeśli czytasz o IKEv1 i IKEv2, to ich źródłem jest STS, ISO/IEC IS 9798-3 i protokoły SIGMA (SIGn-and-MAc) - wystarczająco proste do wdrożenia w jeden wieczór.

Co jest dobrego w SIGMA, jako najnowszym ogniwie w rozwoju protokołów STS/ISO? Spełnia wszystkie nasze wymagania (w tym „ukrywanie” identyfikatorów rozmówców) i nie ma znanych problemów kryptograficznych. Jest minimalistyczny – usunięcie przynajmniej jednego elementu z komunikatu protokołu spowoduje jego niepewność.

Przejdźmy od najprostszego, domowego protokołu do protokołu SIGMA. Najbardziej podstawowa operacja, która nas interesuje, to kluczowa umowa: Funkcja, która zwraca obu uczestnikom tę samą wartość, której można użyć jako klucza symetrycznego. Bez wchodzenia w szczegóły: każda ze stron generuje efemeryczną (używaną tylko w ramach jednej sesji) parę kluczy (klucze publiczny i prywatny), wymienia klucze publiczne, wywołuje funkcję porozumienia, na wejście której przekazuje swój klucz prywatny i publiczny klucz rozmówcy.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ══════════╗ │───────────────>│ ║PrvA, PubA = DHgen()║ │ │ ╚ ═ ════════ ═══════════╝ │ IdB, PubB │ ╔════════════════════╗ │<───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚════════════════════╝ ─ ───┐ ╔════ ═══╧════════════╗ │ ║Klucz = DH(PrvA, PubB)║ <───┘ ╚═══════╤ ═ ═══════ ════╝ │ │ │ │

Każdy może wskoczyć w środek i zastąpić klucze publiczne własnymi – w tym protokole nie ma uwierzytelniania rozmówców. Dodajmy podpis z długowiecznymi kluczami.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │IdA, PubA, znak(SignPrvA, (PubA)) │ ╔═ │──────────── ────────── ───────────>│ ║SignPrvA, SignP ubA = obciążenie()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════ ═══════ ═════════════╝ │IdB, PubB , znak(SignPrvB, (PubB)) │ ╔══════════════ ═══════ ══════╗ │<─────── ─────────── ───────────── ──│ ║SignPrvB, SignPubB = loading( )║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚══ ═════════ ══════════════ ══╝ ────┐ ╔ ════════════════ ═════╗ │ │ ║weryfikuj( SignPubB, ...)║ │ <───┘ ║Klucz = DH(Pr vA, PubB) ║ │ │ ╚═══════════════════ ══╝ │ │ │

Taki podpis nie będzie działać, ponieważ nie jest powiązany z konkretną sesją. Takie wiadomości „nadają się” także do sesji z innymi uczestnikami. Cały kontekst musi się subskrybować. Zmusza to nas do dodania jeszcze jednej wiadomości od A.

Dodatkowo niezwykle istotne jest dodanie pod podpisem własnego identyfikatora, gdyż w przeciwnym razie możemy zastąpić IdXXX i ponownie podpisać wiadomość kluczem innego znanego rozmówcy. Aby zapobiec ataki refleksyjnekonieczne jest, aby elementy pod podpisem znajdowały się w jasno określonych miejscach zgodnie z ich znaczeniem: jeśli znak A (PubA, PubB), to B musi się podpisać (PubB, PubA). Świadczy to również o znaczeniu wyboru struktury i formatu serializowanych danych. Na przykład zestawy w kodowaniu ASN.1 DER są sortowane: SET OF(PubA, PubB) będzie identyczny z SET OF(PubB, PubA).

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │───────────────────────── ────────── ─────────────>│ ║SignPrvA, SignPubA = Load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚════ ═ ═══════ ═══════════════╝ │IdB, PubB, znak(SignPrvB, (IdB, PubA, PubB)) │ ╔══════════ ═════ ════════════╗ │<───────────────────────── ────────── ─────────│ ║SignPrvB, SignPubB = Load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═════════ ════════ ══════════╝ │ znak(SignPrvA, (IdA, PubB, PubA)) │ ╔═════════════════ ════╗ │─ ──────────────────────────────────────────── ───>│ ║weryfikuj(SignPubB, ...) ║ │ │ ║klawisz = dh (prva, PUBB) ║ │ │ │

Jednak nadal nie „udowodniliśmy”, że wygenerowaliśmy ten sam klucz współdzielony dla tej sesji. W zasadzie możemy obejść się bez tego kroku – już pierwsze połączenie transportowe będzie nieważne, ale chcemy, żeby po zakończeniu uzgadniania mieliśmy pewność, że rzeczywiście wszystko jest uzgodnione. W tej chwili dysponujemy protokołem ISO/IEC IS 9798-3.

Moglibyśmy podpisać sam wygenerowany klucz. Jest to niebezpieczne, ponieważ możliwe jest, że w zastosowanym algorytmie podpisu mogą wystąpić nieszczelności (choć liczba bitów na podpis, ale nadal jest nieszczelna). Możliwe jest podpisanie skrótu klucza wyprowadzającego, ale wyciek nawet skrótu klucza pochodnego może być cenny w przypadku ataku brute-force na funkcję wyprowadzania. SIGMA wykorzystuje funkcję MAC, która uwierzytelnia identyfikator nadawcy.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │───────────────────────── ────────── ──────────────────>│ ║SignPrvA, SignPubA = Load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚ ═══════ ════════════════════╝ │IdB, PubB, znak(SignPrvB, (PubA, PubB)), MAC(IdB) │ ╔════ ═══ │<───────────────── ────────── ─────────────── ────────── ─│ ║SignPrvB, SignPubB = obciążenie()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ════════════ ═════════ ══╝ │ │ ╔════════════ ═════════╗ │ znak(SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Klawisz = DH( PrvA, PubB) ║ │───────────────────── ── ─────────── ────────── ─────>│ ║weryfikuj(Klucz, IdB) ║ │ │ ║weryfikuj(SignPubB, ...)║ │ │ ╚═══════════════ ═════ ═╝ │ │

W ramach optymalizacji niektórzy mogą chcieć ponownie wykorzystać swoje klucze efemeryczne (co jest oczywiście niefortunne w przypadku PFS). Na przykład wygenerowaliśmy parę kluczy, próbowaliśmy się połączyć, ale protokół TCP nie był dostępny lub został przerwany gdzieś w środku protokołu. Szkoda marnować zmarnowaną entropię i zasoby procesora na nową parę. Dlatego wprowadzimy tzw. plik cookie – wartość pseudolosową, która będzie chronić przed możliwymi atakami typu random replay podczas ponownego wykorzystania efemerycznych kluczy publicznych. Ze względu na powiązanie pliku cookie z efemerycznym kluczem publicznym, klucz publiczny strony przeciwnej może zostać usunięty z podpisu jako niepotrzebny.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA, CiasteczkoA │ ╔════════ ═══════════════════╗ │─────────────────────── ────────── ──────────────────────────────────────────── ─>│ ║SignPrvA, SignPubA = obciążenie( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚════════════════════════════ ══╝ │IdB, PubB, CookieB , znak(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔═══════════════════════════ ╗ │< ──────────────────────────────────────────── ────────── ────────────────────│ ║SignPrvB, SignPubB = Load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚══════ ═════════════════════╝ │ │ ╔══════════════ ═══════╗ │ znak( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │───────────────────────── ── ──────────────────────────────────────────── ───────>│ ║ zweryfikować(Klucz, IdB) ║ │ │ ║weryfikować(SignPubB, ...)║ │ │ ╚═════════════════════╝ │ │

Wreszcie chcemy pozyskać prywatność naszych rozmówców od biernego obserwatora. W tym celu SIGMA proponuje w pierwszej kolejności wymianę kluczy efemerycznych i opracowanie wspólnego klucza, na którym można szyfrować wiadomości uwierzytelniające i identyfikujące. SIGMA opisuje dwie opcje:

  • SIGMA-I - chroni inicjatora przed atakami aktywnymi, odpowiadającego przed atakami pasywnymi: inicjator uwierzytelnia odpowiadającego i jeśli coś nie pasuje, nie podaje swojej identyfikacji. Pozwany wydaje swój dowód tożsamości w przypadku rozpoczęcia z nim aktywnego protokołu. Bierny obserwator niczego się nie uczy;
    SIGMA-R - chroni odpowiadającego przed atakami aktywnymi, inicjatora przed pasywnymi. Wszystko jest dokładnie odwrotnie, ale w tym protokole przesyłane są już cztery komunikaty uścisku dłoni.

    Wybieramy SIGMA-I, ponieważ jest bardziej podobny do tego, czego oczekujemy od rzeczy znanych klient-serwer: klient jest rozpoznawany tylko przez serwer uwierzytelniony i każdy już zna ten serwer. Ponadto jest łatwiejszy do wdrożenia ze względu na mniejszą liczbę komunikatów uzgadniania. Jedyne, co dodajemy do protokołu, to zaszyfrowanie części wiadomości i przeniesienie identyfikatora A do zaszyfrowanej części ostatniej wiadomości:

    PubA, CookieA │ ╔══════════ ═════════════════╗ │────── ────────── ───── ────────── ──────────────────────────── ─────────── ───── ──────>│ ║SignPrvA , SignPubA = Load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚══════ ════════ ═════════ ════╝ │ PubB, CookieB, Enc((IdB, znak(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔═════ ═══════════════ ═══════╗ │<─────────────── ────────── ───── ────────── ║SignP rvB, SignPubB = Load()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚════ ═══════ ════════════════╝ │ │ ╔════════ ═══════════ ══╗ │ Enc((IdA, znak( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │──────────────────────── ─ ────────────────── ────────── ( SignPubB, ...)║ │ │ ╚════ ═══════ ══════════╝ │ │
    
    • Do podpisu używany jest GOST R 34.10-2012 algorytm z kluczami 256-bitowymi.
    • Do wygenerowania klucza publicznego używany jest VKO 34.10-2012.
    • CMAC jest używany jako MAC. Technicznie jest to specjalny tryb działania szyfru blokowego, opisany w GOST R 34.13-2015. Jako funkcja szyfrowania dla tego trybu − Konik polny (34.12-2015).
    • Jako identyfikator rozmówcy używany jest skrót jego klucza publicznego. Używany jako skrót Stribog-256 (34.11 2012 bitów).

    Po uścisku dłoni uzgodnimy wspólny klucz. Możemy go używać do uwierzytelnionego szyfrowania wiadomości transportowych. Ta część jest bardzo prosta i trudna do popełnienia błędu: zwiększamy licznik wiadomości, szyfrujemy wiadomość, uwierzytelniamy (MAC) licznik i tekst zaszyfrowany, wysyłamy. Po otrzymaniu wiadomości sprawdzamy, czy licznik ma oczekiwaną wartość, uwierzytelniamy zaszyfrowany tekst za pomocą licznika i deszyfrujemy go. Jakiego klucza powinienem używać do szyfrowania wiadomości uścisku dłoni, wiadomości transportowych i jak je uwierzytelniać? Używanie jednego klucza do wszystkich tych zadań jest niebezpieczne i nierozsądne. Konieczne jest generowanie kluczy za pomocą wyspecjalizowanych funkcji KDF (funkcja wyprowadzania klucza). Znowu nie dzielmy włosa na czworo i wymyślmy coś: HKDF jest od dawna znany, dobrze zbadany i nie powoduje żadnych znanych problemów. Niestety natywna biblioteka Pythona nie ma tej funkcji, więc używamy hkdf plastikowa torba. HKDF wykorzystuje wewnętrznie HMAC, który z kolei korzysta z funkcji skrótu. Przykładowa implementacja w Pythonie na stronie Wikipedii zajmuje tylko kilka linijek kodu. Podobnie jak w przypadku 34.10, jako funkcję skrótu użyjemy Stribog-2012. Wynik naszej funkcji uzgodnienia klucza będzie nazywany kluczem sesyjnym, z którego zostaną wygenerowane brakujące klucze symetryczne:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    Struktury/Schematy

    Przyjrzyjmy się, jakie mamy teraz struktury ASN.1 do przesyłania wszystkich tych danych:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    Uścisk dłoni TBS będzie tym, co zostanie podpisane. UzgadnianieTBE – co będzie szyfrowane. Zwracam uwagę na pole ukm w MsgHandshake1. 34.10 VKO, dla jeszcze większej randomizacji generowanych kluczy, zawiera parametr UKM (user keying material) - po prostu dodatkowa entropia.

    Dodawanie kryptografii do kodu

    Rozważmy tylko zmiany wprowadzone w oryginalnym kodzie, ponieważ framework pozostał ten sam (w rzeczywistości najpierw napisano ostateczną implementację, a następnie wycięto z niej całą kryptografię).

    Ponieważ uwierzytelnianie i identyfikacja rozmówców będzie odbywać się przy użyciu kluczy publicznych, trzeba je teraz gdzieś przechowywać przez długi czas. Dla uproszczenia używamy JSON w następujący sposób:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    our - nasza para kluczy, szesnastkowe klucze prywatne i publiczne. ich — imiona rozmówców i ich klucze publiczne. Zmieńmy argumenty wiersza poleceń i dodajmy przetwarzanie końcowe danych JSON:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    Klucz prywatny algorytmu 34.10 jest liczbą losową. Rozmiar 256-bitowy dla 256-bitowych krzywych eliptycznych. PyGOST nie działa z zestawem bajtów, ale z duże liczby, więc nasz klucz prywatny (urandom(32)) musi zostać przekonwertowany na liczbę za pomocą gost3410.prv_unmarshal(). Klucz publiczny jest określany deterministycznie na podstawie klucza prywatnego za pomocą gost3410.public_key(). Klucz publiczny 34.10 to dwie duże liczby, które również należy przekonwertować na sekwencję bajtów, aby ułatwić przechowywanie i przesyłanie za pomocą gost3410.pub_marshal().

    Po odczytaniu pliku JSON klucze publiczne należy odpowiednio przekonwertować za pomocą gost3410.pub_unmarshal(). Ponieważ identyfikatory rozmówców otrzymamy w postaci hasha z klucza publicznego, można je od razu z góry obliczyć i umieścić w słowniku w celu szybkiego wyszukiwania. Hash Stribog-256 to gost34112012256.GOST34112012256(), który w pełni obsługuje interfejs hashlib funkcji skrótu.

    Jak zmieniła się współprogram inicjujący? Wszystko przebiega zgodnie ze schematem uzgadniania: generujemy plik cookie (128 bitów w zupełności wystarczy), efemeryczną parę kluczy 34.10, która będzie używana w funkcji uzgadniania kluczy VKO.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • czekamy na odpowiedź i dekodujemy przychodzącą wiadomość Msg;
    • upewnij się, że uzyskałeś uścisk dłoni1;
    • zdekodować efemeryczny klucz publiczny drugiej strony i obliczyć klucz sesji;
    • Generujemy klucze symetryczne niezbędne do przetworzenia części TBE wiadomości.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM jest liczbą 64-bitową (urandom(8)), która również wymaga deserializacji na podstawie reprezentacji bajtowej za pomocą gost3410_vko.ukm_unmarshal(). Funkcja VKO na 34.10 2012-bitowa to gost256_vko.kek_3410() (KEK - klucz szyfrowania).

    Wygenerowany klucz sesji jest już 256-bitową pseudolosową sekwencją bajtów. Dlatego można go od razu zastosować w funkcjach HKDF. Ponieważ GOST34112012256 spełnia interfejs hashlib, można go natychmiast użyć w klasie Hkdf. Nie podajemy soli (pierwszy argument Hkdf), ponieważ wygenerowany klucz, ze względu na efemeryczność uczestniczących par kluczy, będzie inny dla każdej sesji i zawiera już wystarczającą entropię. kdf.expand() domyślnie generuje już 256-bitowe klucze wymagane później dla Grasshoppera.

    Następnie sprawdzane są części TBE i TBS przychodzącej wiadomości:

    • obliczany i sprawdzany jest adres MAC przychodzącego tekstu zaszyfrowanego;
    • zaszyfrowany tekst został odszyfrowany;
    • Struktura TBE jest dekodowana;
    • pobierany jest z niego identyfikator rozmówcy i sprawdzane jest, czy w ogóle jest on nam znany;
    • Obliczany i sprawdzany jest MAC nad tym identyfikatorem;
    • weryfikowany jest podpis nad strukturą TBS, który obejmuje plik cookie obu stron oraz publiczny klucz efemeryczny strony przeciwnej. Podpis jest weryfikowany za pomocą długowiecznego klucza podpisu rozmówcy.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    Jak pisałem powyżej, 34.13 opisuje różne tryby pracy szyfru blokowego od 34.12r. Wśród nich znajduje się tryb generowania wstawek imitacyjnych oraz obliczeń MAC. W PyGOST jest to gost2015.mac(). Tryb ten wymaga przekazania funkcji szyfrowania (odbioru i zwrotu jednego bloku danych), rozmiaru bloku szyfrowania i tak naprawdę samych danych. Dlaczego nie można zakodować na stałe rozmiaru bloku szyfrującego? 3413 opisuje nie tylko 34.12-bitowy szyfr Grasshopper, ale także 2015-bitowy Magma - nieco zmodyfikowany GOST 28147-89, stworzony jeszcze w KGB i nadal ma jeden z najwyższych progów bezpieczeństwa.

    Kuznechik jest inicjowany przez wywołanie gost.3412.GOST3412Kuznechik(key) i zwraca obiekt metodami .encrypt()/.decrypt() odpowiednimi do przekazywania do funkcji 34.13. MAC jest obliczany w następujący sposób: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, tekst zaszyfrowany). Aby porównać obliczony i otrzymany adres MAC, nie można użyć zwykłego porównania (==) ciągów bajtów, ponieważ ta operacja powoduje wyciek czasu porównania, co w ogólnym przypadku może prowadzić do krytycznych luk w zabezpieczeniach, takich jak BEAST ataki na TLS. W Pythonie dostępna jest do tego specjalna funkcja hmac.compare_digest.

    Funkcja szyfrowania blokowego może zaszyfrować tylko jeden blok danych. W przypadku większej liczby, a nawet nie będącej wielokrotnością długości, konieczne jest zastosowanie trybu szyfrowania. 34.13-2015 opisuje: EBC, CTR, OFB, CBC, CFB. Każdy z nich ma swoje własne akceptowalne obszary zastosowań i właściwości. Niestety nadal nie mamy standaryzacji uwierzytelnione tryby szyfrowania (takie jak CCM, OCB, GCM i tym podobne) - jesteśmy zmuszeni przynajmniej sami dodać MAC. wybieram tryb licznika (CTR): nie wymaga dopełniania rozmiaru bloku, można go zrównoleglać, wykorzystuje jedynie funkcję szyfrowania, można go bezpiecznie używać do szyfrowania dużej liczby wiadomości (w przeciwieństwie do CBC, które stosunkowo szybko ulega kolizjom).

    Podobnie jak .mac(), .ctr() pobiera podobne dane wejściowe: szyfrogram = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, zwykły tekst, iv). Wymagane jest określenie wektora inicjującego, który jest dokładnie równy połowie długości bloku szyfrującego. Jeśli nasz klucz szyfrujący służy tylko do zaszyfrowania jednej wiadomości (aczkolwiek z kilku bloków), to bezpiecznie jest ustawić zerowy wektor inicjujący. Do szyfrowania wiadomości uścisku dłoni używamy za każdym razem osobnego klucza.

    Weryfikacja podpisu gost3410.verify() jest banalna: przekazujemy krzywą eliptyczną, w obrębie której pracujemy (po prostu zapisujemy ją w naszym protokole GOSTIM), klucz publiczny osoby podpisującej (nie zapominajmy, że powinna to być krotka dwóch duże liczby, a nie ciąg bajtów), skrót 34.11 i sam podpis.

    Następnie w inicjatorze przygotowujemy i wysyłamy wiadomość uścisku dłoni do handshake2, wykonując te same czynności, co podczas weryfikacji, tylko symetrycznie: podpisując się na naszych kluczach zamiast sprawdzać itp.

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    Po ustanowieniu sesji generowane są klucze transportowe (oddzielny klucz do szyfrowania i uwierzytelniania dla każdej ze stron), a Grasshopper jest inicjowany w celu odszyfrowania i sprawdzenia adresu MAC:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    Współprogram msg_sender szyfruje teraz wiadomości przed wysłaniem ich przez połączenie TCP. Każda wiadomość ma monotonicznie rosnącą wartość jednorazową, która jest również wektorem inicjującym, gdy jest szyfrowana w trybie licznika. Każdy komunikat i blok komunikatów ma gwarantowaną inną wartość licznika.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    Wiadomości przychodzące są przetwarzane przez procedurę msg_receiver, która obsługuje uwierzytelnianie i deszyfrowanie:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    wniosek

    GOSTIM jest przeznaczony wyłącznie do celów edukacyjnych (przynajmniej dlatego, że nie jest objęty testami)! Można pobrać kod źródłowy programu tutaj (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, Przejdź do VPN, GOSTIM jest całkowicie darmowe oprogramowanie, rozpowszechniane zgodnie z warunkami GPLv3 +.

    Siergiej Matwiejew, cypherpunk, członek Fundacja SPO, programista Python/Go, główny specjalista FSUE „STC „Atlas”.

Źródło: www.habr.com

Dodaj komentarz