GOSTIM: GOST 암호화를 사용한 어느 날 저녁 P2P F2F E2EE IM

개발자가 되다 PyGOST 라이브러리(순수 Python의 GOST 암호화 기본 요소)를 사용하면서 무릎에 가장 간단한 보안 메시징을 구현하는 방법에 대한 질문을 자주 받습니다. 많은 사람들은 적용된 암호화가 매우 간단하다고 생각하며, 블록 암호에서 .encrypt()를 호출하는 것만으로도 통신 채널을 통해 안전하게 전송할 수 있습니다. 다른 사람들은 응용 암호화가 소수의 운명이라고 믿으며, 올림피아드 수학자들이 있는 Telegram과 같은 부유한 회사는 허용됩니다. 구현할 수 없다 보안 프로토콜.

이 모든 것이 암호화 프로토콜과 보안 IM을 구현하는 것이 그렇게 어려운 작업이 아니라는 것을 보여주기 위해 이 기사를 작성하게 만들었습니다. 그러나 자체 인증 및 키 계약 프로토콜을 개발하는 것은 가치가 없습니다.

GOSTIM: GOST 암호화를 사용한 어느 날 저녁 P2P F2F E2EE IM
기사가 쓰겠다 피어 - 투 - 피어, 친구 대 친구, 엔드 투 엔드 암호화 인스턴트 메신저 시그마-I 인증 및 키 계약 프로토콜(이를 기반으로 구현됨) IPsec IKE), GOST 암호화 알고리즘 PyGOST 라이브러리 및 ASN.1 메시지 인코딩 라이브러리만 사용 피데라스N (나는 이미 이에 대해 전에 쓴). 전제 조건: 하루 저녁(또는 근무일)에 처음부터 작성할 수 있을 정도로 매우 단순해야 합니다. 그렇지 않으면 더 이상 간단한 프로그램이 아닙니다. 아마도 오류, 불필요한 복잡함, 단점이 있을 것입니다. 게다가 이것은 asyncio 라이브러리를 사용하는 첫 번째 프로그램입니다.

메신저 디자인

먼저 IM이 어떤 모습일지 이해해야 합니다. 단순화를 위해 참가자를 검색하지 않고 PXNUMXP 네트워크로 만듭니다. 대담자와 통신하기 위해 연결할 포트 주소를 개인적으로 표시합니다.

현재로서는 임의의 두 컴퓨터 간에 직접 통신이 가능하다는 가정이 IM의 실제 적용 가능성에 심각한 제한이 된다는 점을 이해합니다. 그러나 더 많은 개발자가 모든 종류의 NAT 통과 목발을 구현할수록 우리는 IPv4 인터넷에 더 오래 머물게 될 것이며 임의의 컴퓨터 간의 통신 가능성은 낮아질 것입니다. 집과 직장에서 IPv6의 부족을 언제까지 견딜 수 있습니까?

우리는 친구 대 친구 네트워크를 갖게 될 것입니다. 가능한 모든 대화 상대를 미리 알아야 합니다. 첫째, 이것은 모든 것을 크게 단순화합니다. 우리는 자신을 소개하고, 이름/키를 찾거나 찾지 못했고, 연결을 끊거나 계속 작업하고, 대담자를 알고 있습니다. 둘째, 일반적으로 안전하며 많은 공격을 제거합니다.

IM 인터페이스는 기존 솔루션에 가깝습니다. 형편없는 프로젝트, 저는 그들의 미니멀리즘과 Unix-way 철학을 정말 좋아합니다. IM 프로그램은 각 대화자에 대해 세 개의 Unix 도메인 소켓이 있는 디렉터리를 만듭니다.

  • in - 대담자에게 보낸 메시지가 기록됩니다.
  • out - 대담자로부터 받은 메시지를 읽습니다.
  • 상태 - 이를 읽어서 대화 상대가 현재 연결되어 있는지 여부, 연결 주소/포트를 알아냅니다.

또한 원격 대담자에 대한 연결을 시작하는 호스트 포트를 작성하여 conn 소켓이 생성됩니다.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

이 접근 방식을 사용하면 IM 전송 및 사용자 인터페이스를 독립적으로 구현할 수 있습니다. 친구가 없고 모든 사람을 만족시킬 수는 없기 때문입니다. 사용 tmux 및 / 또는 다꼬리, 구문 강조 기능이 있는 다중 창 인터페이스를 얻을 수 있습니다. 그리고 도움으로 rlwrap GNU Readline 호환 메시지 입력 라인을 얻을 수 있습니다.

실제로 형편없는 프로젝트는 FIFO 파일을 사용합니다. 개인적으로 전용 스레드에서 손으로 직접 작성한 배경 없이 asyncio에서 경쟁적으로 파일 작업을 수행하는 방법을 이해할 수 없었습니다(저는 오랫동안 그런 일을 위해 언어를 사용해 왔습니다) Go). 따라서 나는 Unix 도메인 소켓을 사용하기로 결정했습니다. 불행히도 이로 인해 echo 2001:470:dead::babe 6666 > conn을 수행할 수 없습니다. 나는 다음을 사용하여 이 문제를 해결했습니다. 소캣: 에코 2001:470:죽은::베이비 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alice/in.

원래의 안전하지 않은 프로토콜

TCP는 전송으로 사용됩니다. 이는 배달과 순서를 보장합니다. UDP는 둘 다 보장하지 않지만(암호화를 사용할 때 유용함) 다음을 지원합니다. SCTP 파이썬은 기본적으로 나오지 않습니다.

불행하게도 TCP에는 메시지라는 개념이 없고 단지 바이트 스트림만 있습니다. 따라서 이 스레드에서 메시지를 서로 공유할 수 있도록 메시지 형식을 고안하는 것이 필요합니다. 줄 바꿈 문자를 사용하는 데 동의할 수 있습니다. 처음에는 괜찮지만 일단 메시지 암호화를 시작하면 이 문자가 암호문의 어느 곳에나 나타날 수 있습니다. 따라서 네트워크에서 널리 사용되는 프로토콜은 먼저 메시지 길이를 바이트 단위로 보내는 프로토콜입니다. 예를 들어, 기본적으로 Python에는 비슷한 형식으로 작업할 수 있는 xdrlib가 있습니다. XDR.

우리는 TCP 읽기를 정확하고 효율적으로 수행하지 않을 것입니다. 코드를 단순화할 것입니다. 완전한 메시지를 디코딩할 때까지 무한 루프를 통해 소켓에서 데이터를 읽습니다. XML이 포함된 JSON도 이 접근 방식의 형식으로 사용할 수 있습니다. 그러나 암호화가 추가되면 데이터에 서명하고 인증해야 합니다. 이를 위해서는 JSON/XML이 제공하지 않는 바이트 단위의 동일한 객체 표현이 필요합니다(덤프 결과는 다를 수 있음).

XDR은 이 작업에 적합하지만 저는 DER 인코딩이 포함된 ASN.1을 선택하고 피데라스N 도서관은 작업하기가 더 즐겁고 편리한 높은 수준의 개체를 보유하게 될 것이기 때문입니다. 스키마 없는 것과 달리 벤코드, 메시지팩 또는 CBOR, ASN.1은 하드 코딩된 스키마와 비교하여 데이터를 자동으로 확인합니다.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

수신된 메시지는 Msg입니다. 텍스트 MsgText(현재는 하나의 텍스트 필드 포함) 또는 MsgHandshake 핸드셰이크 메시지(대화자의 이름 포함)입니다. 지금은 너무 복잡해 보이지만 이는 미래를 위한 기반입니다.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │MsgHandshake( IdA) │ │───────── ────────>│ │ │ │MsgHandshake(IdB) │ │<─────────────────│ │ │ │ MsgText() │ │──── MsgText() │ │ │

암호화 없는 IM

이미 말했듯이 asyncio 라이브러리는 모든 소켓 작업에 사용됩니다. 출시 시 기대되는 사항을 발표해 보겠습니다.

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

자신의 이름을 설정하세요(--our-name alice). 예상되는 모든 대담자는 쉼표로 구분되어 나열됩니다(—그들의 이름 bob,eve). 각 대담자에 대해 Unix 소켓이 있는 디렉터리와 각 in, out, 상태에 대한 코루틴이 생성됩니다.

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

in 소켓에서 사용자로부터 오는 메시지는 IN_QUEUES 대기열로 전송됩니다.

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

대담자로부터 오는 메시지는 OUT_QUEUES 대기열로 전송되며, 여기에서 데이터가 출력 소켓에 기록됩니다.

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

상태 소켓에서 읽을 때 프로그램은 PEER_ALIVE 사전에서 대담자의 주소를 찾습니다. 대담 자와 아직 연결이 없으면 빈 줄이 기록됩니다.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

conn 소켓에 주소를 쓸 때 연결 "초기자" 기능이 시작됩니다.

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

개시자를 고려해 봅시다. 먼저 지정된 호스트/포트에 대한 연결을 열고 해당 이름과 함께 핸드셰이크 메시지를 보냅니다.

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

그런 다음 원격 상대방의 응답을 기다립니다. Msg ASN.1 구성표를 사용하여 들어오는 응답의 디코딩을 시도합니다. 우리는 전체 메시지가 하나의 TCP 세그먼트로 전송되고 .read()를 호출할 때 원자적으로 수신한다고 가정합니다. Handshake 메시지를 받았는지 확인합니다.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

우리는 대담 자의 수신 이름이 우리에게 알려져 있는지 확인합니다. 그렇지 않다면 연결을 끊습니다. 우리는 이미 그와의 연결을 설정했는지 확인하고 (대담자가 다시 우리에게 연결하라는 명령을 내림) 닫습니다. IN_QUEUES 대기열은 메시지 텍스트가 포함된 Python 문자열을 보유하지만 msg_sender 코루틴에 작업을 중지하여 레거시 TCP 연결과 관련된 작성자를 잊어버리도록 신호를 보내는 특별한 값 None이 있습니다.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender는 나가는 메시지(소켓 내 대기열에 있음)를 수락하고 이를 MsgText 메시지로 직렬화한 다음 TCP 연결을 통해 보냅니다. 언제든지 깨질 수 있습니다. 우리는 이것을 명확하게 차단합니다.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

결국 개시자는 소켓에서 메시지를 읽는 무한 루프에 들어갑니다. 이러한 메시지가 텍스트 메시지인지 확인하고 OUT_QUEUES 대기열에 배치하면 해당 대화 상대의 출력 소켓으로 전송됩니다. 왜 .read()를 수행하고 메시지를 디코딩할 수 없나요? 사용자의 여러 메시지가 운영 체제 버퍼에 집계되어 하나의 TCP 세그먼트로 전송될 수 있기 때문입니다. 첫 번째 것을 디코딩하면 다음 부분의 일부가 버퍼에 남을 수 있습니다. 비정상적인 상황이 발생하는 경우 TCP 연결을 닫고 msg_sender 코루틴을 중지합니다(OUT_QUEUES 대기열에 None을 전송하여).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

메인 코드로 돌아가자. 프로그램 시작 시 모든 코루틴을 생성한 후 TCP 서버를 시작합니다. 설정된 각 연결에 대해 응답자 코루틴을 생성합니다.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

응답자는 개시자와 유사하며 동일한 작업을 모두 미러링하지만 단순화를 위해 메시지 읽기의 무한 루프가 즉시 시작됩니다. 현재 핸드셰이크 프로토콜은 각 측에서 하나의 메시지를 보내지만 앞으로는 연결 개시자로부터 두 개의 메시지가 전송될 예정이며 그 후에는 문자 메시지를 즉시 보낼 수 있습니다.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

보안 프로토콜

이제 우리의 통신을 보호해야 할 때입니다. 보안이란 무엇을 의미하며 우리가 원하는 것은 무엇입니까?

  • 전송된 메시지의 기밀성;
  • 전송된 메시지의 신뢰성 및 무결성 - 변경 사항을 감지해야 합니다.
  • 재생 공격으로부터 보호 - 메시지가 누락되거나 반복된다는 사실을 감지해야 합니다(그리고 연결을 종료하기로 결정합니다).
  • 미리 입력된 공개 키를 사용하여 대화 상대를 식별하고 인증합니다. 우리는 이미 친구 간 네트워크를 만들기로 결정했습니다. 인증 후에야 우리는 누구와 통신하고 있는지 이해할 수 있습니다.
  • 가용성 완벽한 전달 비밀 속성(PFS) - 수명이 긴 서명 키가 손상되더라도 이전의 모든 서신을 읽을 수 있는 능력이 없어져서는 안 됩니다. 가로채는 트래픽을 기록하면 쓸모가 없게 됩니다.
  • 하나의 TCP 세션 내에서만 메시지(전송 및 핸드셰이크)의 유효성/유효성. 다른 세션에서 올바르게 서명/인증된 메시지를 삽입하는 것은 (동일한 대화 상대라도) 불가능해야 합니다.
  • 수동적 관찰자는 사용자 식별자, 전송된 수명이 긴 공개 키 또는 해시를 볼 수 없습니다. 수동적인 관찰자의 특정 익명성.

놀랍게도 거의 모든 사람이 모든 핸드셰이크 프로토콜에서 이 최소값을 갖기를 원하며 "자체 개발" 프로토콜에서는 위의 항목 중 극히 일부만 충족됩니다. 이제 우리는 새로운 것을 발명하지 않을 것입니다. 나는 확실히 사용하는 것이 좋습니다 노이즈 프레임워크 프로토콜을 구축하기 위한 것이지만 더 간단한 것을 선택하겠습니다.

가장 널리 사용되는 두 가지 프로토콜은 다음과 같습니다.

  • TLS - 버그, 잼, 취약성, 잘못된 생각, 복잡성 및 단점의 오랜 역사를 지닌 매우 복잡한 프로토콜입니다(그러나 이는 TLS 1.3과는 거의 관련이 없습니다). 하지만 너무 복잡하기 때문에 고려하지 않습니다.
  • IPsec с IKE — 간단하지는 않지만 심각한 암호화 문제가 없습니다. IKEv1 및 IKEv2에 대해 읽으면 해당 소스는 다음과 같습니다. STS, ISO/IEC IS 9798-3 및 SIGMA(SIGn-and-MAc) 프로토콜 - 하루 저녁에 구현할 수 있을 만큼 간단합니다.

STS/ISO 프로토콜 개발의 최신 링크로서 SIGMA의 장점은 무엇입니까? 이는 우리의 모든 요구 사항(대담자 식별자 "숨기기" 포함)을 충족하며 알려진 암호화 문제가 없습니다. 이는 최소한입니다. 프로토콜 메시지에서 최소한 하나의 요소를 제거하면 보안이 불안정해집니다.

가장 간단한 자체 개발 프로토콜에서 SIGMA로 이동해 보겠습니다. 우리가 관심을 갖는 가장 기본적인 작업은 다음과 같습니다. 핵심 합의: 두 참여자 모두에게 동일한 값을 출력하는 기능으로, 대칭키로 활용 가능합니다. 자세히 설명하지 않고 각 당사자는 임시(한 세션 내에서만 사용됨) 키 쌍(공개 및 개인 키)을 생성하고, 공개 키를 교환하고, 계약 기능을 호출하여 입력에 개인 키와 공개 키를 전달합니다. 대담자의 열쇠.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔xxxxxx ==========╗ │───────────────>│ ║PrvA, PubA = DHgen()║ │ │ ╚= ========╗ ============╝ │ IdB, PubB │ ╔===================╗ │<───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚===================╝ ─ ───┐ ╔==== ===╧============╗ │ ║Key = DH(PrvA, PubB)║ <───┘ ╚=======╤= ======= ====╝ │ │ │ │

누구든지 중간에 뛰어들어 공개 키를 자신의 것으로 교체할 수 있습니다. 이 프로토콜에는 대화 상대에 대한 인증이 없습니다. 수명이 긴 키로 서명을 추가해 보겠습니다.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │IdA, PubA, sign(SignPrvA, (PubA)) │ ╔= │──────────── ────────── ───────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚======= =" | ───────────── ──│ ║SignPrvB, SignPubB = 로드( )║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚========== ============== ==╝ ────┐ ╔ ====================╗ │ │ ║확인( SignPubB, ...)║ │ <───┘ ║Key = DH(Pr vA, PubB) ║ │ │ ╚==================╝ │ │ │

이러한 서명은 특정 세션에 연결되어 있지 않기 때문에 작동하지 않습니다. 이러한 메시지는 다른 참가자와의 세션에도 "적합"합니다. 전체 컨텍스트가 구독해야 합니다. 이로 인해 A의 또 다른 메시지도 추가해야 합니다.

또한 서명 아래에 자신의 식별자를 추가하는 것이 중요합니다. 그렇지 않으면 IdXXX를 대체하고 알려진 다른 대담자의 키로 메시지에 다시 서명할 수 있기 때문입니다. 방지하기 위해 반사 공격, 서명 아래의 요소는 해당 의미에 따라 명확하게 정의된 위치에 있어야 합니다. A가 (PubA, PubB)에 서명하면 B는 (PubB, PubA)에 서명해야 합니다. 이는 또한 직렬화된 데이터의 구조와 형식을 선택하는 것의 중요성을 말해줍니다. 예를 들어, ASN.1 DER 인코딩의 세트는 정렬됩니다. SET OF(PubA, PubB)는 SET OF(PubB, PubA)와 동일합니다.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔xxxxxx =================╗ │──────────────────── ────────── ─────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚===== =" ===============╝ │IdB, PubB, sign(SignPrvB, (IdB, PubA, PubB)) │ ╔=============== =============╗ │<──────────────────────── ────────── ─────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚================= ==========╝ │ 기호(SignPrvA, (IdA, PubB, PubA)) │ ╔================ ====╗ │─ ─────────────────────────────────────── ───>│ ║verify(SignPubB, ...) ║ │ │ ║key = dh (prva, PUBB) ║ │ │ │

그러나 우리는 이 세션에 대해 동일한 공유 키를 생성했다는 것을 아직 "증명"하지 못했습니다. 원칙적으로 이 단계 없이도 수행할 수 있습니다. 첫 번째 전송 연결은 유효하지 않지만 핸드셰이크가 완료되면 모든 것이 실제로 합의되었는지 확인하기를 원합니다. 현재 우리는 ISO/IEC IS 9798-3 프로토콜을 보유하고 있습니다.

생성된 키 자체에 서명할 수 있습니다. 이는 사용된 서명 알고리즘에 누출이 있을 수 있기 때문에 위험합니다(서명당 비트 수는 있지만 여전히 누출됨). 파생 키의 해시에 서명하는 것이 가능하지만 파생 키의 해시를 유출하는 것도 파생 기능에 대한 무차별 대입 공격에서 유용할 수 있습니다. SIGMA는 발신자 ID를 인증하는 MAC 기능을 사용합니다.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔xxxxxxxxxxxxxxxxxx =================╗ │──────────────────── ─────────── ──────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚ ======= ===================╝ │IdB, PubB, sign(SignPrvB, (PubA, PubB)), MAC(IdB) │ ╔==== === │<───────────────── ────────── ──────────────────── ─│ ║SignPrvB, SignPubB = 로드()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚==== ================= ==╝ │ │ ╔===========╗ │ sign(SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Key = DH( PrvA, PubB) ║ │───────────────────── ── ──────────────────── ─────>│ ║verify(Key, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚===xxx =╝ │ │

최적화를 위해 일부는 임시 키를 재사용하기를 원할 수도 있습니다(물론 PFS에게는 불행한 일입니다). 예를 들어 키 쌍을 생성하고 연결을 시도했지만 TCP를 사용할 수 없거나 프로토콜 중간에서 중단되었습니다. 새로운 쌍에 낭비되는 엔트로피와 프로세서 리소스를 낭비하는 것은 부끄러운 일입니다. 따라서 임시 공개 키를 재사용할 때 발생할 수 있는 임의 재생 공격으로부터 보호하는 의사 난수 값인 소위 쿠키를 소개합니다. 쿠키와 임시 공개키의 결합으로 인해 상대방의 공개키가 불필요하게 서명에서 제거될 수 있습니다.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA, CookieA │ ╔xxxxxxxxxxxxxxx ="="" ─────────────────────────────────────── ─>│ ║SignPrvA, SignPubA = 로드( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚======================== ==╝ │IdB, PubB, CookieB , 부호(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔========================= ╗ │< ─────────────────────────────────────── ─────────── ────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚====== ====================╝ │ │ ╔=======Problem SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │─────────────────── ── ─────────────────────────────────────── ───────>│ ║ verify(키, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚===================╝ │ │ │

마지막으로, 우리는 수동적인 관찰자로부터 대화 상대의 프라이버시를 얻고 싶습니다. 이를 위해 SIGMA는 먼저 임시 키를 교환하고 인증 및 식별 메시지를 암호화할 공통 키를 개발할 것을 제안합니다. SIGMA는 두 가지 옵션을 설명합니다:

  • SIGMA-I - 활성 공격으로부터 개시자를 보호하고 수동적 공격으로부터 응답자를 보호합니다. 개시자는 응답자를 인증하고 일치하지 않는 항목이 있으면 식별 정보를 제공하지 않습니다. 피고인과 함께 활성 프로토콜이 시작되면 피고인은 자신의 신분증을 제공합니다. 수동적인 관찰자는 아무것도 배우지 못합니다.
    SIGMA-R - 능동적 공격으로부터 응답자를 보호하고, 수동적 공격으로부터 개시자를 보호합니다. 모든 것이 정반대이지만 이 프로토콜에서는 XNUMX개의 핸드셰이크 메시지가 이미 전송되었습니다.

    우리는 클라이언트-서버 친숙한 것에서 기대하는 것과 더 유사하기 때문에 SIGMA-I를 선택합니다. 클라이언트는 인증된 서버에 의해서만 인식되고 모든 사람이 이미 서버를 알고 있습니다. 또한 핸드셰이크 메시지가 적기 때문에 구현하기가 더 쉽습니다. 프로토콜에 추가하는 것은 메시지의 일부를 암호화하고 식별자 A를 마지막 메시지의 암호화된 부분으로 전송하는 것뿐입니다.

    PubA, CookieA │ ╔============================╗ │──────────────── ───── ────────── ────────────────────────────────── ───── ──────>│ ║SignPrvA , SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚============= ========= ====╝ │ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔====== =======================╗ │<──────────────────────── ───── ────────── ║SignP rvB, SignPubB = load()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚=========== ===============╝ │ │ ╔======== =============╗ │ Enc((IdA, sign( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │──────────────────── ────────────────── ────────── ────────────────────── ──────>│ ║verify(Key, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚================= ==╝ │ │
    
    • 서명에는 GOST R이 사용됩니다. 34.10-2012 256비트 키를 사용하는 알고리즘입니다.
    • 공개 키를 생성하려면 34.10년 2012월 XNUMX일 VKO가 사용됩니다.
    • MAC으로는 CMAC가 사용된다. 기술적으로 이는 GOST R 34.13-2015에 설명된 블록 암호의 특수 작동 모드입니다. 이 모드의 암호화 기능으로 − 메뚜기 (34.12-2015).
    • 공개 키의 해시는 대담자의 식별자로 사용됩니다. 해시로 사용됨 Stribog-256 (34.11년 2012월 256일 XNUMX비트).

    악수 후에 우리는 공유 키에 동의하게 됩니다. 전송 메시지의 인증된 암호화에 이를 사용할 수 있습니다. 이 부분은 매우 간단하고 실수하기 어렵습니다. 메시지 카운터를 늘리고, 메시지를 암호화하고, 카운터와 암호문을 인증(MAC)하고, 보냅니다. 메시지를 받으면 카운터에 예상된 값이 있는지 확인하고 카운터로 암호문을 인증한 후 복호화합니다. 핸드셰이크 메시지, 전송 메시지를 암호화하고 이를 인증하는 방법에는 어떤 키를 사용해야 합니까? 이러한 모든 작업에 하나의 키를 사용하는 것은 위험하고 현명하지 않습니다. 특화된 기능을 사용하여 키를 생성해야 합니다. KDF (키 파생 함수). 다시 말하지만, 머리카락을 쪼개서 뭔가를 발명하지 맙시다. HKDF 오랫동안 알려져 왔으며 잘 연구되었으며 알려진 문제가 없습니다. 안타깝게도 기본 Python 라이브러리에는 이 기능이 없으므로 다음을 사용합니다. hkdf 비닐 봉투. HKDF는 내부적으로 HMAC, 이는 차례로 해시 함수를 사용합니다. Wikipedia 페이지에 있는 Python의 예제 구현에는 단 몇 줄의 코드가 필요합니다. 34.10년 2012월 256일의 경우와 마찬가지로 Stribog-XNUMX을 해시함수로 사용하겠습니다. 키 계약 기능의 출력을 세션 키라고 하며, 여기에서 누락된 대칭 키가 생성됩니다.

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    구조/구성표

    이 모든 데이터를 전송하기 위해 현재 가지고 있는 ASN.1 구조를 살펴보겠습니다.

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS가 서명되는 것입니다. HandshakeTBE - 암호화될 항목입니다. MsgHandshake1의 ukm 필드에 주목하겠습니다. 34.10 VKO는 생성된 키의 무작위화를 더욱 높이기 위해 UKM(사용자 키 자료) 매개변수를 포함합니다. 즉, 엔트로피만 추가됩니다.

    코드에 암호화 추가

    프레임워크는 동일하게 유지되었으므로 원래 코드에 적용된 변경 사항만 고려해 보겠습니다(사실 최종 구현이 먼저 작성된 다음 모든 암호화가 제거되었습니다).

    대담자의 인증 및 식별은 공개 키를 사용하여 수행되므로 이제 오랫동안 어딘가에 저장해야 합니다. 단순화를 위해 다음과 같이 JSON을 사용합니다.

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    our - 키 쌍, XNUMX진수 개인 키 및 공개 키입니다. 그들의 — 대담자의 이름과 공개 키. 명령줄 인수를 변경하고 JSON 데이터의 사후 처리를 추가해 보겠습니다.

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    34.10 알고리즘의 개인 키는 임의의 숫자입니다. 256비트 타원 곡선의 경우 256비트 크기입니다. PyGOST는 바이트 세트에서는 작동하지 않지만 큰 숫자, 따라서 개인 키(urandom(32))는 gost3410.prv_unmarshal()을 사용하여 숫자로 변환되어야 합니다. 공개 키는 gost3410.public_key()를 사용하여 개인 키에서 결정적으로 결정됩니다. 공개 키 34.10은 gost3410.pub_marshal()을 사용하여 저장 및 전송을 쉽게 하기 위해 바이트 시퀀스로 변환해야 하는 두 개의 큰 숫자입니다.

    JSON 파일을 읽은 후에는 gost3410.pub_unmarshal()을 사용하여 공개 키를 다시 변환해야 합니다. 공개 키에서 해시 형식으로 대담자의 식별자를 수신하므로 사전에 즉시 계산하여 빠른 검색을 위해 사전에 배치할 수 있습니다. Stribog-256 해시는 gost34112012256.GOST34112012256()이며, 이는 해시 함수의 hashlib 인터페이스를 완전히 만족합니다.

    개시자 코루틴은 어떻게 변경되었나요? 모든 것은 핸드셰이크 체계에 따릅니다. 우리는 VKO 키 계약 기능에 사용될 임시 키 쌍 128인 쿠키(34.10비트면 충분)를 생성합니다.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • 응답을 기다리고 들어오는 Msg 메시지를 디코딩합니다.
    • handshake1을 받았는지 확인하세요.
    • 상대방의 임시 공개 키를 디코딩하고 세션 키를 계산합니다.
    • 메시지의 TBE 부분을 처리하는 데 필요한 대칭 키를 생성합니다.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM은 64비트 숫자(urandom(8))이며 gost3410_vko.ukm_unmarshal()을 사용하여 바이트 표현에서 역직렬화도 필요합니다. 34.10년 2012월 256일 3410비트용 VKO 기능은 gost34102012256_vko.kek_XNUMX()(KEK - 암호화 키)입니다.

    생성된 세션 키는 이미 256비트 의사 무작위 바이트 시퀀스입니다. 따라서 HKDF 함수에서 즉시 사용할 수 있습니다. GOST34112012256은 hashlib 인터페이스를 만족하므로 Hkdf 클래스에서 바로 사용할 수 있습니다. 참여하는 키 쌍의 임시성으로 인해 생성된 키가 각 세션마다 다르고 이미 충분한 엔트로피를 포함하고 있기 때문에 솔트(Hkdf의 첫 번째 인수)를 지정하지 않습니다. kdf.expand()는 기본적으로 나중에 Grasshopper에 필요한 256비트 키를 이미 생성합니다.

    다음으로 수신 메시지의 TBE 및 TBS 부분을 확인합니다.

    • 들어오는 암호문에 대한 MAC가 계산되고 확인됩니다.
    • 암호문이 해독됩니다.
    • TBE 구조가 디코딩됩니다.
    • 대담 자의 식별자를 가져 와서 그가 우리에게 전혀 알려져 있는지 확인합니다.
    • 이 식별자에 대한 MAC가 계산되고 확인됩니다.
    • 양 당사자의 쿠키와 상대방의 공개 임시 키를 포함하는 TBS 구조에 대한 서명이 확인됩니다. 서명은 대담자의 수명이 긴 서명 키로 확인됩니다.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    위에서 쓴 것처럼 34.13년 2015월 XNUMX일에는 다양한 설명이 나와 있습니다. 블록 암호 작동 모드 34.12년 2015월 3413일부터. 그중에는 모방 삽입 및 MAC 계산을 생성하는 모드가 있습니다. PyGOST에서는 gost34.12.mac()입니다. 이 모드에서는 암호화 기능(하나의 데이터 블록 수신 및 반환), 암호화 블록의 크기 및 실제로 데이터 자체를 전달해야 합니다. 암호화 블록의 크기를 하드코딩할 수 없는 이유는 무엇입니까? 2015년 128월 64일에는 XNUMX비트 Grasshopper 암호뿐만 아니라 XNUMX비트 암호도 설명되어 있습니다. 연한 덩어리 - 약간 수정된 GOST 28147-89는 KGB에서 다시 생성되었으며 여전히 가장 높은 안전 임계값 중 하나를 가지고 있습니다.

    Kuznechik은 gost.3412.GOST3412Kuznechik(key)를 호출하여 초기화되고 34.13 함수에 전달하는 데 적합한 .encrypt()/.decrypt() 메서드가 있는 객체를 반환합니다. MAC는 다음과 같이 계산됩니다: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext). 계산된 MAC과 수신된 MAC를 비교하려면 바이트 문자열의 일반적인 비교(==)를 사용할 수 없습니다. 이 작업은 비교 시간을 누출하여 일반적으로 다음과 같은 치명적인 취약점을 초래할 수 있기 때문입니다. 비스트 TLS에 대한 공격. Python에는 이를 위한 특별한 함수인 hmac.compare_digest가 있습니다.

    블록 암호화 기능은 하나의 데이터 블록만 암호화할 수 있습니다. 숫자가 더 크거나 길이의 배수가 아니더라도 암호화 모드를 사용해야 합니다. 34.13-2015에는 ECB, CTR, OFB, CBC, CFB가 설명되어 있습니다. 각각에는 허용되는 적용 영역과 특성이 있습니다. 아쉽게도 아직 표준화가 되어 있지 않습니다. 인증된 암호화 모드 (예: CCM, OCB, GCM 등) - 최소한 MAC를 직접 추가해야 합니다. 나는 선택한다 카운터 모드 (CTR): 블록 크기에 대한 패딩이 필요하지 않고, 병렬화될 수 있으며, 암호화 기능만 사용하고, 많은 수의 메시지를 암호화하는 데 안전하게 사용할 수 있습니다(비교적 빠르게 충돌하는 CBC와는 달리).

    .mac()과 마찬가지로 .ctr()도 비슷한 입력을 받습니다: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). 암호화 블록 길이의 정확히 절반인 초기화 벡터를 지정해야 합니다. 암호화 키가 하나의 메시지(여러 블록의 메시지임에도 불구하고)를 암호화하는 데만 사용되는 경우 초기화 벡터를 XNUMX으로 설정하는 것이 안전합니다. 핸드셰이크 메시지를 암호화하기 위해 매번 별도의 키를 사용합니다.

    gost3410.verify() 서명을 확인하는 것은 간단합니다. 작업 중인 타원 곡선(GOSTIM 프로토콜에 간단히 기록함), 서명자의 공개 키(34.11개의 튜플이어야 함을 잊지 마세요)를 전달합니다. 큰 숫자, 바이트 문자열 아님), 2012년 XNUMX월 XNUMX일 해시 및 서명 자체.

    다음으로, 개시자에서 우리는 핸드셰이크 메시지를 준비하고 handshake2에 보내며, 확인 중에 했던 것과 동일한 작업을 대칭적으로만 수행합니다. 즉 확인하는 대신 키에 서명하는 등입니다.

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    세션이 설정되면 전송 키가 생성되고(각 당사자에 대해 암호화, 인증을 위한 별도의 키), Grasshopper가 초기화되어 MAC을 해독하고 확인합니다.

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    msg_sender 코루틴은 이제 메시지를 TCP 연결로 보내기 전에 암호화합니다. 각 메시지에는 단조롭게 증가하는 nonce가 있으며 이는 카운터 모드에서 암호화될 때 초기화 벡터이기도 합니다. 각 메시지와 메시지 블록은 서로 다른 카운터 값을 갖습니다.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    수신 메시지는 인증 및 암호 해독을 처리하는 msg_receiver 코루틴에 의해 처리됩니다.

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    결론

    GOSTIM은 교육 목적으로만 사용하도록 고안되었습니다(적어도 테스트에서는 다루지 않기 때문입니다)! 프로그램의 소스코드를 다운로드 받을 수 있습니다. 여기에 (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, 피데라스N, NCCP, GoVPN, GOSTIM은 완전히 무료 소프트웨어, 약관에 따라 배포됨 GPLv3 +.

    세르게이 마트 비프, 사이퍼펑크회원 SPO재단, Python/Go 개발자, 수석 전문가 FSUE "STC "아틀라스".

출처 : habr.com

코멘트를 추가