توسعه دهنده بودن کتابخانهها (اولیههای رمزنگاری GOST در پایتون خالص)، اغلب سؤالاتی در مورد نحوه پیادهسازی سادهترین پیامرسانی امن روی زانو دریافت میکنم. بسیاری از مردم رمزنگاری کاربردی را بسیار ساده میدانند و فراخوانی .encrypt() روی یک رمز بلاک برای ارسال امن آن از طریق یک کانال ارتباطی کافی است. برخی دیگر معتقدند که رمزنگاری کاربردی سرنوشت معدودی است و قابل قبول است که شرکتهای ثروتمندی مانند تلگرام با ریاضیدانان المپیاد پروتکل امن
همه اینها مرا بر آن داشت تا این مقاله را بنویسم تا نشان دهم پیاده سازی پروتکل های رمزنگاری و IM امن کار چندان دشواری نیست. با این حال، ارزش اختراع پروتکل های تأیید هویت و توافق کلیدی خود را ندارد.

مقاله خواهد نوشت , , پیام رسان فوری با پروتکل احراز هویت و توافق کلید (بر اساس آن اجرا می شود ) منحصراً با استفاده از الگوریتم های رمزنگاری GOST کتابخانه PyGOST و کتابخانه رمزگذاری پیام ASN.1 (که قبلاً در مورد آن ). یک پیش نیاز: باید آنقدر ساده باشد که بتوان آن را از ابتدا در یک عصر (یا یک روز کاری) نوشت، در غیر این صورت دیگر یک برنامه ساده نیست. احتمالاً دارای خطاها، پیچیدگی های غیر ضروری، کاستی هایی است، به علاوه این اولین برنامه من است که از کتابخانه asyncio استفاده می کنم.
طراحی IM
ابتدا باید بفهمیم IM ما چگونه خواهد بود. برای سادگی، اجازه دهید یک شبکه همتا به همتا باشد، بدون هیچ گونه کشفی از شرکت کنندگان. ما شخصاً نشان خواهیم داد که به کدام آدرس: پورت برای برقراری ارتباط با طرف مقابل متصل شویم.
من درک می کنم که در حال حاضر، این فرض که ارتباط مستقیم بین دو رایانه دلخواه در دسترس است، محدودیت قابل توجهی در کاربرد IM در عمل است. اما هرچه توسعهدهندگان بیشتر انواع عصاهای NAT-traversal را پیادهسازی کنند، مدت بیشتری در اینترنت IPv4 باقی میمانیم، با احتمال ناامیدکننده ارتباط بین رایانههای دلخواه. تا کی می توانید کمبود IPv6 را در خانه و محل کار تحمل کنید؟
ما یک شبکه دوست به دوست خواهیم داشت: همه طرف های ممکن باید از قبل شناخته شوند. اولا، این همه چیز را بسیار ساده می کند: ما خودمان را معرفی کردیم، نام/کلید را پیدا کردیم یا پیدا نکردیم، با شناخت طرف مقابل، ارتباط را قطع کردیم یا به کار ادامه دادیم. دوم اینکه به طور کلی ایمن است و بسیاری از حملات را از بین می برد.
رابط IM نزدیک به راه حل های کلاسیک خواهد بود که من واقعاً آن را به خاطر مینیمالیسم و فلسفه یونیکس راهشان دوست دارم. برنامه IM یک دایرکتوری با سه سوکت دامنه یونیکس برای هر مخاطب ایجاد می کند:
- در - پیام های ارسال شده به همکار در آن ثبت می شود.
- خارج - پیام های دریافت شده از طرف گفتگو از آن خوانده می شود.
- حالت - با خواندن از آن، متوجه می شویم که آیا مخاطب در حال حاضر متصل است، آدرس / پورت اتصال.
علاوه بر این، یک سوکت اتصال ایجاد میشود، با نوشتن پورت میزبان که در آن یک اتصال به مخاطب راه دور را آغاز میکنیم.
|-- alice
| |-- in
| |-- out
| `-- state
|-- bob
| |-- in
| |-- out
| `-- state
`- conn
این رویکرد به شما امکان می دهد تا پیاده سازی های مستقلی از حمل و نقل IM و رابط کاربری انجام دهید، زیرا هیچ دوستی وجود ندارد، شما نمی توانید همه را راضی نگه دارید. استفاده كردن و یا ، می توانید یک رابط چند پنجره ای با برجسته سازی نحو دریافت کنید. و با کمک می توانید یک خط ورودی پیام سازگار با Readline گنو دریافت کنید.
در واقع، پروژه های بدون مکش از فایل های FIFO استفاده می کنند. شخصاً نمیتوانستم بفهمم که چگونه با فایلها به صورت رقابتی در asyncio کار کنم، بدون اینکه پسزمینه دستنویسی از رشتههای اختصاصی وجود داشته باشد (من مدت طولانی است که از این زبان برای چنین مواردی استفاده میکنم. ). بنابراین، تصمیم گرفتم به سوکت های دامنه یونیکس بسنده کنم. متأسفانه، این کار انجام echo 2001:470:dead::babe 6666 > conn را غیرممکن می کند. با استفاده از این مشکل حل کردم : echo 2001:470:dead::babe 6666 | socat - UNIX-CONNECT:conn، socat READLINE UNIX-CONNECT:alice/in.
پروتکل ناامن اصلی
TCP به عنوان حمل و نقل استفاده می شود: تحویل و سفارش آن را تضمین می کند. UDP هیچکدام را تضمین نمی کند (که در هنگام استفاده از رمزنگاری مفید خواهد بود)، بلکه پشتیبانی می کند پایتون از جعبه خارج نمی شود.
متأسفانه، در TCP هیچ مفهومی از پیام وجود ندارد، فقط یک جریان از بایت ها وجود دارد. بنابراین لازم است قالبی برای پیام ها ارائه شود تا در این تاپیک بین خودشان به اشتراک گذاشته شود. ما می توانیم با استفاده از کاراکتر فید خط موافقت کنیم. برای شروع خوب است، اما هنگامی که ما شروع به رمزگذاری پیام های خود می کنیم، این کاراکتر ممکن است در هر جایی از متن رمزگذاری شده ظاهر شود. بنابراین در شبکه ها، پروتکل های محبوب پروتکل هایی هستند که ابتدا طول پیام را بر حسب بایت ارسال می کنند. به عنوان مثال، خارج از جعبه پایتون دارای xdrlib است که به شما امکان می دهد با یک فرمت مشابه کار کنید .
ما با خواندن TCP به درستی و کارآمد کار نخواهیم کرد - ما کد را ساده می کنیم. ما داده ها را از سوکت در یک حلقه بی پایان می خوانیم تا زمانی که پیام کامل را رمزگشایی کنیم. JSON با XML نیز می تواند به عنوان قالبی برای این رویکرد استفاده شود. اما وقتی رمزنگاری اضافه میشود، دادهها باید امضا و احراز هویت شوند - و این نیاز به نمایش یکسان بایت به بایت از اشیا دارد، که JSON/XML ارائه نمیکند (نتایج تخلیه ممکن است متفاوت باشد).
XDR برای این کار مناسب است، با این حال من ASN.1 را با رمزگذاری DER و کتابخانه، از آنجایی که ما اشیاء سطح بالایی را در دسترس خواهیم داشت که اغلب کار کردن با آنها دلپذیرتر و راحت تر است. بر خلاف طرحواره , یا ، ASN.1 به طور خودکار داده ها را در برابر یک طرحواره کدگذاری شده بررسی می کند.
# Msg ::= CHOICE {
# text MsgText,
# handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
schema = ((
("text", MsgText()),
("handshake", MsgHandshake(expl=tag_ctxc(0))),
))
# MsgText ::= SEQUENCE {
# text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
schema = ((
("text", UTF8String(bounds=(1, MaxTextLen))),
))
# MsgHandshake ::= SEQUENCE {
# peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
schema = ((
("peerName", UTF8String(bounds=(1, 256))),
))
پیام دریافتی به صورت Msg خواهد بود: یا یک MsgText متنی (در حال حاضر با یک فیلد متنی) یا یک پیام دست دادن MsgHandshake (که حاوی نام مخاطب است). اکنون بیش از حد پیچیده به نظر می رسد، اما این پایه ای برای آینده است.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───┬─ IdA) │ │───────── ────────>│ │ │ │MsgHandshake(IdB) │ │ MsgText() │ │──── MsgText() │ │ │
IM بدون رمزنگاری
همانطور که قبلاً گفتم، کتابخانه asyncio برای همه عملیات سوکت استفاده خواهد شد. بیایید آنچه را که در راه اندازی انتظار داریم اعلام کنیم:
parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
"--our-name",
required=True,
help="Our peer name",
)
parser.add_argument(
"--their-names",
required=True,
help="Their peer names, comma-separated",
)
parser.add_argument(
"--bind",
default="::1",
help="Address to listen on",
)
parser.add_argument(
"--port",
type=int,
default=6666,
help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))
نام خود را تنظیم کنید (--ما-نام آلیس). همه مخاطبین مورد انتظار با کاما از هم جدا شده اند (-نام آنها bob,eve). برای هر یک از مخاطبین، یک دایرکتوری با سوکت های یونیکس و همچنین یک برنامه برای هر یک از حالت های ورودی، خروجی ایجاد می شود:
for peer_name in THEIR_NAMES:
makedirs(peer_name, mode=0o700, exist_ok=True)
out_queue = asyncio.Queue()
OUT_QUEUES[peer_name] = out_queue
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_out_processor, out_queue=out_queue),
path.join(peer_name, "out"),
))
in_queue = asyncio.Queue()
IN_QUEUES[peer_name] = in_queue
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_in_processor, in_queue=in_queue),
path.join(peer_name, "in"),
))
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_state_processor, peer_name=peer_name),
path.join(peer_name, "state"),
))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))
پیامهایی که از سوی کاربر از سوکت ورودی میآیند به صف IN_QUEUES ارسال میشوند:
async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
while True:
text = await reader.read(MaxTextLen)
if text == b"":
break
await in_queue.put(text.decode("utf-8"))
پیامهایی که از طرف صحبتکنندگان میآیند به صفهای OUT_QUEUES ارسال میشوند که دادهها از آنها به سوکت خروجی نوشته میشوند:
async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
while True:
text = await out_queue.get()
writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
await writer.drain()
هنگام خواندن از سوکت حالت، برنامه آدرس مخاطب را در فرهنگ لغت PEER_ALIVE جستجو می کند. اگر هنوز ارتباطی با مخاطب وجود نداشته باشد، یک خط خالی نوشته می شود.
async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
peer_writer = PEER_ALIVES.get(peer_name)
writer.write(
b"" if peer_writer is None else (" ".join([
str(i) for i in peer_writer.get_extra_info("peername")[:2]
]).encode("utf-8") + b"n")
)
await writer.drain()
writer.close()
هنگام نوشتن یک آدرس در یک سوکت اتصال، عملکرد "Initiator" اتصال راه اندازی می شود:
async def unixsock_conn_processor(reader, writer) -> None:
data = await reader.read(256)
writer.close()
host, port = data.decode("utf-8").split(" ")
await initiator(host=host, port=int(port))
بیایید آغازگر را در نظر بگیریم. ابتدا یک اتصال به میزبان/پورت مشخص شده را باز می کند و یک پیام دست دادن با نام آن ارسال می کند:
130 async def initiator(host, port):
131 _id = repr((host, port))
132 logging.info("%s: dialing", _id)
133 reader, writer = await asyncio.open_connection(host, port)
134 # Handshake message {{{
135 writer.write(Msg(("handshake", MsgHandshake((
136 ("peerName", OUR_NAME),
137 )))).encode())
138 # }}}
139 await writer.drain()
سپس منتظر پاسخ طرف راه دور می ماند. سعی می کند پاسخ دریافتی را با استفاده از طرح Msg ASN.1 رمزگشایی کند. ما فرض می کنیم که کل پیام در یک بخش TCP ارسال می شود و هنگام فراخوانی .read() آن را به صورت اتمی دریافت می کنیم. بررسی می کنیم که پیام دست دادن را دریافت کرده ایم.
141 # Wait for Handshake message {{{
142 data = await reader.read(256)
143 if data == b"":
144 logging.warning("%s: no answer, disconnecting", _id)
145 writer.close()
146 return
147 try:
148 msg, _ = Msg().decode(data)
149 except ASN1Error:
150 logging.warning("%s: undecodable answer, disconnecting", _id)
151 writer.close()
152 return
153 logging.info("%s: got %s message", _id, msg.choice)
154 if msg.choice != "handshake":
155 logging.warning("%s: unexpected message, disconnecting", _id)
156 writer.close()
157 return
158 # }}}
ما بررسی می کنیم که نام دریافتی مخاطب برای ما شناخته شده باشد. اگر نه، پس ما اتصال را قطع می کنیم. ما بررسی می کنیم که آیا قبلاً با او ارتباط برقرار کرده ایم (مصاحبه دوباره دستور اتصال به ما را داد) و آن را می بندیم. صف IN_QUEUES رشتههای پایتون را با متن پیام نگه میدارد، اما مقدار ویژهای None دارد که به coroutine msg_sender سیگنال میدهد که کار را متوقف کند تا نویسندهاش مرتبط با اتصال TCP قدیمی را فراموش کند.
159 msg_handshake = msg.value
160 peer_name = str(msg_handshake["peerName"])
161 if peer_name not in THEIR_NAMES:
162 logging.warning("unknown peer name: %s", peer_name)
163 writer.close()
164 return
165 logging.info("%s: session established: %s", _id, peer_name)
166 # Run text message sender, initialize transport decoder {{{
167 peer_alive = PEER_ALIVES.pop(peer_name, None)
168 if peer_alive is not None:
169 peer_alive.close()
170 await IN_QUEUES[peer_name].put(None)
171 PEER_ALIVES[peer_name] = writer
172 asyncio.ensure_future(msg_sender(peer_name, writer))
173 # }}}
msg_sender پیامهای خروجی را میپذیرد (از یک سوکت در صف قرار میگیرد)، آنها را به صورت یک پیام MsgText سریال میکند و از طریق یک اتصال TCP ارسال میکند. هر لحظه ممکن است بشکند - ما به وضوح این را رهگیری می کنیم.
async def msg_sender(peer_name: str, writer) -> None:
in_queue = IN_QUEUES[peer_name]
while True:
text = await in_queue.get()
if text is None:
break
writer.write(Msg(("text", MsgText((
("text", UTF8String(text)),
)))).encode())
try:
await writer.drain()
except ConnectionResetError:
del PEER_ALIVES[peer_name]
return
logging.info("%s: sent %d characters message", peer_name, len(text))
در پایان، آغازگر یک حلقه بی نهایت از خواندن پیام ها را از سوکت وارد می کند. بررسی میکند که آیا این پیامها پیامهای متنی هستند یا خیر و آنها را در صف OUT_QUEUES قرار میدهد که از آنجا به سوکت خروجی مخاطب مربوطه ارسال میشود. چرا نمی توانید فقط .read() را انجام دهید و پیام را رمزگشایی کنید؟ زیرا ممکن است چندین پیام از کاربر در بافر سیستم عامل جمع شده و در یک بخش TCP ارسال شود. ما میتوانیم اولین مورد را رمزگشایی کنیم و سپس بخشی از مورد بعدی ممکن است در بافر باقی بماند. در صورت بروز هر گونه وضعیت غیرعادی، اتصال TCP را می بندیم و کوروتین msg_sender را متوقف می کنیم (با ارسال None به صف OUT_QUEUES).
174 buf = b""
175 # Wait for test messages {{{
176 while True:
177 data = await reader.read(MaxMsgLen)
178 if data == b"":
179 break
180 buf += data
181 if len(buf) > MaxMsgLen:
182 logging.warning("%s: max buffer size exceeded", _id)
183 break
184 try:
185 msg, tail = Msg().decode(buf)
186 except ASN1Error:
187 continue
188 buf = tail
189 if msg.choice != "text":
190 logging.warning("%s: unexpected %s message", _id, msg.choice)
191 break
192 try:
193 await msg_receiver(msg.value, peer_name)
194 except ValueError as err:
195 logging.warning("%s: %s", err)
196 break
197 # }}}
198 logging.info("%s: disconnecting: %s", _id, peer_name)
199 IN_QUEUES[peer_name].put(None)
200 writer.close()
66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
67 text = str(msg_text["text"])
68 logging.info("%s: received %d characters message", peer_name, len(text))
69 await OUT_QUEUES[peer_name].put(text)
به کد اصلی برگردیم. پس از ایجاد تمام کوروتین ها در زمان شروع برنامه، سرور TCP را راه اندازی می کنیم. برای هر اتصال ایجاد شده، یک کوروتین پاسخ دهنده ایجاد می کند.
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()
پاسخ دهنده شبیه آغازگر است و همه اقدامات مشابه را منعکس می کند، اما حلقه بی نهایت خواندن پیام ها برای سادگی، بلافاصله شروع می شود. در حال حاضر، پروتکل دست دادن یک پیام از هر طرف ارسال می کند، اما در آینده دو پیام از آغازگر اتصال وجود خواهد داشت، پس از آن پیام های متنی می توانند بلافاصله ارسال شوند.
72 async def responder(reader, writer):
73 _id = writer.get_extra_info("peername")
74 logging.info("%s: connected", _id)
75 buf = b""
76 msg_expected = "handshake"
77 peer_name = None
78 while True:
79 # Read until we get Msg message {{{
80 data = await reader.read(MaxMsgLen)
81 if data == b"":
82 logging.info("%s: closed connection", _id)
83 break
84 buf += data
85 if len(buf) > MaxMsgLen:
86 logging.warning("%s: max buffer size exceeded", _id)
87 break
88 try:
89 msg, tail = Msg().decode(buf)
90 except ASN1Error:
91 continue
92 buf = tail
93 # }}}
94 if msg.choice != msg_expected:
95 logging.warning("%s: unexpected %s message", _id, msg.choice)
96 break
97 if msg_expected == "text":
98 try:
99 await msg_receiver(msg.value, peer_name)
100 except ValueError as err:
101 logging.warning("%s: %s", err)
102 break
103 # Process Handshake message {{{
104 elif msg_expected == "handshake":
105 logging.info("%s: got %s message", _id, msg_expected)
106 msg_handshake = msg.value
107 peer_name = str(msg_handshake["peerName"])
108 if peer_name not in THEIR_NAMES:
109 logging.warning("unknown peer name: %s", peer_name)
110 break
111 writer.write(Msg(("handshake", MsgHandshake((
112 ("peerName", OUR_NAME),
113 )))).encode())
114 await writer.drain()
115 logging.info("%s: session established: %s", _id, peer_name)
116 peer_alive = PEER_ALIVES.pop(peer_name, None)
117 if peer_alive is not None:
118 peer_alive.close()
119 await IN_QUEUES[peer_name].put(None)
120 PEER_ALIVES[peer_name] = writer
121 asyncio.ensure_future(msg_sender(peer_name, writer))
122 msg_expected = "text"
123 # }}}
124 logging.info("%s: disconnecting", _id)
125 if msg_expected == "text":
126 IN_QUEUES[peer_name].put(None)
127 writer.close()
پروتکل امن
وقت آن است که ارتباطات خود را ایمن کنیم. منظور ما از امنیت چیست و چه می خواهیم:
- محرمانه بودن پیام های ارسالی؛
- صحت و یکپارچگی پیام های ارسال شده - تغییرات آنها باید شناسایی شود.
- محافظت در برابر حملات بازپخش - واقعیت پیام های مفقود یا مکرر باید شناسایی شود (و ما تصمیم می گیریم اتصال را خاتمه دهیم).
- شناسایی و احراز هویت طرفین با استفاده از کلیدهای عمومی از قبل وارد شده - ما قبلاً تصمیم گرفتیم که یک شبکه دوست به دوست ایجاد کنیم. تنها پس از احراز هویت، متوجه می شویم که با چه کسی در ارتباط هستیم.
- در دسترس بودن خواص (PFS) - به خطر انداختن کلید امضای طولانی مدت ما نباید منجر به توانایی خواندن تمام مکاتبات قبلی شود. ضبط ترافیک رهگیری شده بی فایده می شود.
- اعتبار/اعتبار پیام ها (حمل و نقل و دست دادن) فقط در یک جلسه TCP. درج پیامهای امضا شده/تأیید شده از یک جلسه دیگر (حتی با همان مخاطب) نباید امکان پذیر باشد.
- یک ناظر غیرفعال نباید شناسه های کاربر، کلیدهای عمومی با عمر طولانی ارسال شده یا هش های آنها را ببیند. ناشناس بودن مشخص از ناظر منفعل.
با کمال تعجب، تقریباً همه مایلند که این حداقل را در هر پروتکل دست دادنی داشته باشند، و تعداد بسیار کمی از موارد فوق در نهایت برای پروتکلهای «بومی» برآورده میشود. حالا ما چیز جدیدی اختراع نمی کنیم. من قطعا استفاده را توصیه می کنم برای ساخت پروتکلها، اما بیایید چیز سادهتری را انتخاب کنیم.
دو پروتکل محبوب عبارتند از:
- - یک پروتکل بسیار پیچیده با سابقه طولانی باگ ها، موانع، آسیب پذیری ها، تفکر ضعیف، پیچیدگی و کاستی ها (با این حال، این ارتباط چندانی با TLS 1.3 ندارد). اما ما آن را در نظر نمی گیریم زیرا بیش از حد پیچیده است.
- с - مشکلات رمزنگاری جدی ندارند، اگرچه آنها نیز ساده نیستند. اگر در مورد IKEv1 و IKEv2 خوانده اید، منبع آنها است پروتکل های ISO/IEC IS 9798-3 و SIGMA (SIGn-and-MAc) - به اندازه کافی ساده برای پیاده سازی در یک شب.
چه چیزی در مورد SIGMA به عنوان آخرین پیوند در توسعه پروتکل های STS/ISO خوب است؟ همه الزامات ما (از جمله "پنهان کردن" شناسه های همکار) را برآورده می کند و هیچ مشکل رمزنگاری شناخته شده ای ندارد. حداقلی است - حذف حداقل یک عنصر از پیام پروتکل منجر به ناامنی آن می شود.
بیایید از ساده ترین پروتکل خانگی به SIGMA برویم. اساسی ترین عملیات مورد علاقه ما این است : تابعی که برای هر دو شرکت کننده مقدار یکسانی را به خروجی می دهد که می تواند به عنوان یک کلید متقارن استفاده شود. بدون پرداختن به جزئیات: هر یک از طرفین یک جفت کلید زودگذر (که فقط در یک جلسه استفاده میشود) (کلیدهای عمومی و خصوصی) تولید میکنند، کلیدهای عمومی را مبادله میکنند، تابع توافق را فراخوانی میکنند، که کلید خصوصی خود و عمومی را به ورودی آن ارسال میکنند. کلید مخاطب
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───│A└─┘┘┬└─┘┘ │ ╔══════════ - ═ ════════ ═══════════╝ │ IdB, PubB │ ╔═════════════════════ │<───────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚══════════════════ ───┐ ╔════ ═══╧════════════╗ │ ║Key = DH(PrvA، PubB)║ <───┘═════════ ═══════ ════╝ │ │ │ │
هر کسی می تواند به وسط بپرد و کلیدهای عمومی را با کلیدهای خود جایگزین کند - در این پروتکل احراز هویت مخاطبین وجود ندارد. بیایید یک امضا با کلیدهای طولانی مدت اضافه کنیم.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───┬└─┘──┬└─┘ └┬ sign(SignPrvA, (PubA)) │ ╔═ │──────────── ────────── ────- PubA = load()║ │ │ ║PrvA، PubA = DHgen() ║ │ │ ╚═══════ ═══════ ══════════════ , sign(SignPrvB, (PubB)) │ ╔══════════════ ═══════ ════ ─────────── - ══════════ ══════════════ ══╝ ────┐ ╔ ════════ ═════╗ │ │ ║ تایید( SignPubB, ...)║ │ <───┘ ║Key = DH(Pr vA, PubB) ║ │ │ ╚════════════════════ ══╝ │ │ │
چنین امضایی کار نخواهد کرد، زیرا به یک جلسه خاص مرتبط نیست. چنین پیام هایی برای جلسات با سایر شرکت کنندگان نیز "مناسب" هستند. کل زمینه باید مشترک شود. این ما را مجبور می کند که پیام دیگری از A اضافه کنیم.
علاوه بر این، افزودن شناسه خود در زیر امضا بسیار مهم است، زیرا در غیر این صورت میتوانیم IdXXX را جایگزین کنیم و پیام را با کلید یک همکار شناخته شده دیگر دوباره امضا کنیم. برای جلوگیری از ، لازم است که عناصر زیر امضا در مکان های کاملاً مشخص با توجه به معنای آنها قرار گیرند: اگر A علامت می دهد (PubA, PubB) پس B باید امضا کند (PubB, PubA). این همچنین نشان دهنده اهمیت انتخاب ساختار و قالب داده های سریالی است. به عنوان مثال، مجموعهها در رمزگذاری ASN.1 DER مرتب شدهاند: SET OF(PubA، PubB) با SET OF(PubB، PubA) یکسان خواهد بود.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───│A└─┘┘┬└─┘┘ │ ╔══════════ ═════════════════╗ │──────── ────────── ─────────────>│ ║SignPrvA، SignPubA = load()║ │ │ ║PrvA، PubA = DHgen( ═ ═══════ ═══════════════╝ │IdB, PubB, sign(SignPrvB, (IdB, PubA, PubB)) │ ═════════ ═════ ════════════╗ │<────────────- ────────── ─────────│ ║SignPrvB، SignPubB = load()║ │ │ ║PrvB، PubB = DHgen() ║ │ ┐══════ ═ ════════ علامت ══════════╝ │ علامت(SignPrvA، (IdA، PubB، PubA)) ════╗ │─ ────────────────── ───>│ ║تأیید (SignPubB, ...) ║ │ │ ║ کلید = dh (prva، PUBB) ║ │ │ │
با این حال، ما هنوز "اثبات" نکرده ایم که کلید مشترک مشابهی را برای این جلسه ایجاد کرده ایم. در اصل، ما می توانیم بدون این مرحله انجام دهیم - اولین اتصال حمل و نقل نامعتبر خواهد بود، اما ما می خواهیم که وقتی دست دادن کامل شد، مطمئن باشیم که همه چیز واقعاً توافق شده است. در حال حاضر پروتکل ISO/IEC IS 9798-3 را در اختیار داریم.
ما می توانیم خود کلید تولید شده را امضا کنیم. این خطرناک است، زیرا ممکن است در الگوریتم امضای مورد استفاده نشتی وجود داشته باشد (حتی بیت در هر امضا، اما همچنان نشت کند). امضای هش کلید مشتق ممکن است، اما نشت حتی هش کلید مشتق شده میتواند در حمله brute-force به تابع مشتقسازی ارزشمند باشد. SIGMA از یک تابع MAC استفاده می کند که شناسه فرستنده را احراز هویت می کند.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───│A└─┘┘┬└─┘┘ │ ╔══════════ ═════════════════╗ │──────── ────────── ──────────────────>│ ║SignPrvA، SignPubA = بار()║ │ │ ── ╚ ═══════ ═══ │<───────────────── ──── ────────── ─│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ═════════ ═════════ ══╝ │ │ ╔════════════ ═════════╗ ═══╗ │ ║ کلید = DH( PvA, PubB) ║ │────────────────────── ────────── ─────>│ ║تأیید (کلید، IdB) ║ │ │ ║ تأیید (SignPubB، ...) ═════ ═╝ │ │
به عنوان یک بهینه سازی، برخی ممکن است بخواهند از کلیدهای زودگذر خود مجددا استفاده کنند (که البته برای PFS مایه تاسف است). به عنوان مثال، ما یک جفت کلید ایجاد کردیم، سعی کردیم متصل شویم، اما TCP در دسترس نبود یا جایی در وسط پروتکل قطع شد. حیف است که آنتروپی و منابع پردازشگر هدر رفته را برای یک جفت جدید هدر دهیم. بنابراین، ما به اصطلاح کوکی را معرفی می کنیم - یک مقدار شبه تصادفی که هنگام استفاده مجدد از کلیدهای عمومی زودگذر، از حملات احتمالی تکرار تصادفی محافظت می کند. به دلیل اتصال بین کوکی و کلید عمومی زودگذر، کلید عمومی طرف مقابل را می توان به عنوان غیر ضروری از امضا حذف کرد.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └───┬└─┘┘ └┬─ CookieA │ ╔════════ ═══════════════════╗ │─────── ────────── ────────────────── ─>│ ║SignPrvA، SignPubA = load( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚══════════════════════════ ══╝ │IdB، PubB، CookieB , sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔══════════════════════════ ═ ╗ │< ────────────────── ────────── - │ ╚══════ ═════════════════════╝ │ │ ╔════════ ═══════╗ │ علامت( SignPrvA، (CookieB، CookieA، PubA))، MAC(IdA) │ ║Key = DH(PrvA، PubB) ║ │───────────────── ─ ── ────────────────── ───────>│ ║ تأیید (کلید، شناسه) ║ │ │ ║ تأیید (SignPubB، ...)║ │ │ ╚═══════════════ │
در نهایت، ما می خواهیم حریم خصوصی شرکای مکالمه خود را از یک ناظر منفعل به دست آوریم. برای انجام این کار، SIGMA پیشنهاد میکند که ابتدا کلیدهای زودگذر را مبادله کرده و یک کلید مشترک ایجاد کند که روی آن پیامهای احراز هویت و شناسایی رمزگذاری شود. SIGMA دو گزینه را توضیح می دهد:
- SIGMA-I - آغازگر را از حملات فعال محافظت می کند، پاسخ دهنده را از حملات غیرفعال محافظت می کند: آغازگر پاسخ دهنده را احراز هویت می کند و اگر چیزی مطابقت نداشته باشد، شناسایی خود را اعلام نمی کند. اگر یک پروتکل فعال با او شروع شود، متهم هویت خود را اعلام می کند. ناظر منفعل چیزی یاد نمی گیرد.
SIGMA-R - از پاسخ دهنده در برابر حملات فعال، آغازگر از حملات غیرفعال محافظت می کند. همه چیز دقیقا برعکس است، اما در این پروتکل چهار پیام دست دادن قبلا منتقل شده است.ما SIGMA-I را انتخاب می کنیم زیرا بیشتر شبیه آنچه ما از چیزهای آشنای مشتری-سرور انتظار داریم است: مشتری فقط توسط سرور تأیید شده شناسایی می شود و همه از قبل سرور را می شناسند. به علاوه به دلیل پیام های دست دادن کمتر، پیاده سازی آن آسان تر است. تنها چیزی که به پروتکل اضافه می کنیم این است که بخشی از پیام را رمزگذاری کنیم و شناسه A را به قسمت رمزگذاری شده آخرین پیام منتقل کنیم:
PubA، CookieA │ ╔══════════ ════════════════════ ─────────── ───── ────────── ────- ─────────── ───── ──────>│ ║SignPrvA، SignPubA = load()║ │ │ ║PrvA، PubA = DHgen() ═════ ═════════ ═════════ ════╝ │ pubb ، کوکی ، Enc ((IDB ، Sign (signprvb ، (Cookiea ، Cookieb ، Pubb)) ، Mac (IDB))) │ ╔═════ ╔═════ ╔═════ ╔═════ ╔═════ ╔═════ ╔═════ ═══════════════ ═══════╗ │<─── ────────── ───── ────────── ║SignP rvB، SignPubB = load()║ │ │ ║ PrvB، PubB = DHgen( ════════ ════════════════╝ │ │ ╔══════════════ ══╗ │ Enc((IdA, sign( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │───────────────────── ── ────────────────── . ─────────── ──────>│ ║تأیید (کلید، IdB) ║ │ │ ║ تأیید ═════ ══╝ │ │
- GOST R برای امضا استفاده می شود الگوریتم با کلیدهای 256 بیتی.
- برای تولید کلید عمومی، از 34.10/2012/XNUMX VKO استفاده می شود.
- CMAC به عنوان MAC استفاده می شود. از نظر فنی، این یک حالت خاص از عملکرد یک رمز بلوکی است که در GOST R 34.13-2015 توضیح داده شده است. به عنوان یک تابع رمزگذاری برای این حالت - (34.12 2015).
- هش کلید عمومی او به عنوان شناسه مخاطب استفاده می شود. به عنوان هش استفاده می شود (34.11/2012/256 XNUMX بیت).
پس از دست دادن، در مورد یک کلید مشترک به توافق خواهیم رسید. ما می توانیم از آن برای رمزگذاری تایید شده پیام های حمل و نقل استفاده کنیم. این بخش بسیار ساده است و اشتباه کردن آن دشوار است: شمارنده پیام را افزایش می دهیم، پیام را رمزگذاری می کنیم، شمارنده را تأیید می کنیم (MAC) و متن رمز شده، ارسال می کنیم. هنگام دریافت پیام، بررسی می کنیم که شمارنده مقدار مورد انتظار را داشته باشد، متن رمز را با شمارنده احراز هویت می کنیم و آن را رمزگشایی می کنیم. از چه کلیدی برای رمزگذاری پیام های دست دادن، انتقال پیام ها و نحوه احراز هویت آنها استفاده کنم؟ استفاده از یک کلید برای همه این وظایف خطرناک و غیرعاقلانه است. تولید کلیدها با استفاده از توابع تخصصی ضروری است (تابع مشتق کلید). باز هم، بیایید موها را شکافته و چیزی اختراع نکنیم: مدتهاست که شناخته شده است، به خوبی تحقیق شده است و هیچ مشکل شناخته شده ای ندارد. متأسفانه کتابخانه بومی پایتون این تابع را ندارد، بنابراین ما از آن استفاده می کنیم کیسه پلاستیکی. HKDF داخلی استفاده می کند ، که به نوبه خود از یک تابع هش استفاده می کند. اجرای نمونه در پایتون در صفحه ویکیپدیا فقط چند خط کد نیاز دارد. همانطور که در مورد 34.10/2012/256، ما از Stribog-XNUMX به عنوان تابع هش استفاده خواهیم کرد. خروجی تابع توافق کلید ما، کلید جلسه نامیده می شود، که از آن متقارن های گمشده تولید می شوند:
kdf = Hkdf(None, key_session, hash=GOST34112012256) kdf.expand(b"handshake1-mac-identity") kdf.expand(b"handshake1-enc") kdf.expand(b"handshake1-mac") kdf.expand(b"handshake2-mac-identity") kdf.expand(b"handshake2-enc") kdf.expand(b"handshake2-mac") kdf.expand(b"transport-initiator-enc") kdf.expand(b"transport-initiator-mac") kdf.expand(b"transport-responder-enc") kdf.expand(b"transport-responder-mac")ساختارها/طرح ها
بیایید ببینیم که اکنون چه ساختارهای ASN.1 برای انتقال همه این داده ها داریم:
class Msg(Choice): schema = (( ("text", MsgText()), ("handshake0", MsgHandshake0(expl=tag_ctxc(0))), ("handshake1", MsgHandshake1(expl=tag_ctxc(1))), ("handshake2", MsgHandshake2(expl=tag_ctxc(2))), )) class MsgText(Sequence): schema = (( ("payload", MsgTextPayload()), ("payloadMac", MAC()), )) class MsgTextPayload(Sequence): schema = (( ("nonce", Integer(bounds=(0, float("+inf")))), ("ciphertext", OctetString(bounds=(1, MaxTextLen))), )) class MsgHandshake0(Sequence): schema = (( ("cookieInitiator", Cookie()), ("pubKeyInitiator", PubKey()), )) class MsgHandshake1(Sequence): schema = (( ("cookieResponder", Cookie()), ("pubKeyResponder", PubKey()), ("ukm", OctetString(bounds=(8, 8))), ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class MsgHandshake2(Sequence): schema = (( ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class HandshakeTBE(Sequence): schema = (( ("identity", OctetString(bounds=(32, 32))), ("signature", OctetString(bounds=(64, 64))), ("identityMac", MAC()), )) class HandshakeTBS(Sequence): schema = (( ("cookieTheir", Cookie()), ("cookieOur", Cookie()), ("pubKeyOur", PubKey()), )) class Cookie(OctetString): bounds = (16, 16) class PubKey(OctetString): bounds = (64, 64) class MAC(OctetString): bounds = (16, 16)HandshakeTBS چیزی است که امضا خواهد شد. HandshakeTBE - چه چیزی رمزگذاری خواهد شد. توجه شما را به فیلد ukm در MsgHandshake1 جلب می کنم. 34.10 VKO، برای تصادفی سازی حتی بیشتر کلیدهای تولید شده، شامل پارامتر UKM (مواد کلیدسازی کاربر) است - فقط آنتروپی اضافی.
اضافه کردن رمزنگاری به کد
بیایید فقط تغییرات ایجاد شده در کد اصلی را در نظر بگیریم، زیرا چارچوب یکسان باقی مانده است (در واقع، ابتدا پیاده سازی نهایی نوشته شد و سپس تمام رمزنگاری از آن بریده شد).
از آنجایی که احراز هویت و شناسایی مخاطبین با استفاده از کلیدهای عمومی انجام می شود، اکنون باید برای مدت طولانی در جایی ذخیره شوند. برای سادگی، ما از JSON به صورت زیر استفاده می کنیم:
{ "our": { "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98", "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1" }, "their": { "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce", "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a" } }ما - جفت کلید ما، کلیدهای خصوصی و عمومی هگزادسیمال. آنها - نام مخاطبین و کلیدهای عمومی آنها. بیایید آرگومان های خط فرمان را تغییر دهیم و پردازش پس از داده های JSON را اضافه کنیم:
from pygost import gost3410 from pygost.gost34112012256 import GOST34112012256 CURVE = gost3410.GOST3410Curve( *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"] ) parser = argparse.ArgumentParser(description="GOSTIM") parser.add_argument( "--keys-gen", action="store_true", help="Generate JSON with our new keypair", ) parser.add_argument( "--keys", default="keys.json", required=False, help="JSON with our and their keys", ) parser.add_argument( "--bind", default="::1", help="Address to listen on", ) parser.add_argument( "--port", type=int, default=6666, help="Port to listen on", ) args = parser.parse_args() if args.keys_gen: prv_raw = urandom(32) pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw)) pub_raw = gost3410.pub_marshal(pub) print(json.dumps({ "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)}, "their": {}, })) exit(0) # Parse and unmarshal our and their keys {{{ with open(args.keys, "rb") as fd: _keys = json.loads(fd.read().decode("utf-8")) KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"])) _pub = hexdec(_keys["our"]["pub"]) KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub) KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest()) for peer_name, pub_raw in _keys["their"].items(): _pub = hexdec(pub_raw) KEYS[GOST34112012256(_pub).digest()] = { "name": peer_name, "pub": gost3410.pub_unmarshal(_pub), } # }}}کلید خصوصی الگوریتم 34.10 یک عدد تصادفی است. اندازه 256 بیت برای منحنی های بیضوی 256 بیتی. PyGOST با مجموعه ای از بایت ها کار نمی کند، بلکه با ، بنابراین کلید خصوصی ما (urandom(32)) باید با استفاده از gost3410.prv_unmarshal () به عدد تبدیل شود. کلید عمومی به طور قطعی از کلید خصوصی با استفاده از gost3410.public_key () تعیین می شود. کلید عمومی 34.10 دو عدد بزرگ است که همچنین برای سهولت در ذخیره سازی و انتقال با استفاده از gost3410.pub_marshal باید به یک دنباله بایت تبدیل شوند.
پس از خواندن فایل JSON، کلیدهای عمومی بر این اساس باید با استفاده از gost3410.pub_unmarshal () دوباره تبدیل شوند. از آنجایی که ما شناسه های مخاطبین را به صورت هش از کلید عمومی دریافت می کنیم، می توان آنها را بلافاصله از قبل محاسبه کرد و برای جستجوی سریع در فرهنگ لغت قرار داد. هش Stribog-256 gost34112012256.GOST34112012256 است که به طور کامل رابط هشلب توابع هش را برآورده می کند.
روال آغازگر چگونه تغییر کرده است؟ همه چیز مطابق با طرح دست دادن است: ما یک کوکی (128 بیتی زیاد است)، یک جفت کلید زودگذر 34.10 تولید می کنیم که برای تابع توافق کلید VKO استفاده می شود.
395 async def initiator(host, port): 396 _id = repr((host, port)) 397 logging.info("%s: dialing", _id) 398 reader, writer = await asyncio.open_connection(host, port) 399 # Generate our ephemeral public key and cookie, send Handshake 0 message {{{ 400 cookie_our = Cookie(urandom(16)) 401 prv = gost3410.prv_unmarshal(urandom(32)) 402 pub_our = gost3410.public_key(CURVE, prv) 403 pub_our_raw = PubKey(gost3410.pub_marshal(pub_our)) 404 writer.write(Msg(("handshake0", MsgHandshake0(( 405 ("cookieInitiator", cookie_our), 406 ("pubKeyInitiator", pub_our_raw), 407 )))).encode()) 408 # }}} 409 await writer.drain()- ما منتظر پاسخ هستیم و پیام دریافتی را رمزگشایی می کنیم.
- مطمئن شوید که دست دادن1;
- رمزگشایی کلید عمومی زودگذر طرف مقابل و محاسبه کلید جلسه.
- ما کلیدهای متقارن لازم برای پردازش قسمت TBE پیام را تولید می کنیم.
423 logging.info("%s: got %s message", _id, msg.choice) 424 if msg.choice != "handshake1": 425 logging.warning("%s: unexpected message, disconnecting", _id) 426 writer.close() 427 return 428 # }}} 429 msg_handshake1 = msg.value 430 # Validate Handshake message {{{ 431 cookie_their = msg_handshake1["cookieResponder"] 432 pub_their_raw = msg_handshake1["pubKeyResponder"] 433 pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw)) 434 ukm_raw = bytes(msg_handshake1["ukm"]) 435 ukm = ukm_unmarshal(ukm_raw) 436 key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001) 437 kdf = Hkdf(None, key_session, hash=GOST34112012256) 438 key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity") 439 key_handshake1_enc = kdf.expand(b"handshake1-enc") 440 key_handshake1_mac = kdf.expand(b"handshake1-mac")UKM یک عدد 64 بیتی است (urandom(8))، که همچنین نیاز به deserialization از نمایش بایت آن با استفاده از gost3410_vko.ukm_unmarshal() دارد. تابع VKO برای 34.10/2012/256 3410 بیتی gost34102012256_vko.kek_XNUMX() (KEK - کلید رمزگذاری) است.
کلید جلسه تولید شده در حال حاضر یک توالی بایت شبه تصادفی 256 بیتی است. بنابراین، می توان آن را بلافاصله در توابع HKDF استفاده کرد. از آنجایی که GOST34112012256 رابط هشلب را برآورده می کند، می توان آن را بلافاصله در کلاس Hkdf استفاده کرد. ما salt (اول آرگومان Hkdf) را مشخص نمیکنیم، زیرا کلید تولید شده، به دلیل زودگذر بودن جفتهای کلیدی شرکتکننده، برای هر جلسه متفاوت خواهد بود و از قبل دارای آنتروپی کافی است. ()kdf.expand به طور پیش فرض کلیدهای 256 بیتی مورد نیاز برای Grasshopper را در آینده تولید می کند.
سپس قسمت های TBE و TBS پیام دریافتی بررسی می شوند:
- MAC روی متن رمز ورودی محاسبه و بررسی می شود.
- متن رمز رمزگشایی شده است.
- ساختار TBE رمزگشایی شده است.
- شناسه مخاطب از آن گرفته می شود و بررسی می شود که آیا او اصلاً برای ما شناخته شده است یا خیر.
- MAC روی این شناسه محاسبه و بررسی می شود.
- امضای ساختار TBS تأیید می شود که شامل کوکی هر دو طرف و کلید موقت عمومی طرف مقابل است. امضا با کلید امضای طولانی مدت طرف مقابل تأیید می شود.
441 try: 442 peer_name = validate_tbe( 443 msg_handshake1, 444 key_handshake1_mac_identity, 445 key_handshake1_enc, 446 key_handshake1_mac, 447 cookie_our, 448 cookie_their, 449 pub_their_raw, 450 ) 451 except ValueError as err: 452 logging.warning("%s: %s, disconnecting", _id, err) 453 writer.close() 454 return 455 # }}} 128 def validate_tbe( 129 msg_handshake: Union[MsgHandshake1, MsgHandshake2], 130 key_mac_identity: bytes, 131 key_enc: bytes, 132 key_mac: bytes, 133 cookie_their: Cookie, 134 cookie_our: Cookie, 135 pub_key_our: PubKey, 136 ) -> str: 137 ciphertext = bytes(msg_handshake["ciphertext"]) 138 mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext) 139 if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])): 140 raise ValueError("invalid MAC") 141 plaintext = ctr( 142 GOST3412Kuznechik(key_enc).encrypt, 143 KUZNECHIK_BLOCKSIZE, 144 ciphertext, 145 8 * b"x00", 146 ) 147 try: 148 tbe, _ = HandshakeTBE().decode(plaintext) 149 except ASN1Error: 150 raise ValueError("can not decode TBE") 151 key_sign_pub_hash = bytes(tbe["identity"]) 152 peer = KEYS.get(key_sign_pub_hash) 153 if peer is None: 154 raise ValueError("unknown identity") 155 mac_tag = mac( 156 GOST3412Kuznechik(key_mac_identity).encrypt, 157 KUZNECHIK_BLOCKSIZE, 158 key_sign_pub_hash, 159 ) 160 if not compare_digest(mac_tag, bytes(tbe["identityMac"])): 161 raise ValueError("invalid identity MAC") 162 tbs = HandshakeTBS(( 163 ("cookieTheir", cookie_their), 164 ("cookieOur", cookie_our), 165 ("pubKeyOur", pub_key_our), 166 )) 167 if not gost3410.verify( 168 CURVE, 169 peer["pub"], 170 GOST34112012256(tbs.encode()).digest(), 171 bytes(tbe["signature"]), 172 ): 173 raise ValueError("invalid signature") 174 return peer["name"]همانطور که در بالا نوشتم، 34.13/2015/XNUMX موارد مختلفی را توصیف می کند از 34.12/2015/3413. در میان آنها حالتی برای تولید درج های تقلیدی و محاسبات MAC وجود دارد. در PyGOST این () gost34.12.mac است. این حالت مستلزم عبور تابع رمزگذاری (دریافت و برگرداندن یک بلوک داده)، اندازه بلوک رمزگذاری و در واقع خود داده است. چرا نمی توانید اندازه بلوک رمزگذاری را کد سخت کنید؟ 2015/128/64 نه تنها رمز XNUMX بیتی Grasshopper، بلکه XNUMX بیتی را نیز توصیف می کند. - یک GOST 28147-89 کمی تغییر یافته که در KGB ایجاد شده است و هنوز یکی از بالاترین آستانه های ایمنی را دارد.
Kuznechik با فراخوانی gost.3412.GOST3412Kuznechik (کلید) مقداردهی اولیه می شود و یک شی با متدهای .encrypt()/.decrypt() مناسب برای ارسال به توابع 34.13 برمی گرداند. MAC به صورت زیر محاسبه می شود: gost3413.mac(GOST3412Kuznechik(key).رمزگذاری، KUZNECHIK_BLOCKSIZE، متن رمزی). برای مقایسه MAC محاسبهشده و دریافتی، نمیتوانید از مقایسه معمول (==) رشتههای بایت استفاده کنید، زیرا این عملیات زمان مقایسه را نشت میکند، که در حالت کلی، میتواند منجر به آسیبپذیریهای کشنده مانند شود. حملات به TLS پایتون یک تابع ویژه به نام hmac.compare_digest برای این کار دارد.
تابع رمز بلوک فقط می تواند یک بلوک از داده ها را رمزگذاری کند. برای تعداد بیشتر و حتی نه مضربی از طول، استفاده از حالت رمزگذاری ضروری است. 34.13-2015 موارد زیر را شرح می دهد: ECB، CTR، OFB، CBC، CFB. هر کدام حوزه های کاربردی و ویژگی های قابل قبول خود را دارند. متاسفانه هنوز استاندارد نشده ایم (مانند CCM، OCB، GCM و مانند آن) - ما مجبوریم حداقل خودمان MAC را اضافه کنیم. من انتخاب می کنم (CTR): به اندازه بلوک نیازی ندارد، می تواند موازی شود، فقط از عملکرد رمزگذاری استفاده می کند، می تواند به طور ایمن برای رمزگذاری تعداد زیادی پیام استفاده شود (برخلاف CBC که برخورد نسبتاً سریع دارد).
مانند .mac()، .ctr() ورودی مشابهی می گیرد: ciphertext = gost3413.ctr(GOST3412Kuznechik(key).رمزگذاری، KUZNECHIK_BLOCKSIZE، متن ساده، iv). لازم است یک بردار اولیه که دقیقاً نصف طول بلوک رمزگذاری باشد را مشخص کنید. اگر کلید رمزگذاری ما فقط برای رمزگذاری یک پیام (البته از چندین بلوک) استفاده میشود، آنگاه میتوان یک بردار اولیه صفر را تنظیم کرد. برای رمزگذاری پیام های دست دادن، هر بار از یک کلید جداگانه استفاده می کنیم.
تأیید امضای gost3410.verify () بی اهمیت است: ما از منحنی بیضی که در آن کار می کنیم عبور می کنیم (به سادگی آن را در پروتکل GOSTIM خود ثبت می کنیم)، کلید عمومی امضاکننده (فراموش نکنید که این باید دو تایی باشد. اعداد بزرگ، و نه یک رشته بایت)، هش 34.11/2012/XNUMX و خود امضا.
در مرحله بعد، در آغازگر ما یک پیام handshake را آماده می کنیم و به handshake2 می فرستیم، همان اقداماتی را که در حین تأیید انجام دادیم، فقط به صورت متقارن انجام می دهیم: امضا کردن روی کلیدهای خود به جای بررسی و غیره...
456 # Prepare and send Handshake 2 message {{{ 457 tbs = HandshakeTBS(( 458 ("cookieTheir", cookie_their), 459 ("cookieOur", cookie_our), 460 ("pubKeyOur", pub_our_raw), 461 )) 462 signature = gost3410.sign( 463 CURVE, 464 KEY_OUR_SIGN_PRV, 465 GOST34112012256(tbs.encode()).digest(), 466 ) 467 key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity") 468 mac_tag = mac( 469 GOST3412Kuznechik(key_handshake2_mac_identity).encrypt, 470 KUZNECHIK_BLOCKSIZE, 471 bytes(KEY_OUR_SIGN_PUB_HASH), 472 ) 473 tbe = HandshakeTBE(( 474 ("identity", KEY_OUR_SIGN_PUB_HASH), 475 ("signature", OctetString(signature)), 476 ("identityMac", MAC(mac_tag)), 477 )) 478 tbe_raw = tbe.encode() 479 key_handshake2_enc = kdf.expand(b"handshake2-enc") 480 key_handshake2_mac = kdf.expand(b"handshake2-mac") 481 ciphertext = ctr( 482 GOST3412Kuznechik(key_handshake2_enc).encrypt, 483 KUZNECHIK_BLOCKSIZE, 484 tbe_raw, 485 8 * b"x00", 486 ) 487 mac_tag = mac( 488 GOST3412Kuznechik(key_handshake2_mac).encrypt, 489 KUZNECHIK_BLOCKSIZE, 490 ciphertext, 491 ) 492 writer.write(Msg(("handshake2", MsgHandshake2(( 493 ("ciphertext", OctetString(ciphertext)), 494 ("ciphertextMac", MAC(mac_tag)), 495 )))).encode()) 496 # }}} 497 await writer.drain() 498 logging.info("%s: session established: %s", _id, peer_name)هنگامی که جلسه برقرار شد، کلیدهای انتقال تولید می شوند (یک کلید جداگانه برای رمزگذاری، برای احراز هویت، برای هر یک از طرفین)، و Grasshopper برای رمزگشایی و بررسی MAC مقداردهی اولیه می شود:
499 # Run text message sender, initialize transport decoder {{{ 500 key_initiator_enc = kdf.expand(b"transport-initiator-enc") 501 key_initiator_mac = kdf.expand(b"transport-initiator-mac") 502 key_responder_enc = kdf.expand(b"transport-responder-enc") 503 key_responder_mac = kdf.expand(b"transport-responder-mac") ... 509 asyncio.ensure_future(msg_sender( 510 peer_name, 511 key_initiator_enc, 512 key_initiator_mac, 513 writer, 514 )) 515 encrypter = GOST3412Kuznechik(key_responder_enc).encrypt 516 macer = GOST3412Kuznechik(key_responder_mac).encrypt 517 # }}} 519 nonce_expected = 0 520 # Wait for test messages {{{ 521 while True: 522 data = await reader.read(MaxMsgLen) ... 530 msg, tail = Msg().decode(buf) ... 537 try: 538 await msg_receiver( 539 msg.value, 540 nonce_expected, 541 macer, 542 encrypter, 543 peer_name, 544 ) 545 except ValueError as err: 546 logging.warning("%s: %s", err) 547 break 548 nonce_expected += 1 549 # }}}msg_sender coroutine اکنون پیامها را قبل از ارسال در اتصال TCP رمزگذاری میکند. هر پیام دارای یک نونس در حال افزایش یکنواخت است که هنگام رمزگذاری در حالت شمارنده، بردار اولیه است. هر بلوک پیام و پیام دارای یک مقدار متمایز متفاوت است.
async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None: nonce = 0 encrypter = GOST3412Kuznechik(key_enc).encrypt macer = GOST3412Kuznechik(key_mac).encrypt in_queue = IN_QUEUES[peer_name] while True: text = await in_queue.get() if text is None: break ciphertext = ctr( encrypter, KUZNECHIK_BLOCKSIZE, text.encode("utf-8"), long2bytes(nonce, 8), ) payload = MsgTextPayload(( ("nonce", Integer(nonce)), ("ciphertext", OctetString(ciphertext)), )) mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode()) writer.write(Msg(("text", MsgText(( ("payload", payload), ("payloadMac", MAC(mac_tag)), )))).encode()) nonce += 1پیامهای دریافتی توسط برنامه msg_receiver پردازش میشوند که احراز هویت و رمزگشایی را مدیریت میکند:
async def msg_receiver( msg_text: MsgText, nonce_expected: int, macer, encrypter, peer_name: str, ) -> None: payload = msg_text["payload"] if int(payload["nonce"]) != nonce_expected: raise ValueError("unexpected nonce value") mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode()) if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])): raise ValueError("invalid MAC") plaintext = ctr( encrypter, KUZNECHIK_BLOCKSIZE, bytes(payload["ciphertext"]), long2bytes(nonce_expected, 8), ) text = plaintext.decode("utf-8") await OUT_QUEUES[peer_name].put(text)نتیجه
GOSTIM در نظر گرفته شده است که به طور انحصاری برای اهداف آموزشی مورد استفاده قرار گیرد (زیرا حداقل تحت پوشش آزمایشی نیست)! کد منبع برنامه قابل دانلود است (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа , , , ، GOSTIM کاملاً است تحت شرایط توزیع شده است .
, ، عضو , Python/Go-developer، متخصص ارشد .
منبع: www.habr.com
