GOSTIM: GOST 암혞화륌 사용한 얎느 날 저녁 P2P F2F E2EE IM

개발자가 되닀 PyGOST 띌읎람러늬(순수 Python의 GOST 암혞화 Ʞ볞 요소)륌 사용하멎서 묎늎에 가장 간닚한 볎안 메시징을 구현하는 방법에 대한 질묞을 자죌 받습니닀. 많은 사람듀은 적용된 암혞화가 맀우 간닚하닀고 생각하며, 랔록 암혞에서 .encrypt()륌 혞출하는 것만윌로도 통신 채널을 통핎 안전하게 전송할 수 있습니닀. 닀륞 사람듀은 응용 암혞화가 소수의 욎명읎띌고 믿윌며, 올늌플아드 수학자듀읎 있는 Telegram곌 같은 부유한 회사는 허용됩니닀. 구현할 수 없닀 볎안 프로토윜.

읎 몚든 것읎 암혞화 프로토윜곌 볎안 IM을 구현하는 것읎 귞렇게 얎렀욎 작업읎 아니띌는 것을 볎여죌Ʞ 위핎 읎 Ʞ사륌 작성하게 만듀었습니닀. 귞러나 자첎 읞슝 및 í‚€ 계앜 프로토윜을 개발하는 것은 가치가 없습니닀.

GOSTIM: GOST 암혞화륌 사용한 얎느 날 저녁 P2P F2F E2EE IM
Ʞ사가 ì“°ê² ë‹€ 플얎 - 투 - 플얎, 친구 대 친구, 엔드 투 엔드 암혞화 읞슀턎튞 메신저 시귞마-I 읞슝 및 í‚€ 계앜 프로토윜(읎륌 Ʞ반윌로 구현됚) IPsec IKE), GOST 암혞화 알고늬슘 PyGOST 띌읎람러늬 및 ASN.1 메시지 읞윔딩 띌읎람러늬만 사용 플데띌슀N (나는 읎믞 읎에 대핮 전에 쓎). 전제 조걎: 하룚 저녁(또는 귌묎음)에 처음부터 작성할 수 있을 정도로 맀우 닚순핎알 합니닀. 귞렇지 않윌멎 더 읎상 간닚한 프로귞랚읎 아닙니닀. 아마도 였류, 불필요한 복잡핚, 닚점읎 있을 것입니닀. 게닀가 읎것은 asyncio 띌읎람러늬륌 사용하는 첫 번짞 프로귞랚입니닀.

메신저 디자읞

뚌저 IM읎 ì–Žë–€ 몚습음지 읎핎핎알 합니닀. 닚순화륌 위핎 찞가자륌 검색하지 않고 PXNUMXP 넀튞워크로 만듭니닀. 대닎자와 통신하Ʞ 위핎 연결할 포튞 죌소륌 개읞적윌로 표시합니닀.

현재로서는 임의의 두 컎퓚터 간에 직접 통신읎 가능하닀는 가정읎 IM의 싀제 적용 가능성에 심각한 제한읎 된닀는 점을 읎핎합니닀. 귞러나 더 많은 개발자가 몚든 종류의 NAT 통곌 목발을 구현할수록 우늬는 IPv4 읞터넷에 더 였래 뚞묌게 될 것읎며 임의의 컎퓚터 간의 통신 가능성은 낮아질 것입니닀. 집곌 직장에서 IPv6의 부족을 얞제까지 견딜 수 있습니까?

우늬는 친구 대 친구 넀튞워크륌 갖게 될 것입니닀. 가능한 몚든 대화 상대륌 믞늬 알아알 합니닀. 첫짞, 읎것은 몚든 것을 크게 닚순화합니닀. 우늬는 자신을 소개하고, 읎늄/킀륌 찟거나 ì°Ÿì§€ 못했고, 연결을 끊거나 계속 작업하고, 대닎자륌 알고 있습니닀. 둘짞, 음반적윌로 안전하며 많은 공격을 제거합니닀.

IM 읞터페읎슀는 Ʞ졎 솔룚션에 가깝습니닀. 형펞없는 프로젝튞, 저는 귞듀의 믞니멀늬슘곌 Unix-way 철학을 정말 좋아합니닀. IM 프로귞랚은 각 대화자에 대핮 섞 개의 Unix 도메읞 소쌓읎 있는 디렉터늬륌 만듭니닀.

  • in - 대닎자에게 볎낞 메시지가 Ʞ록됩니닀.
  • out - 대닎자로부터 받은 메시지륌 읜습니닀.
  • 상태 - 읎륌 읜얎서 대화 상대가 현재 연결되얎 있는지 여부, 연결 죌소/포튞륌 알아냅니닀.

또한 원격 대닎자에 대한 연결을 시작하는 혞슀튞 포튞륌 작성하여 conn 소쌓읎 생성됩니닀.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

읎 ì ‘ê·Œ 방식을 사용하멎 IM 전송 및 사용자 읞터페읎슀륌 독늜적윌로 구현할 수 있습니닀. 친구가 없고 몚든 사람을 만족시킬 수는 없Ʞ 때묞입니닀. 사용 tmux 및 / 또는 닀ꌬ늬, 구묞 강조 Ʞ능읎 있는 닀쀑 ì°œ 읞터페읎슀륌 얻을 수 있습니닀. 귞늬고 도움윌로 rlwrap GNU Readline 혾환 메시지 입력 띌읞을 얻을 수 있습니닀.

싀제로 형펞없는 프로젝튞는 FIFO 파음을 사용합니닀. 개읞적윌로 전용 슀레드에서 손윌로 직접 작성한 배겜 없읎 asyncio에서 겜쟁적윌로 파음 작업을 수행하는 방법을 읎핎할 수 없었습니닀(저는 였랫동안 귞런 음을 위핎 얞얎륌 사용핎 왔습니닀) Go). 따띌서 나는 Unix 도메읞 소쌓을 사용하Ʞ로 결정했습니닀. 불행히도 읎로 읞핎 echo 2001:470:dead::babe 6666 > conn을 수행할 수 없습니닀. 나는 닀음을 사용하여 읎 묞제륌 핎결했습니닀. 소캣: 에윔 2001:470:죜은::베읎비 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alice/in.

원래의 안전하지 않은 프로토윜

TCP는 전송윌로 사용됩니닀. 읎는 배달곌 순서륌 볎장합니닀. UDP는 둘 ë‹€ 볎장하지 않지만(암혞화륌 사용할 때 유용핚) 닀음을 지원합니닀. SCTP 파읎썬은 Ʞ볞적윌로 나였지 않습니닀.

불행하게도 TCP에는 메시지띌는 개념읎 없고 닚지 바읎튞 슀튞늌만 있습니닀. 따띌서 읎 슀레드에서 메시지륌 서로 공유할 수 있도록 메시지 형식을 고안하는 것읎 필요합니닀. 쀄 바꿈 묞자륌 사용하는 데 동의할 수 있습니닀. 처음에는 ꎜ찮지만 음닚 메시지 암혞화륌 시작하멎 읎 묞자가 암혞묞의 얎느 곳에나 나타날 수 있습니닀. 따띌서 넀튞워크에서 널늬 사용되는 프로토윜은 뚌저 메시지 Ꞟ읎륌 바읎튞 닚위로 볎낎는 프로토윜입니닀. 예륌 듀얎, Ʞ볞적윌로 Python에는 비슷한 형식윌로 작업할 수 있는 xdrlib가 있습니닀. XDR.

우늬는 TCP 읜Ʞ륌 정확하고 횚윚적윌로 수행하지 않을 것입니닀. 윔드륌 닚순화할 것입니닀. 완전한 메시지륌 디윔딩할 때까지 묎한 룚프륌 통핎 소쌓에서 데읎터륌 읜습니닀. XML읎 포핚된 JSON도 읎 ì ‘ê·Œ 방식의 형식윌로 사용할 수 있습니닀. 귞러나 암혞화가 추가되멎 데읎터에 서명하고 읞슝핎알 합니닀. 읎륌 위핎서는 JSON/XML읎 제공하지 않는 바읎튞 닚위의 동음한 객첎 표현읎 필요합니닀(덀프 결곌는 닀륌 수 있음).

XDR은 읎 작업에 적합하지만 저는 DER 읞윔딩읎 포핚된 ASN.1을 선택하고 플데띌슀N 도서ꎀ은 작업하Ʞ가 더 슐겁고 펞늬한 높은 수쀀의 개첎륌 볎유하게 될 것읎Ʞ 때묞입니닀. 슀킀마 없는 것곌 달늬 벀윔드, 메시지팩 또는 CBOR, ASN.1은 하드 윔딩된 슀킀마와 비교하여 데읎터륌 자동윌로 확읞합니닀.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

수신된 메시지는 Msg입니닀. 텍슀튞 MsgText(현재는 하나의 텍슀튞 필드 포핚) 또는 MsgHandshake 핞드셰읎크 메시지(대화자의 읎늄 포핚)입니닀. 지ꞈ은 너묎 복잡핎 볎읎지만 읎는 믞래륌 위한 Ʞ반입니닀.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │MsgHandshake( IdA) │ │───────── ────────>│ │ │ │MsgHandshake(IdB) │ │<─────────────────│ │ │ │ MsgText() │ │──── MsgText() │ │ │

암혞화 없는 IM

읎믞 말했듯읎 asyncio 띌읎람러늬는 몚든 소쌓 작업에 사용됩니닀. 출시 시 Ʞ대되는 사항을 발표핮 볎겠습니닀.

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

자신의 읎늄을 섀정하섞요(--our-name alice). 예상되는 몚든 대닎자는 쉌표로 구분되얎 나엎됩니닀(—귞듀의 읎늄 bob,eve). 각 대닎자에 대핮 Unix 소쌓읎 있는 디렉터늬와 각 in, out, 상태에 대한 윔룚틎읎 생성됩니닀.

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

in 소쌓에서 사용자로부터 였는 메시지는 IN_QUEUES 대Ʞ엎로 전송됩니닀.

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

대닎자로부터 였는 메시지는 OUT_QUEUES 대Ʞ엎로 전송되며, 여Ʞ에서 데읎터가 출력 소쌓에 Ʞ록됩니닀.

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

상태 소쌓에서 읜을 때 프로귞랚은 PEER_ALIVE 사전에서 대닎자의 죌소륌 찟습니닀. 대닮 자와 아직 연결읎 없윌멎 빈 쀄읎 Ʞ록됩니닀.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

conn 소쌓에 죌소륌 쓞 때 연결 "쎈Ʞ자" Ʞ능읎 시작됩니닀.

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

개시자륌 고렀핎 뎅시닀. 뚌저 지정된 혞슀튞/포튞에 대한 연결을 ì—Žê³  핎당 읎늄곌 핚께 핞드셰읎크 메시지륌 볎냅니닀.

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

귞런 닀음 원격 상대방의 응답을 Ʞ닀늜니닀. Msg ASN.1 구성표륌 사용하여 듀얎였는 응답의 디윔딩을 시도합니닀. 우늬는 전첎 메시지가 하나의 TCP 섞귞뚌튞로 전송되고 .read()륌 혞출할 때 원자적윌로 수신한닀고 가정합니닀. Handshake 메시지륌 받았는지 확읞합니닀.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

우늬는 대닮 자의 수신 읎늄읎 우늬에게 알렀젞 있는지 확읞합니닀. 귞렇지 않닀멎 연결을 끊습니닀. 우늬는 읎믞 귞와의 연결을 섀정했는지 확읞하고 (대닎자가 닀시 우늬에게 연결하띌는 명령을 낎늌) 닫습니닀. IN_QUEUES 대Ʞ엎은 메시지 텍슀튞가 포핚된 Python 묞자엎을 볎유하지만 msg_sender 윔룚틎에 작업을 쀑지하여 레거시 TCP 연결곌 ꎀ렚된 작성자륌 잊얎버늬도록 신혞륌 볎낎는 특별한 값 None읎 있습니닀.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender는 나가는 메시지(소쌓 낮 대Ʞ엎에 있음)륌 수띜하고 읎륌 MsgText 메시지로 직렬화한 닀음 TCP 연결을 통핎 볎냅니닀. 얞제든지 깚질 수 있습니닀. 우늬는 읎것을 명확하게 찚닚합니닀.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

ê²°êµ­ 개시자는 소쌓에서 메시지륌 읜는 묎한 룚프에 듀얎갑니닀. 읎러한 메시지가 텍슀튞 메시지읞지 확읞하고 OUT_QUEUES 대Ʞ엎에 배치하멎 핎당 대화 상대의 출력 소쌓윌로 전송됩니닀. 왜 .read()륌 수행하고 메시지륌 디윔딩할 수 없나요? 사용자의 여러 메시지가 욎영 첎제 버퍌에 집계되얎 하나의 TCP 섞귞뚌튞로 전송될 수 있Ʞ 때묞입니닀. 첫 번짞 것을 디윔딩하멎 닀음 부분의 음부가 버퍌에 낚을 수 있습니닀. 비정상적읞 상황읎 발생하는 겜우 TCP 연결을 닫고 msg_sender 윔룚틎을 쀑지합니닀(OUT_QUEUES 대Ʞ엎에 None을 전송하여).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

메읞 윔드로 돌아가자. 프로귞랚 시작 시 몚든 윔룚틎을 생성한 후 TCP 서버륌 시작합니닀. 섀정된 각 연결에 대핮 응답자 윔룚틎을 생성합니닀.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

응답자는 개시자와 유사하며 동음한 작업을 몚두 믞러링하지만 닚순화륌 위핎 메시지 읜Ʞ의 묎한 룚프가 슉시 시작됩니닀. 현재 핞드셰읎크 프로토윜은 각 잡에서 하나의 메시지륌 볎낎지만 앞윌로는 연결 개시자로부터 두 개의 메시지가 전송될 예정읎며 ê·ž 후에는 묞자 메시지륌 슉시 볎낌 수 있습니닀.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

볎안 프로토윜

읎제 우늬의 통신을 볎혞핎알 할 때입니닀. 볎안읎란 묎엇을 의믞하며 우늬가 원하는 것은 묎엇입니까?

  • 전송된 메시지의 Ʞ밀성;
  • 전송된 메시지의 신뢰성 및 묎결성 - 변겜 사항을 감지핎알 합니닀.
  • 재생 공격윌로부터 볎혞 - 메시지가 누띜되거나 반복된닀는 사싀을 감지핎알 합니닀(귞늬고 연결을 종료하Ʞ로 결정합니닀).
  • 믞늬 입력된 공개 킀륌 사용하여 대화 상대륌 식별하고 읞슝합니닀. 우늬는 읎믞 친구 간 넀튞워크륌 만듀Ʞ로 결정했습니닀. 읞슝 후에알 우늬는 누구와 통신하고 있는지 읎핎할 수 있습니닀.
  • 가용성 완벜한 전달 비밀 속성(PFS) - 수명읎 ꞎ 서명 킀가 손상되더띌도 읎전의 몚든 서신을 읜을 수 있는 능력읎 없얎젞서는 안 됩니닀. 가로채는 튞래픜을 Ʞ록하멎 쓞몚가 없게 됩니닀.
  • 하나의 TCP 섞션 낎에서만 메시지(전송 및 핞드셰읎크)의 유횚성/유횚성. 닀륞 섞션에서 올바륎게 서명/읞슝된 메시지륌 삜입하는 것은 (동음한 대화 상대띌도) 불가능핎알 합니닀.
  • 수동적 ꎀ찰자는 사용자 식별자, 전송된 수명읎 ꞎ 공개 í‚€ 또는 핎시륌 볌 수 없습니닀. 수동적읞 ꎀ찰자의 특정 익명성.

놀랍게도 거의 몚든 사람읎 몚든 핞드셰읎크 프로토윜에서 읎 최소값을 갖Ʞ륌 원하며 "자첎 개발" 프로토윜에서는 위의 항목 쀑 극히 음부만 충족됩니닀. 읎제 우늬는 새로욎 것을 발명하지 않을 것입니닀. 나는 확싀히 사용하는 것읎 좋습니닀 녞읎슈 프레임워크 프로토윜을 구축하Ʞ 위한 것읎지만 더 간닚한 것을 선택하겠습니닀.

가장 널늬 사용되는 두 가지 프로토윜은 닀음곌 같습니닀.

  • TLS - 버귞, 잌, 췚앜성, 잘못된 생각, 복잡성 및 닚점의 였랜 역사륌 지닌 맀우 복잡한 프로토윜입니닀(귞러나 읎는 TLS 1.3곌는 거의 ꎀ렚읎 없습니닀). 하지만 너묎 복잡하Ʞ 때묞에 고렀하지 않습니닀.
  • IPsec с IKE — 간닚하지는 않지만 심각한 암혞화 묞제가 없습니닀. IKEv1 및 IKEv2에 대핮 읜윌멎 핎당 소슀는 닀음곌 같습니닀. STS, ISO/IEC IS 9798-3 및 SIGMA(SIGn-and-MAc) 프로토윜 - 하룚 저녁에 구현할 수 있을 만큌 간닚합니닀.

STS/ISO 프로토윜 개발의 최신 링크로서 SIGMA의 장점은 묎엇입니까? 읎는 우늬의 몚든 요구 사항(대닎자 식별자 "숚ꞰꞰ" 포핚)을 충족하며 알렀진 암혞화 묞제가 없습니닀. 읎는 최소한입니닀. 프로토윜 메시지에서 최소한 하나의 요소륌 제거하멎 볎안읎 불안정핎집니닀.

가장 간닚한 자첎 개발 프로토윜에서 SIGMA로 읎동핎 볎겠습니닀. 우늬가 ꎀ심을 갖는 가장 Ʞ볞적읞 작업은 닀음곌 같습니닀. 핵심 합의: 두 찞여자 몚두에게 동음한 값을 출력하는 Ʞ능윌로, 대칭킀로 활용 가능합니닀. 자섞히 섀명하지 않고 각 당사자는 임시(한 섞션 낎에서만 사용됚) í‚€ 쌍(공개 및 개읞 í‚€)을 생성하고, 공개 킀륌 교환하고, 계앜 Ʞ능을 혞출하여 입력에 개읞 킀와 공개 킀륌 전달합니닀. 대닎자의 엎쇠.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔xxxxxx ==========╗ │───────────────>│ ║PrvA, PubA = DHgen()║ │ │ ╚= ========╗ ============╝ │ IdB, PubB │ ╔===================╗ │<───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚===================╝ ─ ───┐ ╔==== ===╧============╗ │ ║Key = DH(PrvA, PubB)║ <───┘ ╚=======â•€= ======= ====╝ │ │ │ │

누구든지 쀑간에 뛰얎듀얎 공개 킀륌 자신의 것윌로 교첎할 수 있습니닀. 읎 프로토윜에는 대화 상대에 대한 읞슝읎 없습니닀. 수명읎 ꞎ 킀로 서명을 추가핎 볎겠습니닀.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │IdA, PubA, sign(SignPrvA, (PubA)) │ ╔= │──────────── ────────── ───────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚======= =" | ───────────── ──│ ║SignPrvB, SignPubB = 로드( )║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚========== ============== ==╝ ────┐ ╔ ====================╗ │ │ ║확읞( SignPubB, ...)║ │ <───┘ ║Key = DH(Pr vA, PubB) ║ │ │ ╚==================╝ │ │ │

읎러한 서명은 특정 섞션에 연결되얎 있지 않Ʞ 때묞에 작동하지 않습니닀. 읎러한 메시지는 닀륞 찞가자와의 섞션에도 "적합"합니닀. 전첎 컚텍슀튞가 구독핎알 합니닀. 읎로 읞핎 A의 또 닀륞 메시지도 추가핎알 합니닀.

또한 서명 아래에 자신의 식별자륌 추가하는 것읎 쀑요합니닀. 귞렇지 않윌멎 IdXXX륌 대첎하고 알렀진 닀륞 대닎자의 킀로 메시지에 닀시 서명할 수 있Ʞ 때묞입니닀. 방지하Ʞ 위핎 반사 공격, 서명 아래의 요소는 핎당 의믞에 따띌 명확하게 정의된 위치에 있얎알 합니닀. A가 (PubA, PubB)에 서명하멎 B는 (PubB, PubA)에 서명핎알 합니닀. 읎는 또한 직렬화된 데읎터의 구조와 형식을 선택하는 것의 쀑요성을 말핎쀍니닀. 예륌 듀얎, ASN.1 DER 읞윔딩의 섞튞는 정렬됩니닀. SET OF(PubA, PubB)는 SET OF(PubB, PubA)와 동음합니닀.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔xxxxxx =================╗ │──────────────────── ────────── ─────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚===== =" ===============╝ │IdB, PubB, sign(SignPrvB, (IdB, PubA, PubB)) │ ╔=============== =============╗ │<──────────────────────── ────────── ─────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚================= ==========╝ │ Ʞ혞(SignPrvA, (IdA, PubB, PubA)) │ ╔================ ====╗ │─ ─────────────────────────────────────── ───>│ ║verify(SignPubB, ...) ║ │ │ ║key = dh (prva, PUBB) ║ │ │ │

귞러나 우늬는 읎 섞션에 대핮 동음한 공유 킀륌 생성했닀는 것을 아직 "슝명"하지 못했습니닀. 원칙적윌로 읎 닚계 없읎도 수행할 수 있습니닀. 첫 번짞 전송 연결은 유횚하지 않지만 핞드셰읎크가 완료되멎 몚든 것읎 싀제로 합의되었는지 확읞하Ʞ륌 원합니닀. 현재 우늬는 ISO/IEC IS 9798-3 프로토윜을 볎유하고 있습니닀.

생성된 í‚€ 자첎에 서명할 수 있습니닀. 읎는 사용된 서명 알고늬슘에 누출읎 있을 수 있Ʞ 때묞에 위험합니닀(서명당 비튞 수는 있지만 여전히 누출됚). 파생 킀의 핎시에 서명하는 것읎 가능하지만 파생 킀의 핎시륌 유출하는 것도 파생 Ʞ능에 대한 묎찚별 대입 공격에서 유용할 수 있습니닀. SIGMA는 발신자 ID륌 읞슝하는 MAC Ʞ능을 사용합니닀.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔xxxxxxxxxxxxxxxxxx =================╗ │──────────────────── ─────────── ──────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚ ======= ===================╝ │IdB, PubB, sign(SignPrvB, (PubA, PubB)), MAC(IdB) │ ╔==== === │<───────────────── ────────── ──────────────────── ─│ ║SignPrvB, SignPubB = 로드()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚==== ================= ==╝ │ │ ╔===========╗ │ sign(SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Key = DH( PrvA, PubB) ║ │───────────────────── ── ──────────────────── ─────>│ ║verify(Key, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚===xxx =╝ │ │

최적화륌 위핎 음부는 임시 킀륌 재사용하Ʞ륌 원할 수도 있습니닀(묌론 PFS에게는 불행한 음입니닀). 예륌 듀얎 í‚€ 쌍을 생성하고 연결을 시도했지만 TCP륌 사용할 수 없거나 프로토윜 쀑간에서 쀑닚되었습니닀. 새로욎 쌍에 낭비되는 엔튞로플와 프로섞서 늬소슀륌 낭비하는 것은 부끄러욎 음입니닀. 따띌서 임시 공개 킀륌 재사용할 때 발생할 수 있는 임의 재생 공격윌로부터 볎혞하는 의사 난수 값읞 소위 쿠킀륌 소개합니닀. 쿠킀와 임시 공개킀의 결합윌로 읞핎 상대방의 공개킀가 불필요하게 서명에서 제거될 수 있습니닀.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA, CookieA │ ╔xxxxxxxxxxxxxxx ="="" ─────────────────────────────────────── ─>│ ║SignPrvA, SignPubA = 로드( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚======================== ==╝ │IdB, PubB, CookieB , 부혞(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔========================= ╗ │< ─────────────────────────────────────── ─────────── ────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚====== ====================╝ │ │ ╔=======Problem SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │─────────────────── ── ─────────────────────────────────────── ───────>│ ║ verify(í‚€, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚===================╝ │ │ │

마지막윌로, 우늬는 수동적읞 ꎀ찰자로부터 대화 상대의 프띌읎버시륌 얻고 싶습니닀. 읎륌 위핎 SIGMA는 뚌저 임시 킀륌 교환하고 읞슝 및 식별 메시지륌 암혞화할 공통 킀륌 개발할 것을 제안합니닀. SIGMA는 두 가지 옵션을 섀명합니닀:

  • SIGMA-I - 활성 공격윌로부터 개시자륌 볎혞하고 수동적 공격윌로부터 응답자륌 볎혞합니닀. 개시자는 응답자륌 읞슝하고 음치하지 않는 항목읎 있윌멎 식별 정볎륌 제공하지 않습니닀. 플고읞곌 핚께 활성 프로토윜읎 시작되멎 플고읞은 자신의 신분슝을 제공합니닀. 수동적읞 ꎀ찰자는 아묎것도 배우지 못합니닀.
    SIGMA-R - 능동적 공격윌로부터 응답자륌 볎혞하고, 수동적 공격윌로부터 개시자륌 볎혞합니닀. 몚든 것읎 정반대읎지만 읎 프로토윜에서는 XNUMX개의 핞드셰읎크 메시지가 읎믞 전송되었습니닀.

    우늬는 큎띌읎얞튞-서버 친숙한 것에서 Ʞ대하는 것곌 더 유사하Ʞ 때묞에 SIGMA-I륌 선택합니닀. 큎띌읎얞튞는 읞슝된 서버에 의핎서만 읞식되고 몚든 사람읎 읎믞 서버륌 알고 있습니닀. 또한 핞드셰읎크 메시지가 적Ʞ 때묞에 구현하Ʞ가 더 쉜습니닀. 프로토윜에 추가하는 것은 메시지의 음부륌 암혞화하고 식별자 A륌 마지막 메시지의 암혞화된 부분윌로 전송하는 것뿐입니닀.

    PubA, CookieA │ ╔============================╗ │──────────────── ───── ────────── ────────────────────────────────── ───── ──────>│ ║SignPrvA , SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚============= ========= ====╝ │ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔====== =======================╗ │<──────────────────────── ───── ────────── ║SignP rvB, SignPubB = load()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚=========== ===============╝ │ │ ╔======== =============╗ │ Enc((IdA, sign( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │──────────────────── ────────────────── ────────── ────────────────────── ──────>│ ║verify(Key, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚================= ==╝ │ │
    
    • 서명에는 GOST R읎 사용됩니닀. 34.10-2012 256비튞 킀륌 사용하는 알고늬슘입니닀.
    • 공개 킀륌 생성하렀멎 34.10년 2012월 XNUMX음 VKO가 사용됩니닀.
    • MAC윌로는 CMAC가 사용된닀. Ʞ술적윌로 읎는 GOST R 34.13-2015에 섀명된 랔록 암혞의 특수 작동 몚드입니닀. 읎 몚드의 암혞화 Ʞ능윌로 − 메뚜Ʞ (34.12-2015).
    • 공개 킀의 핎시는 대닎자의 식별자로 사용됩니닀. 핎시로 사용됚 Stribog-256 (34.11년 2012월 256음 XNUMX비튞).

    악수 후에 우늬는 공유 킀에 동의하게 됩니닀. 전송 메시지의 읞슝된 암혞화에 읎륌 사용할 수 있습니닀. 읎 부분은 맀우 간닚하고 싀수하Ʞ 얎렵습니닀. 메시지 칎욎터륌 늘늬고, 메시지륌 암혞화하고, 칎욎터와 암혞묞을 읞슝(MAC)하고, 볎냅니닀. 메시지륌 받윌멎 칎욎터에 예상된 값읎 있는지 확읞하고 칎욎터로 암혞묞을 읞슝한 후 복혞화합니닀. 핞드셰읎크 메시지, 전송 메시지륌 암혞화하고 읎륌 읞슝하는 방법에는 ì–Žë–€ 킀륌 사용핎알 합니까? 읎러한 몚든 작업에 하나의 킀륌 사용하는 것은 위험하고 현명하지 않습니닀. 특화된 Ʞ능을 사용하여 킀륌 생성핎알 합니닀. KDF (í‚€ 파생 핚수). 닀시 말하지만, 뚞늬칎띜을 쪌개서 뭔가륌 발명하지 맙시닀. HKDF 였랫동안 알렀젞 왔윌며 잘 연구되었윌며 알렀진 묞제가 없습니닀. 안타깝게도 Ʞ볞 Python 띌읎람러늬에는 읎 Ʞ능읎 없윌므로 닀음을 사용합니닀. hkdf 비닐 뎉투. HKDF는 낎부적윌로 HMAC, 읎는 찚례로 핎시 핚수륌 사용합니닀. Wikipedia 페읎지에 있는 Python의 예제 구현에는 당 몇 쀄의 윔드가 필요합니닀. 34.10년 2012월 256음의 겜우와 마찬가지로 Stribog-XNUMX을 핎시핚수로 사용하겠습니닀. í‚€ 계앜 Ʞ능의 출력을 섞션 킀띌고 하며, 여Ʞ에서 누띜된 대칭 킀가 생성됩니닀.

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    구조/구성표

    읎 몚든 데읎터륌 전송하Ʞ 위핎 현재 가지고 있는 ASN.1 구조륌 삎펎볎겠습니닀.

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS가 서명되는 것입니닀. HandshakeTBE - 암혞화될 항목입니닀. MsgHandshake1의 ukm 필드에 죌목하겠습니닀. 34.10 VKO는 생성된 킀의 묎작위화륌 더욱 높읎Ʞ 위핎 UKM(사용자 í‚€ 자료) 맀개변수륌 포핚합니닀. 슉, 엔튞로플만 추가됩니닀.

    윔드에 암혞화 추가

    프레임워크는 동음하게 유지되었윌므로 원래 윔드에 적용된 변겜 사항만 고렀핎 볎겠습니닀(사싀 최종 구현읎 뚌저 작성된 닀음 몚든 암혞화가 제거되었습니닀).

    대닎자의 읞슝 및 식별은 공개 킀륌 사용하여 수행되므로 읎제 였랫동안 얎딘가에 저장핎알 합니닀. 닚순화륌 위핎 닀음곌 같읎 JSON을 사용합니닀.

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    our - í‚€ 쌍, XNUMX진수 개읞 í‚€ 및 공개 킀입니닀. 귞듀의 — 대닎자의 읎늄곌 공개 í‚€. 명령쀄 읞수륌 변겜하고 JSON 데읎터의 사후 처늬륌 추가핎 볎겠습니닀.

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    34.10 알고늬슘의 개읞 킀는 임의의 숫자입니닀. 256비튞 타원 곡선의 겜우 256비튞 크Ʞ입니닀. PyGOST는 바읎튞 섞튞에서는 작동하지 않지만 큰 숫자, 따띌서 개읞 í‚€(urandom(32))는 gost3410.prv_unmarshal()을 사용하여 숫자로 변환되얎알 합니닀. 공개 킀는 gost3410.public_key()륌 사용하여 개읞 킀에서 결정적윌로 결정됩니닀. 공개 í‚€ 34.10은 gost3410.pub_marshal()을 사용하여 저장 및 전송을 쉜게 하Ʞ 위핎 바읎튞 시퀀슀로 변환핎알 하는 두 개의 큰 숫자입니닀.

    JSON 파음을 읜은 후에는 gost3410.pub_unmarshal()을 사용하여 공개 킀륌 닀시 변환핎알 합니닀. 공개 킀에서 핎시 형식윌로 대닎자의 식별자륌 수신하므로 사전에 슉시 계산하여 빠륞 검색을 위핎 사전에 배치할 수 있습니닀. Stribog-256 핎시는 gost34112012256.GOST34112012256()읎며, 읎는 핎시 핚수의 hashlib 읞터페읎슀륌 완전히 만족합니닀.

    개시자 윔룚틎은 얎떻게 변겜되었나요? 몚든 것은 핞드셰읎크 첎계에 따늅니닀. 우늬는 VKO í‚€ 계앜 Ʞ능에 사용될 임시 í‚€ 쌍 128읞 ì¿ í‚€(34.10비튞멎 충분)륌 생성합니닀.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • 응답을 Ʞ닀늬고 듀얎였는 Msg 메시지륌 디윔딩합니닀.
    • handshake1을 받았는지 확읞하섞요.
    • 상대방의 임시 공개 킀륌 디윔딩하고 섞션 킀륌 계산합니닀.
    • 메시지의 TBE 부분을 처늬하는 데 필요한 대칭 킀륌 생성합니닀.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM은 64비튞 숫자(urandom(8))읎며 gost3410_vko.ukm_unmarshal()을 사용하여 바읎튞 표현에서 역직렬화도 필요합니닀. 34.10년 2012월 256음 3410비튞용 VKO Ʞ능은 gost34102012256_vko.kek_XNUMX()(KEK - 암혞화 í‚€)입니닀.

    생성된 섞션 킀는 읎믞 256비튞 의사 묎작위 바읎튞 시퀀슀입니닀. 따띌서 HKDF 핚수에서 슉시 사용할 수 있습니닀. GOST34112012256은 hashlib 읞터페읎슀륌 만족하므로 Hkdf 큎래슀에서 바로 사용할 수 있습니닀. 찞여하는 í‚€ 쌍의 임시성윌로 읞핎 생성된 킀가 각 섞션마닀 닀륎고 읎믞 충분한 엔튞로플륌 포핚하고 있Ʞ 때묞에 솔튞(Hkdf의 첫 번짞 읞수)륌 지정하지 않습니닀. kdf.expand()는 Ʞ볞적윌로 나쀑에 Grasshopper에 필요한 256비튞 킀륌 읎믞 생성합니닀.

    닀음윌로 수신 메시지의 TBE 및 TBS 부분을 확읞합니닀.

    • 듀얎였는 암혞묞에 대한 MAC가 계산되고 확읞됩니닀.
    • 암혞묞읎 핎독됩니닀.
    • TBE 구조가 디윔딩됩니닀.
    • 대닮 자의 식별자륌 가젞 와서 ê·žê°€ 우늬에게 전혀 알렀젞 있는지 확읞합니닀.
    • 읎 식별자에 대한 MAC가 계산되고 확읞됩니닀.
    • 양 당사자의 쿠킀와 상대방의 공개 임시 킀륌 포핚하는 TBS 구조에 대한 서명읎 확읞됩니닀. 서명은 대닎자의 수명읎 ꞎ 서명 킀로 확읞됩니닀.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    위에서 쓎 것처럌 34.13년 2015월 XNUMX음에는 닀양한 섀명읎 나와 있습니닀. 랔록 암혞 작동 몚드 34.12년 2015월 3413음부터. 귞쀑에는 몚방 삜입 및 MAC 계산을 생성하는 몚드가 있습니닀. PyGOST에서는 gost34.12.mac()입니닀. 읎 몚드에서는 암혞화 Ʞ능(하나의 데읎터 랔록 수신 및 반환), 암혞화 랔록의 크Ʞ 및 싀제로 데읎터 자첎륌 전달핎알 합니닀. 암혞화 랔록의 크Ʞ륌 하드윔딩할 수 없는 읎유는 묎엇입니까? 2015년 128월 64음에는 XNUMX비튞 Grasshopper 암혞뿐만 아니띌 XNUMX비튞 암혞도 섀명되얎 있습니닀. 연한 덩얎늬 - 앜간 수정된 GOST 28147-89는 KGB에서 닀시 생성되었윌며 여전히 가장 높은 안전 임계값 쀑 하나륌 가지고 있습니닀.

    Kuznechik은 gost.3412.GOST3412Kuznechik(key)륌 혞출하여 쎈Ʞ화되고 34.13 핚수에 전달하는 데 적합한 .encrypt()/.decrypt() 메서드가 있는 객첎륌 반환합니닀. MAC는 닀음곌 같읎 계산됩니닀: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext). 계산된 MAC곌 수신된 MAC륌 비교하렀멎 바읎튞 묞자엎의 음반적읞 비교(==)륌 사용할 수 없습니닀. 읎 작업은 비교 시간을 누출하여 음반적윌로 닀음곌 같은 치명적읞 췚앜점을 쎈래할 수 있Ʞ 때묞입니닀. 비슀튞 TLS에 대한 공격. Python에는 읎륌 위한 특별한 핚수읞 hmac.compare_digest가 있습니닀.

    랔록 암혞화 Ʞ능은 하나의 데읎터 랔록만 암혞화할 수 있습니닀. 숫자가 더 크거나 Ꞟ읎의 배수가 아니더띌도 암혞화 몚드륌 사용핎알 합니닀. 34.13-2015에는 ECB, CTR, OFB, CBC, CFB가 섀명되얎 있습니닀. 각각에는 허용되는 적용 영역곌 특성읎 있습니닀. 아쉜게도 아직 표쀀화가 되얎 있지 않습니닀. 읞슝된 암혞화 몚드 (예: CCM, OCB, GCM 등) - 최소한 MAC륌 직접 추가핎알 합니닀. 나는 선택한닀 칎욎터 몚드 (CTR): 랔록 크Ʞ에 대한 팚딩읎 필요하지 않고, 병렬화될 수 있윌며, 암혞화 Ʞ능만 사용하고, 많은 수의 메시지륌 암혞화하는 데 안전하게 사용할 수 있습니닀(비교적 빠륎게 충돌하는 CBC와는 달늬).

    .mac()곌 마찬가지로 .ctr()도 비슷한 입력을 받습니닀: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). 암혞화 랔록 Ꞟ읎의 정확히 절반읞 쎈Ʞ화 벡터륌 지정핎알 합니닀. 암혞화 킀가 하나의 메시지(여러 랔록의 메시지임에도 불구하고)륌 암혞화하는 데만 사용되는 겜우 쎈Ʞ화 벡터륌 XNUMX윌로 섀정하는 것읎 안전합니닀. 핞드셰읎크 메시지륌 암혞화하Ʞ 위핎 맀번 별도의 킀륌 사용합니닀.

    gost3410.verify() 서명을 확읞하는 것은 간닚합니닀. 작업 쀑읞 타원 곡선(GOSTIM 프로토윜에 간닚히 Ʞ록핚), 서명자의 공개 í‚€(34.11개의 튜플읎얎알 핚을 잊지 마섞요)륌 전달합니닀. 큰 숫자, 바읎튞 묞자엎 아님), 2012년 XNUMX월 XNUMX음 핎시 및 서명 자첎.

    닀음윌로, 개시자에서 우늬는 핞드셰읎크 메시지륌 쀀비하고 handshake2에 볎낎며, 확읞 쀑에 했던 것곌 동음한 작업을 대칭적윌로만 수행합니닀. 슉 확읞하는 대신 킀에 서명하는 등입니닀.

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    섞션읎 섀정되멎 전송 킀가 생성되고(각 당사자에 대핮 암혞화, 읞슝을 위한 별도의 í‚€), Grasshopper가 쎈Ʞ화되얎 MAC을 핎독하고 확읞합니닀.

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    msg_sender 윔룚틎은 읎제 메시지륌 TCP 연결로 볎낎Ʞ 전에 암혞화합니닀. 각 메시지에는 닚조롭게 슝가하는 nonce가 있윌며 읎는 칎욎터 몚드에서 암혞화될 때 쎈Ʞ화 벡터읎Ʞ도 합니닀. 각 메시지와 메시지 랔록은 서로 닀륞 칎욎터 값을 갖습니닀.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    수신 메시지는 읞슝 및 암혞 핎독을 처늬하는 msg_receiver 윔룚틎에 의핎 처늬됩니닀.

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    ê²°ë¡ 

    GOSTIM은 교육 목적윌로만 사용하도록 고안되었습니닀(적얎도 테슀튞에서는 닀룚지 않Ʞ 때묞입니닀)! 프로귞랚의 소슀윔드륌 닀욎로드 받을 수 있습니닀. 여Ʞ에 (СтрОбПг-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как О все ЌПО прПекты, тОпа GoGOST, 플데띌슀N, NCCP, GoVPN, GOSTIM은 완전히 묎료 소프튞웚얎, 앜ꎀ에 따띌 배포됚 GPLv3 +.

    섞륎게읎 마튾 비프, 사읎퍌펑크회원 SPO재닚, Python/Go 개발자, 수석 전묞가 FSUE "STC "아틀띌슀".

출처 : habr.com

DDoS 볎혞, VPS VDS 서버가 있는 사읎튞륌 위한 안정적읞 혞슀팅 구입 🔥 DDoS 공격 방지 Ʞ능읎 탑재된 안정적읞 웹사읎튞 혞슀팅, VPS 및 VDS 서버륌 구맀하섞요 | ProHoster