GOSTIM: P2P F2F E2EE IM در یک شب با رمزنگاری GOST

توسعه دهنده بودن PyGOST کتابخانه‌ها (اولیه‌های رمزنگاری GOST در پایتون خالص)، اغلب سؤالاتی در مورد نحوه پیاده‌سازی ساده‌ترین پیام‌رسانی امن روی زانو دریافت می‌کنم. بسیاری از مردم رمزنگاری کاربردی را بسیار ساده می‌دانند و فراخوانی .encrypt() روی یک رمز بلاک برای ارسال امن آن از طریق یک کانال ارتباطی کافی است. برخی دیگر معتقدند که رمزنگاری کاربردی سرنوشت معدودی است و قابل قبول است که شرکت‌های ثروتمندی مانند تلگرام با ریاضیدانان المپیاد نمی تواند اجرا کند پروتکل امن

همه اینها مرا بر آن داشت تا این مقاله را بنویسم تا نشان دهم پیاده سازی پروتکل های رمزنگاری و IM امن کار چندان دشواری نیست. با این حال، ارزش اختراع پروتکل های تأیید هویت و توافق کلیدی خود را ندارد.

GOSTIM: P2P F2F E2EE IM در یک شب با رمزنگاری GOST
مقاله خواهد نوشت همتا به همتا, دوست به دوست, رمزگذاری سرتاسر پیام رسان فوری با SIGMA-I پروتکل احراز هویت و توافق کلید (بر اساس آن اجرا می شود IPsec IKE) منحصراً با استفاده از الگوریتم های رمزنگاری GOST کتابخانه PyGOST و کتابخانه رمزگذاری پیام ASN.1 PyDERASN (که قبلاً در مورد آن قبلا نوشته بود). یک پیش نیاز: باید آنقدر ساده باشد که بتوان آن را از ابتدا در یک عصر (یا یک روز کاری) نوشت، در غیر این صورت دیگر یک برنامه ساده نیست. احتمالاً دارای خطاها، پیچیدگی های غیر ضروری، کاستی هایی است، به علاوه این اولین برنامه من است که از کتابخانه asyncio استفاده می کنم.

طراحی IM

ابتدا باید بفهمیم IM ما چگونه خواهد بود. برای سادگی، اجازه دهید یک شبکه همتا به همتا باشد، بدون هیچ گونه کشفی از شرکت کنندگان. ما شخصاً نشان خواهیم داد که به کدام آدرس: پورت برای برقراری ارتباط با طرف مقابل متصل شویم.

من درک می کنم که در حال حاضر، این فرض که ارتباط مستقیم بین دو رایانه دلخواه در دسترس است، محدودیت قابل توجهی در کاربرد IM در عمل است. اما هرچه توسعه‌دهندگان بیشتر انواع عصاهای NAT-traversal را پیاده‌سازی کنند، مدت بیشتری در اینترنت IPv4 باقی می‌مانیم، با احتمال ناامیدکننده ارتباط بین رایانه‌های دلخواه. تا کی می توانید کمبود IPv6 را در خانه و محل کار تحمل کنید؟

ما یک شبکه دوست به دوست خواهیم داشت: همه طرف های ممکن باید از قبل شناخته شوند. اولا، این همه چیز را بسیار ساده می کند: ما خودمان را معرفی کردیم، نام/کلید را پیدا کردیم یا پیدا نکردیم، با شناخت طرف مقابل، ارتباط را قطع کردیم یا به کار ادامه دادیم. دوم اینکه به طور کلی ایمن است و بسیاری از حملات را از بین می برد.

رابط IM نزدیک به راه حل های کلاسیک خواهد بود پروژه های بی مزهکه من واقعاً آن را به خاطر مینیمالیسم و ​​فلسفه یونیکس راهشان دوست دارم. برنامه IM یک دایرکتوری با سه سوکت دامنه یونیکس برای هر مخاطب ایجاد می کند:

  • در - پیام های ارسال شده به همکار در آن ثبت می شود.
  • خارج - پیام های دریافت شده از طرف گفتگو از آن خوانده می شود.
  • حالت - با خواندن از آن، متوجه می شویم که آیا مخاطب در حال حاضر متصل است، آدرس / پورت اتصال.

علاوه بر این، یک سوکت اتصال ایجاد می‌شود، با نوشتن پورت میزبان که در آن یک اتصال به مخاطب راه دور را آغاز می‌کنیم.

|-- alice
|   |-- in
|   |-- out
|   `-- state
|-- bob
|   |-- in
|   |-- out
|   `-- state
`- conn

این رویکرد به شما امکان می دهد تا پیاده سازی های مستقلی از حمل و نقل IM و رابط کاربری انجام دهید، زیرا هیچ دوستی وجود ندارد، شما نمی توانید همه را راضی نگه دارید. استفاده كردن tmux و یا چند دم، می توانید یک رابط چند پنجره ای با برجسته سازی نحو دریافت کنید. و با کمک rlwrap می توانید یک خط ورودی پیام سازگار با Readline گنو دریافت کنید.

در واقع، پروژه های بدون مکش از فایل های FIFO استفاده می کنند. شخصاً نمی‌توانستم بفهمم که چگونه با فایل‌ها به صورت رقابتی در asyncio کار کنم، بدون اینکه پس‌زمینه دست‌نویسی از رشته‌های اختصاصی وجود داشته باشد (من مدت طولانی است که از این زبان برای چنین مواردی استفاده می‌کنم. Go). بنابراین، تصمیم گرفتم به سوکت های دامنه یونیکس بسنده کنم. متأسفانه، این کار انجام echo 2001:470:dead::babe 6666 > conn را غیرممکن می کند. با استفاده از این مشکل حل کردم socat: echo 2001:470:dead::babe 6666 | socat - UNIX-CONNECT:conn، socat READLINE UNIX-CONNECT:alice/in.

پروتکل ناامن اصلی

TCP به عنوان حمل و نقل استفاده می شود: تحویل و سفارش آن را تضمین می کند. UDP هیچکدام را تضمین نمی کند (که در هنگام استفاده از رمزنگاری مفید خواهد بود)، بلکه پشتیبانی می کند SCTP پایتون از جعبه خارج نمی شود.

متأسفانه، در TCP هیچ مفهومی از پیام وجود ندارد، فقط یک جریان از بایت ها وجود دارد. بنابراین لازم است قالبی برای پیام ها ارائه شود تا در این تاپیک بین خودشان به اشتراک گذاشته شود. ما می توانیم با استفاده از کاراکتر فید خط موافقت کنیم. برای شروع خوب است، اما هنگامی که ما شروع به رمزگذاری پیام های خود می کنیم، این کاراکتر ممکن است در هر جایی از متن رمزگذاری شده ظاهر شود. بنابراین در شبکه ها، پروتکل های محبوب پروتکل هایی هستند که ابتدا طول پیام را بر حسب بایت ارسال می کنند. به عنوان مثال، خارج از جعبه پایتون دارای xdrlib است که به شما امکان می دهد با یک فرمت مشابه کار کنید XDR.

ما با خواندن TCP به درستی و کارآمد کار نخواهیم کرد - ما کد را ساده می کنیم. ما داده ها را از سوکت در یک حلقه بی پایان می خوانیم تا زمانی که پیام کامل را رمزگشایی کنیم. JSON با XML نیز می تواند به عنوان قالبی برای این رویکرد استفاده شود. اما وقتی رمزنگاری اضافه می‌شود، داده‌ها باید امضا و احراز هویت شوند - و این نیاز به نمایش یکسان بایت به بایت از اشیا دارد، که JSON/XML ارائه نمی‌کند (نتایج تخلیه ممکن است متفاوت باشد).

XDR برای این کار مناسب است، با این حال من ASN.1 را با رمزگذاری DER و PyDERASN کتابخانه، از آنجایی که ما اشیاء سطح بالایی را در دسترس خواهیم داشت که اغلب کار کردن با آنها دلپذیرتر و راحت تر است. بر خلاف طرحواره رمزگذاری, بسته پیام یا CBOR، ASN.1 به طور خودکار داده ها را در برابر یک طرحواره کدگذاری شده بررسی می کند.

# Msg ::= CHOICE {
#       text      MsgText,
#       handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
    schema = ((
        ("text", MsgText()),
        ("handshake", MsgHandshake(expl=tag_ctxc(0))),
    ))

# MsgText ::= SEQUENCE {
#       text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
    schema = ((
        ("text", UTF8String(bounds=(1, MaxTextLen))),
    ))

# MsgHandshake ::= SEQUENCE {
#       peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
    schema = ((
        ("peerName", UTF8String(bounds=(1, 256))),
    ))

پیام دریافتی به صورت Msg خواهد بود: یا یک MsgText متنی (در حال حاضر با یک فیلد متنی) یا یک پیام دست دادن MsgHandshake (که حاوی نام مخاطب است). اکنون بیش از حد پیچیده به نظر می رسد، اما این پایه ای برای آینده است.

     ┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───┬─ IdA) │ │───────── ────────>│ │ │ │MsgHandshake(IdB) │ │ MsgText() │ │──── MsgText() │ │ │

IM بدون رمزنگاری

همانطور که قبلاً گفتم، کتابخانه asyncio برای همه عملیات سوکت استفاده خواهد شد. بیایید آنچه را که در راه اندازی انتظار داریم اعلام کنیم:

parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
    "--our-name",
    required=True,
    help="Our peer name",
)
parser.add_argument(
    "--their-names",
    required=True,
    help="Their peer names, comma-separated",
)
parser.add_argument(
    "--bind",
    default="::1",
    help="Address to listen on",
)
parser.add_argument(
    "--port",
    type=int,
    default=6666,
    help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))

نام خود را تنظیم کنید (--ما-نام آلیس). همه مخاطبین مورد انتظار با کاما از هم جدا شده اند (-نام آنها bob,eve). برای هر یک از مخاطبین، یک دایرکتوری با سوکت های یونیکس و همچنین یک برنامه برای هر یک از حالت های ورودی، خروجی ایجاد می شود:

for peer_name in THEIR_NAMES:
    makedirs(peer_name, mode=0o700, exist_ok=True)
    out_queue = asyncio.Queue()
    OUT_QUEUES[peer_name] = out_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_out_processor, out_queue=out_queue),
        path.join(peer_name, "out"),
    ))
    in_queue = asyncio.Queue()
    IN_QUEUES[peer_name] = in_queue
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_in_processor, in_queue=in_queue),
        path.join(peer_name, "in"),
    ))
    asyncio.ensure_future(asyncio.start_unix_server(
        partial(unixsock_state_processor, peer_name=peer_name),
        path.join(peer_name, "state"),
    ))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))

پیام‌هایی که از سوی کاربر از سوکت ورودی می‌آیند به صف IN_QUEUES ارسال می‌شوند:

async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
    while True:
        text = await reader.read(MaxTextLen)
        if text == b"":
            break
        await in_queue.put(text.decode("utf-8"))

پیام‌هایی که از طرف صحبت‌کنندگان می‌آیند به صف‌های OUT_QUEUES ارسال می‌شوند که داده‌ها از آن‌ها به سوکت خروجی نوشته می‌شوند:

async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
    while True:
        text = await out_queue.get()
        writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
        await writer.drain()

هنگام خواندن از سوکت حالت، برنامه آدرس مخاطب را در فرهنگ لغت PEER_ALIVE جستجو می کند. اگر هنوز ارتباطی با مخاطب وجود نداشته باشد، یک خط خالی نوشته می شود.

async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
    peer_writer = PEER_ALIVES.get(peer_name)
    writer.write(
        b"" if peer_writer is None else (" ".join([
            str(i) for i in peer_writer.get_extra_info("peername")[:2]
        ]).encode("utf-8") + b"n")
    )
    await writer.drain()
    writer.close()

هنگام نوشتن یک آدرس در یک سوکت اتصال، عملکرد "Initiator" اتصال راه اندازی می شود:

async def unixsock_conn_processor(reader, writer) -> None:
    data = await reader.read(256)
    writer.close()
    host, port = data.decode("utf-8").split(" ")
    await initiator(host=host, port=int(port))

بیایید آغازگر را در نظر بگیریم. ابتدا یک اتصال به میزبان/پورت مشخص شده را باز می کند و یک پیام دست دادن با نام آن ارسال می کند:

 130 async def initiator(host, port):
 131     _id = repr((host, port))
 132     logging.info("%s: dialing", _id)
 133     reader, writer = await asyncio.open_connection(host, port)
 134     # Handshake message {{{
 135     writer.write(Msg(("handshake", MsgHandshake((
 136         ("peerName", OUR_NAME),
 137     )))).encode())
 138     # }}}
 139     await writer.drain()

سپس منتظر پاسخ طرف راه دور می ماند. سعی می کند پاسخ دریافتی را با استفاده از طرح Msg ASN.1 رمزگشایی کند. ما فرض می کنیم که کل پیام در یک بخش TCP ارسال می شود و هنگام فراخوانی .read() آن را به صورت اتمی دریافت می کنیم. بررسی می کنیم که پیام دست دادن را دریافت کرده ایم.

 141     # Wait for Handshake message {{{
 142     data = await reader.read(256)
 143     if data == b"":
 144         logging.warning("%s: no answer, disconnecting", _id)
 145         writer.close()
 146         return
 147     try:
 148         msg, _ = Msg().decode(data)
 149     except ASN1Error:
 150         logging.warning("%s: undecodable answer, disconnecting", _id)
 151         writer.close()
 152         return
 153     logging.info("%s: got %s message", _id, msg.choice)
 154     if msg.choice != "handshake":
 155         logging.warning("%s: unexpected message, disconnecting", _id)
 156         writer.close()
 157         return
 158     # }}}

ما بررسی می کنیم که نام دریافتی مخاطب برای ما شناخته شده باشد. اگر نه، پس ما اتصال را قطع می کنیم. ما بررسی می کنیم که آیا قبلاً با او ارتباط برقرار کرده ایم (مصاحبه دوباره دستور اتصال به ما را داد) و آن را می بندیم. صف IN_QUEUES رشته‌های پایتون را با متن پیام نگه می‌دارد، اما مقدار ویژه‌ای None دارد که به coroutine msg_sender سیگنال می‌دهد که کار را متوقف کند تا نویسنده‌اش مرتبط با اتصال TCP قدیمی را فراموش کند.

 159     msg_handshake = msg.value
 160     peer_name = str(msg_handshake["peerName"])
 161     if peer_name not in THEIR_NAMES:
 162         logging.warning("unknown peer name: %s", peer_name)
 163         writer.close()
 164         return
 165     logging.info("%s: session established: %s", _id, peer_name)
 166     # Run text message sender, initialize transport decoder {{{
 167     peer_alive = PEER_ALIVES.pop(peer_name, None)
 168     if peer_alive is not None:
 169         peer_alive.close()
 170         await IN_QUEUES[peer_name].put(None)
 171     PEER_ALIVES[peer_name] = writer
 172     asyncio.ensure_future(msg_sender(peer_name, writer))
 173     # }}}

msg_sender پیام‌های خروجی را می‌پذیرد (از یک سوکت در صف قرار می‌گیرد)، آنها را به صورت یک پیام MsgText سریال می‌کند و از طریق یک اتصال TCP ارسال می‌کند. هر لحظه ممکن است بشکند - ما به وضوح این را رهگیری می کنیم.

async def msg_sender(peer_name: str, writer) -> None:
    in_queue = IN_QUEUES[peer_name]
    while True:
        text = await in_queue.get()
        if text is None:
            break
        writer.write(Msg(("text", MsgText((
            ("text", UTF8String(text)),
        )))).encode())
        try:
            await writer.drain()
        except ConnectionResetError:
            del PEER_ALIVES[peer_name]
            return
        logging.info("%s: sent %d characters message", peer_name, len(text))

در پایان، آغازگر یک حلقه بی نهایت از خواندن پیام ها را از سوکت وارد می کند. بررسی می‌کند که آیا این پیام‌ها پیام‌های متنی هستند یا خیر و آنها را در صف OUT_QUEUES قرار می‌دهد که از آنجا به سوکت خروجی مخاطب مربوطه ارسال می‌شود. چرا نمی توانید فقط .read() را انجام دهید و پیام را رمزگشایی کنید؟ زیرا ممکن است چندین پیام از کاربر در بافر سیستم عامل جمع شده و در یک بخش TCP ارسال شود. ما می‌توانیم اولین مورد را رمزگشایی کنیم و سپس بخشی از مورد بعدی ممکن است در بافر باقی بماند. در صورت بروز هر گونه وضعیت غیرعادی، اتصال TCP را می بندیم و کوروتین msg_sender را متوقف می کنیم (با ارسال None به صف OUT_QUEUES).

 174     buf = b""
 175     # Wait for test messages {{{
 176     while True:
 177         data = await reader.read(MaxMsgLen)
 178         if data == b"":
 179             break
 180         buf += data
 181         if len(buf) > MaxMsgLen:
 182             logging.warning("%s: max buffer size exceeded", _id)
 183             break
 184         try:
 185             msg, tail = Msg().decode(buf)
 186         except ASN1Error:
 187             continue
 188         buf = tail
 189         if msg.choice != "text":
 190             logging.warning("%s: unexpected %s message", _id, msg.choice)
 191             break
 192         try:
 193             await msg_receiver(msg.value, peer_name)
 194         except ValueError as err:
 195             logging.warning("%s: %s", err)
 196             break
 197     # }}}
 198     logging.info("%s: disconnecting: %s", _id, peer_name)
 199     IN_QUEUES[peer_name].put(None)
 200     writer.close()

  66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
  67     text = str(msg_text["text"])
  68     logging.info("%s: received %d characters message", peer_name, len(text))
  69     await OUT_QUEUES[peer_name].put(text)

به کد اصلی برگردیم. پس از ایجاد تمام کوروتین ها در زمان شروع برنامه، سرور TCP را راه اندازی می کنیم. برای هر اتصال ایجاد شده، یک کوروتین پاسخ دهنده ایجاد می کند.

logging.basicConfig(
    level=logging.INFO,
    format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()

پاسخ دهنده شبیه آغازگر است و همه اقدامات مشابه را منعکس می کند، اما حلقه بی نهایت خواندن پیام ها برای سادگی، بلافاصله شروع می شود. در حال حاضر، پروتکل دست دادن یک پیام از هر طرف ارسال می کند، اما در آینده دو پیام از آغازگر اتصال وجود خواهد داشت، پس از آن پیام های متنی می توانند بلافاصله ارسال شوند.

  72 async def responder(reader, writer):
  73     _id = writer.get_extra_info("peername")
  74     logging.info("%s: connected", _id)
  75     buf = b""
  76     msg_expected = "handshake"
  77     peer_name = None
  78     while True:
  79         # Read until we get Msg message {{{
  80         data = await reader.read(MaxMsgLen)
  81         if data == b"":
  82             logging.info("%s: closed connection", _id)
  83             break
  84         buf += data
  85         if len(buf) > MaxMsgLen:
  86             logging.warning("%s: max buffer size exceeded", _id)
  87             break
  88         try:
  89             msg, tail = Msg().decode(buf)
  90         except ASN1Error:
  91             continue
  92         buf = tail
  93         # }}}
  94         if msg.choice != msg_expected:
  95             logging.warning("%s: unexpected %s message", _id, msg.choice)
  96             break
  97         if msg_expected == "text":
  98             try:
  99                 await msg_receiver(msg.value, peer_name)
 100             except ValueError as err:
 101                 logging.warning("%s: %s", err)
 102                 break
 103         # Process Handshake message {{{
 104         elif msg_expected == "handshake":
 105             logging.info("%s: got %s message", _id, msg_expected)
 106             msg_handshake = msg.value
 107             peer_name = str(msg_handshake["peerName"])
 108             if peer_name not in THEIR_NAMES:
 109                 logging.warning("unknown peer name: %s", peer_name)
 110                 break
 111             writer.write(Msg(("handshake", MsgHandshake((
 112                 ("peerName", OUR_NAME),
 113             )))).encode())
 114             await writer.drain()
 115             logging.info("%s: session established: %s", _id, peer_name)
 116             peer_alive = PEER_ALIVES.pop(peer_name, None)
 117             if peer_alive is not None:
 118                 peer_alive.close()
 119                 await IN_QUEUES[peer_name].put(None)
 120             PEER_ALIVES[peer_name] = writer
 121             asyncio.ensure_future(msg_sender(peer_name, writer))
 122             msg_expected = "text"
 123         # }}}
 124     logging.info("%s: disconnecting", _id)
 125     if msg_expected == "text":
 126         IN_QUEUES[peer_name].put(None)
 127     writer.close()

پروتکل امن

وقت آن است که ارتباطات خود را ایمن کنیم. منظور ما از امنیت چیست و چه می خواهیم:

  • محرمانه بودن پیام های ارسالی؛
  • صحت و یکپارچگی پیام های ارسال شده - تغییرات آنها باید شناسایی شود.
  • محافظت در برابر حملات بازپخش - واقعیت پیام های مفقود یا مکرر باید شناسایی شود (و ما تصمیم می گیریم اتصال را خاتمه دهیم).
  • شناسایی و احراز هویت طرفین با استفاده از کلیدهای عمومی از قبل وارد شده - ما قبلاً تصمیم گرفتیم که یک شبکه دوست به دوست ایجاد کنیم. تنها پس از احراز هویت، متوجه می شویم که با چه کسی در ارتباط هستیم.
  • در دسترس بودن پنهان کاری کامل خواص (PFS) - به خطر انداختن کلید امضای طولانی مدت ما نباید منجر به توانایی خواندن تمام مکاتبات قبلی شود. ضبط ترافیک رهگیری شده بی فایده می شود.
  • اعتبار/اعتبار پیام ها (حمل و نقل و دست دادن) فقط در یک جلسه TCP. درج پیام‌های امضا شده/تأیید شده از یک جلسه دیگر (حتی با همان مخاطب) نباید امکان پذیر باشد.
  • یک ناظر غیرفعال نباید شناسه های کاربر، کلیدهای عمومی با عمر طولانی ارسال شده یا هش های آنها را ببیند. ناشناس بودن مشخص از ناظر منفعل.

با کمال تعجب، تقریباً همه مایلند که این حداقل را در هر پروتکل دست دادنی داشته باشند، و تعداد بسیار کمی از موارد فوق در نهایت برای پروتکل‌های «بومی» برآورده می‌شود. حالا ما چیز جدیدی اختراع نمی کنیم. من قطعا استفاده را توصیه می کنم چارچوب نویز برای ساخت پروتکل‌ها، اما بیایید چیز ساده‌تری را انتخاب کنیم.

دو پروتکل محبوب عبارتند از:

  • TLS - یک پروتکل بسیار پیچیده با سابقه طولانی باگ ها، موانع، آسیب پذیری ها، تفکر ضعیف، پیچیدگی و کاستی ها (با این حال، این ارتباط چندانی با TLS 1.3 ندارد). اما ما آن را در نظر نمی گیریم زیرا بیش از حد پیچیده است.
  • IPsec с IKE - مشکلات رمزنگاری جدی ندارند، اگرچه آنها نیز ساده نیستند. اگر در مورد IKEv1 و IKEv2 خوانده اید، منبع آنها است STSپروتکل های ISO/IEC IS 9798-3 و SIGMA (SIGn-and-MAc) - به اندازه کافی ساده برای پیاده سازی در یک شب.

چه چیزی در مورد SIGMA به عنوان آخرین پیوند در توسعه پروتکل های STS/ISO خوب است؟ همه الزامات ما (از جمله "پنهان کردن" شناسه های همکار) را برآورده می کند و هیچ مشکل رمزنگاری شناخته شده ای ندارد. حداقلی است - حذف حداقل یک عنصر از پیام پروتکل منجر به ناامنی آن می شود.

بیایید از ساده ترین پروتکل خانگی به SIGMA برویم. اساسی ترین عملیات مورد علاقه ما این است توافقنامه کلیدی: تابعی که برای هر دو شرکت کننده مقدار یکسانی را به خروجی می دهد که می تواند به عنوان یک کلید متقارن استفاده شود. بدون پرداختن به جزئیات: هر یک از طرفین یک جفت کلید زودگذر (که فقط در یک جلسه استفاده می‌شود) (کلیدهای عمومی و خصوصی) تولید می‌کنند، کلیدهای عمومی را مبادله می‌کنند، تابع توافق را فراخوانی می‌کنند، که کلید خصوصی خود و عمومی را به ورودی آن ارسال می‌کنند. کلید مخاطب

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───│A└─┘┘┬└─┘┘ │ ╔══════════ - ═ ════════ ═══════════╝ │ IdB, PubB │ ╔═════════════════════ │<───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚══════════════════ ───┐ ╔════ ═══╧════════════╗ │ ║Key = DH(PrvA، PubB)║ <───┘═════════ ═══════ ════╝ │ │ │ │

هر کسی می تواند به وسط بپرد و کلیدهای عمومی را با کلیدهای خود جایگزین کند - در این پروتکل احراز هویت مخاطبین وجود ندارد. بیایید یک امضا با کلیدهای طولانی مدت اضافه کنیم.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───┬└─┘──┬└─┘ └┬ sign(SignPrvA, (PubA)) │ ╔═ │──────────── ────────── ────- PubA = load()║ │ │ ║PrvA، PubA = DHgen() ║ │ │ ╚═══════ ═══════ ══════════════ , sign(SignPrvB, (PubB)) │ ╔══════════════ ═══════ ════ ─────────── - ══════════ ══════════════ ══╝ ────┐ ╔ ════════ ═════╗ │ │ ║ تایید( SignPubB, ...)║ │ <───┘ ║Key = DH(Pr vA, PubB) ║ │ │ ╚════════════════════ ══╝ │ │ │

چنین امضایی کار نخواهد کرد، زیرا به یک جلسه خاص مرتبط نیست. چنین پیام هایی برای جلسات با سایر شرکت کنندگان نیز "مناسب" هستند. کل زمینه باید مشترک شود. این ما را مجبور می کند که پیام دیگری از A اضافه کنیم.

علاوه بر این، افزودن شناسه خود در زیر امضا بسیار مهم است، زیرا در غیر این صورت می‌توانیم IdXXX را جایگزین کنیم و پیام را با کلید یک همکار شناخته شده دیگر دوباره امضا کنیم. برای جلوگیری از حملات بازتابی، لازم است که عناصر زیر امضا در مکان های کاملاً مشخص با توجه به معنای آنها قرار گیرند: اگر A علامت می دهد (PubA, PubB) پس B باید امضا کند (PubB, PubA). این همچنین نشان دهنده اهمیت انتخاب ساختار و قالب داده های سریالی است. به عنوان مثال، مجموعه‌ها در رمزگذاری ASN.1 DER مرتب شده‌اند: SET OF(PubA، PubB) با SET OF(PubB، PubA) یکسان خواهد بود.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───│A└─┘┘┬└─┘┘ │ ╔══════════ ═════════════════╗ │──────── ────────── ─────────────>│ ║SignPrvA، SignPubA = load()║ │ │ ║PrvA، PubA = DHgen( ═ ═══════ ═══════════════╝ │IdB, PubB, sign(SignPrvB, (IdB, PubA, PubB)) │ ═════════ ═════ ════════════╗ │<────────────- ────────── ─────────│ ║SignPrvB، SignPubB = load()║ │ │ ║PrvB، PubB = DHgen() ║ │ ┐══════ ═ ════════ علامت ══════════╝ │ علامت(SignPrvA، (IdA، PubB، PubA)) ════╗ │─ ────────────────── ───>│ ║تأیید (SignPubB, ...) ║ │ │ ║ کلید = dh (prva، PUBB) ║ │ │ │

با این حال، ما هنوز "اثبات" نکرده ایم که کلید مشترک مشابهی را برای این جلسه ایجاد کرده ایم. در اصل، ما می توانیم بدون این مرحله انجام دهیم - اولین اتصال حمل و نقل نامعتبر خواهد بود، اما ما می خواهیم که وقتی دست دادن کامل شد، مطمئن باشیم که همه چیز واقعاً توافق شده است. در حال حاضر پروتکل ISO/IEC IS 9798-3 را در اختیار داریم.

ما می توانیم خود کلید تولید شده را امضا کنیم. این خطرناک است، زیرا ممکن است در الگوریتم امضای مورد استفاده نشتی وجود داشته باشد (حتی بیت در هر امضا، اما همچنان نشت کند). امضای هش کلید مشتق ممکن است، اما نشت حتی هش کلید مشتق شده می‌تواند در حمله brute-force به تابع مشتق‌سازی ارزشمند باشد. SIGMA از یک تابع MAC استفاده می کند که شناسه فرستنده را احراز هویت می کند.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───│A└─┘┘┬└─┘┘ │ ╔══════════ ═════════════════╗ │──────── ────────── ──────────────────>│ ║SignPrvA، SignPubA = بار()║ │ │ ── ╚ ═══════ ‎ ═══ │<───────────────── ──── ────────── ─│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ═════════ ═════════ ══╝ │ │ ╔════════════ ═════════╗ ═══╗ │ ║ کلید = DH( PvA, PubB) ║ │────────────────────── ────────── ─────>│ ║تأیید (کلید، IdB) ║ │ │ ║ تأیید (SignPubB، ...) ═════ ═╝ │ │

به عنوان یک بهینه سازی، برخی ممکن است بخواهند از کلیدهای زودگذر خود مجددا استفاده کنند (که البته برای PFS مایه تاسف است). به عنوان مثال، ما یک جفت کلید ایجاد کردیم، سعی کردیم متصل شویم، اما TCP در دسترس نبود یا جایی در وسط پروتکل قطع شد. حیف است که آنتروپی و منابع پردازشگر هدر رفته را برای یک جفت جدید هدر دهیم. بنابراین، ما به اصطلاح کوکی را معرفی می کنیم - یک مقدار شبه تصادفی که هنگام استفاده مجدد از کلیدهای عمومی زودگذر، از حملات احتمالی تکرار تصادفی محافظت می کند. به دلیل اتصال بین کوکی و کلید عمومی زودگذر، کلید عمومی طرف مقابل را می توان به عنوان غیر ضروری از امضا حذف کرد.

┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───┬└─┘┘ └┬─ CookieA │ ╔════════ ═══════════════════╗ │─────── ────────── ────────────────── ─>│ ║SignPrvA، SignPubA = load( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚══════════════════════════ ══╝ │IdB، PubB، CookieB , sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔══════════════════════════ ═ ╗ │< ────────────────── ────────── - │ ╚══════ ═════════════════════╝ │ │ ╔════════ ═══════╗ │ علامت( SignPrvA، (CookieB، CookieA، PubA))، MAC(IdA) │ ║Key = DH(PrvA، PubB) ║ │───────────────── ─ ── ────────────────── ───────>│ ║ تأیید (کلید، شناسه) ║ │ │ ║ تأیید (SignPubB، ...)║ │ │ ╚═══════════════ │

در نهایت، ما می خواهیم حریم خصوصی شرکای مکالمه خود را از یک ناظر منفعل به دست آوریم. برای انجام این کار، SIGMA پیشنهاد می‌کند که ابتدا کلیدهای زودگذر را مبادله کرده و یک کلید مشترک ایجاد کند که روی آن پیام‌های احراز هویت و شناسایی رمزگذاری شود. SIGMA دو گزینه را توضیح می دهد:

  • SIGMA-I - آغازگر را از حملات فعال محافظت می کند، پاسخ دهنده را از حملات غیرفعال محافظت می کند: آغازگر پاسخ دهنده را احراز هویت می کند و اگر چیزی مطابقت نداشته باشد، شناسایی خود را اعلام نمی کند. اگر یک پروتکل فعال با او شروع شود، متهم هویت خود را اعلام می کند. ناظر منفعل چیزی یاد نمی گیرد.
    SIGMA-R - از پاسخ دهنده در برابر حملات فعال، آغازگر از حملات غیرفعال محافظت می کند. همه چیز دقیقا برعکس است، اما در این پروتکل چهار پیام دست دادن قبلا منتقل شده است.

    ما SIGMA-I را انتخاب می کنیم زیرا بیشتر شبیه آنچه ما از چیزهای آشنای مشتری-سرور انتظار داریم است: مشتری فقط توسط سرور تأیید شده شناسایی می شود و همه از قبل سرور را می شناسند. به علاوه به دلیل پیام های دست دادن کمتر، پیاده سازی آن آسان تر است. تنها چیزی که به پروتکل اضافه می کنیم این است که بخشی از پیام را رمزگذاری کنیم و شناسه A را به قسمت رمزگذاری شده آخرین پیام منتقل کنیم:

    PubA، CookieA │ ╔══════════ ════════════════════ ─────────── ───── ────────── ────- ─────────── ───── ──────>│ ║SignPrvA، SignPubA = load()║ │ │ ║PrvA، PubA = DHgen() ═════ ═════════ ═════════ ════╝ │ pubb ، کوکی ، Enc ((IDB ، Sign (signprvb ، (Cookiea ، Cookieb ، Pubb)) ، Mac (IDB))) │ ╔═════ ╔═════ ╔═════ ╔═════ ╔═════ ╔═════ ╔═════ ═══════════════ ═══════╗ │<─── ────────── ───── ────────── ║SignP rvB، SignPubB = load()║ │ │ ║ PrvB، PubB = DHgen( ════════ ════════════════╝ │ │ ╔══════════════ ══╗ │ Enc((IdA, sign( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │───────────────────── ── ────────────────── . ─────────── ──────>│ ║تأیید (کلید، IdB) ║ │ │ ║ تأیید ═════ ══╝ │ │
    
    • GOST R برای امضا استفاده می شود 34.10-2012 الگوریتم با کلیدهای 256 بیتی.
    • برای تولید کلید عمومی، از 34.10/2012/XNUMX VKO استفاده می شود.
    • CMAC به عنوان MAC استفاده می شود. از نظر فنی، این یک حالت خاص از عملکرد یک رمز بلوکی است که در GOST R 34.13-2015 توضیح داده شده است. به عنوان یک تابع رمزگذاری برای این حالت - ملخ (34.12 2015).
    • هش کلید عمومی او به عنوان شناسه مخاطب استفاده می شود. به عنوان هش استفاده می شود Stribog-256 (34.11/2012/256 XNUMX بیت).

    پس از دست دادن، در مورد یک کلید مشترک به توافق خواهیم رسید. ما می توانیم از آن برای رمزگذاری تایید شده پیام های حمل و نقل استفاده کنیم. این بخش بسیار ساده است و اشتباه کردن آن دشوار است: شمارنده پیام را افزایش می دهیم، پیام را رمزگذاری می کنیم، شمارنده را تأیید می کنیم (MAC) و متن رمز شده، ارسال می کنیم. هنگام دریافت پیام، بررسی می کنیم که شمارنده مقدار مورد انتظار را داشته باشد، متن رمز را با شمارنده احراز هویت می کنیم و آن را رمزگشایی می کنیم. از چه کلیدی برای رمزگذاری پیام های دست دادن، انتقال پیام ها و نحوه احراز هویت آنها استفاده کنم؟ استفاده از یک کلید برای همه این وظایف خطرناک و غیرعاقلانه است. تولید کلیدها با استفاده از توابع تخصصی ضروری است KDF (تابع مشتق کلید). باز هم، بیایید موها را شکافته و چیزی اختراع نکنیم: HKDF مدتهاست که شناخته شده است، به خوبی تحقیق شده است و هیچ مشکل شناخته شده ای ندارد. متأسفانه کتابخانه بومی پایتون این تابع را ندارد، بنابراین ما از آن استفاده می کنیم hkdf کیسه پلاستیکی. HKDF داخلی استفاده می کند HMAC، که به نوبه خود از یک تابع هش استفاده می کند. اجرای نمونه در پایتون در صفحه ویکی‌پدیا فقط چند خط کد نیاز دارد. همانطور که در مورد 34.10/2012/256، ما از Stribog-XNUMX به عنوان تابع هش استفاده خواهیم کرد. خروجی تابع توافق کلید ما، کلید جلسه نامیده می شود، که از آن متقارن های گمشده تولید می شوند:

    kdf = Hkdf(None, key_session, hash=GOST34112012256)
    kdf.expand(b"handshake1-mac-identity")
    kdf.expand(b"handshake1-enc")
    kdf.expand(b"handshake1-mac")
    kdf.expand(b"handshake2-mac-identity")
    kdf.expand(b"handshake2-enc")
    kdf.expand(b"handshake2-mac")
    kdf.expand(b"transport-initiator-enc")
    kdf.expand(b"transport-initiator-mac")
    kdf.expand(b"transport-responder-enc")
    kdf.expand(b"transport-responder-mac")
    

    ساختارها/طرح ها

    بیایید ببینیم که اکنون چه ساختارهای ASN.1 برای انتقال همه این داده ها داریم:

    class Msg(Choice):
        schema = ((
            ("text", MsgText()),
            ("handshake0", MsgHandshake0(expl=tag_ctxc(0))),
            ("handshake1", MsgHandshake1(expl=tag_ctxc(1))),
            ("handshake2", MsgHandshake2(expl=tag_ctxc(2))),
        ))
    
    class MsgText(Sequence):
        schema = ((
            ("payload", MsgTextPayload()),
            ("payloadMac", MAC()),
        ))
    
    class MsgTextPayload(Sequence):
        schema = ((
            ("nonce", Integer(bounds=(0, float("+inf")))),
            ("ciphertext", OctetString(bounds=(1, MaxTextLen))),
        ))
    
    class MsgHandshake0(Sequence):
        schema = ((
            ("cookieInitiator", Cookie()),
            ("pubKeyInitiator", PubKey()),
        ))
    
    class MsgHandshake1(Sequence):
        schema = ((
            ("cookieResponder", Cookie()),
            ("pubKeyResponder", PubKey()),
            ("ukm", OctetString(bounds=(8, 8))),
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class MsgHandshake2(Sequence):
        schema = ((
            ("ciphertext", OctetString()),
            ("ciphertextMac", MAC()),
        ))
    
    class HandshakeTBE(Sequence):
        schema = ((
            ("identity", OctetString(bounds=(32, 32))),
            ("signature", OctetString(bounds=(64, 64))),
            ("identityMac", MAC()),
        ))
    
    class HandshakeTBS(Sequence):
        schema = ((
            ("cookieTheir", Cookie()),
            ("cookieOur", Cookie()),
            ("pubKeyOur", PubKey()),
        ))
    
    class Cookie(OctetString): bounds = (16, 16)
    class PubKey(OctetString): bounds = (64, 64)
    class MAC(OctetString): bounds = (16, 16)
    

    HandshakeTBS چیزی است که امضا خواهد شد. HandshakeTBE - چه چیزی رمزگذاری خواهد شد. توجه شما را به فیلد ukm در MsgHandshake1 جلب می کنم. 34.10 VKO، برای تصادفی سازی حتی بیشتر کلیدهای تولید شده، شامل پارامتر UKM (مواد کلیدسازی کاربر) است - فقط آنتروپی اضافی.

    اضافه کردن رمزنگاری به کد

    بیایید فقط تغییرات ایجاد شده در کد اصلی را در نظر بگیریم، زیرا چارچوب یکسان باقی مانده است (در واقع، ابتدا پیاده سازی نهایی نوشته شد و سپس تمام رمزنگاری از آن بریده شد).

    از آنجایی که احراز هویت و شناسایی مخاطبین با استفاده از کلیدهای عمومی انجام می شود، اکنون باید برای مدت طولانی در جایی ذخیره شوند. برای سادگی، ما از JSON به صورت زیر استفاده می کنیم:

    {
        "our": {
            "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98",
            "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1"
        },
        "their": {
            "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce",
            "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a"
        }
    }
    

    ما - جفت کلید ما، کلیدهای خصوصی و عمومی هگزادسیمال. آنها - نام مخاطبین و کلیدهای عمومی آنها. بیایید آرگومان های خط فرمان را تغییر دهیم و پردازش پس از داده های JSON را اضافه کنیم:

    from pygost import gost3410
    from pygost.gost34112012256 import GOST34112012256
    
    CURVE = gost3410.GOST3410Curve(
        *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"]
    )
    
    parser = argparse.ArgumentParser(description="GOSTIM")
    parser.add_argument(
        "--keys-gen",
        action="store_true",
        help="Generate JSON with our new keypair",
    )
    parser.add_argument(
        "--keys",
        default="keys.json",
        required=False,
        help="JSON with our and their keys",
    )
    parser.add_argument(
        "--bind",
        default="::1",
        help="Address to listen on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=6666,
        help="Port to listen on",
    )
    args = parser.parse_args()
    
    if args.keys_gen:
        prv_raw = urandom(32)
        pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw))
        pub_raw = gost3410.pub_marshal(pub)
        print(json.dumps({
            "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)},
            "their": {},
        }))
        exit(0)
    
    # Parse and unmarshal our and their keys {{{
    with open(args.keys, "rb") as fd:
        _keys = json.loads(fd.read().decode("utf-8"))
    KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"]))
    _pub = hexdec(_keys["our"]["pub"])
    KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub)
    KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest())
    for peer_name, pub_raw in _keys["their"].items():
        _pub = hexdec(pub_raw)
        KEYS[GOST34112012256(_pub).digest()] = {
            "name": peer_name,
            "pub": gost3410.pub_unmarshal(_pub),
        }
    # }}}
    

    کلید خصوصی الگوریتم 34.10 یک عدد تصادفی است. اندازه 256 بیت برای منحنی های بیضوی 256 بیتی. PyGOST با مجموعه ای از بایت ها کار نمی کند، بلکه با اعداد بزرگ، بنابراین کلید خصوصی ما (urandom(32)) باید با استفاده از gost3410.prv_unmarshal () به عدد تبدیل شود. کلید عمومی به طور قطعی از کلید خصوصی با استفاده از gost3410.public_key () تعیین می شود. کلید عمومی 34.10 دو عدد بزرگ است که همچنین برای سهولت در ذخیره سازی و انتقال با استفاده از gost3410.pub_marshal باید به یک دنباله بایت تبدیل شوند.

    پس از خواندن فایل JSON، کلیدهای عمومی بر این اساس باید با استفاده از gost3410.pub_unmarshal () دوباره تبدیل شوند. از آنجایی که ما شناسه های مخاطبین را به صورت هش از کلید عمومی دریافت می کنیم، می توان آنها را بلافاصله از قبل محاسبه کرد و برای جستجوی سریع در فرهنگ لغت قرار داد. هش Stribog-256 gost34112012256.GOST34112012256 است که به طور کامل رابط هشلب توابع هش را برآورده می کند.

    روال آغازگر چگونه تغییر کرده است؟ همه چیز مطابق با طرح دست دادن است: ما یک کوکی (128 بیتی زیاد است)، یک جفت کلید زودگذر 34.10 تولید می کنیم که برای تابع توافق کلید VKO استفاده می شود.

     395 async def initiator(host, port):
     396     _id = repr((host, port))
     397     logging.info("%s: dialing", _id)
     398     reader, writer = await asyncio.open_connection(host, port)
     399     # Generate our ephemeral public key and cookie, send Handshake 0 message {{{
     400     cookie_our = Cookie(urandom(16))
     401     prv = gost3410.prv_unmarshal(urandom(32))
     402     pub_our = gost3410.public_key(CURVE, prv)
     403     pub_our_raw = PubKey(gost3410.pub_marshal(pub_our))
     404     writer.write(Msg(("handshake0", MsgHandshake0((
     405         ("cookieInitiator", cookie_our),
     406         ("pubKeyInitiator", pub_our_raw),
     407     )))).encode())
     408     # }}}
     409     await writer.drain()
    

    • ما منتظر پاسخ هستیم و پیام دریافتی را رمزگشایی می کنیم.
    • مطمئن شوید که دست دادن1;
    • رمزگشایی کلید عمومی زودگذر طرف مقابل و محاسبه کلید جلسه.
    • ما کلیدهای متقارن لازم برای پردازش قسمت TBE پیام را تولید می کنیم.

     423     logging.info("%s: got %s message", _id, msg.choice)
     424     if msg.choice != "handshake1":
     425         logging.warning("%s: unexpected message, disconnecting", _id)
     426         writer.close()
     427         return
     428     # }}}
     429     msg_handshake1 = msg.value
     430     # Validate Handshake message {{{
     431     cookie_their = msg_handshake1["cookieResponder"]
     432     pub_their_raw = msg_handshake1["pubKeyResponder"]
     433     pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw))
     434     ukm_raw = bytes(msg_handshake1["ukm"])
     435     ukm = ukm_unmarshal(ukm_raw)
     436     key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001)
     437     kdf = Hkdf(None, key_session, hash=GOST34112012256)
     438     key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity")
     439     key_handshake1_enc = kdf.expand(b"handshake1-enc")
     440     key_handshake1_mac = kdf.expand(b"handshake1-mac")
    

    UKM یک عدد 64 بیتی است (urandom(8))، که همچنین نیاز به deserialization از نمایش بایت آن با استفاده از gost3410_vko.ukm_unmarshal() دارد. تابع VKO برای 34.10/2012/256 3410 بیتی gost34102012256_vko.kek_XNUMX() (KEK - کلید رمزگذاری) است.

    کلید جلسه تولید شده در حال حاضر یک توالی بایت شبه تصادفی 256 بیتی است. بنابراین، می توان آن را بلافاصله در توابع HKDF استفاده کرد. از آنجایی که GOST34112012256 رابط هشلب را برآورده می کند، می توان آن را بلافاصله در کلاس Hkdf استفاده کرد. ما salt (اول آرگومان Hkdf) را مشخص نمی‌کنیم، زیرا کلید تولید شده، به دلیل زودگذر بودن جفت‌های کلیدی شرکت‌کننده، برای هر جلسه متفاوت خواهد بود و از قبل دارای آنتروپی کافی است. ()kdf.expand به طور پیش فرض کلیدهای 256 بیتی مورد نیاز برای Grasshopper را در آینده تولید می کند.

    سپس قسمت های TBE و TBS پیام دریافتی بررسی می شوند:

    • MAC روی متن رمز ورودی محاسبه و بررسی می شود.
    • متن رمز رمزگشایی شده است.
    • ساختار TBE رمزگشایی شده است.
    • شناسه مخاطب از آن گرفته می شود و بررسی می شود که آیا او اصلاً برای ما شناخته شده است یا خیر.
    • MAC روی این شناسه محاسبه و بررسی می شود.
    • امضای ساختار TBS تأیید می شود که شامل کوکی هر دو طرف و کلید موقت عمومی طرف مقابل است. امضا با کلید امضای طولانی مدت طرف مقابل تأیید می شود.

     441     try:
     442         peer_name = validate_tbe(
     443             msg_handshake1,
     444             key_handshake1_mac_identity,
     445             key_handshake1_enc,
     446             key_handshake1_mac,
     447             cookie_our,
     448             cookie_their,
     449             pub_their_raw,
     450         )
     451     except ValueError as err:
     452         logging.warning("%s: %s, disconnecting", _id, err)
     453         writer.close()
     454         return
     455     # }}}
    
     128 def validate_tbe(
     129         msg_handshake: Union[MsgHandshake1, MsgHandshake2],
     130         key_mac_identity: bytes,
     131         key_enc: bytes,
     132         key_mac: bytes,
     133         cookie_their: Cookie,
     134         cookie_our: Cookie,
     135         pub_key_our: PubKey,
     136 ) -> str:
     137     ciphertext = bytes(msg_handshake["ciphertext"])
     138     mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext)
     139     if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])):
     140         raise ValueError("invalid MAC")
     141     plaintext = ctr(
     142         GOST3412Kuznechik(key_enc).encrypt,
     143         KUZNECHIK_BLOCKSIZE,
     144         ciphertext,
     145         8 * b"x00",
     146     )
     147     try:
     148         tbe, _ = HandshakeTBE().decode(plaintext)
     149     except ASN1Error:
     150         raise ValueError("can not decode TBE")
     151     key_sign_pub_hash = bytes(tbe["identity"])
     152     peer = KEYS.get(key_sign_pub_hash)
     153     if peer is None:
     154         raise ValueError("unknown identity")
     155     mac_tag = mac(
     156         GOST3412Kuznechik(key_mac_identity).encrypt,
     157         KUZNECHIK_BLOCKSIZE,
     158         key_sign_pub_hash,
     159     )
     160     if not compare_digest(mac_tag, bytes(tbe["identityMac"])):
     161         raise ValueError("invalid identity MAC")
     162     tbs = HandshakeTBS((
     163         ("cookieTheir", cookie_their),
     164         ("cookieOur", cookie_our),
     165         ("pubKeyOur", pub_key_our),
     166     ))
     167     if not gost3410.verify(
     168         CURVE,
     169         peer["pub"],
     170         GOST34112012256(tbs.encode()).digest(),
     171         bytes(tbe["signature"]),
     172     ):
     173         raise ValueError("invalid signature")
     174     return peer["name"]
    

    همانطور که در بالا نوشتم، 34.13/2015/XNUMX موارد مختلفی را توصیف می کند حالت های عملیات رمز را مسدود کنید از 34.12/2015/3413. در میان آنها حالتی برای تولید درج های تقلیدی و محاسبات MAC وجود دارد. در PyGOST این () gost34.12.mac است. این حالت مستلزم عبور تابع رمزگذاری (دریافت و برگرداندن یک بلوک داده)، اندازه بلوک رمزگذاری و در واقع خود داده است. چرا نمی توانید اندازه بلوک رمزگذاری را کد سخت کنید؟ 2015/128/64 نه تنها رمز XNUMX بیتی Grasshopper، بلکه XNUMX بیتی را نیز توصیف می کند. ماگما - یک GOST 28147-89 کمی تغییر یافته که در KGB ایجاد شده است و هنوز یکی از بالاترین آستانه های ایمنی را دارد.

    Kuznechik با فراخوانی gost.3412.GOST3412Kuznechik (کلید) مقداردهی اولیه می شود و یک شی با متدهای .encrypt()/.decrypt() مناسب برای ارسال به توابع 34.13 برمی گرداند. MAC به صورت زیر محاسبه می شود: gost3413.mac(GOST3412Kuznechik(key).رمزگذاری، KUZNECHIK_BLOCKSIZE، متن رمزی). برای مقایسه MAC محاسبه‌شده و دریافتی، نمی‌توانید از مقایسه معمول (==) رشته‌های بایت استفاده کنید، زیرا این عملیات زمان مقایسه را نشت می‌کند، که در حالت کلی، می‌تواند منجر به آسیب‌پذیری‌های کشنده مانند شود. BEAST حملات به TLS پایتون یک تابع ویژه به نام hmac.compare_digest برای این کار دارد.

    تابع رمز بلوک فقط می تواند یک بلوک از داده ها را رمزگذاری کند. برای تعداد بیشتر و حتی نه مضربی از طول، استفاده از حالت رمزگذاری ضروری است. 34.13-2015 موارد زیر را شرح می دهد: ECB، CTR، OFB، CBC، CFB. هر کدام حوزه های کاربردی و ویژگی های قابل قبول خود را دارند. متاسفانه هنوز استاندارد نشده ایم حالت های رمزگذاری تایید شده (مانند CCM، OCB، GCM و مانند آن) - ما مجبوریم حداقل خودمان MAC را اضافه کنیم. من انتخاب می کنم حالت شمارنده (CTR): به اندازه بلوک نیازی ندارد، می تواند موازی شود، فقط از عملکرد رمزگذاری استفاده می کند، می تواند به طور ایمن برای رمزگذاری تعداد زیادی پیام استفاده شود (برخلاف CBC که برخورد نسبتاً سریع دارد).

    مانند .mac()، .ctr() ورودی مشابهی می گیرد: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).رمزگذاری، KUZNECHIK_BLOCKSIZE، متن ساده، iv). لازم است یک بردار اولیه که دقیقاً نصف طول بلوک رمزگذاری باشد را مشخص کنید. اگر کلید رمزگذاری ما فقط برای رمزگذاری یک پیام (البته از چندین بلوک) استفاده می‌شود، آنگاه می‌توان یک بردار اولیه صفر را تنظیم کرد. برای رمزگذاری پیام های دست دادن، هر بار از یک کلید جداگانه استفاده می کنیم.

    تأیید امضای gost3410.verify () بی اهمیت است: ما از منحنی بیضی که در آن کار می کنیم عبور می کنیم (به سادگی آن را در پروتکل GOSTIM خود ثبت می کنیم)، کلید عمومی امضاکننده (فراموش نکنید که این باید دو تایی باشد. اعداد بزرگ، و نه یک رشته بایت)، هش 34.11/2012/XNUMX و خود امضا.

    در مرحله بعد، در آغازگر ما یک پیام handshake را آماده می کنیم و به handshake2 می فرستیم، همان اقداماتی را که در حین تأیید انجام دادیم، فقط به صورت متقارن انجام می دهیم: امضا کردن روی کلیدهای خود به جای بررسی و غیره...

     456     # Prepare and send Handshake 2 message {{{
     457     tbs = HandshakeTBS((
     458         ("cookieTheir", cookie_their),
     459         ("cookieOur", cookie_our),
     460         ("pubKeyOur", pub_our_raw),
     461     ))
     462     signature = gost3410.sign(
     463         CURVE,
     464         KEY_OUR_SIGN_PRV,
     465         GOST34112012256(tbs.encode()).digest(),
     466     )
     467     key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity")
     468     mac_tag = mac(
     469         GOST3412Kuznechik(key_handshake2_mac_identity).encrypt,
     470         KUZNECHIK_BLOCKSIZE,
     471         bytes(KEY_OUR_SIGN_PUB_HASH),
     472     )
     473     tbe = HandshakeTBE((
     474         ("identity", KEY_OUR_SIGN_PUB_HASH),
     475         ("signature", OctetString(signature)),
     476         ("identityMac", MAC(mac_tag)),
     477     ))
     478     tbe_raw = tbe.encode()
     479     key_handshake2_enc = kdf.expand(b"handshake2-enc")
     480     key_handshake2_mac = kdf.expand(b"handshake2-mac")
     481     ciphertext = ctr(
     482         GOST3412Kuznechik(key_handshake2_enc).encrypt,
     483         KUZNECHIK_BLOCKSIZE,
     484         tbe_raw,
     485         8 * b"x00",
     486     )
     487     mac_tag = mac(
     488         GOST3412Kuznechik(key_handshake2_mac).encrypt,
     489         KUZNECHIK_BLOCKSIZE,
     490         ciphertext,
     491     )
     492     writer.write(Msg(("handshake2", MsgHandshake2((
     493         ("ciphertext", OctetString(ciphertext)),
     494         ("ciphertextMac", MAC(mac_tag)),
     495     )))).encode())
     496     # }}}
     497     await writer.drain()
     498     logging.info("%s: session established: %s", _id, peer_name)
     

    هنگامی که جلسه برقرار شد، کلیدهای انتقال تولید می شوند (یک کلید جداگانه برای رمزگذاری، برای احراز هویت، برای هر یک از طرفین)، و Grasshopper برای رمزگشایی و بررسی MAC مقداردهی اولیه می شود:

     499     # Run text message sender, initialize transport decoder {{{
     500     key_initiator_enc = kdf.expand(b"transport-initiator-enc")
     501     key_initiator_mac = kdf.expand(b"transport-initiator-mac")
     502     key_responder_enc = kdf.expand(b"transport-responder-enc")
     503     key_responder_mac = kdf.expand(b"transport-responder-mac")
     ...
     509     asyncio.ensure_future(msg_sender(
     510         peer_name,
     511         key_initiator_enc,
     512         key_initiator_mac,
     513         writer,
     514     ))
     515     encrypter = GOST3412Kuznechik(key_responder_enc).encrypt
     516     macer = GOST3412Kuznechik(key_responder_mac).encrypt
     517     # }}}
     519     nonce_expected = 0
    
     520     # Wait for test messages {{{
     521     while True:
     522         data = await reader.read(MaxMsgLen)
     ...
     530             msg, tail = Msg().decode(buf)
     ...
     537         try:
     538             await msg_receiver(
     539                 msg.value,
     540                 nonce_expected,
     541                 macer,
     542                 encrypter,
     543                 peer_name,
     544             )
     545         except ValueError as err:
     546             logging.warning("%s: %s", err)
     547             break
     548         nonce_expected += 1
     549     # }}}
    

    msg_sender coroutine اکنون پیام‌ها را قبل از ارسال در اتصال TCP رمزگذاری می‌کند. هر پیام دارای یک نونس در حال افزایش یکنواخت است که هنگام رمزگذاری در حالت شمارنده، بردار اولیه است. هر بلوک پیام و پیام دارای یک مقدار متمایز متفاوت است.

    async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None:
        nonce = 0
        encrypter = GOST3412Kuznechik(key_enc).encrypt
        macer = GOST3412Kuznechik(key_mac).encrypt
        in_queue = IN_QUEUES[peer_name]
        while True:
            text = await in_queue.get()
            if text is None:
                break
            ciphertext = ctr(
                encrypter,
                KUZNECHIK_BLOCKSIZE,
                text.encode("utf-8"),
                long2bytes(nonce, 8),
            )
            payload = MsgTextPayload((
                ("nonce", Integer(nonce)),
                ("ciphertext", OctetString(ciphertext)),
            ))
            mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
            writer.write(Msg(("text", MsgText((
                ("payload", payload),
                ("payloadMac", MAC(mac_tag)),
            )))).encode())
            nonce += 1
    

    پیام‌های دریافتی توسط برنامه msg_receiver پردازش می‌شوند که احراز هویت و رمزگشایی را مدیریت می‌کند:

    async def msg_receiver(
            msg_text: MsgText,
            nonce_expected: int,
            macer,
            encrypter,
            peer_name: str,
    ) -> None:
        payload = msg_text["payload"]
        if int(payload["nonce"]) != nonce_expected:
            raise ValueError("unexpected nonce value")
        mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode())
        if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])):
            raise ValueError("invalid MAC")
        plaintext = ctr(
            encrypter,
            KUZNECHIK_BLOCKSIZE,
            bytes(payload["ciphertext"]),
            long2bytes(nonce_expected, 8),
        )
        text = plaintext.decode("utf-8")
        await OUT_QUEUES[peer_name].put(text)
    

    نتیجه

    GOSTIM در نظر گرفته شده است که به طور انحصاری برای اهداف آموزشی مورد استفاده قرار گیرد (زیرا حداقل تحت پوشش آزمایشی نیست)! کد منبع برنامه قابل دانلود است اینجا (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа GoGOST, PyDERASN, NCCP, GoVPN، GOSTIM کاملاً است نرم افزار رایگانتحت شرایط توزیع شده است GPLv3 +.

    سرگئی ماتویف, سایفرپانک، عضو بنیاد SPO, Python/Go-developer، متخصص ارشد شرکت واحد ایالتی فدرال "STC "اطلس".

منبع: www.habr.com

اضافه کردن نظر